File: test_message.py

package info (click to toggle)
python-snitun 0.45.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 640 kB
  • sloc: python: 6,681; sh: 5; makefile: 3
file content (53 lines) | stat: -rw-r--r-- 1,605 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from snitun.multiplexer.message import (
    FlowType,
    MultiplexerChannelId,
    MultiplexerMessage,
)


def test_multiplexer_channel_id() -> None:
    """Test MultiplexerChannelId."""
    channel_id = MultiplexerChannelId(b"testtesttesttest")
    assert channel_id.bytes == b"testtesttesttest"
    assert channel_id.hex() == "74657374746573747465737474657374"
    assert str(channel_id) == "74657374746573747465737474657374"


def test_message_types() -> None:
    """Test FlowType."""
    assert FlowType.NEW == 0x01
    assert FlowType.NEW.value == 0x01
    assert FlowType.DATA == 0x02
    assert FlowType.DATA.value == 0x02
    assert FlowType.CLOSE == 0x04
    assert FlowType.CLOSE.value == 0x04
    assert FlowType.PING == 0x08
    assert FlowType.PING.value == 0x08
    assert FlowType.PAUSE == 0x16
    assert FlowType.PAUSE.value == 0x16
    assert FlowType.RESUME == 0x32
    assert FlowType.RESUME.value == 0x32


def test_message_repr() -> None:
    """Test MultiplexerMessage __repr__."""
    msg = MultiplexerMessage(
        MultiplexerChannelId(b"testtesttesttest"),
        FlowType.NEW,
        b"test",
        b"test",
    )
    assert repr(msg) == (
        "MultiplexerMessage(id=74657374746573747465737474657374, flow_type="
        "<FlowType.NEW: 1>, data=b'test', extra=b'test')"
    )
    msg = MultiplexerMessage(
        MultiplexerChannelId(b"testtesttesttest"),
        255,
        b"test",
        b"test",
    )
    assert repr(msg) == (
        "MultiplexerMessage(id=74657374746573747465737474657374, flow_type="
        "255, data=b'test', extra=b'test')"
    )