Skip to content

declearn.communication.websockets.WebsocketsServer

Bases: NetworkServer

Server-side communication endpoint using WebSockets.

Source code in declearn/communication/websockets/_server.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
class WebsocketsServer(NetworkServer):
    """Server-side communication endpoint using WebSockets."""

    protocol = "websockets"

    def __init__(
        self,
        host: str = "localhost",
        port: int = 8765,
        certificate: Optional[str] = None,
        private_key: Optional[str] = None,
        password: Optional[str] = None,
        heartbeat: float = 1.0,
        logger: Union[logging.Logger, str, None] = None,
    ) -> None:
        """Instantiate the server-side WebSockets communications handler.

        Parameters
        ----------
        host : str, default='localhost'
            Host name (e.g. IP address) of the server.
        port: int, default=8765
            Communications port to use.
        certificate: str or None, default=None
            Path to the server certificate (publickey) to use SSL/TLS
            communications encryption. If provided, `private_key` must
            be set as well.
        private_key: str or None, default=None
            Path to the server private key to use SSL/TLS communications
            encryption. If provided, `certificate` must be set as well.
        password: str or None, default=None
            Optional password used to access `private_key`, or path to a
            file from which to read such a password.
            If None but a password is needed, an input will be prompted.
        heartbeat: float, default=1.0
            Delay (in seconds) between verifications when checking for a
            message having beend received from or collected by a client.
        logger: logging.Logger or str or None, default=None,
            Logger to use, or name of a logger to set up with
            `declearn.utils.get_logger`. If None, use `type(self)`.
        """
        # inherited signature; pylint: disable=too-many-arguments
        super().__init__(
            host, port, certificate, private_key, password, heartbeat, logger
        )
        self._server = None  # type: Optional[WebSocketServer]

    @property
    def uri(self) -> str:
        protocol = "ws" if self._ssl is None else "wss"
        return f"{protocol}://{self.host}:{self.port}"

    @staticmethod
    def _setup_ssl_context(
        certificate: str,
        private_key: str,
        password: Optional[str] = None,
    ) -> Optional[ssl.SSLContext]:
        """Set up and return a SSLContext object."""
        # Optionally read a password from a file.
        if password and os.path.isfile(password):
            with open(password, mode="r", encoding="utf-8") as file:
                password = file.read().strip("\n")
        # Set up the SSLContext, load certificate information and return.
        ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
        ssl_context.load_cert_chain(certificate, private_key, password)
        return ssl_context

    async def start(
        self,
    ) -> None:
        """Start the websockets server."""
        # Set up the websockets connections handling process.
        server = ws.serve(  # type: ignore  # pylint: disable=no-member
            self._handle_connection,
            host=self.host,
            port=self.port,
            logger=self.logger,
            ssl=self._ssl,
            ping_timeout=None,  # disable timeout on keep-alive pings
        )
        # Run the websockets server.
        self.logger.info("Server is now starting...")
        self._server = await server

    async def _handle_connection(
        self,
        socket: WebSocketServerProtocol,
    ) -> None:
        """WebSockets handler to manage incoming client connections."""
        self.logger.info("New connection from %s", socket.remote_address)
        try:
            async for frame in socket:
                # Receive the message (covering chunked-message case).
                known = socket in self.handler.registered_clients
                try:
                    message = await receive_websockets_message(
                        frame, socket, allow_chunks=known
                    )
                except StreamRefusedError:
                    self.logger.warning(
                        "Refused a chunks-streaming request from client %s",
                        socket.remote_address,
                    )
                    break
                # Handle the received message and produce an answer.
                reply = await self.handler.handle_message(message, socket)
                await send_websockets_message(reply.to_string(), socket)
                if socket not in self.handler.registered_clients:
                    break
        except (ConnectionClosedOK, ConnectionClosedError) as exc:
            name = self.handler.registered_clients.pop(
                socket, socket.remote_address
            )
            self.logger.error(
                "Connection from client '%s' was closed unexpectedly: %s",
                name,
                exc,
            )
        finally:
            await socket.close()

    async def stop(
        self,
    ) -> None:
        """Stop the websockets server and purge information about clients."""
        if self._server is not None:
            self._server.close()
            await self._server.wait_closed()
            self._server = None
        await self.handler.purge()

__init__(host='localhost', port=8765, certificate=None, private_key=None, password=None, heartbeat=1.0, logger=None)

Instantiate the server-side WebSockets communications handler.

Parameters:

Name Type Description Default
host str, default

Host name (e.g. IP address) of the server.

'localhost'
port int

Communications port to use.

8765
certificate Optional[str]

Path to the server certificate (publickey) to use SSL/TLS communications encryption. If provided, private_key must be set as well.

None
private_key Optional[str]

Path to the server private key to use SSL/TLS communications encryption. If provided, certificate must be set as well.

None
password Optional[str]

Optional password used to access private_key, or path to a file from which to read such a password. If None but a password is needed, an input will be prompted.

None
heartbeat float

Delay (in seconds) between verifications when checking for a message having beend received from or collected by a client.

1.0
logger Union[logging.Logger, str, None]

Logger to use, or name of a logger to set up with declearn.utils.get_logger. If None, use type(self).

None
Source code in declearn/communication/websockets/_server.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def __init__(
    self,
    host: str = "localhost",
    port: int = 8765,
    certificate: Optional[str] = None,
    private_key: Optional[str] = None,
    password: Optional[str] = None,
    heartbeat: float = 1.0,
    logger: Union[logging.Logger, str, None] = None,
) -> None:
    """Instantiate the server-side WebSockets communications handler.

    Parameters
    ----------
    host : str, default='localhost'
        Host name (e.g. IP address) of the server.
    port: int, default=8765
        Communications port to use.
    certificate: str or None, default=None
        Path to the server certificate (publickey) to use SSL/TLS
        communications encryption. If provided, `private_key` must
        be set as well.
    private_key: str or None, default=None
        Path to the server private key to use SSL/TLS communications
        encryption. If provided, `certificate` must be set as well.
    password: str or None, default=None
        Optional password used to access `private_key`, or path to a
        file from which to read such a password.
        If None but a password is needed, an input will be prompted.
    heartbeat: float, default=1.0
        Delay (in seconds) between verifications when checking for a
        message having beend received from or collected by a client.
    logger: logging.Logger or str or None, default=None,
        Logger to use, or name of a logger to set up with
        `declearn.utils.get_logger`. If None, use `type(self)`.
    """
    # inherited signature; pylint: disable=too-many-arguments
    super().__init__(
        host, port, certificate, private_key, password, heartbeat, logger
    )
    self._server = None  # type: Optional[WebSocketServer]

start() async

Start the websockets server.

Source code in declearn/communication/websockets/_server.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
async def start(
    self,
) -> None:
    """Start the websockets server."""
    # Set up the websockets connections handling process.
    server = ws.serve(  # type: ignore  # pylint: disable=no-member
        self._handle_connection,
        host=self.host,
        port=self.port,
        logger=self.logger,
        ssl=self._ssl,
        ping_timeout=None,  # disable timeout on keep-alive pings
    )
    # Run the websockets server.
    self.logger.info("Server is now starting...")
    self._server = await server

stop() async

Stop the websockets server and purge information about clients.

Source code in declearn/communication/websockets/_server.py
163
164
165
166
167
168
169
170
171
async def stop(
    self,
) -> None:
    """Stop the websockets server and purge information about clients."""
    if self._server is not None:
        self._server.close()
        await self._server.wait_closed()
        self._server = None
    await self.handler.purge()