File: named_pipe.py

package info (click to toggle)
libsearpc 3.2.1-1%2Breally3.2%2Bgit20220902.15f6f0b-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 524 kB
  • sloc: ansic: 4,094; python: 863; makefile: 111; sh: 68
file content (166 lines) | stat: -rw-r--r-- 4,948 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
"""
RPC client/server implementation based on named pipe transport.
"""

import json
import logging
import os
import socket
import struct
from threading import Thread
import queue

from .client import SearpcClient
from .server import searpc_server
from .transport import SearpcTransport
from .utils import make_socket_closeonexec, recvall, sendall

logger = logging.getLogger(__name__)


class NamedPipeException(Exception):
    pass


class NamedPipeTransport(SearpcTransport):
    """
    This transport uses named pipes on windows and unix domain socket
    on linux/mac.

    It's compatible with the c implementation of named pipe transport.
    in lib/searpc-named-pipe-transport.[ch] files.

    The protocol is:
    - request: <32b length header><json request>
    - response: <32b length header><json response>
    """

    def __init__(self, socket_path):
        self.socket_path = socket_path
        self.pipe = None

    def connect(self):
        self.pipe = socket.socket(socket.AF_UNIX)
        self.pipe.connect(self.socket_path)

    def stop(self):
        if self.pipe:
            self.pipe.close()
            self.pipe = None

    def send(self, service, fcall_str):
        body = json.dumps({
            'service': service,
            'request': fcall_str,
        })
        body_utf8 = body.encode(encoding='utf-8')
        # "I" for unsiged int
        header = struct.pack('=I', len(body_utf8))
        sendall(self.pipe, header)
        sendall(self.pipe, body_utf8)

        resp_header = recvall(self.pipe, 4)
        # logger.info('resp_header is %s', resp_header)
        resp_size, = struct.unpack('=I', resp_header)
        # logger.info('resp_size is %s', resp_size)
        resp = recvall(self.pipe, resp_size)
        # logger.info('resp is %s', resp)
        return resp.decode(encoding='utf-8')


class NamedPipeClient(SearpcClient):
    def __init__(self, socket_path, service_name, pool_size=5):
        self.socket_path = socket_path
        self.service_name = service_name
        self.pool_size = pool_size
        self._pool = queue.Queue(pool_size)

    def _create_transport(self):
        transport = NamedPipeTransport(self.socket_path)
        transport.connect()
        return transport

    def _get_transport(self):
        try:
            transport = self._pool.get(False)
        except:
            transport = self._create_transport()
        return transport

    def _return_transport(self, transport):
        try:
            self._pool.put(transport, False)
        except queue.Full:
            transport.stop()

    def call_remote_func_sync(self, fcall_str):
        transport = self._get_transport()
        ret_str = transport.send(self.service_name, fcall_str)
        self._return_transport(transport)
        return ret_str


class NamedPipeServer(object):
    """
    Searpc server based on named pipe transport. Note this server is
    very basic and is written for testing purpose only.
    """
    def __init__(self, socket_path):
        self.socket_path = socket_path
        self.pipe = None
        self.thread = Thread(target=self.accept_loop)
        self.thread.setDaemon(True)

    def start(self):
        self.init_socket()
        self.thread.start()

    def stop(self):
        pass

    def init_socket(self):
        if os.path.exists(self.socket_path):
            try:
                os.unlink(self.socket_path)
            except OSError:
                raise NamedPipeException(
                    'Failed to remove existing unix socket {}'.
                    format(self.socket_path)
                )
        self.pipe = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
        make_socket_closeonexec(self.pipe)
        self.pipe.bind(self.socket_path)
        self.pipe.listen(10)
        logger.info('Server now listening at %s', self.socket_path)

    def accept_loop(self):
        logger.info('Waiting for clients')
        while True:
            connfd, _ = self.pipe.accept()
            logger.info('New pip client')
            t = PipeHandlerThread(connfd)
            t.start()


class PipeHandlerThread(Thread):
    def __init__(self, pipe):
        Thread.__init__(self)
        self.setDaemon(True)
        self.pipe = pipe

    def run(self):
        while True:
            req_header = recvall(self.pipe, 4)
            # logger.info('Got req header %s', req_header)
            req_size, = struct.unpack('I', req_header)
            # logger.info('req size is %s', req_size)
            req = recvall(self.pipe, req_size)
            # logger.info('req is %s', req)

            data = json.loads(req.decode(encoding='utf-8'))
            resp = searpc_server.call_function(data['service'], data['request'])
            # logger.info('resp is %s', resp)

            resp_header = struct.pack('I', len(resp))
            sendall(self.pipe, resp_header)
            sendall(self.pipe, resp.encode(encoding='utf-8'))