Skip to content

declearn.communication.api.NetworkServer

Abstract class defining an API for server-side communication endpoints.

This class defines the key methods used to communicate between a server and its clients during a federated learning process, agnostic to the actual communication protocol in use.

Instantiating a NetworkServer does not instantly serve the declearn messaging program on the selected host and port. To enable clients to connect to the server via a NetworkServer object, its start method must first be awaited, and conversely, its stop method should be awaited to close the connection:

>>> server = ServerSubclass(
...     "example.domain.com", 8765, "cert_path", "pkey_path"
... )
>>> await server.start()
>>> try:
>>>     server.wait_for_clients(...)
>>>     ...
>>> finally:
>>>     await server.stop()

An alternative syntax to achieve the former is using the server object as an asynchronous context manager:

>>> async with ServerSubclass(...) as server:
>>>     server.wait_for_clients(...)
>>>     ...

Note that a NetworkServer manages an allow-list of clients, which is defined based on NetworkClient.register(...)-emitted requests during a registration phase restricted to the context of the awaitable wait_for_clients method.

Source code in declearn/communication/api/_server.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
@create_types_registry
class NetworkServer(metaclass=abc.ABCMeta):
    """Abstract class defining an API for server-side communication endpoints.

    This class defines the key methods used to communicate between
    a server and its clients during a federated learning process,
    agnostic to the actual communication protocol in use.

    Instantiating a `NetworkServer` does not instantly serve the declearn
    messaging program on the selected host and port. To enable clients
    to connect to the server via a `NetworkServer` object, its `start`
    method must first be awaited, and conversely, its `stop` method
    should be awaited to close the connection:
    ```
    >>> server = ServerSubclass(
    ...     "example.domain.com", 8765, "cert_path", "pkey_path"
    ... )
    >>> await server.start()
    >>> try:
    >>>     server.wait_for_clients(...)
    >>>     ...
    >>> finally:
    >>>     await server.stop()
    ```

    An alternative syntax to achieve the former is using the server
    object as an asynchronous context manager:
    ```
    >>> async with ServerSubclass(...) as server:
    >>>     server.wait_for_clients(...)
    >>>     ...
    ```

    Note that a `NetworkServer` manages an allow-list of clients,
    which is defined based on `NetworkClient.register(...)`-emitted
    requests during a registration phase restricted to the context
    of the awaitable `wait_for_clients` method.
    """

    protocol: ClassVar[str] = NotImplemented
    """Protocol name identifier, unique across NetworkServer classes."""

    def __init_subclass__(
        cls,
        register: bool = True,
        **kwargs: Any,
    ) -> None:
        """Automate the type-registration of NetworkServer subclasses."""
        super().__init_subclass__(**kwargs)
        if register:
            register_type(cls, cls.protocol, group="NetworkServer")

    def __init__(
        self,
        host: str,
        port: int,
        certificate: Optional[str] = None,
        private_key: Optional[str] = None,
        password: Optional[str] = None,
        heartbeat: float = 1.0,
        logger: Union[logging.Logger, str, None] = None,
    ) -> None:
        """Instantiate the server-side communications handler.

        Parameters
        ----------
        host: str
            Host name (e.g. IP address) of the server.
        port: int
            Communications port to use.
        certificate: str or None, default=None
            Path to the server certificate (publickey) to use SSL/TLS
            communications encryption. If provided, `private_key` must
            be set as well.
        private_key: str or None, default=None
            Path to the server private key to use SSL/TLS communications
            encryption. If provided, `certificate` must be set as well.
        password: str or None, default=None
            Optional password used to access `private_key`, or path to a
            file from which to read such a password.
            If None but a password is needed, an input will be prompted.
        heartbeat: float, default=1.0
            Delay (in seconds) between verifications when checking for a
            message having beend received from or collected by a client.
        logger: logging.Logger or str or None, default=None,
            Logger to use, or name of a logger to set up with
            `declearn.utils.get_logger`. If None, use `type(self)`.
        """
        # arguments serve modularity; pylint: disable=too-many-arguments
        self.host = host
        self.port = port
        self._ssl = self._setup_ssl(certificate, private_key, password)
        if isinstance(logger, logging.Logger):
            self.logger = logger
        else:
            self.logger = get_logger(logger or f"{type(self).__name__}")
        self.handler = MessagesHandler(logger=self.logger, heartbeat=heartbeat)

    @property
    @abc.abstractmethod
    def uri(self) -> str:
        """URI on which this server is exposed, to be requested by clients."""

    @property
    def client_names(self) -> Set[str]:
        """Set of registered clients' names."""
        return self.handler.client_names

    def _setup_ssl(
        self,
        certificate: Optional[str] = None,
        private_key: Optional[str] = None,
        password: Optional[str] = None,
    ) -> Any:
        """Set up and return an (optional) SSL context object.

        The return type is communication-protocol dependent.
        """
        if (certificate is None) and (private_key is None):
            return None
        if (certificate is None) or (private_key is None):
            raise ValueError(
                "Both 'certificate' and 'private_key' are required "
                "to set up SSL encryption."
            )
        return self._setup_ssl_context(certificate, private_key, password)

    @staticmethod
    @abc.abstractmethod
    def _setup_ssl_context(
        certificate: str,
        private_key: str,
        password: Optional[str] = None,
    ) -> Any:
        """Set up and return a SSL context object suitable for this class."""

    @abc.abstractmethod
    async def start(
        self,
    ) -> None:
        """Initialize the server and start welcoming communications."""

    @abc.abstractmethod
    async def stop(
        self,
    ) -> None:
        """Stop the server and purge information about clients."""

    async def __aenter__(
        self,
    ) -> Self:
        await self.start()
        return self

    async def __aexit__(
        self,
        exc_type: Type[Exception],
        exc_value: Exception,
        exc_tb: types.TracebackType,
    ) -> None:
        await self.stop()

    async def wait_for_clients(
        self,
        min_clients: int = 1,
        max_clients: Optional[int] = None,
        timeout: Optional[float] = None,
    ) -> None:
        """Wait for clients to register for training, with given criteria.

        Parameters
        ----------
        min_clients: int, default=1
            Minimum number of clients required. Corrected to be >= 1.
            If `timeout` is None, used as the exact number of clients
            required - once reached, registration will be closed.
        max_clients: int or None, default=None
            Maximum number of clients authorized to register.
        timeout: float or None, default=None
            Optional maximum waiting time (in seconds) beyond which
            to close registration and either return or raise.

        Raises
        ------
        RuntimeError
            If the number of registered clients does not abide by the
            provided boundaries at the end of the process.
        """
        await self.handler.wait_for_clients(min_clients, max_clients, timeout)

    async def send_message(
        self,
        message: Message,
        client: str,
        timeout: Optional[float] = None,
    ) -> None:
        """Send a message to a given client and wait for it to be collected.

        Parameters
        ----------
        message: str
            Message instance that is to be delivered to the client.
        client: str
            Identifier of the client to whom the message is addressed.
        timeout: float or None, default=None
            Optional maximum delay (in seconds) beyond which to stop
            waiting for collection and raise an asyncio.TimeoutError.

        Raises
        ------
        asyncio.TimeoutError
            If `timeout` is set and is reached while the message is
            yet to be collected by the client.
        """
        await self.handler.send_message(message.to_string(), client, timeout)

    async def send_messages(
        self,
        messages: Mapping[str, Message],
        timeout: Optional[float] = None,
    ) -> None:
        """Send messages to an ensemble of clients and await their collection.

        Parameters
        ----------
        messages: dict[str, Message]
            Dict mapping client names to the messages addressed to them.
        timeout: float or None, default=None
            Optional maximum delay (in seconds) beyond which to stop
            waiting for collection and raise an asyncio.TimeoutError.

        Raises
        ------
        asyncio.TimeoutError
            If `timeout` is set and is reached while the message is
            yet to be collected by at least one of the clients.
        """
        routines = [
            self.send_message(message, client, timeout)
            for client, message in messages.items()
        ]
        await asyncio.gather(*routines, return_exceptions=False)

    async def broadcast_message(
        self,
        message: Message,
        clients: Optional[Set[str]] = None,
        timeout: Optional[float] = None,
    ) -> None:
        """Send a message to an ensemble of clients and await its collection.

        Parameters
        ----------
        message: str
            Message instance that is to be delivered to the clients.
        clients: set[str] or None, default=None
            Optional subset of registered clients, messages from
            whom to wait for. If None, set to `self.client_names`.
        timeout: float or None, default=None
            Optional maximum delay (in seconds) beyond which to stop
            waiting for collection and raise an asyncio.TimeoutError.

        Raises
        ------
        asyncio.TimeoutError
            If `timeout` is set and is reached while the message is
            yet to be collected by at least one of the clients.
        """
        if clients is None:
            clients = self.client_names
        messages = {client: message for client in clients}
        await self.send_messages(messages, timeout)

    async def wait_for_messages(
        self,
        clients: Optional[Set[str]] = None,
    ) -> Dict[str, SerializedMessage]:
        """Wait for messages from (a subset of) all clients.

        Parameters
        ----------
        clients: set[str] or None, default=None
            Optional subset of registered clients, messages from
            whom to wait for. If None, set to `self.client_names`.

        Returns
        -------
        messages:
            A dictionary mapping clients' names to the serialized
            messages they sent to the server.
        """
        if clients is None:
            clients = self.client_names
        routines = [self.handler.recv_message(client) for client in clients]
        received = await asyncio.gather(*routines, return_exceptions=False)
        return {
            client: SerializedMessage.from_message_string(string)
            for client, string in zip(clients, received)
        }

    async def wait_for_messages_with_timeout(
        self,
        timeout: float,
        clients: Optional[Set[str]] = None,
    ) -> Tuple[Dict[str, SerializedMessage], List[str]]:
        """Wait for an ensemble of clients to have sent a message.

        Parameters
        ----------
        timeout: float or None, default=None
            Maximum waiting delay (in seconds) before returning
            received messages, even if some are missing.
        clients: set[str] or None, default=None
            Optional subset of registered clients, messages from
            whom to wait for. If None, set to `self.client_names`.

        Returns
        -------
        messages: dict[str, Message]
            A dictionary where the keys are the clients' names and
            the values are Message objects they sent to the server.
        timeouts: list[str]
            List of names of clients that failed to send a message
            prior to `timeout` being reached.
        """
        if clients is None:
            clients = self.client_names
        routines = [
            self.handler.recv_message(client, timeout) for client in clients
        ]
        received = await asyncio.gather(*routines, return_exceptions=True)
        messages = {}  # type: Dict[str, SerializedMessage]
        timeouts = []  # type: List[str]
        for client, output in zip(clients, received):
            if isinstance(output, asyncio.TimeoutError):
                timeouts.append(client)
            elif isinstance(output, BaseException):
                raise output
            else:
                messages[client] = SerializedMessage.from_message_string(
                    output
                )
        return messages, timeouts

client_names: Set[str] property

Set of registered clients' names.

protocol: ClassVar[str] = NotImplemented class-attribute

Protocol name identifier, unique across NetworkServer classes.

uri: str abstractmethod property

URI on which this server is exposed, to be requested by clients.

__init__(host, port, certificate=None, private_key=None, password=None, heartbeat=1.0, logger=None)

Instantiate the server-side communications handler.

Parameters:

Name Type Description Default
host str

Host name (e.g. IP address) of the server.

required
port int

Communications port to use.

required
certificate Optional[str]

Path to the server certificate (publickey) to use SSL/TLS communications encryption. If provided, private_key must be set as well.

None
private_key Optional[str]

Path to the server private key to use SSL/TLS communications encryption. If provided, certificate must be set as well.

None
password Optional[str]

Optional password used to access private_key, or path to a file from which to read such a password. If None but a password is needed, an input will be prompted.

None
heartbeat float

Delay (in seconds) between verifications when checking for a message having beend received from or collected by a client.

1.0
logger Union[logging.Logger, str, None]

Logger to use, or name of a logger to set up with declearn.utils.get_logger. If None, use type(self).

None
Source code in declearn/communication/api/_server.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def __init__(
    self,
    host: str,
    port: int,
    certificate: Optional[str] = None,
    private_key: Optional[str] = None,
    password: Optional[str] = None,
    heartbeat: float = 1.0,
    logger: Union[logging.Logger, str, None] = None,
) -> None:
    """Instantiate the server-side communications handler.

    Parameters
    ----------
    host: str
        Host name (e.g. IP address) of the server.
    port: int
        Communications port to use.
    certificate: str or None, default=None
        Path to the server certificate (publickey) to use SSL/TLS
        communications encryption. If provided, `private_key` must
        be set as well.
    private_key: str or None, default=None
        Path to the server private key to use SSL/TLS communications
        encryption. If provided, `certificate` must be set as well.
    password: str or None, default=None
        Optional password used to access `private_key`, or path to a
        file from which to read such a password.
        If None but a password is needed, an input will be prompted.
    heartbeat: float, default=1.0
        Delay (in seconds) between verifications when checking for a
        message having beend received from or collected by a client.
    logger: logging.Logger or str or None, default=None,
        Logger to use, or name of a logger to set up with
        `declearn.utils.get_logger`. If None, use `type(self)`.
    """
    # arguments serve modularity; pylint: disable=too-many-arguments
    self.host = host
    self.port = port
    self._ssl = self._setup_ssl(certificate, private_key, password)
    if isinstance(logger, logging.Logger):
        self.logger = logger
    else:
        self.logger = get_logger(logger or f"{type(self).__name__}")
    self.handler = MessagesHandler(logger=self.logger, heartbeat=heartbeat)

__init_subclass__(register=True, **kwargs)

Automate the type-registration of NetworkServer subclasses.

Source code in declearn/communication/api/_server.py
83
84
85
86
87
88
89
90
91
def __init_subclass__(
    cls,
    register: bool = True,
    **kwargs: Any,
) -> None:
    """Automate the type-registration of NetworkServer subclasses."""
    super().__init_subclass__(**kwargs)
    if register:
        register_type(cls, cls.protocol, group="NetworkServer")

broadcast_message(message, clients=None, timeout=None) async

Send a message to an ensemble of clients and await its collection.

Parameters:

Name Type Description Default
message Message

Message instance that is to be delivered to the clients.

required
clients Optional[Set[str]]

Optional subset of registered clients, messages from whom to wait for. If None, set to self.client_names.

None
timeout Optional[float]

Optional maximum delay (in seconds) beyond which to stop waiting for collection and raise an asyncio.TimeoutError.

None

Raises:

Type Description
asyncio.TimeoutError

If timeout is set and is reached while the message is yet to be collected by at least one of the clients.

Source code in declearn/communication/api/_server.py
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
async def broadcast_message(
    self,
    message: Message,
    clients: Optional[Set[str]] = None,
    timeout: Optional[float] = None,
) -> None:
    """Send a message to an ensemble of clients and await its collection.

    Parameters
    ----------
    message: str
        Message instance that is to be delivered to the clients.
    clients: set[str] or None, default=None
        Optional subset of registered clients, messages from
        whom to wait for. If None, set to `self.client_names`.
    timeout: float or None, default=None
        Optional maximum delay (in seconds) beyond which to stop
        waiting for collection and raise an asyncio.TimeoutError.

    Raises
    ------
    asyncio.TimeoutError
        If `timeout` is set and is reached while the message is
        yet to be collected by at least one of the clients.
    """
    if clients is None:
        clients = self.client_names
    messages = {client: message for client in clients}
    await self.send_messages(messages, timeout)

send_message(message, client, timeout=None) async

Send a message to a given client and wait for it to be collected.

Parameters:

Name Type Description Default
message Message

Message instance that is to be delivered to the client.

required
client str

Identifier of the client to whom the message is addressed.

required
timeout Optional[float]

Optional maximum delay (in seconds) beyond which to stop waiting for collection and raise an asyncio.TimeoutError.

None

Raises:

Type Description
asyncio.TimeoutError

If timeout is set and is reached while the message is yet to be collected by the client.

Source code in declearn/communication/api/_server.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
async def send_message(
    self,
    message: Message,
    client: str,
    timeout: Optional[float] = None,
) -> None:
    """Send a message to a given client and wait for it to be collected.

    Parameters
    ----------
    message: str
        Message instance that is to be delivered to the client.
    client: str
        Identifier of the client to whom the message is addressed.
    timeout: float or None, default=None
        Optional maximum delay (in seconds) beyond which to stop
        waiting for collection and raise an asyncio.TimeoutError.

    Raises
    ------
    asyncio.TimeoutError
        If `timeout` is set and is reached while the message is
        yet to be collected by the client.
    """
    await self.handler.send_message(message.to_string(), client, timeout)

send_messages(messages, timeout=None) async

Send messages to an ensemble of clients and await their collection.

Parameters:

Name Type Description Default
messages Mapping[str, Message]

Dict mapping client names to the messages addressed to them.

required
timeout Optional[float]

Optional maximum delay (in seconds) beyond which to stop waiting for collection and raise an asyncio.TimeoutError.

None

Raises:

Type Description
asyncio.TimeoutError

If timeout is set and is reached while the message is yet to be collected by at least one of the clients.

Source code in declearn/communication/api/_server.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
async def send_messages(
    self,
    messages: Mapping[str, Message],
    timeout: Optional[float] = None,
) -> None:
    """Send messages to an ensemble of clients and await their collection.

    Parameters
    ----------
    messages: dict[str, Message]
        Dict mapping client names to the messages addressed to them.
    timeout: float or None, default=None
        Optional maximum delay (in seconds) beyond which to stop
        waiting for collection and raise an asyncio.TimeoutError.

    Raises
    ------
    asyncio.TimeoutError
        If `timeout` is set and is reached while the message is
        yet to be collected by at least one of the clients.
    """
    routines = [
        self.send_message(message, client, timeout)
        for client, message in messages.items()
    ]
    await asyncio.gather(*routines, return_exceptions=False)

start() abstractmethod async

Initialize the server and start welcoming communications.

Source code in declearn/communication/api/_server.py
177
178
179
180
181
@abc.abstractmethod
async def start(
    self,
) -> None:
    """Initialize the server and start welcoming communications."""

stop() abstractmethod async

Stop the server and purge information about clients.

Source code in declearn/communication/api/_server.py
183
184
185
186
187
@abc.abstractmethod
async def stop(
    self,
) -> None:
    """Stop the server and purge information about clients."""

wait_for_clients(min_clients=1, max_clients=None, timeout=None) async

Wait for clients to register for training, with given criteria.

Parameters:

Name Type Description Default
min_clients int

Minimum number of clients required. Corrected to be >= 1. If timeout is None, used as the exact number of clients required - once reached, registration will be closed.

1
max_clients Optional[int]

Maximum number of clients authorized to register.

None
timeout Optional[float]

Optional maximum waiting time (in seconds) beyond which to close registration and either return or raise.

None

Raises:

Type Description
RuntimeError

If the number of registered clients does not abide by the provided boundaries at the end of the process.

Source code in declearn/communication/api/_server.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
async def wait_for_clients(
    self,
    min_clients: int = 1,
    max_clients: Optional[int] = None,
    timeout: Optional[float] = None,
) -> None:
    """Wait for clients to register for training, with given criteria.

    Parameters
    ----------
    min_clients: int, default=1
        Minimum number of clients required. Corrected to be >= 1.
        If `timeout` is None, used as the exact number of clients
        required - once reached, registration will be closed.
    max_clients: int or None, default=None
        Maximum number of clients authorized to register.
    timeout: float or None, default=None
        Optional maximum waiting time (in seconds) beyond which
        to close registration and either return or raise.

    Raises
    ------
    RuntimeError
        If the number of registered clients does not abide by the
        provided boundaries at the end of the process.
    """
    await self.handler.wait_for_clients(min_clients, max_clients, timeout)

wait_for_messages(clients=None) async

Wait for messages from (a subset of) all clients.

Parameters:

Name Type Description Default
clients Optional[Set[str]]

Optional subset of registered clients, messages from whom to wait for. If None, set to self.client_names.

None

Returns:

Name Type Description
messages Dict[str, SerializedMessage]

A dictionary mapping clients' names to the serialized messages they sent to the server.

Source code in declearn/communication/api/_server.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
async def wait_for_messages(
    self,
    clients: Optional[Set[str]] = None,
) -> Dict[str, SerializedMessage]:
    """Wait for messages from (a subset of) all clients.

    Parameters
    ----------
    clients: set[str] or None, default=None
        Optional subset of registered clients, messages from
        whom to wait for. If None, set to `self.client_names`.

    Returns
    -------
    messages:
        A dictionary mapping clients' names to the serialized
        messages they sent to the server.
    """
    if clients is None:
        clients = self.client_names
    routines = [self.handler.recv_message(client) for client in clients]
    received = await asyncio.gather(*routines, return_exceptions=False)
    return {
        client: SerializedMessage.from_message_string(string)
        for client, string in zip(clients, received)
    }

wait_for_messages_with_timeout(timeout, clients=None) async

Wait for an ensemble of clients to have sent a message.

Parameters:

Name Type Description Default
timeout float

Maximum waiting delay (in seconds) before returning received messages, even if some are missing.

required
clients Optional[Set[str]]

Optional subset of registered clients, messages from whom to wait for. If None, set to self.client_names.

None

Returns:

Name Type Description
messages dict[str, Message]

A dictionary where the keys are the clients' names and the values are Message objects they sent to the server.

timeouts list[str]

List of names of clients that failed to send a message prior to timeout being reached.

Source code in declearn/communication/api/_server.py
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
async def wait_for_messages_with_timeout(
    self,
    timeout: float,
    clients: Optional[Set[str]] = None,
) -> Tuple[Dict[str, SerializedMessage], List[str]]:
    """Wait for an ensemble of clients to have sent a message.

    Parameters
    ----------
    timeout: float or None, default=None
        Maximum waiting delay (in seconds) before returning
        received messages, even if some are missing.
    clients: set[str] or None, default=None
        Optional subset of registered clients, messages from
        whom to wait for. If None, set to `self.client_names`.

    Returns
    -------
    messages: dict[str, Message]
        A dictionary where the keys are the clients' names and
        the values are Message objects they sent to the server.
    timeouts: list[str]
        List of names of clients that failed to send a message
        prior to `timeout` being reached.
    """
    if clients is None:
        clients = self.client_names
    routines = [
        self.handler.recv_message(client, timeout) for client in clients
    ]
    received = await asyncio.gather(*routines, return_exceptions=True)
    messages = {}  # type: Dict[str, SerializedMessage]
    timeouts = []  # type: List[str]
    for client, output in zip(clients, received):
        if isinstance(output, asyncio.TimeoutError):
            timeouts.append(client)
        elif isinstance(output, BaseException):
            raise output
        else:
            messages[client] = SerializedMessage.from_message_string(
                output
            )
    return messages, timeouts