File: test_big_message.py

package info (click to toggle)
dbus-fast 3.1.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,804 kB
  • sloc: python: 9,997; xml: 39; makefile: 29; sh: 5
file content (79 lines) | stat: -rw-r--r-- 2,291 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import asyncio
import sys

import pytest

from dbus_fast import Message, MessageType, aio, glib
from dbus_fast.service import ServiceInterface, dbus_method
from tests.util import check_gi_repository, skip_reason_no_gi

has_gi = check_gi_repository()


class ExampleInterface(ServiceInterface):
    def __init__(self):
        super().__init__("example.interface")

    @dbus_method()
    def echo_bytes(self, what: "ay") -> "ay":
        return what


@pytest.mark.asyncio
async def test_aio_big_message():
    "this tests that nonblocking reads and writes actually work for aio"
    bus1 = await aio.MessageBus().connect()
    bus2 = await aio.MessageBus().connect()
    interface = ExampleInterface()
    bus1.export("/test/path", interface)

    # two megabytes
    big_body = [bytes(1000000) * 2]
    result = await bus2.call(
        Message(
            destination=bus1.unique_name,
            path="/test/path",
            interface=interface.name,
            member="echo_bytes",
            signature="ay",
            body=big_body,
        )
    )
    assert result.message_type == MessageType.METHOD_RETURN, result.body[0]
    assert result.body[0] == big_body[0]

    bus1.disconnect()
    bus2.disconnect()
    await asyncio.wait_for(bus1.wait_for_disconnect(), timeout=1)
    await asyncio.wait_for(bus2.wait_for_disconnect(), timeout=1)


@pytest.mark.skipif(not has_gi, reason=skip_reason_no_gi)
@pytest.mark.skipif(
    sys.version_info[:3][1] in (10, 11, 12, 13),
    reason="segfaults on py3.10,py3.11,py3.12,py3.13",
)
def test_glib_big_message():
    "this tests that nonblocking reads and writes actually work for glib"
    bus1 = glib.MessageBus().connect_sync()
    bus2 = glib.MessageBus().connect_sync()
    interface = ExampleInterface()
    bus1.export("/test/path", interface)

    # two megabytes
    big_body = [bytes(1000000) * 2]
    result = bus2.call_sync(
        Message(
            destination=bus1.unique_name,
            path="/test/path",
            interface=interface.name,
            member="echo_bytes",
            signature="ay",
            body=big_body,
        )
    )
    assert result.message_type == MessageType.METHOD_RETURN, result.body[0]
    assert result.body[0] == big_body[0]

    bus1.disconnect()
    bus2.disconnect()