File: connection.py

package info (click to toggle)
python-websockets 15.0.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,948 kB
  • sloc: python: 25,105; javascript: 350; ansic: 148; makefile: 43
file content (115 lines) | stat: -rw-r--r-- 3,323 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import asyncio
import contextlib

from websockets.asyncio.connection import Connection


class InterceptingConnection(Connection):
    """
    Connection subclass that can intercept outgoing packets.

    By interfacing with this connection, we simulate network conditions
    affecting what the component being tested receives during a test.

    """

    def connection_made(self, transport):
        super().connection_made(InterceptingTransport(transport))

    @contextlib.contextmanager
    def delay_frames_sent(self, delay):
        """
        Add a delay before sending frames.

        This can result in out-of-order writes, which is unrealistic.

        """
        assert self.transport.delay_write is None
        self.transport.delay_write = delay
        try:
            yield
        finally:
            self.transport.delay_write = None

    @contextlib.contextmanager
    def delay_eof_sent(self, delay):
        """
        Add a delay before sending EOF.

        This can result in out-of-order writes, which is unrealistic.

        """
        assert self.transport.delay_write_eof is None
        self.transport.delay_write_eof = delay
        try:
            yield
        finally:
            self.transport.delay_write_eof = None

    @contextlib.contextmanager
    def drop_frames_sent(self):
        """
        Prevent frames from being sent.

        Since TCP is reliable, sending frames or EOF afterwards is unrealistic.

        """
        assert not self.transport.drop_write
        self.transport.drop_write = True
        try:
            yield
        finally:
            self.transport.drop_write = False

    @contextlib.contextmanager
    def drop_eof_sent(self):
        """
        Prevent EOF from being sent.

        Since TCP is reliable, sending frames or EOF afterwards is unrealistic.

        """
        assert not self.transport.drop_write_eof
        self.transport.drop_write_eof = True
        try:
            yield
        finally:
            self.transport.drop_write_eof = False


class InterceptingTransport:
    """
    Transport wrapper that intercepts calls to ``write()`` and ``write_eof()``.

    This is coupled to the implementation, which relies on these two methods.

    Since ``write()`` and ``write_eof()`` are not coroutines, this effect is
    achieved by scheduling writes at a later time, after the methods return.
    This can easily result in out-of-order writes, which is unrealistic.

    """

    def __init__(self, transport):
        self.loop = asyncio.get_running_loop()
        self.transport = transport
        self.delay_write = None
        self.delay_write_eof = None
        self.drop_write = False
        self.drop_write_eof = False

    def __getattr__(self, name):
        return getattr(self.transport, name)

    def write(self, data):
        if not self.drop_write:
            if self.delay_write is not None:
                self.loop.call_later(self.delay_write, self.transport.write, data)
            else:
                self.transport.write(data)

    def write_eof(self):
        if not self.drop_write_eof:
            if self.delay_write_eof is not None:
                self.loop.call_later(self.delay_write_eof, self.transport.write_eof)
            else:
                self.transport.write_eof()