File: parser.py

package info (click to toggle)
tap.py 3.2.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 476 kB
  • sloc: python: 1,808; makefile: 164; sh: 40
file content (196 lines) | stat: -rw-r--r-- 6,787 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
import itertools
import re
import sys
from io import StringIO

from tap.directive import Directive
from tap.line import Bail, Diagnostic, Plan, Result, Unknown, Version

try:
    import yaml  # noqa
    from more_itertools import peekable

    ENABLE_VERSION_13 = True
except ImportError:  # pragma: no cover
    ENABLE_VERSION_13 = False


class Parser:
    """A parser for TAP files and lines."""

    # ok and not ok share most of the same characteristics.
    result_base = r"""
        \s*                    # Optional whitespace.
        (?P<number>\d*)        # Optional test number.
        \s*                    # Optional whitespace.
        (?P<description>[^#]*) # Optional description before #.
        \#?                    # Optional directive marker.
        \s*                    # Optional whitespace.
        (?P<directive>.*)      # Optional directive text.
    """
    ok = re.compile(r"^ok" + result_base, re.VERBOSE)
    not_ok = re.compile(r"^not\ ok" + result_base, re.VERBOSE)
    plan = re.compile(
        r"""
        ^1..(?P<expected>\d+) # Match the plan details.
        [^#]*                 # Consume any non-hash character to confirm only
                              # directives appear with the plan details.
        \#?                   # Optional directive marker.
        \s*                   # Optional whitespace.
        (?P<directive>.*)     # Optional directive text.
    """,
        re.VERBOSE,
    )
    diagnostic = re.compile(r"^#")
    bail = re.compile(
        r"""
        ^Bail\ out!
        \s*            # Optional whitespace.
        (?P<reason>.*) # Optional reason.
    """,
        re.VERBOSE,
    )
    version = re.compile(r"^TAP version (?P<version>\d+)$")

    yaml_block_start = re.compile(r"^(?P<indent>\s+)-")
    yaml_block_end = re.compile(r"^\s+\.\.\.")

    TAP_MINIMUM_DECLARED_VERSION = 13

    def parse_file(self, filename):
        """Parse a TAP file to an iterable of tap.line.Line objects.

        This is a generator method that will yield an object for each
        parsed line. The file given by `filename` is assumed to exist.
        """
        return self.parse(open(filename))

    def parse_stdin(self):
        """Parse a TAP stream from standard input.

        Note: this has the side effect of closing the standard input
        filehandle after parsing.
        """
        return self.parse(sys.stdin)

    def parse_text(self, text):
        """Parse a string containing one or more lines of TAP output."""
        return self.parse(StringIO(text))

    def parse(self, fh):
        """Generate tap.line.Line objects, given a file-like object `fh`.

        `fh` may be any object that implements both the iterator and
        context management protocol (i.e. it can be used in both a
        "with" statement and a "for...in" statement.)

        Trailing whitespace and newline characters will be automatically
        stripped from the input lines.
        """
        with fh:
            try:
                first_line = next(fh)
            except StopIteration:
                return
            first_parsed = self.parse_line(first_line.rstrip())
            fh_new = itertools.chain([first_line], fh)
            if first_parsed.category == "version" and first_parsed.version >= 13:
                if ENABLE_VERSION_13:
                    fh_new = peekable(itertools.chain([first_line], fh))
                else:  # pragma no cover
                    print(
                        """
WARNING: Optional imports not found, TAP 13 output will be
    ignored. To parse yaml, see requirements in docs:
    https://tappy.readthedocs.io/en/latest/consumers.html#tap-version-13"""
                    )

            for line in fh_new:
                yield self.parse_line(line.rstrip(), fh_new)

    def parse_line(self, text, fh=None):
        """Parse a line into whatever TAP category it belongs."""
        match = self.ok.match(text)
        if match:
            return self._parse_result(True, match, fh)

        match = self.not_ok.match(text)
        if match:
            return self._parse_result(False, match, fh)

        if self.diagnostic.match(text):
            return Diagnostic(text)

        match = self.plan.match(text)
        if match:
            return self._parse_plan(match)

        match = self.bail.match(text)
        if match:
            return Bail(match.group("reason"))

        match = self.version.match(text)
        if match:
            return self._parse_version(match)

        return Unknown()

    def _parse_plan(self, match):
        """Parse a matching plan line."""
        expected_tests = int(match.group("expected"))
        directive = Directive(match.group("directive"))

        # Only SKIP directives are allowed in the plan.
        if directive.text and not directive.skip:
            return Unknown()

        return Plan(expected_tests, directive)

    def _parse_result(self, ok, match, fh=None):
        """Parse a matching result line into a result instance."""
        peek_match = None
        try:
            if fh is not None and ENABLE_VERSION_13 and isinstance(fh, peekable):
                peek_match = self.yaml_block_start.match(fh.peek())
        except StopIteration:
            pass
        if peek_match is None:
            return Result(
                ok,
                number=match.group("number"),
                description=match.group("description").strip(),
                directive=Directive(match.group("directive")),
            )
        indent = peek_match.group("indent")
        concat_yaml = self._extract_yaml_block(indent, fh)
        return Result(
            ok,
            number=match.group("number"),
            description=match.group("description").strip(),
            directive=Directive(match.group("directive")),
            raw_yaml_block=concat_yaml,
        )

    def _extract_yaml_block(self, indent, fh):
        """Extract a raw yaml block from a file handler"""
        raw_yaml = []
        indent_match = re.compile(rf"^{indent}")
        try:
            next(fh)
            while indent_match.match(fh.peek()):
                raw_yaml.append(next(fh).replace(indent, "", 1))
                # check for the end and stop adding yaml if encountered
                if self.yaml_block_end.match(fh.peek()):
                    next(fh)
                    break
        except StopIteration:
            pass
        return "".join(raw_yaml)

    def _parse_version(self, match):
        version = int(match.group("version"))
        if version < self.TAP_MINIMUM_DECLARED_VERSION:
            raise ValueError(
                "It is an error to explicitly specify any version lower than 13."
            )
        return Version(version)