File: app_group.py

package info (click to toggle)
pypy3 7.3.19%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 212,236 kB
  • sloc: python: 2,098,316; ansic: 540,565; sh: 21,462; asm: 14,419; cpp: 4,451; makefile: 4,209; objc: 761; xml: 530; exp: 499; javascript: 314; pascal: 244; lisp: 45; csh: 12; awk: 4
file content (212 lines) | stat: -rw-r--r-- 7,474 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# helper functions for interp_group.py that are exposed via a gateway.applevel


def check_new_args(cls, message, exceptions):
    if not isinstance(message, str):
        raise TypeError(f"argument 1 must be str, not {type(message)}")
    if not isinstance(exceptions, (list, tuple)):
        from collections.abc import Sequence
        if not isinstance(exceptions, Sequence):
            raise TypeError("second argument (exceptions) must be a sequence")
    if not exceptions or not exceptions.count:
        raise ValueError("second argument (exceptions) must be a non-empty sequence")

    for i, exc in enumerate(exceptions):
        if not isinstance(exc, BaseException):
            raise ValueError(f"Item {i} of second argument (exceptions) is not an exception")

    if cls is BaseExceptionGroup:
        if all(isinstance(exc, Exception) for exc in exceptions):
            cls = ExceptionGroup

    if issubclass(cls, Exception):
        for exc in exceptions:
            if not isinstance(exc, Exception):
                if cls is ExceptionGroup:
                    raise TypeError("Cannot nest BaseExceptions in an ExceptionGroup")
                else:
                    raise TypeError(f"Cannot nest BaseExceptions in {cls.__name__!r}")
    return cls, tuple(exceptions)

def subgroup(self, condition):
    condition = get_condition_filter(condition)
    modified = False
    if condition(self):
        return self

    exceptions = []
    for exc in self.exceptions:
        if isinstance(exc, BaseExceptionGroup):
            subgroup = exc.subgroup(condition)
            if subgroup is not None:
                exceptions.append(subgroup)
            if subgroup is not exc:
                modified = True
        elif condition(exc):
            exceptions.append(exc)
        else:
            modified = True

    if not modified:
        # this is the difference to split!
        return self
    elif exceptions:
        group = _derive_and_copy_attrs(self, exceptions)
        return group
    else:
        return None

def split(self, condition):
    condition = get_condition_filter(condition)
    if condition(self):
        return self, None

    matching_exceptions = []
    nonmatching_exceptions = []
    for exc in self.exceptions:
        if isinstance(exc, BaseExceptionGroup):
            matching, nonmatching = exc.split(condition)
            if matching is not None:
                matching_exceptions.append(matching)
            if nonmatching is not None:
                nonmatching_exceptions.append(nonmatching)
        elif condition(exc):
            matching_exceptions.append(exc)
        else:
            nonmatching_exceptions.append(exc)

    matching_group = None
    if matching_exceptions:
        matching_group = _derive_and_copy_attrs(self, matching_exceptions)

    nonmatching_group = None
    if nonmatching_exceptions:
        nonmatching_group = _derive_and_copy_attrs(self, nonmatching_exceptions)

    return (matching_group, nonmatching_group)

def __str__(self):
    suffix = "" if len(self.exceptions) == 1 else "s"
    return f"{self.message} ({len(self.exceptions)} sub-exception{suffix})"

def __repr__(self):
    return f"{self.__class__.__name__}({self.message!r}, {list(self.exceptions)!r})"

def _derive_and_copy_attrs(self, excs):
    eg = self.derive(excs)
    if hasattr(self, "__notes__"):
        # Create a new list so that add_note() only affects one exceptiongroup
        try:
            eg.__notes__ = list(self.__notes__)
        except TypeError:
            # ignore non-sequence __notes__
            pass
    eg.__cause__ = self.__cause__
    eg.__context__ = self.__context__
    eg.__traceback__ = self.__traceback__
    return eg


_SENTINEL = object()

def _is_same_exception_metadata(exc1, exc2):
    # TODO: Exception or BaseException?
    assert isinstance(exc1, Exception)
    assert isinstance(exc2, Exception)

    return (getattr(exc1, '__notes__', _SENTINEL) is getattr(exc2, '__notes__', _SENTINEL) and
            exc1.__traceback__ is exc2.__traceback__ and
            exc1.__cause__     is exc2.__cause__ and
            exc1.__context__   is exc2.__context__)

def get_condition_filter(condition):
    if isinstance(condition, type) and issubclass(condition, BaseException):
        return lambda exc: isinstance(exc, condition)
    elif isinstance(condition, tuple) and \
            all((isinstance(c, type) and issubclass(c, BaseException)) for c in condition):
        return lambda exc: isinstance(exc, condition)
    elif callable(condition):
        return condition
    # otherwise
    raise TypeError("expected a function, exception type or tuple of exception types")

# helper functions for the interpreter

def _exception_group_projection(eg, keep_list):
    """
    From CPython:
    /* This function is used by the interpreter to construct reraised
    * exception groups. It takes an exception group eg and a list
    * of exception groups keep and returns the sub-exception group
    * of eg which contains all leaf exceptions that are contained
    * in any exception group in keep.
    */
    """
    from __pypy__ import identity_dict
    # TODO: muss es nicht eigentlich anders herum sein
    # (return rest instead of match)?
    assert isinstance(eg, BaseExceptionGroup)
    assert isinstance(keep_list, list)

    resultset = identity_dict()
    for keep in keep_list:
        _collect_eg_leafs(keep, resultset)

    # TODO: maybe don't construct rest eg
    split_match, _ = eg.split(lambda element: element in resultset)

    return split_match

def _collect_eg_leafs(eg_or_exc, resultset):
    if eg_or_exc is None:
        # empty exception groups appear as a result
        # of matches (split, subgroup) and thus are valid
        pass
    elif isinstance(eg_or_exc, BaseExceptionGroup):
        # recursively collect children of eg
        for subexc in eg_or_exc.exceptions:
            _collect_eg_leafs(subexc, resultset)
    elif isinstance(eg_or_exc, BaseException):
        # we have a single exception (not a group),
        # return a singleton list containing the exc
        resultset[eg_or_exc] = None
    else:
        raise TypeError(f"expected BaseException, got {type(eg_or_exc)}")

def _prep_reraise_star(orig, exc_list):
    assert isinstance(orig, BaseException)
    assert isinstance(exc_list, list)

    # TODO: test this:
    if len(exc_list) < 1:
        return None

    for exc in exc_list: assert isinstance(exc, BaseException) or exc is None

    if not isinstance(orig, BaseExceptionGroup):
        # a naked exception was caught and wrapped. Only one except* clause
        # could have executed,so there is at most one exception to raise.
        assert len(exc_list) == 1 or (len(exc_list) == 2 and exc_list[1] is None)
        return exc_list[0]

    raised_list = []
    reraised_list = []
    for exc in exc_list:
        if exc is not None:
            if _is_same_exception_metadata(exc, orig):
                reraised_list.append(exc)
            else:
                raised_list.append(exc)

    reraised_eg = _exception_group_projection(orig, reraised_list)
    if reraised_eg is not None:
        assert _is_same_exception_metadata(reraised_eg, orig)

    if not raised_list:
        return reraised_eg
    if reraised_eg is not None:
        raised_list.append(reraised_eg)
    if len(raised_list) == 1:
        return raised_list[0]
    return ExceptionGroup("", raised_list)