Skip to content

declearn.communication.api.NetworkClient

Abstract class defining an API for client-side communication endpoints.

This class defines the key methods used to communicate between a client and the orchestrating server during a federated learning process, agnostic to the actual communication protocol in use.

Instantiating a NetworkClient does not trigger a connection to the target server. To enable communicating with the server via a NetworkClient object, its start method must first be awaited and conversely, its stop method should be awaited to close the connection:

>>> client = ClientSubclass("example.domain.com:8765", "name", "cert_path")
>>> await client.start()
>>> try:
>>>     client.register()
>>>     ...
>>> finally:
>>>     await client.stop()

An alternative syntax to achieve the former is using the client object as an asynchronous context manager:

>>> async with ClientSubclass(...) as client:
>>>     client.register()
>>>     ...

Note that a declearn NetworkServer manages an allow-list of clients, which is defined during a registration phase of limited time, based on requests emitted through the NetworkClient.register method. Any message emitted using NetworkClient.send_message will probably be rejected by the server if the client has not registered.

Source code in declearn/communication/api/_client.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
@create_types_registry
class NetworkClient(metaclass=abc.ABCMeta):
    """Abstract class defining an API for client-side communication endpoints.

    This class defines the key methods used to communicate between a
    client and the orchestrating server during a federated learning
    process, agnostic to the actual communication protocol in use.

    Instantiating a `NetworkClient` does not trigger a connection to
    the target server. To enable communicating with the server via a
    `NetworkClient` object, its `start` method must first be awaited
    and conversely, its `stop` method should be awaited to close the
    connection:
    ```
    >>> client = ClientSubclass("example.domain.com:8765", "name", "cert_path")
    >>> await client.start()
    >>> try:
    >>>     client.register()
    >>>     ...
    >>> finally:
    >>>     await client.stop()
    ```

    An alternative syntax to achieve the former is using the client
    object as an asynchronous context manager:
    ```
    >>> async with ClientSubclass(...) as client:
    >>>     client.register()
    >>>     ...
    ```

    Note that a declearn `NetworkServer` manages an allow-list of
    clients, which is defined during a registration phase of limited
    time, based on requests emitted through the `NetworkClient.register`
    method. Any message emitted using `NetworkClient.send_message` will
    probably be rejected by the server if the client has not registered.
    """

    protocol: ClassVar[str] = NotImplemented
    """Protocol name identifier, unique across NetworkClient classes."""

    def __init_subclass__(
        cls,
        register: bool = True,
        **kwargs: Any,
    ) -> None:
        """Automate the type-registration of NetworkClient subclasses."""
        super().__init_subclass__(**kwargs)
        if register:
            register_type(cls, cls.protocol, group="NetworkClient")

    def __init__(
        self,
        server_uri: str,
        name: str,
        certificate: Optional[str] = None,
        logger: Union[logging.Logger, str, None] = None,
    ) -> None:
        """Instantiate the client-side communications handler.

        Parameters
        ----------
        server_uri: str
            Public uri of the server to which this client is to connect.
        name: str
            Name of this client, reported to the server for logging and
            messages' addressing purposes.
        certificate: str or None, default=None,
            Path to a certificate (publickey) PEM file, to use SSL/TLS
            communcations encryption.
        logger: logging.Logger or str or None, default=None,
            Logger to use, or name of a logger to set up using
            `declearn.utils.get_logger`. If None, use `type(self)-name`.
        """
        self.server_uri = server_uri
        self.name = name
        self._ssl = self._setup_ssl_context(certificate)
        if isinstance(logger, logging.Logger):
            self.logger = logger
        else:
            self.logger = get_logger(logger or f"{type(self).__name__}-{name}")

    @staticmethod
    @abc.abstractmethod
    def _setup_ssl_context(
        certificate: Optional[str] = None,
    ) -> Any:
        """Set up and return an (optional) SSL context object.

        The return type is communication-protocol dependent.
        """

    # similar to NetworkServer API; pylint: disable=duplicate-code

    @abc.abstractmethod
    async def start(
        self,
    ) -> None:
        """Start the client, i.e. connect to the server.

        Note: this method can be called safely even if the
        client is already running (simply having no effect).
        """

    @abc.abstractmethod
    async def stop(
        self,
    ) -> None:
        """Stop the client, i.e. close all connections.

        Note: this method can be called safely even if the
        client is not running (simply having no effect).
        """

    async def __aenter__(
        self,
    ) -> Self:
        await self.start()
        return self

    async def __aexit__(
        self,
        exc_type: Type[Exception],
        exc_value: Exception,
        exc_tb: types.TracebackType,
    ) -> None:
        await self.stop()

    # pylint: enable=duplicate-code

    @abc.abstractmethod
    async def _send_message(
        self,
        message: str,
    ) -> str:
        """Send a message to the server and return the obtained reply.

        This method should be defined by concrete NetworkClient child
        classes, and implement communication-protocol-specific code
        to send a message to the server and await the primary reply
        from the `MessagesHandler` used by the server.
        """

    async def _exchange_action_messages(
        self,
        message: ActionMessage,
    ) -> ActionMessage:
        """Send an `ActionMessage` to the server and await its response."""
        query = message.to_string()
        reply = await self._send_message(query)
        try:
            return parse_action_from_string(reply)
        except Exception as exc:
            error = "Failed to decode a reply from the server."
            self.logger.critical(error)
            raise RuntimeError(error) from exc

    async def register(
        self,
        data_info: Optional[Dict[str, Any]] = None,
    ) -> bool:
        """Register to the server as a client.

        Returns
        -------
        accepted: bool
            Whether the registration request was accepted by the server.

        Raises
        -------
        TypeError
            If the server does not return a valid message.
            This is a failsafe and should never happen.
        """
        if data_info:
            warnings.warn(
                "Sending dataset information is no longer part of the client "
                "registration process. The argument was ignored, and will be "
                "removed in DecLearn version 2.4 and/or 3.0.",
                DeprecationWarning,
            )
        query = Join(name=self.name, version=VERSION)
        reply = await self._exchange_action_messages(query)
        # Case when registration was accepted.
        if isinstance(reply, Accept):
            self.logger.info("Registration was accepted: '%s'", reply.flag)
            return True
        # Case when registration was rejected.
        if isinstance(reply, Reject):
            self.logger.error("Registration was rejected: '%s'", reply.flag)
            return False
        # Otherwise, raise.
        error = (
            "Received an undue response type when attempting to register "
            f"with the server: '{type(reply)}'."
        )
        self.logger.critical(error)
        raise TypeError(error)

    async def send_message(
        self,
        message: Message,
    ) -> None:
        """Send a message to the server.

        Parameters
        ----------
        message: str
            Message instance that is to be delivered to the server.

        Raises
        ------
        RuntimeError
            If the server rejects the sent message.
        TypeError
            If the server returns neither a ping-back nor rejection message.
            This is a failsafe and should never happen.

        Note
        ----
        The message sent here is designed to be received using the
        `NetworkServer.wait_for_messages` method.
        """
        query = Send(message.to_string())
        reply = await self._exchange_action_messages(query)
        if isinstance(reply, Ping):
            return None
        if isinstance(reply, Reject):
            error = f"Message was rejected: {reply.flag}"
            self.logger.error(error)
            raise RuntimeError(error)
        error = (
            "Received an undue response type when attempting to send a "
            f"message: '{type(reply)}'."
        )
        self.logger.critical(error)
        raise TypeError(error)

    async def recv_message(
        self,
        timeout: Optional[float] = None,
    ) -> SerializedMessage:
        """Await a message from the server, with optional timeout.

        Parameters
        ----------
        timeout: float or None, default=None
            Optional timeout delay, after which the server will send
            a timeout notification to this client if no message is
            available for it.

        Returns
        -------
        message: SerializedMessage
            Serialized message received from the server.

        Note
        ----
        The message received here is expected to have been sent
        using one of the following `NetorkServer` methods:
        `send_message`, `send_messages`, or `broadcast_message`.

        Raises
        ------
        asyncio.TimeoutError
            If no message is available after the `timeout` delay.
        RuntimeError
            If the request is rejected by the server.
        TypeError
            If the server returns data of unproper type.
            This is a failsafe and should never happen.
        """
        # Send a query, get a reply and return its content when possible.
        query = Recv(timeout)
        reply = await self._exchange_action_messages(query)
        if isinstance(reply, Send):
            return SerializedMessage.from_message_string(reply.content)
        # Handle the various kinds of failures and raise accordingly.
        if isinstance(reply, Reject):
            if reply.flag == flags.CHECK_MESSAGE_TIMEOUT:
                error = "Message-retrieval request timed out."
                self.logger.error(error)
                raise asyncio.TimeoutError(error)
            error = f"Message-retrieval request was rejected: '{reply.flag}'."
            self.logger.error(error)
            raise RuntimeError(error)
        error = (
            "Received an undue response type when attempting to receive a "
            f"message: '{type(reply)}'."
        )
        self.logger.critical(error)
        raise TypeError(error)

    async def check_message(
        self,
        timeout: Optional[float] = None,
    ) -> SerializedMessage:
        """Await a message from the server, with optional timeout.

        This method is DEPRECATED in favor of the `recv_message` one.
        It acts as an alias and will be removed in v2.6 and/or v3.0.
        """
        warnings.warn(
            "'NetworkServer.check_message' was renamed as 'recv_message' "
            "in DecLearn 2.4. It now acts as an alias, but will be removed "
            "in version 2.6 and/or 3.0.",
            DeprecationWarning,
        )
        return await self.recv_message(timeout)

protocol: ClassVar[str] = NotImplemented class-attribute

Protocol name identifier, unique across NetworkClient classes.

__init__(server_uri, name, certificate=None, logger=None)

Instantiate the client-side communications handler.

Parameters:

Name Type Description Default
server_uri str

Public uri of the server to which this client is to connect.

required
name str

Name of this client, reported to the server for logging and messages' addressing purposes.

required
certificate Optional[str]

Path to a certificate (publickey) PEM file, to use SSL/TLS communcations encryption.

None
logger Union[logging.Logger, str, None]

Logger to use, or name of a logger to set up using declearn.utils.get_logger. If None, use type(self)-name.

None
Source code in declearn/communication/api/_client.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def __init__(
    self,
    server_uri: str,
    name: str,
    certificate: Optional[str] = None,
    logger: Union[logging.Logger, str, None] = None,
) -> None:
    """Instantiate the client-side communications handler.

    Parameters
    ----------
    server_uri: str
        Public uri of the server to which this client is to connect.
    name: str
        Name of this client, reported to the server for logging and
        messages' addressing purposes.
    certificate: str or None, default=None,
        Path to a certificate (publickey) PEM file, to use SSL/TLS
        communcations encryption.
    logger: logging.Logger or str or None, default=None,
        Logger to use, or name of a logger to set up using
        `declearn.utils.get_logger`. If None, use `type(self)-name`.
    """
    self.server_uri = server_uri
    self.name = name
    self._ssl = self._setup_ssl_context(certificate)
    if isinstance(logger, logging.Logger):
        self.logger = logger
    else:
        self.logger = get_logger(logger or f"{type(self).__name__}-{name}")

__init_subclass__(register=True, **kwargs)

Automate the type-registration of NetworkClient subclasses.

Source code in declearn/communication/api/_client.py
91
92
93
94
95
96
97
98
99
def __init_subclass__(
    cls,
    register: bool = True,
    **kwargs: Any,
) -> None:
    """Automate the type-registration of NetworkClient subclasses."""
    super().__init_subclass__(**kwargs)
    if register:
        register_type(cls, cls.protocol, group="NetworkClient")

check_message(timeout=None) async

Await a message from the server, with optional timeout.

This method is DEPRECATED in favor of the recv_message one. It acts as an alias and will be removed in v2.6 and/or v3.0.

Source code in declearn/communication/api/_client.py
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
async def check_message(
    self,
    timeout: Optional[float] = None,
) -> SerializedMessage:
    """Await a message from the server, with optional timeout.

    This method is DEPRECATED in favor of the `recv_message` one.
    It acts as an alias and will be removed in v2.6 and/or v3.0.
    """
    warnings.warn(
        "'NetworkServer.check_message' was renamed as 'recv_message' "
        "in DecLearn 2.4. It now acts as an alias, but will be removed "
        "in version 2.6 and/or 3.0.",
        DeprecationWarning,
    )
    return await self.recv_message(timeout)

recv_message(timeout=None) async

Await a message from the server, with optional timeout.

Parameters:

Name Type Description Default
timeout Optional[float]

Optional timeout delay, after which the server will send a timeout notification to this client if no message is available for it.

None

Returns:

Name Type Description
message SerializedMessage

Serialized message received from the server.

Note

The message received here is expected to have been sent using one of the following NetorkServer methods: send_message, send_messages, or broadcast_message.

Raises:

Type Description
asyncio.TimeoutError

If no message is available after the timeout delay.

RuntimeError

If the request is rejected by the server.

TypeError

If the server returns data of unproper type. This is a failsafe and should never happen.

Source code in declearn/communication/api/_client.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
async def recv_message(
    self,
    timeout: Optional[float] = None,
) -> SerializedMessage:
    """Await a message from the server, with optional timeout.

    Parameters
    ----------
    timeout: float or None, default=None
        Optional timeout delay, after which the server will send
        a timeout notification to this client if no message is
        available for it.

    Returns
    -------
    message: SerializedMessage
        Serialized message received from the server.

    Note
    ----
    The message received here is expected to have been sent
    using one of the following `NetorkServer` methods:
    `send_message`, `send_messages`, or `broadcast_message`.

    Raises
    ------
    asyncio.TimeoutError
        If no message is available after the `timeout` delay.
    RuntimeError
        If the request is rejected by the server.
    TypeError
        If the server returns data of unproper type.
        This is a failsafe and should never happen.
    """
    # Send a query, get a reply and return its content when possible.
    query = Recv(timeout)
    reply = await self._exchange_action_messages(query)
    if isinstance(reply, Send):
        return SerializedMessage.from_message_string(reply.content)
    # Handle the various kinds of failures and raise accordingly.
    if isinstance(reply, Reject):
        if reply.flag == flags.CHECK_MESSAGE_TIMEOUT:
            error = "Message-retrieval request timed out."
            self.logger.error(error)
            raise asyncio.TimeoutError(error)
        error = f"Message-retrieval request was rejected: '{reply.flag}'."
        self.logger.error(error)
        raise RuntimeError(error)
    error = (
        "Received an undue response type when attempting to receive a "
        f"message: '{type(reply)}'."
    )
    self.logger.critical(error)
    raise TypeError(error)

register(data_info=None) async

Register to the server as a client.

Returns:

Name Type Description
accepted bool

Whether the registration request was accepted by the server.

Raises:

Type Description
TypeError

If the server does not return a valid message. This is a failsafe and should never happen.

Source code in declearn/communication/api/_client.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
async def register(
    self,
    data_info: Optional[Dict[str, Any]] = None,
) -> bool:
    """Register to the server as a client.

    Returns
    -------
    accepted: bool
        Whether the registration request was accepted by the server.

    Raises
    -------
    TypeError
        If the server does not return a valid message.
        This is a failsafe and should never happen.
    """
    if data_info:
        warnings.warn(
            "Sending dataset information is no longer part of the client "
            "registration process. The argument was ignored, and will be "
            "removed in DecLearn version 2.4 and/or 3.0.",
            DeprecationWarning,
        )
    query = Join(name=self.name, version=VERSION)
    reply = await self._exchange_action_messages(query)
    # Case when registration was accepted.
    if isinstance(reply, Accept):
        self.logger.info("Registration was accepted: '%s'", reply.flag)
        return True
    # Case when registration was rejected.
    if isinstance(reply, Reject):
        self.logger.error("Registration was rejected: '%s'", reply.flag)
        return False
    # Otherwise, raise.
    error = (
        "Received an undue response type when attempting to register "
        f"with the server: '{type(reply)}'."
    )
    self.logger.critical(error)
    raise TypeError(error)

send_message(message) async

Send a message to the server.

Parameters:

Name Type Description Default
message Message

Message instance that is to be delivered to the server.

required

Raises:

Type Description
RuntimeError

If the server rejects the sent message.

TypeError

If the server returns neither a ping-back nor rejection message. This is a failsafe and should never happen.

Note

The message sent here is designed to be received using the NetworkServer.wait_for_messages method.

Source code in declearn/communication/api/_client.py
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
async def send_message(
    self,
    message: Message,
) -> None:
    """Send a message to the server.

    Parameters
    ----------
    message: str
        Message instance that is to be delivered to the server.

    Raises
    ------
    RuntimeError
        If the server rejects the sent message.
    TypeError
        If the server returns neither a ping-back nor rejection message.
        This is a failsafe and should never happen.

    Note
    ----
    The message sent here is designed to be received using the
    `NetworkServer.wait_for_messages` method.
    """
    query = Send(message.to_string())
    reply = await self._exchange_action_messages(query)
    if isinstance(reply, Ping):
        return None
    if isinstance(reply, Reject):
        error = f"Message was rejected: {reply.flag}"
        self.logger.error(error)
        raise RuntimeError(error)
    error = (
        "Received an undue response type when attempting to send a "
        f"message: '{type(reply)}'."
    )
    self.logger.critical(error)
    raise TypeError(error)

start() abstractmethod async

Start the client, i.e. connect to the server.

Note: this method can be called safely even if the client is already running (simply having no effect).

Source code in declearn/communication/api/_client.py
144
145
146
147
148
149
150
151
152
@abc.abstractmethod
async def start(
    self,
) -> None:
    """Start the client, i.e. connect to the server.

    Note: this method can be called safely even if the
    client is already running (simply having no effect).
    """

stop() abstractmethod async

Stop the client, i.e. close all connections.

Note: this method can be called safely even if the client is not running (simply having no effect).

Source code in declearn/communication/api/_client.py
154
155
156
157
158
159
160
161
162
@abc.abstractmethod
async def stop(
    self,
) -> None:
    """Stop the client, i.e. close all connections.

    Note: this method can be called safely even if the
    client is not running (simply having no effect).
    """