File: streams.py

package info (click to toggle)
python-hl7 0.4.5-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 508 kB
  • sloc: python: 3,833; makefile: 160
file content (313 lines) | stat: -rw-r--r-- 11,091 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
import warnings
from asyncio import (
    LimitOverrunError,
    StreamReader,
    StreamReaderProtocol,
    StreamWriter,
    get_event_loop,
    iscoroutine,
)
from asyncio.streams import _DEFAULT_LIMIT

from hl7.mllp.exceptions import InvalidBlockError
from hl7.parser import parse as hl7_parse

START_BLOCK = b"\x0b"
END_BLOCK = b"\x1c"
CARRIAGE_RETURN = b"\x0d"


async def open_hl7_connection(
    host=None,
    port=None,
    *,
    loop=None,
    limit=_DEFAULT_LIMIT,
    encoding=None,
    encoding_errors=None,
    **kwds
):
    """A wrapper for `loop.create_connection()` returning a (reader, writer) pair.

    The reader returned is a :py:class:`hl7.mllp.HL7StreamReader` instance; the writer is a
    :py:class:`hl7.mllp.HL7StreamWriter` instance.

    The arguments are all the usual arguments to create_connection()
    except `protocol_factory`; most common are positional `host` and `port`,
    with various optional keyword arguments following.

    Additional optional keyword arguments are `loop` (to set the event loop
    instance to use), `limit` (to set the buffer limit passed to the
    :py:class:`hl7.mllp.HL7StreamReader`), `encoding` (to set the encoding on the :py:class:`hl7.mllp.HL7StreamReader`
    and :py:class:`hl7.mllp.HL7StreamWriter`) and `encoding_errors` (to set the encoding_errors on the :py:class:`hl7.mllp.HL7StreamReader`
    and :py:class:`hl7.mllp.HL7StreamWriter`).
    """
    if loop is None:
        loop = get_event_loop()
    else:
        warnings.warn(
            "The loop argument is deprecated since Python 3.8, "
            "and scheduled for removal in Python 3.10.",
            DeprecationWarning,
            stacklevel=2,
        )
    reader = HL7StreamReader(
        limit=limit, loop=loop, encoding=encoding, encoding_errors=encoding_errors
    )
    protocol = HL7StreamProtocol(
        reader, loop=loop, encoding=encoding, encoding_errors=encoding_errors
    )
    transport, _ = await loop.create_connection(lambda: protocol, host, port, **kwds)
    writer = HL7StreamWriter(
        transport, protocol, reader, loop, encoding, encoding_errors
    )
    return reader, writer


async def start_hl7_server(
    client_connected_cb,
    host=None,
    port=None,
    *,
    loop=None,
    limit=_DEFAULT_LIMIT,
    encoding=None,
    encoding_errors=None,
    **kwds
):
    """Start a socket server, call back for each client connected.

    The first parameter, `client_connected_cb`, takes two parameters:
    `client_reader`, `client_writer`.  `client_reader` is a
    :py:class:`hl7.mllp.HL7StreamReader` object, while `client_writer`
    is a :py:class:`hl7.mllp.HL7StreamWriter` object.  This
    parameter can either be a plain callback function or a coroutine;
    if it is a coroutine, it will be automatically converted into a
    `Task`.

    The rest of the arguments are all the usual arguments to
    `loop.create_server()` except `protocol_factory`; most common are
    positional `host` and `port`, with various optional keyword arguments
    following.

    The return value is the same as `loop.create_server()`.
    Additional optional keyword arguments are `loop` (to set the event loop
    instance to use) and `limit` (to set the buffer limit passed to the
    StreamReader).

    The return value is the same as `loop.create_server()`, i.e. a
    `Server` object which can be used to stop the service.
    """
    if loop is None:
        loop = get_event_loop()
    else:
        warnings.warn(
            "The loop argument is deprecated since Python 3.8, "
            "and scheduled for removal in Python 3.10.",
            DeprecationWarning,
            stacklevel=2,
        )

    def factory():
        reader = HL7StreamReader(
            limit=limit, loop=loop, encoding=encoding, encoding_errors=encoding_errors
        )
        protocol = HL7StreamProtocol(
            reader,
            client_connected_cb,
            loop=loop,
            encoding=encoding,
            encoding_errors=encoding_errors,
        )
        return protocol

    return await loop.create_server(factory, host, port, **kwds)


class MLLPStreamReader(StreamReader):
    def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
        super().__init__(limit, loop)

    async def readblock(self):
        """Read a chunk of data from the stream until the block termination
        separator (b'\x1c\x0d') are found.

        On success, the data and separator will be removed from the
        internal buffer (consumed). Returned data will not include the
        separator at the end or the MLLP start block character (b'\x0b') at the
        beginning.

        Configured stream limit is used to check result. Limit sets the
        maximal length of data that can be returned, not counting the
        separator.

        If an EOF occurs and the complete separator is still not found,
        an IncompleteReadError exception will be raised, and the internal
        buffer will be reset.  The IncompleteReadError.partial attribute
        may contain the separator partially.

        If limit is reached, ValueError will be raised. In that case, if
        block termination separator was found, complete line including separator
        will be removed from internal buffer. Else, internal buffer will be cleared. Limit is
        compared against part of the line without separator.

        If the block is invalid (missing required start block character) and InvalidBlockError
        will be raised.

        If stream was paused, this function will automatically resume it if
        needed.
        """
        sep = END_BLOCK + CARRIAGE_RETURN
        seplen = len(sep)
        try:
            block = await self.readuntil(sep)
        except LimitOverrunError as loe:
            if self._buffer.startswith(sep, loe.consumed):
                del self._buffer[: loe.consumed + seplen]
            else:
                self._buffer.clear()
            self._maybe_resume_transport()
            raise ValueError(loe.args[0])
        if not block or block[0:1] != START_BLOCK:
            raise InvalidBlockError(
                "Block does not begin with Start Block character <VT>"
            )
        return block[1:-2]


class MLLPStreamWriter(StreamWriter):
    def __init__(self, transport, protocol, reader, loop):
        super().__init__(transport, protocol, reader, loop)

    def writeblock(self, data):
        """Write a block of data to the stream,
        encapsulating the block with b'\x0b' at the beginning
        and b'\x1c\x0d' at the end.
        """
        self.write(START_BLOCK + data + END_BLOCK + CARRIAGE_RETURN)


class HL7StreamProtocol(StreamReaderProtocol):
    def __init__(
        self,
        stream_reader,
        client_connected_cb=None,
        loop=None,
        encoding=None,
        encoding_errors=None,
    ):
        super().__init__(stream_reader, client_connected_cb, loop)
        self._encoding = encoding
        self._encoding_errors = encoding_errors

    def connection_made(self, transport):
        # _reject_connection not added until 3.8
        if getattr(self, "_reject_connection", False):
            context = {
                "message": (
                    "An open stream was garbage collected prior to "
                    "establishing network connection; "
                    'call "stream.close()" explicitly.'
                )
            }
            if self._source_traceback:
                context["source_traceback"] = self._source_traceback
            self._loop.call_exception_handler(context)
            transport.abort()
            return
        self._transport = transport
        reader = self._stream_reader
        if reader is not None:
            reader.set_transport(transport)
        self._over_ssl = transport.get_extra_info("sslcontext") is not None
        if self._client_connected_cb is not None:
            self._stream_writer = HL7StreamWriter(
                transport,
                self,
                reader,
                self._loop,
                self._encoding,
                self._encoding_errors,
            )
            res = self._client_connected_cb(reader, self._stream_writer)
            if iscoroutine(res):
                self._loop.create_task(res)
            self._strong_reader = None


class HL7StreamReader(MLLPStreamReader):
    def __init__(
        self, limit=_DEFAULT_LIMIT, loop=None, encoding=None, encoding_errors=None
    ):
        super().__init__(limit=limit, loop=loop)
        self.encoding = encoding
        self.encoding_errors = encoding_errors

    @property
    def encoding(self):
        return self._encoding

    @encoding.setter
    def encoding(self, encoding):
        if encoding and not isinstance(encoding, str):
            raise TypeError("encoding must be a str or None")
        self._encoding = encoding or "ascii"

    @property
    def encoding_errors(self):
        return self._encoding_errors

    @encoding_errors.setter
    def encoding_errors(self, encoding_errors):
        if encoding_errors and not isinstance(encoding_errors, str):
            raise TypeError("encoding_errors must be a str or None")
        self._encoding_errors = encoding_errors or "strict"

    async def readmessage(self):
        """Reads a full HL7 message from the stream.

        This will return an :py:class:`hl7.Message`.

        If `limit` is reached, `ValueError` will be raised. In that case, if
        block termination separator was found, complete line including separator
        will be removed from internal buffer. Else, internal buffer will be cleared. Limit is
        compared against part of the line without separator.

        If an invalid MLLP block is encountered, :py:class:`hl7.mllp.InvalidBlockError` will be
        raised.
        """
        block = await self.readblock()
        return hl7_parse(block.decode(self.encoding, self.encoding_errors))


class HL7StreamWriter(MLLPStreamWriter):
    def __init__(
        self, transport, protocol, reader, loop, encoding=None, encoding_errors=None
    ):
        super().__init__(transport, protocol, reader, loop)
        self.encoding = encoding
        self.encoding_errors = encoding_errors

    @property
    def encoding(self):
        return self._encoding

    @encoding.setter
    def encoding(self, encoding):
        if encoding and not isinstance(encoding, str):
            raise TypeError("encoding must be a str or None")
        self._encoding = encoding or "ascii"

    @property
    def encoding_errors(self):
        return self._encoding_errors

    @encoding_errors.setter
    def encoding_errors(self, encoding_errors):
        if encoding_errors and not isinstance(encoding_errors, str):
            raise TypeError("encoding_errors must be a str or None")
        self._encoding_errors = encoding_errors or "strict"

    def writemessage(self, message):
        """Writes an :py:class:`hl7.Message` to the stream."""
        self.writeblock(str(message).encode(self.encoding, self.encoding_errors))