File: parser.py

package info (click to toggle)
python-hl7 0.4.5-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 508 kB
  • sloc: python: 3,833; makefile: 160
file content (437 lines) | stat: -rw-r--r-- 15,668 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
# -*- coding: utf-8 -*-
from string import whitespace

from .containers import Factory
from .exceptions import ParseException
from .util import isbatch, isfile, ishl7

_HL7_WHITESPACE = whitespace.replace("\r", "")


def parse_hl7(line, encoding="utf-8", factory=Factory):
    """Returns a instance of the :py:class:`hl7.Message`, :py:class:`hl7.Batch`
    or :py:class:`hl7.File` that allows indexed access to the data elements or
    messages or batches respectively.

    A custom :py:class:`hl7.Factory` subclass can be passed in to be used when
    constructing the message/batch/file and it's components.

    .. note::

        HL7 usually contains only ASCII, but can use other character
        sets (HL7 Standards Document, Section 1.7.1), however as of v2.8,
        UTF-8 is the preferred character set [#]_.

        python-hl7 works on Python unicode strings. :py:func:`hl7.parse_hl7`
        will accept unicode string or will attempt to convert bytestrings
        into unicode strings using the optional ``encoding`` parameter.
        ``encoding`` defaults to UTF-8, so no work is needed for bytestrings
        in UTF-8, but for other character sets like 'cp1252' or 'latin1',
        ``encoding`` must be set appropriately.

    >>> h = hl7.parse_hl7(message)

    To decode a non-UTF-8 byte string::

       hl7.parse_hl7(message, encoding='latin1')

    :rtype: :py:class:`hl7.Message` | :py:class:`hl7.Batch` | :py:class:`hl7.File`

    .. [#] http://wiki.hl7.org/index.php?title=Character_Set_used_in_v2_messages

    """
    # Ensure we are working with unicode data, decode the bytestring
    # if needed
    if isinstance(line, bytes):
        line = line.decode(encoding)
    # If it is an HL7 message, parse as normal
    if ishl7(line):
        return parse(line, encoding=encoding, factory=factory)
    # If we have a batch, then parse the batch
    elif isbatch(line):
        return parse_batch(line, encoding=encoding, factory=factory)
    # If we have a file, parse the HL7 file
    elif isfile(line):
        return parse_file(line, encoding=encoding, factory=factory)
    # Not an HL7 message
    raise ValueError("line is not HL7")


def parse(lines, encoding="utf-8", factory=Factory):
    """Returns a instance of the :py:class:`hl7.Message` that allows
    indexed access to the data elements.

    A custom :py:class:`hl7.Factory` subclass can be passed in to be used when
    constructing the message and it's components.

    .. note::

        HL7 usually contains only ASCII, but can use other character
        sets (HL7 Standards Document, Section 1.7.1), however as of v2.8,
        UTF-8 is the preferred character set [#]_.

        python-hl7 works on Python unicode strings. :py:func:`hl7.parse`
        will accept unicode string or will attempt to convert bytestrings
        into unicode strings using the optional ``encoding`` parameter.
        ``encoding`` defaults to UTF-8, so no work is needed for bytestrings
        in UTF-8, but for other character sets like 'cp1252' or 'latin1',
        ``encoding`` must be set appropriately.

    >>> h = hl7.parse(message)

    To decode a non-UTF-8 byte string::

       hl7.parse(message, encoding='latin1')

    :rtype: :py:class:`hl7.Message`

    .. [#] http://wiki.hl7.org/index.php?title=Character_Set_used_in_v2_messages

    """
    # Ensure we are working with unicode data, decode the bytestring
    # if needed
    if isinstance(lines, bytes):
        lines = lines.decode(encoding)
    # Strip out unnecessary whitespace
    strmsg = lines.strip()
    # The method for parsing the message
    plan = create_parse_plan(strmsg, factory)
    # Start splitting the methods based upon the ParsePlan
    return _split(strmsg, plan)


def _create_batch(batch, messages, encoding, factory):
    """Creates a :py:class:`hl7.Batch`"""
    kwargs = {
        "sequence": [
            parse(message, encoding=encoding, factory=factory) for message in messages
        ],
    }
    # If the BHS/BTS were present, use those to set up the batch
    # otherwise default
    if batch:
        batch = parse(batch, encoding=encoding, factory=factory)
        kwargs["esc"] = batch.esc
        kwargs["separators"] = batch.separators
        kwargs["factory"] = batch.factory
    parsed = factory.create_batch(**kwargs)
    # If the BHS/BTS were present then set them
    if batch:
        parsed.header = batch.segment("BHS")
        try:
            parsed.trailer = batch.segment("BTS")
        except KeyError:
            parsed.trailer = parsed.create_segment([parsed.create_field(["BTS"])])
    return parsed


def parse_batch(lines, encoding="utf-8", factory=Factory):
    """Returns a instance of a :py:class:`hl7.Batch`
    that allows indexed access to the messages.

    A custom :py:class:`hl7.Factory` subclass can be passed in to be used when
    constructing the batch and it's components.

    .. note::

        HL7 usually contains only ASCII, but can use other character
        sets (HL7 Standards Document, Section 1.7.1), however as of v2.8,
        UTF-8 is the preferred character set [#]_.

        python-hl7 works on Python unicode strings. :py:func:`hl7.parse_batch`
        will accept unicode string or will attempt to convert bytestrings
        into unicode strings using the optional ``encoding`` parameter.
        ``encoding`` defaults to UTF-8, so no work is needed for bytestrings
        in UTF-8, but for other character sets like 'cp1252' or 'latin1',
        ``encoding`` must be set appropriately.

    >>> h = hl7.parse_batch(message)

    To decode a non-UTF-8 byte string::

       hl7.parse_batch(message, encoding='latin1')

    :rtype: :py:class:`hl7.Batch`

    .. [#] http://wiki.hl7.org/index.php?title=Character_Set_used_in_v2_messages

    """
    # Ensure we are working with unicode data, decode the bytestring
    # if needed
    if isinstance(lines, bytes):
        lines = lines.decode(encoding)
    batch = None
    messages = []
    # Split the batch into lines, retaining the ends
    for line in lines.strip(_HL7_WHITESPACE).splitlines(keepends=True):
        # strip out all whitespace MINUS the '\r'
        line = line.strip(_HL7_WHITESPACE)
        if line[:3] == "BHS":
            if batch:
                raise ParseException("Batch cannot have more than one BHS segment")
            batch = line
        elif line[:3] == "BTS":
            if not batch or "\rBTS" in batch:
                continue
            batch += line
        elif line[:3] == "MSH":
            messages.append(line)
        else:
            if not messages:
                raise ParseException(
                    "Segment received before message header {}".format(line)
                )
            messages[-1] += line
    return _create_batch(batch, messages, encoding, factory)


def _create_file(file, batches, encoding, factory):
    kwargs = {
        "sequence": [
            _create_batch(batch[0], batch[1], encoding, factory) for batch in batches
        ],
    }
    # If the FHS/FTS are present, use them to set up the file
    if file:
        file = parse(file, encoding=encoding, factory=factory)
        kwargs["esc"] = file.esc
        kwargs["separators"] = file.separators
        kwargs["factory"] = file.factory
    parsed = factory.create_file(**kwargs)
    # If the FHS/FTS are present, add them
    if file:
        parsed.header = file.segment("FHS")
        try:
            parsed.trailer = file.segment("FTS")
        except KeyError:
            parsed.trailer = parsed.create_segment([parsed.create_field(["FTS"])])
    return parsed


def parse_file(lines, encoding="utf-8", factory=Factory):  # noqa: C901
    """Returns a instance of the :py:class:`hl7.File` that allows
    indexed access to the batches.

    A custom :py:class:`hl7.Factory` subclass can be passed in to be used when
    constructing the file and it's components.

    .. note::

        HL7 usually contains only ASCII, but can use other character
        sets (HL7 Standards Document, Section 1.7.1), however as of v2.8,
        UTF-8 is the preferred character set [#]_.

        python-hl7 works on Python unicode strings. :py:func:`hl7.parse_file`
        will accept unicode string or will attempt to convert bytestrings
        into unicode strings using the optional ``encoding`` parameter.
        ``encoding`` defaults to UTF-8, so no work is needed for bytestrings
        in UTF-8, but for other character sets like 'cp1252' or 'latin1',
        ``encoding`` must be set appropriately.

    >>> h = hl7.parse_file(message)

    To decode a non-UTF-8 byte string::

       hl7.parse_file(message, encoding='latin1')

    :rtype: :py:class:`hl7.File`

    .. [#] http://wiki.hl7.org/index.php?title=Character_Set_used_in_v2_messages

    """
    # Ensure we are working with unicode data, decode the bytestring
    # if needed
    if isinstance(lines, bytes):
        lines = lines.decode(encoding)
    file = None
    batches = []
    messages = []
    in_batch = False
    # Split the file into lines, retaining the ends
    for line in lines.strip(_HL7_WHITESPACE).splitlines(keepends=True):
        # strip out all whitespace MINUS the '\r'
        line = line.strip(_HL7_WHITESPACE)
        if line[:3] == "FHS":
            if file:
                raise ParseException("File cannot have more than one FHS segment")
            file = line
        elif line[:3] == "FTS":
            if not file or "\rFTS" in file:
                continue
            file += line
        elif line[:3] == "BHS":
            if in_batch:
                raise ParseException("Batch cannot have more than one BHS segment")
            batches.append([line, []])
            in_batch = True
        elif line[:3] == "BTS":
            if not in_batch:
                continue
            batches[-1][0] += line
            in_batch = False
        elif line[:3] == "MSH":
            if in_batch:
                batches[-1][1].append(line)
            else:  # Messages outside of a batch go into the "default" batch
                messages.append(line)
        else:
            if in_batch:
                if not batches[-1][1]:
                    raise ParseException(
                        "Segment received before message header {}".format(line)
                    )
                batches[-1][1][-1] += line
            else:
                if not messages:
                    raise ParseException(
                        "Segment received before message header {}".format(line)
                    )
                messages[-1] += line
    if messages:  # add the default batch, if we have one
        batches.append([None, messages])
    return _create_file(file, batches, encoding, factory)


def _split(text, plan):
    """Recursive function to split the *text* into an n-deep list,
    according to the :py:class:`hl7._ParsePlan`.
    """
    # Base condition, if we have used up all the plans
    if not plan:
        return text

    if not plan.applies(text):
        return plan.container([text])

    # Parsing of the first segment is awkward because it contains
    # the separator characters in a field
    if plan.containers[0] == plan.factory.create_segment and text[:3] in [
        "MSH",
        "BHS",
        "FHS",
    ]:
        seg = text[:3]
        sep0 = text[3]
        sep_end_off = text.find(sep0, 4)
        seps = text[4:sep_end_off]
        text = text[sep_end_off + 1 :]
        data = [
            plan.factory.create_field(
                sequence=[seg], esc=plan.esc, separators=plan.separators
            ),
            plan.factory.create_field(
                sequence=[sep0], esc=plan.esc, separators=plan.separators
            ),
            plan.factory.create_field(
                sequence=[seps], esc=plan.esc, separators=plan.separators
            ),
        ]
    else:
        data = []

    if text:
        data = data + [_split(x, plan.next()) for x in text.split(plan.separator)]
    # Return the instance of the current message part according
    # to the plan
    return plan.container(data)


def create_parse_plan(strmsg, factory=Factory):
    """Creates a plan on how to parse the HL7 message according to
    the details stored within the message.
    """
    # We will always use a carriage return to separate segments
    separators = "\r"

    # Extract the rest of the separators. Defaults used if not present.
    if strmsg[:3] not in ("MSH", "FHS", "BHS"):
        raise ParseException(
            "First segment is {}, must be one of MHS, FHS or BHS".format(strmsg[:3])
        )
    sep0 = strmsg[3]
    seps = list(strmsg[3 : strmsg.find(sep0, 4)])

    separators += seps[0]
    if len(seps) > 2:
        separators += seps[2]  # repetition separator
    else:
        separators += "~"  # repetition separator
    if len(seps) > 1:
        separators += seps[1]  # component separator
    else:
        separators += "^"  # component separator
    if len(seps) > 4:
        separators += seps[4]  # sub-component separator
    else:
        separators += "&"  # sub-component separator
    if len(seps) > 3:
        esc = seps[3]
    else:
        esc = "\\"

    # The ordered list of containers to create
    containers = [
        factory.create_message,
        factory.create_segment,
        factory.create_field,
        factory.create_repetition,
        factory.create_component,
    ]
    return _ParsePlan(separators[0], separators, containers, esc, factory)


class _ParsePlan(object):
    """Details on how to parse an HL7 message. Typically this object
    should be created via :func:`hl7.create_parse_plan`
    """

    # field, component, repetition, escape, subcomponent

    def __init__(self, seperator, separators, containers, esc, factory):
        # TODO test to see performance implications of the assertion
        # since we generate the ParsePlan, this should never be in
        # invalid state
        assert len(containers) == len(separators[separators.find(seperator) :])
        self.separator = seperator
        self.separators = separators
        self.containers = containers
        self.esc = esc
        self.factory = factory

    def container(self, data):
        """Return an instance of the appropriate container for the *data*
        as specified by the current plan.
        """
        return self.containers[0](
            sequence=data,
            esc=self.esc,
            separators=self.separators,
            factory=self.factory,
        )

    def next(self):
        """Generate the next level of the plan (essentially generates
        a copy of this plan with the level of the container and the
        seperator starting at the next index.
        """
        if len(self.containers) > 1:
            # Return a new instance of this class using the tails of
            # the separators and containers lists. Use self.__class__()
            # in case :class:`hl7.ParsePlan` is subclassed
            return self.__class__(
                self.separators[self.separators.find(self.separator) + 1],
                self.separators,
                self.containers[1:],
                self.esc,
                self.factory,
            )
        # When we have no separators and containers left, return None,
        # which indicates that we have nothing further.
        return None

    def applies(self, text):
        """return True if the separator or those if the children are in the text"""
        for s in self.separators[self.separators.find(self.separator) :]:
            if text.find(s) >= 0:
                return True
        return False