Skip to content

declearn.communication.api.backend.MessagesHandler

Minimal protocol-agnostic server-side messages handler.

Source code in declearn/communication/api/backend/_handler.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
class MessagesHandler:
    """Minimal protocol-agnostic server-side messages handler."""

    def __init__(
        self,
        logger: logging.Logger,
        heartbeat: float = 1.0,
    ) -> None:
        # Assign parameters as attributes.
        self.logger = logger
        self.heartbeat = heartbeat
        # Set up containers for client identifiers and pending messages.
        self.registered_clients = {}  # type: Dict[Any, str]
        self.outgoing_messages = {}  # type: Dict[str, str]
        self.incoming_messages = {}  # type: Dict[str, str]
        # Mark client-registration as unopened.
        self.registration_status = flags.REGISTRATION_UNSTARTED

    @property
    def client_names(self) -> Set[str]:
        """Names of the registered clients."""
        return set(self.registered_clients.values())

    async def purge(
        self,
    ) -> None:
        """Close opened connections and purge information about users.

        This resets the instance as though it was first initialized.
        User registration will be marked as unstarted.
        """
        self.registered_clients.clear()
        self.outgoing_messages.clear()
        self.incoming_messages.clear()
        self.registration_status = flags.REGISTRATION_UNSTARTED

    async def handle_message(
        self,
        string: str,
        context: Any,
    ) -> ActionMessage:
        """Handle an incoming message from a client.

        Parameters
        ----------
        string: str
            Received message, as a string that can be parsed back
            into an `ActionMessage` instance.
        context: hashable
            Communications-protocol-specific hashable object that
            may be used to uniquely identify (and thereof contact)
            the client that sent the message being handled.

        Returns
        -------
        message: ActionMessage
            Message to return to the sender, the specific type of
            which depends on the type of incoming request, errors
            encountered, etc.
        """
        # Parse the incoming message. If it is incorrect, reject it.
        try:
            message = parse_action_from_string(string)
        except (KeyError, TypeError, ValueError) as exc:
            self.logger.info(
                "Exception encountered while parsing received message: %s",
                repr(exc),
            )
            return Reject(flags.INVALID_MESSAGE)
        except LegacyMessageError as exc:
            self.logger.info(repr(exc))
            return LegacyReject()
        # Case: join request from a (new) client. Handle it.
        if isinstance(message, Join):
            return await self._handle_join_request(message, context)
        # Case: unregistered client. Reject message.
        if context not in self.registered_clients:
            return Reject(flags.REJECT_UNREGISTERED)
        # Case: registered client. Handle it.
        return await self._handle_registered_client_message(message, context)

    async def _handle_registered_client_message(
        self,
        message: ActionMessage,
        context: Any,
    ) -> ActionMessage:
        """Backend to handle a message from a registered client."""
        # Case: message-receiving request. Handle it.
        if isinstance(message, Recv):
            return await self._handle_recv_request(message, context)
        # Case: message-sending request. Handle it.
        if isinstance(message, Send):
            return await self._handle_send_request(message, context)
        # Case: drop message from a client. Handle it.
        if isinstance(message, Drop):
            return await self._handle_drop_request(message, context)
        # Case: ping request. Ping back.
        if isinstance(message, Ping):
            return Ping()
        # Case: unsupported message. Reject it.
        self.logger.error(
            "TypeError: received a message of unexpected type '%s'",
            type(message).__name__,
        )
        return Reject(flags.INVALID_MESSAGE)

    async def _handle_join_request(
        self,
        message: Join,
        context: Any,
    ) -> Union[Accept, Reject]:
        """Handle a join request."""
        # Case when client is already registered: warn but send OK.
        if context in self.registered_clients:
            self.logger.info(
                "Client %s is already registered.",
                self.registered_clients[context],
            )
            return Accept(flags.REGISTERED_ALREADY)
        # Case when registration is not opened: warn and reject.
        if self.registration_status != flags.REGISTRATION_OPEN:
            self.logger.info("Rejecting registration request.")
            return Reject(flag=self.registration_status)
        # Case when the client uses an incompatible declearn version.
        if (err := self._verify_version_compatibility(message)) is not None:
            return err
        # Case when registration is opened: register the client.
        self._register_client(message, context)
        return Accept(flag=flags.REGISTERED_WELCOME)

    def _verify_version_compatibility(
        self,
        message: Join,
    ) -> Optional[Reject]:
        """Return an 'Error' if a 'JoinRequest' is of incompatible version."""
        if message.version.split(".")[:2] == VERSION.split(".")[:2]:
            return None
        self.logger.info(
            "Received a registration request under name %s, that is "
            "invalid due to the client using DecLearn '%s'.",
            message.name,
            message.version,
        )
        return Reject(flags.REJECT_INCOMPATIBLE_VERSION)

    def _register_client(
        self,
        message: Join,
        context: Any,
    ) -> None:
        """Register a user based on their Join request and context object."""
        # Alias the user name if needed to avoid duplication issues.
        name = message.name
        used = self.client_names
        if name in used:
            idx = sum(other.rsplit(".", 1)[0] == name for other in used)
            name = f"{name}.{idx}"
        # Register the user, recording context and received data information.
        self.logger.info("Registering client '%s' for training.", name)
        self.registered_clients[context] = name

    async def _handle_send_request(
        self,
        message: Send,
        context: Any,
    ) -> Union[Ping, Reject]:
        """Handle a message-sending request (client-to-server)."""
        name = self.registered_clients[context]
        # Wait for any previous message from this client to be collected.
        while self.incoming_messages.get(name):
            await asyncio.sleep(self.heartbeat)
        # Record the received message, and return a ping-back response.
        self.incoming_messages[name] = message.content
        return Ping()

    async def _handle_recv_request(
        self,
        message: Recv,
        context: Any,
    ) -> Union[Send, Reject]:
        """Handle a message-receiving request."""
        # Set up the optional timeout mechanism.
        timeout = message.timeout
        countdown = (
            max(math.ceil(timeout / self.heartbeat), 1) if timeout else -1
        )
        # Wait for a message to be available or timeout to be reached.
        name = self.registered_clients[context]
        while (not self.outgoing_messages.get(name)) and countdown:
            await asyncio.sleep(self.heartbeat)
            countdown -= 1
        # Either send back the collected message, or a timeout error.
        try:
            content = self.outgoing_messages.pop(name)
        except KeyError:
            return Reject(flags.CHECK_MESSAGE_TIMEOUT)
        return Send(content)

    async def _handle_drop_request(
        self,
        message: Drop,
        context: Any,
    ) -> Ping:
        """Handle a drop request from a client."""
        name = self.registered_clients.pop(context)
        reason = (
            f"reason: '{message.reason}'" if message.reason else "no reason"
        )
        self.logger.info("Client %s has dropped with %s.", name, reason)
        return Ping()

    def post_message(
        self,
        message: str,
        client: str,
    ) -> None:
        """Post a message to be requested by a given client.

        Parameters
        ----------
        message: str
            Message string that is to be posted for the client to collect.
        client: str
            Name of the client to whom the message is addressed.

        Notes
        -----
        This method merely makes the message available for the client
        to request, without any guarantee that it is received.
        See the `send_message` async method to wait for the posted
        message to have been requested by and thus sent to the client.
        """
        if client not in self.client_names:
            raise KeyError(f"Unkown destinatory client '{client}'.")
        if client in self.outgoing_messages:
            self.logger.warning(
                "Overwriting pending message uncollected by client '%s'.",
                client,
            )
        self.outgoing_messages[client] = message

    async def send_message(
        self,
        message: str,
        client: str,
        timeout: Optional[float] = None,
    ) -> None:
        """Post a message for a client and wait for it to be collected.

        Parameters
        ----------
        message: str
            Message string that is to be posted for the client to collect.
        client: str
            Name of the client to whom the message is addressed.
        timeout: float or None, default=None
            Optional maximum delay (in seconds) beyond which to stop
            waiting for collection and raise an asyncio.TimeoutError.

        Raises
        ------
        asyncio.TimeoutError
            If `timeout` is set and is reached while the message is
            yet to be collected by the client.

        Notes
        -----
        See the `post_message` method to synchronously post a message
        and move on without guarantees that it was collected.
        """
        # Post the message. Wait for it to have been collected.
        self.post_message(message, client)
        countdown = (
            max(math.ceil(timeout / self.heartbeat), 1) if timeout else -1
        )
        while self.outgoing_messages.get(client, False) and countdown:
            await asyncio.sleep(self.heartbeat)
            countdown -= 1
        # If the message is still there, raise a TimeoutError.
        if self.outgoing_messages.get(client):
            raise asyncio.TimeoutError(
                "Timeout reached before the sent message was collected."
            )

    def check_message(
        self,
        client: str,
    ) -> Optional[str]:
        """Check whether a message was received from a given client.

        Parameters
        ----------
        client: str
            Name of the client whose emitted message to check for.

        Returns
        -------
        message:
            Collected message that was sent by `client`, if any.
            In case no message is available, return None.

        Notes
        -----
        See the `recv_message` async method to wait for a message
        from the client to be available, collect and return it.
        """
        if client not in self.client_names:
            raise KeyError(f"Unregistered checked-for client '{client}'.")
        return self.incoming_messages.pop(client, None)

    async def recv_message(
        self,
        client: str,
        timeout: Optional[float] = None,
    ) -> str:
        """Wait for a message to be received from a given client.

        Parameters
        ----------
        client: str
            Name of the client whose emitted message to check for.
        timeout: float or None, default=None
            Optional maximum delay (in seconds) beyond which to stop
            waiting for a message and raise an asyncio.TimeoutError.

        Raises
        ------
        asyncio.TimeoutError
            If `timeout` is set and is reached while no message has
            been received from the client.

        Returns
        -------
        message:
            Collected message that was sent by `client`.

        Notes
        -----
        See the `check_message` method to synchronously check whether
        a message from the client is available and return it or None.
        """
        countdown = (
            max(math.ceil(timeout / self.heartbeat), 1) if timeout else -1
        )
        while countdown:
            message = self.check_message(client)
            if message is not None:
                return message
            await asyncio.sleep(self.heartbeat)
            countdown -= 1
        raise asyncio.TimeoutError(
            "Timeout reached before a message was received."
        )

    def open_clients_registration(
        self,
    ) -> None:
        """Make this servicer accept registration of new clients."""
        self.registration_status = flags.REGISTRATION_OPEN

    def close_clients_registration(
        self,
    ) -> None:
        """Make this servicer reject registration of new clients."""
        self.registration_status = flags.REGISTRATION_CLOSED

    async def wait_for_clients(
        self,
        min_clients: int = 1,
        max_clients: Optional[int] = None,
        timeout: Optional[float] = None,
    ) -> None:
        """Wait for clients to register for training, with given criteria.

        Parameters
        ----------
        min_clients: int, default=1
            Minimum number of clients required. Corrected to be >= 1.
            If `timeout` is None, used as the exact number of clients
            required - once reached, registration will be closed.
        max_clients: int or None, default=None
            Maximum number of clients authorized to register.
        timeout: float or None, default=None
            Optional maximum waiting time (in seconds) beyond which
            to close registration and either return or raise.

        Raises
        ------
        RuntimeError
            If the number of registered clients does not abide by the
            provided boundaries at the end of the process.
        """
        # Ensure any collected information is purged in case of failure
        # (due to raised errors or wrong number of registered clients).
        try:
            await self._wait_for_clients(min_clients, max_clients, timeout)
        except Exception as exc:  # re-raise; pylint: disable=broad-except
            await self.purge()
            raise exc

    async def _wait_for_clients(
        self,
        min_clients: int = 1,
        max_clients: Optional[int] = None,
        timeout: Optional[float] = None,
    ) -> None:
        """Backend of `wait_for_clients` method, without safeguards."""
        # Parse information on the required number of clients.
        min_clients = max(min_clients, 1)
        max_clients = -1 if max_clients is None else max_clients
        if max_clients < 0:
            max_clients = (
                min_clients if timeout is None else math.inf  # type: ignore
            )
        else:
            max_clients = max(min_clients, max_clients)
        # Wait for the required number of clients to have joined.
        self.open_clients_registration()
        countdown = (
            max(math.ceil(timeout / self.heartbeat), 1) if timeout else -1
        )
        while countdown and (len(self.registered_clients) < max_clients):
            await asyncio.sleep(self.heartbeat)
            countdown -= 1
        self.close_clients_registration()
        # Check whether all requirements have been checked.
        n_clients = len(self.registered_clients)
        if not min_clients <= n_clients <= max_clients:
            raise RuntimeError(
                f"The number of registered clients is {n_clients}, which "
                f"is out of the [{min_clients}, {max_clients}] range."
            )

client_names: Set[str] property

Names of the registered clients.

check_message(client)

Check whether a message was received from a given client.

Parameters:

Name Type Description Default
client str

Name of the client whose emitted message to check for.

required

Returns:

Name Type Description
message Optional[str]

Collected message that was sent by client, if any. In case no message is available, return None.

Notes

See the recv_message async method to wait for a message from the client to be available, collect and return it.

Source code in declearn/communication/api/backend/_handler.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
def check_message(
    self,
    client: str,
) -> Optional[str]:
    """Check whether a message was received from a given client.

    Parameters
    ----------
    client: str
        Name of the client whose emitted message to check for.

    Returns
    -------
    message:
        Collected message that was sent by `client`, if any.
        In case no message is available, return None.

    Notes
    -----
    See the `recv_message` async method to wait for a message
    from the client to be available, collect and return it.
    """
    if client not in self.client_names:
        raise KeyError(f"Unregistered checked-for client '{client}'.")
    return self.incoming_messages.pop(client, None)

close_clients_registration()

Make this servicer reject registration of new clients.

Source code in declearn/communication/api/backend/_handler.py
403
404
405
406
407
def close_clients_registration(
    self,
) -> None:
    """Make this servicer reject registration of new clients."""
    self.registration_status = flags.REGISTRATION_CLOSED

handle_message(string, context) async

Handle an incoming message from a client.

Parameters:

Name Type Description Default
string str

Received message, as a string that can be parsed back into an ActionMessage instance.

required
context Any

Communications-protocol-specific hashable object that may be used to uniquely identify (and thereof contact) the client that sent the message being handled.

required

Returns:

Name Type Description
message ActionMessage

Message to return to the sender, the specific type of which depends on the type of incoming request, errors encountered, etc.

Source code in declearn/communication/api/backend/_handler.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
async def handle_message(
    self,
    string: str,
    context: Any,
) -> ActionMessage:
    """Handle an incoming message from a client.

    Parameters
    ----------
    string: str
        Received message, as a string that can be parsed back
        into an `ActionMessage` instance.
    context: hashable
        Communications-protocol-specific hashable object that
        may be used to uniquely identify (and thereof contact)
        the client that sent the message being handled.

    Returns
    -------
    message: ActionMessage
        Message to return to the sender, the specific type of
        which depends on the type of incoming request, errors
        encountered, etc.
    """
    # Parse the incoming message. If it is incorrect, reject it.
    try:
        message = parse_action_from_string(string)
    except (KeyError, TypeError, ValueError) as exc:
        self.logger.info(
            "Exception encountered while parsing received message: %s",
            repr(exc),
        )
        return Reject(flags.INVALID_MESSAGE)
    except LegacyMessageError as exc:
        self.logger.info(repr(exc))
        return LegacyReject()
    # Case: join request from a (new) client. Handle it.
    if isinstance(message, Join):
        return await self._handle_join_request(message, context)
    # Case: unregistered client. Reject message.
    if context not in self.registered_clients:
        return Reject(flags.REJECT_UNREGISTERED)
    # Case: registered client. Handle it.
    return await self._handle_registered_client_message(message, context)

open_clients_registration()

Make this servicer accept registration of new clients.

Source code in declearn/communication/api/backend/_handler.py
397
398
399
400
401
def open_clients_registration(
    self,
) -> None:
    """Make this servicer accept registration of new clients."""
    self.registration_status = flags.REGISTRATION_OPEN

post_message(message, client)

Post a message to be requested by a given client.

Parameters:

Name Type Description Default
message str

Message string that is to be posted for the client to collect.

required
client str

Name of the client to whom the message is addressed.

required

Notes

This method merely makes the message available for the client to request, without any guarantee that it is received. See the send_message async method to wait for the posted message to have been requested by and thus sent to the client.

Source code in declearn/communication/api/backend/_handler.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def post_message(
    self,
    message: str,
    client: str,
) -> None:
    """Post a message to be requested by a given client.

    Parameters
    ----------
    message: str
        Message string that is to be posted for the client to collect.
    client: str
        Name of the client to whom the message is addressed.

    Notes
    -----
    This method merely makes the message available for the client
    to request, without any guarantee that it is received.
    See the `send_message` async method to wait for the posted
    message to have been requested by and thus sent to the client.
    """
    if client not in self.client_names:
        raise KeyError(f"Unkown destinatory client '{client}'.")
    if client in self.outgoing_messages:
        self.logger.warning(
            "Overwriting pending message uncollected by client '%s'.",
            client,
        )
    self.outgoing_messages[client] = message

purge() async

Close opened connections and purge information about users.

This resets the instance as though it was first initialized. User registration will be marked as unstarted.

Source code in declearn/communication/api/backend/_handler.py
66
67
68
69
70
71
72
73
74
75
76
77
async def purge(
    self,
) -> None:
    """Close opened connections and purge information about users.

    This resets the instance as though it was first initialized.
    User registration will be marked as unstarted.
    """
    self.registered_clients.clear()
    self.outgoing_messages.clear()
    self.incoming_messages.clear()
    self.registration_status = flags.REGISTRATION_UNSTARTED

recv_message(client, timeout=None) async

Wait for a message to be received from a given client.

Parameters:

Name Type Description Default
client str

Name of the client whose emitted message to check for.

required
timeout Optional[float]

Optional maximum delay (in seconds) beyond which to stop waiting for a message and raise an asyncio.TimeoutError.

None

Raises:

Type Description
asyncio.TimeoutError

If timeout is set and is reached while no message has been received from the client.

Returns:

Name Type Description
message str

Collected message that was sent by client.

Notes

See the check_message method to synchronously check whether a message from the client is available and return it or None.

Source code in declearn/communication/api/backend/_handler.py
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
async def recv_message(
    self,
    client: str,
    timeout: Optional[float] = None,
) -> str:
    """Wait for a message to be received from a given client.

    Parameters
    ----------
    client: str
        Name of the client whose emitted message to check for.
    timeout: float or None, default=None
        Optional maximum delay (in seconds) beyond which to stop
        waiting for a message and raise an asyncio.TimeoutError.

    Raises
    ------
    asyncio.TimeoutError
        If `timeout` is set and is reached while no message has
        been received from the client.

    Returns
    -------
    message:
        Collected message that was sent by `client`.

    Notes
    -----
    See the `check_message` method to synchronously check whether
    a message from the client is available and return it or None.
    """
    countdown = (
        max(math.ceil(timeout / self.heartbeat), 1) if timeout else -1
    )
    while countdown:
        message = self.check_message(client)
        if message is not None:
            return message
        await asyncio.sleep(self.heartbeat)
        countdown -= 1
    raise asyncio.TimeoutError(
        "Timeout reached before a message was received."
    )

send_message(message, client, timeout=None) async

Post a message for a client and wait for it to be collected.

Parameters:

Name Type Description Default
message str

Message string that is to be posted for the client to collect.

required
client str

Name of the client to whom the message is addressed.

required
timeout Optional[float]

Optional maximum delay (in seconds) beyond which to stop waiting for collection and raise an asyncio.TimeoutError.

None

Raises:

Type Description
asyncio.TimeoutError

If timeout is set and is reached while the message is yet to be collected by the client.

Notes

See the post_message method to synchronously post a message and move on without guarantees that it was collected.

Source code in declearn/communication/api/backend/_handler.py
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
async def send_message(
    self,
    message: str,
    client: str,
    timeout: Optional[float] = None,
) -> None:
    """Post a message for a client and wait for it to be collected.

    Parameters
    ----------
    message: str
        Message string that is to be posted for the client to collect.
    client: str
        Name of the client to whom the message is addressed.
    timeout: float or None, default=None
        Optional maximum delay (in seconds) beyond which to stop
        waiting for collection and raise an asyncio.TimeoutError.

    Raises
    ------
    asyncio.TimeoutError
        If `timeout` is set and is reached while the message is
        yet to be collected by the client.

    Notes
    -----
    See the `post_message` method to synchronously post a message
    and move on without guarantees that it was collected.
    """
    # Post the message. Wait for it to have been collected.
    self.post_message(message, client)
    countdown = (
        max(math.ceil(timeout / self.heartbeat), 1) if timeout else -1
    )
    while self.outgoing_messages.get(client, False) and countdown:
        await asyncio.sleep(self.heartbeat)
        countdown -= 1
    # If the message is still there, raise a TimeoutError.
    if self.outgoing_messages.get(client):
        raise asyncio.TimeoutError(
            "Timeout reached before the sent message was collected."
        )

wait_for_clients(min_clients=1, max_clients=None, timeout=None) async

Wait for clients to register for training, with given criteria.

Parameters:

Name Type Description Default
min_clients int

Minimum number of clients required. Corrected to be >= 1. If timeout is None, used as the exact number of clients required - once reached, registration will be closed.

1
max_clients Optional[int]

Maximum number of clients authorized to register.

None
timeout Optional[float]

Optional maximum waiting time (in seconds) beyond which to close registration and either return or raise.

None

Raises:

Type Description
RuntimeError

If the number of registered clients does not abide by the provided boundaries at the end of the process.

Source code in declearn/communication/api/backend/_handler.py
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
async def wait_for_clients(
    self,
    min_clients: int = 1,
    max_clients: Optional[int] = None,
    timeout: Optional[float] = None,
) -> None:
    """Wait for clients to register for training, with given criteria.

    Parameters
    ----------
    min_clients: int, default=1
        Minimum number of clients required. Corrected to be >= 1.
        If `timeout` is None, used as the exact number of clients
        required - once reached, registration will be closed.
    max_clients: int or None, default=None
        Maximum number of clients authorized to register.
    timeout: float or None, default=None
        Optional maximum waiting time (in seconds) beyond which
        to close registration and either return or raise.

    Raises
    ------
    RuntimeError
        If the number of registered clients does not abide by the
        provided boundaries at the end of the process.
    """
    # Ensure any collected information is purged in case of failure
    # (due to raised errors or wrong number of registered clients).
    try:
        await self._wait_for_clients(min_clients, max_clients, timeout)
    except Exception as exc:  # re-raise; pylint: disable=broad-except
        await self.purge()
        raise exc