File: websocket-client.py

package info (click to toggle)
twisted 25.5.0-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 20,560 kB
  • sloc: python: 203,171; makefile: 200; sh: 92; javascript: 36; xml: 31
file content (44 lines) | stat: -rw-r--r-- 1,180 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from __future__ import annotations

from typing import Any

from twisted.internet.defer import Failure
from twisted.internet.task import LoopingCall, deferLater, react
from twisted.web.websocket import WebSocketClientEndpoint, WebSocketTransport


class WebSocketClientDemo:
    @classmethod
    def buildProtocol(cls, uri: str) -> WebSocketClientDemo:
        return cls()

    def textMessageReceived(self, data: str) -> None:
        print(f"received text: {data!r}")

    def negotiationStarted(self, transport: WebSocketTransport) -> None:
        self.transport = transport

    def negotiationFinished(self) -> None:
        self.transport.sendTextMessage("hello, world!")

    def bytesMessageReceived(self, data: bytes) -> None:
        ...

    def connectionLost(self, reason: Failure) -> None:
        ...

    def pongReceived(self, payload: bytes) -> None:
        ...


async def main(reactor: Any) -> None:
    endpoint = WebSocketClientEndpoint.new(
        reactor, "ws://localhost:8080/websocket-server.rpy"
    )
    print("connecting...")
    await endpoint.connect(WebSocketClientDemo)
    print("connected!")
    await deferLater(reactor, 10)


react(main)