File: threading_server.py

package info (click to toggle)
jeepney 0.9.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 592 kB
  • sloc: python: 3,036; makefile: 206; xml: 115
file content (88 lines) | stat: -rw-r--r-- 2,479 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
"""A simple server built on Jeepney's blocking integration.

Start this, and then run blocking_server_client.py to send requests to it.
"""
import signal
from threading import Thread
import time

from jeepney import MessageType, HeaderFields, new_method_return, new_error
from jeepney.bus_messages import message_bus
from jeepney.io.threading import (
    open_dbus_connection, DBusRouter, Proxy, ReceiveStopped,
)

SERVER_NAME = "io.gitlab.takluyver.jeepney.examples.Server"

def slow_double(n):
    # Simulate something slow which releases the GIL so other threads can run
    time.sleep(5)
    return n * 2

def double(n):
    return n * 2

def divide(n, d):
    return n / d


def serve(conn, i):
    while True:
        try:
            msg = conn.receive()
        except ReceiveStopped:
            return

        if msg.header.message_type != MessageType.method_call:
            print("Received non-method-call message:", msg)

        method = msg.header.fields[HeaderFields.member]
        print(f"Thread {i}: Message {msg.header.serial} calls {method}")

        # Dispatch to different methods
        if method == 'slow_double':
            res = slow_double(msg.body[0])
            print(f"Delayed reply to {msg.header.serial}: {res}")
            rep = new_method_return(msg, 'i', (res,))

        elif method == 'double':
            res = double(msg.body[0])
            rep = new_method_return(msg, 'i', (res,))

        elif method == 'divide':
            try:
                res = divide(*msg.body)
            except ZeroDivisionError:
                rep = new_error(msg, SERVER_NAME + ".Error.DivideByZero")
            else:
                rep = new_method_return(msg, 'd', (res,))

        else:
            rep = new_error(msg, SERVER_NAME + '.Error.NoMethod')

        conn.send(rep)


with open_dbus_connection() as conn:
    print("My unique name is:", conn.unique_name)

    # Request an additional name on the message bus
    with DBusRouter(conn) as router:
        bus_proxy = Proxy(message_bus, router, timeout=10)
        if bus_proxy.RequestName(SERVER_NAME) == (1,):
            # 1 == DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER
            print("Got name", SERVER_NAME)

    threads = [Thread(target=serve, args=(conn, i)) for i in range(4)]
    for t in threads:
        t.start()

    try:
        signal.pause()  # Wait for Ctrl-C
    except KeyboardInterrupt:
        pass

    conn.interrupt()
    for t in threads:
        t.join()