Skip to content

declearn.communication.grpc.GrpcServer

Bases: NetworkServer

Server-side communication endpoint using gRPC.

Source code in declearn/communication/grpc/_server.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
class GrpcServer(NetworkServer):
    """Server-side communication endpoint using gRPC."""

    protocol = "grpc"

    def __init__(
        self,
        host: str = "localhost",
        port: int = 8765,
        certificate: Optional[str] = None,
        private_key: Optional[str] = None,
        password: Optional[str] = None,
        heartbeat: float = 1.0,
        logger: Union[logging.Logger, str, None] = None,
    ) -> None:
        """Instantiate the server-side gRPC communications handler.

        Parameters
        ----------
        host : str, default='localhost'
            Host name (e.g. IP address) of the server.
        port: int, default=8765
            Communications port to use.
            If set to 0, the gRPC runtime will choose one when the
            server is first started.
        certificate: str or None, default=None
            Path to the server certificate (publickey) to use SSL/TLS
            communications encryption. If provided, `private_key` must
            be set as well.
        private_key: str or None, default=None
            Path to the server private key to use SSL/TLS communications
            encryption. If provided, `certificate` must be set as well.
        password: str or None, default=None
            Optional password used to access `private_key`, or path to a
            file from which to read such a password.
            If None but a password is needed, an input will be prompted.
        heartbeat: float, default=1.0
            Delay (in seconds) between verifications when checking for a
            message having beend received from or collected by a client.
        logger: logging.Logger or str or None, default=None,
            Logger to use, or name of a logger to set up with
            `declearn.utils.get_logger`. If None, use `type(self)`.
        """
        # inherited signature; pylint: disable=too-many-arguments
        # Assign attributes and set up the gRPC server.
        super().__init__(
            host, port, certificate, private_key, password, heartbeat, logger
        )
        self._server = None  # type: Optional[grpc.Server]

    @property
    def uri(self) -> str:
        return f"{self.host}:{self.port}"

    @staticmethod
    def _setup_ssl_context(
        certificate: str,
        private_key: str,
        password: Optional[str] = None,
    ) -> Optional[grpc.ServerCredentials]:
        """Set up and return a grpc.ServerCredentials object."""
        cert = load_pem_file(certificate)
        pkey = load_pem_file(private_key, password)
        return grpc.ssl_server_credentials(
            private_key_certificate_chain_pairs=[(pkey, cert)],
            root_certificates=None,
            require_client_auth=False,
        )

    async def start(
        self,
    ) -> None:
        """Start the gRPC server."""
        self._server = self._setup_server()
        self.logger.info("Server is now starting...")
        await self._server.start()

    def _setup_server(
        self,
    ) -> grpc.Server:
        """Set up and return a grpc Server to be used by this service."""
        server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10))
        address = f"{self.host}:{self.port}"
        self.port = (
            server.add_secure_port(address, self._ssl)
            if (self._ssl is not None)
            else server.add_insecure_port(address)
        )
        servicer = GrpcServicer(self.handler)
        add_MessageBoardServicer_to_server(servicer, server)
        return server

    async def stop(
        self,
    ) -> None:
        """Stop the gRPC server and purge information about clients."""
        if self._server is not None:
            await self._server.stop(grace=None)
            self._server = None
        await self.handler.purge()

__init__(host='localhost', port=8765, certificate=None, private_key=None, password=None, heartbeat=1.0, logger=None)

Instantiate the server-side gRPC communications handler.

Parameters:

Name Type Description Default
host str, default

Host name (e.g. IP address) of the server.

'localhost'
port int

Communications port to use. If set to 0, the gRPC runtime will choose one when the server is first started.

8765
certificate Optional[str]

Path to the server certificate (publickey) to use SSL/TLS communications encryption. If provided, private_key must be set as well.

None
private_key Optional[str]

Path to the server private key to use SSL/TLS communications encryption. If provided, certificate must be set as well.

None
password Optional[str]

Optional password used to access private_key, or path to a file from which to read such a password. If None but a password is needed, an input will be prompted.

None
heartbeat float

Delay (in seconds) between verifications when checking for a message having beend received from or collected by a client.

1.0
logger Union[logging.Logger, str, None]

Logger to use, or name of a logger to set up with declearn.utils.get_logger. If None, use type(self).

None
Source code in declearn/communication/grpc/_server.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def __init__(
    self,
    host: str = "localhost",
    port: int = 8765,
    certificate: Optional[str] = None,
    private_key: Optional[str] = None,
    password: Optional[str] = None,
    heartbeat: float = 1.0,
    logger: Union[logging.Logger, str, None] = None,
) -> None:
    """Instantiate the server-side gRPC communications handler.

    Parameters
    ----------
    host : str, default='localhost'
        Host name (e.g. IP address) of the server.
    port: int, default=8765
        Communications port to use.
        If set to 0, the gRPC runtime will choose one when the
        server is first started.
    certificate: str or None, default=None
        Path to the server certificate (publickey) to use SSL/TLS
        communications encryption. If provided, `private_key` must
        be set as well.
    private_key: str or None, default=None
        Path to the server private key to use SSL/TLS communications
        encryption. If provided, `certificate` must be set as well.
    password: str or None, default=None
        Optional password used to access `private_key`, or path to a
        file from which to read such a password.
        If None but a password is needed, an input will be prompted.
    heartbeat: float, default=1.0
        Delay (in seconds) between verifications when checking for a
        message having beend received from or collected by a client.
    logger: logging.Logger or str or None, default=None,
        Logger to use, or name of a logger to set up with
        `declearn.utils.get_logger`. If None, use `type(self)`.
    """
    # inherited signature; pylint: disable=too-many-arguments
    # Assign attributes and set up the gRPC server.
    super().__init__(
        host, port, certificate, private_key, password, heartbeat, logger
    )
    self._server = None  # type: Optional[grpc.Server]

start() async

Start the gRPC server.

Source code in declearn/communication/grpc/_server.py
138
139
140
141
142
143
144
async def start(
    self,
) -> None:
    """Start the gRPC server."""
    self._server = self._setup_server()
    self.logger.info("Server is now starting...")
    await self._server.start()

stop() async

Stop the gRPC server and purge information about clients.

Source code in declearn/communication/grpc/_server.py
161
162
163
164
165
166
167
168
async def stop(
    self,
) -> None:
    """Stop the gRPC server and purge information about clients."""
    if self._server is not None:
        await self._server.stop(grace=None)
        self._server = None
    await self.handler.purge()