Skip to content

declearn.communication.grpc.GrpcClient

Bases: NetworkClient

Client-side communication endpoint using gRPC.

Source code in declearn/communication/grpc/_client.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
class GrpcClient(NetworkClient):
    """Client-side communication endpoint using gRPC."""

    protocol: ClassVar[str] = "grpc"

    def __init__(
        self,
        server_uri: str,
        name: str,
        certificate: Optional[str] = None,
        logger: Union[logging.Logger, str, None] = None,
    ) -> None:
        """Instantiate the client-side gRPC communications handler.

        Parameters
        ----------
        server_uri: str
            Public uri of the gRPC server to which this client is to
            connect (e.g. "127.0.0.1:8765").
        name: str
            Name of this client, reported to the server for logging and
            messages' addressing purposes.
        certificate: str or None, default=None,
            Path to a certificate (publickey) PEM file, to use SSL/TLS
            communcations encryption.
        logger: logging.Logger or str or None, default=None,
            Logger to use, or name of a logger to set up using
            `declearn.utils.get_logger`. If None, use `type(self)-name`.
        """
        super().__init__(server_uri, name, certificate, logger)
        self._channel = None  # type: Optional[grpc.Channel]
        self._service = None  # type: Optional[MessageBoardStub]

    @staticmethod
    def _setup_ssl_context(
        certificate: Optional[str] = None,
    ) -> Optional[grpc.ChannelCredentials]:
        """Set up and return an (optional) grpc ChannelCredentials object."""
        if certificate is None:
            return None
        with open(certificate, mode="rb") as file:
            cert_bytes = file.read()
        return grpc.ssl_channel_credentials(cert_bytes)

    async def start(self) -> None:
        if self._channel is None:
            self._channel = (
                grpc.aio.secure_channel(self.server_uri, self._ssl)
                if (self._ssl is not None)
                else grpc.aio.insecure_channel(self.server_uri)
            )
        self._service = MessageBoardStub(self._channel)

    async def stop(self) -> None:
        if self._channel is not None:
            await self._channel.close()
            self._channel = None
            self._service = None

    async def _send_message(
        self,
        message: str,
    ) -> str:
        """Send a message to the server and return the obtained reply."""
        if self._service is None:
            raise RuntimeError("Cannot send messages while not connected.")
        # Send the message, as a unary or as a stream of message chunks.
        if len(message) <= CHUNK_LENGTH:
            message = message_pb2.Message(message=message)
            replies = self._service.send(message)
        else:
            chunks = (
                message_pb2.Message(message=message[idx : idx + CHUNK_LENGTH])
                for idx in range(0, len(message), CHUNK_LENGTH)
            )
            replies = self._service.send_stream(chunks)
        # Collect the reply from a stream of message chunks.
        buffer = ""
        async for chunk in replies:
            buffer += chunk.message
        return buffer

    async def register(
        self,
        data_info: Optional[Dict[str, Any]] = None,
    ) -> bool:
        try:
            return await super().register(data_info)
        except grpc.aio.AioRpcError as err:
            self.logger.error(
                "Connection failed during registration: %s %s",
                err.code(),
                err.details(),
            )
            return False

__init__(server_uri, name, certificate=None, logger=None)

Instantiate the client-side gRPC communications handler.

Parameters:

Name Type Description Default
server_uri str

Public uri of the gRPC server to which this client is to connect (e.g. "127.0.0.1:8765").

required
name str

Name of this client, reported to the server for logging and messages' addressing purposes.

required
certificate Optional[str]

Path to a certificate (publickey) PEM file, to use SSL/TLS communcations encryption.

None
logger Union[logging.Logger, str, None]

Logger to use, or name of a logger to set up using declearn.utils.get_logger. If None, use type(self)-name.

None
Source code in declearn/communication/grpc/_client.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def __init__(
    self,
    server_uri: str,
    name: str,
    certificate: Optional[str] = None,
    logger: Union[logging.Logger, str, None] = None,
) -> None:
    """Instantiate the client-side gRPC communications handler.

    Parameters
    ----------
    server_uri: str
        Public uri of the gRPC server to which this client is to
        connect (e.g. "127.0.0.1:8765").
    name: str
        Name of this client, reported to the server for logging and
        messages' addressing purposes.
    certificate: str or None, default=None,
        Path to a certificate (publickey) PEM file, to use SSL/TLS
        communcations encryption.
    logger: logging.Logger or str or None, default=None,
        Logger to use, or name of a logger to set up using
        `declearn.utils.get_logger`. If None, use `type(self)-name`.
    """
    super().__init__(server_uri, name, certificate, logger)
    self._channel = None  # type: Optional[grpc.Channel]
    self._service = None  # type: Optional[MessageBoardStub]