File: __init__.py

package info (click to toggle)
streamlink 7.3.0-2~bpo12%2B1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm-backports
  • size: 5,584 kB
  • sloc: python: 49,172; sh: 184; makefile: 145
file content (30 lines) | stat: -rw-r--r-- 853 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from __future__ import annotations

import trio
from trio_websocket import CloseReason, ConnectionClosed  # type: ignore[import]


class FakeWebsocketConnection:
    sender: trio.MemorySendChannel
    receiver: trio.MemoryReceiveChannel

    def __init__(self) -> None:
        self.sender, self.receiver = trio.open_memory_channel(10)
        self.sent: list[str] = []
        self.closed: bool = False

    async def send_message(self, message: str):
        self.sent.append(message)

    async def get_message(self):
        try:
            return await self.receiver.receive()
        except BaseException:
            # noinspection PyTypeChecker
            raise ConnectionClosed(CloseReason(1000, None)) from None

    async def aclose(self):
        # sync
        self.sender.close()
        self.receiver.close()
        self.closed = True