File: client.py

package info (click to toggle)
python-trio-websocket 0.12.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 432 kB
  • sloc: python: 2,900; makefile: 41; sh: 17
file content (146 lines) | stat: -rw-r--r-- 4,978 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
'''
This interactive WebSocket client allows the user to send frames to a WebSocket
server, including text message, ping, and close frames.

To use SSL/TLS: install the `trustme` package from PyPI and run the
`generate-cert.py` script in this directory.
'''
import argparse
import logging
import pathlib
import ssl
import sys
import urllib.parse
from typing import NoReturn

import trio
from trio_websocket import (
    open_websocket_url,
    ConnectionClosed,
    HandshakeError,
    WebSocketConnection,
    CloseReason,
)


logging.basicConfig(level=logging.DEBUG)
here = pathlib.Path(__file__).parent


def commands() -> None:
    ''' Print the supported commands. '''
    print('Commands: ')
    print('send <MESSAGE>   -> send message')
    print('ping <PAYLOAD>   -> send ping with payload')
    print('close [<REASON>] -> politely close connection with optional reason')
    print()


def parse_args() -> argparse.Namespace:
    ''' Parse command line arguments. '''
    parser = argparse.ArgumentParser(description='Example trio-websocket client')
    parser.add_argument('--heartbeat', action='store_true',
        help='Create a heartbeat task')
    parser.add_argument('url', help='WebSocket URL to connect to')
    return parser.parse_args()


async def main(args: argparse.Namespace) -> bool:
    ''' Main entry point, returning False in the case of logged error. '''
    if urllib.parse.urlsplit(args.url).scheme == 'wss':
        # Configure SSL context to handle our self-signed certificate. Most
        # clients won't need to do this.
        try:
            ssl_context = ssl.create_default_context()
            ssl_context.load_verify_locations(here / 'fake.ca.pem')
        except FileNotFoundError:
            logging.error('Did not find file "fake.ca.pem". You need to run'
                ' generate-cert.py')
            return False
    else:
        ssl_context = None
    try:
        logging.debug('Connecting to WebSocket…')
        async with open_websocket_url(args.url, ssl_context) as conn:
            await handle_connection(conn, args.heartbeat)
    except HandshakeError as e:
        logging.error('Connection attempt failed: %s', e)
        return False
    return True


async def handle_connection(ws: WebSocketConnection, use_heartbeat: bool) -> None:
    ''' Handle the connection. '''
    logging.debug('Connected!')
    try:
        async with trio.open_nursery() as nursery:
            if use_heartbeat:
                nursery.start_soon(heartbeat, ws, 1, 15)
            nursery.start_soon(get_commands, ws)
            nursery.start_soon(get_messages, ws)
    except ConnectionClosed as cc:
        assert isinstance(cc.reason, CloseReason)
        reason = '<no reason>' if cc.reason.reason is None else f'"{cc.reason.reason}"'
        print(f'Closed: {cc.reason.code}/{cc.reason.name} {reason}')


async def heartbeat(ws: WebSocketConnection, timeout: float, interval: float) -> NoReturn:
    '''
    Send periodic pings on WebSocket ``ws``.

    Wait up to ``timeout`` seconds to send a ping and receive a pong. Raises
    ``TooSlowError`` if the timeout is exceeded. If a pong is received, then
    wait ``interval`` seconds before sending the next ping.

    This function runs until cancelled.

    :param ws: A WebSocket to send heartbeat pings on.
    :param float timeout: Timeout in seconds.
    :param float interval: Interval between receiving pong and sending next
        ping, in seconds.
    :raises: ``ConnectionClosed`` if ``ws`` is closed.
    :raises: ``TooSlowError`` if the timeout expires.
    :returns: This function runs until cancelled.
    '''
    while True:
        with trio.fail_after(timeout):
            await ws.ping()
        await trio.sleep(interval)


async def get_commands(ws: WebSocketConnection) -> None:
    ''' In a loop: get a command from the user and execute it. '''
    while True:
        cmd = await trio.to_thread.run_sync(input, 'cmd> ')
        if cmd.startswith('ping'):
            payload = cmd[5:].encode('utf8') or None
            await ws.ping(payload)
        elif cmd.startswith('send'):
            message = cmd[5:] or None
            if message is None:
                logging.error('The "send" command requires a message.')
            else:
                await ws.send_message(message)
        elif cmd.startswith('close'):
            reason = cmd[6:] or None
            await ws.aclose(code=1000, reason=reason)
            break
        else:
            commands()
        # Allow time to receive response and log print logs:
        await trio.sleep(0.25)


async def get_messages(ws: WebSocketConnection) -> None:
    ''' In a loop: get a WebSocket message and print it out. '''
    while True:
        message = await ws.get_message()
        print(f'message: {message!r}')


if __name__ == '__main__':
    try:
        if not trio.run(main, parse_args()):
            sys.exit(1)
    except (KeyboardInterrupt, EOFError):
        print()