File: segments.py

package info (click to toggle)
python-jsonpath 2.0.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,028 kB
  • sloc: python: 9,473; makefile: 6
file content (131 lines) | stat: -rw-r--r-- 4,458 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
"""JSONPath child and descendant segment definitions."""

from __future__ import annotations

from abc import ABC
from abc import abstractmethod
from typing import TYPE_CHECKING
from typing import AsyncIterable
from typing import Iterable
from typing import Mapping
from typing import Sequence
from typing import Tuple

from .exceptions import JSONPathRecursionError

if TYPE_CHECKING:
    from .env import JSONPathEnvironment
    from .match import JSONPathMatch
    from .selectors import JSONPathSelector
    from .token import Token


class JSONPathSegment(ABC):
    """Base class for all JSONPath segments."""

    __slots__ = ("env", "token", "selectors")

    def __init__(
        self,
        *,
        env: JSONPathEnvironment,
        token: Token,
        selectors: Tuple[JSONPathSelector, ...],
    ) -> None:
        self.env = env
        self.token = token
        self.selectors = selectors

    @abstractmethod
    def resolve(self, nodes: Iterable[JSONPathMatch]) -> Iterable[JSONPathMatch]:
        """Apply this segment to each `JSONPathMatch` in _nodes_."""

    @abstractmethod
    def resolve_async(
        self, nodes: AsyncIterable[JSONPathMatch]
    ) -> AsyncIterable[JSONPathMatch]:
        """An async version of `resolve`."""


class JSONPathChildSegment(JSONPathSegment):
    """The JSONPath child selection segment."""

    def resolve(self, nodes: Iterable[JSONPathMatch]) -> Iterable[JSONPathMatch]:
        """Select children of each node in _nodes_."""
        for node in nodes:
            for selector in self.selectors:
                yield from selector.resolve(node)

    async def resolve_async(
        self, nodes: AsyncIterable[JSONPathMatch]
    ) -> AsyncIterable[JSONPathMatch]:
        """An async version of `resolve`."""
        async for node in nodes:
            for selector in self.selectors:
                async for match in selector.resolve_async(node):
                    yield match

    def __str__(self) -> str:
        return f"[{', '.join(str(itm) for itm in self.selectors)}]"

    def __eq__(self, __value: object) -> bool:
        return (
            isinstance(__value, JSONPathChildSegment)
            and self.selectors == __value.selectors
            and self.token == __value.token
        )

    def __hash__(self) -> int:
        return hash((self.selectors, self.token))


class JSONPathRecursiveDescentSegment(JSONPathSegment):
    """The JSONPath recursive descent segment."""

    def resolve(self, nodes: Iterable[JSONPathMatch]) -> Iterable[JSONPathMatch]:
        """Select descendants of each node in _nodes_."""
        for node in nodes:
            for _node in self._visit(node):
                for selector in self.selectors:
                    yield from selector.resolve(_node)

    async def resolve_async(
        self, nodes: AsyncIterable[JSONPathMatch]
    ) -> AsyncIterable[JSONPathMatch]:
        """An async version of `resolve`."""
        async for node in nodes:
            for _node in self._visit(node):
                for selector in self.selectors:
                    async for match in selector.resolve_async(_node):
                        yield match

    def _visit(self, node: JSONPathMatch, depth: int = 1) -> Iterable[JSONPathMatch]:
        """Depth-first, pre-order node traversal."""
        if depth > self.env.max_recursion_depth:
            raise JSONPathRecursionError("recursion limit exceeded", token=self.token)

        yield node

        if isinstance(node.obj, Mapping):
            for name, val in node.obj.items():
                if isinstance(val, (Mapping, Sequence)):
                    _node = node.new_child(val, name)
                    yield from self._visit(_node, depth + 1)
        elif isinstance(node.obj, Sequence) and not isinstance(node.obj, str):
            for i, item in enumerate(node.obj):
                if isinstance(item, (Mapping, Sequence)):
                    _node = node.new_child(item, i)
                    yield from self._visit(_node, depth + 1)

    def __str__(self) -> str:
        return f"..[{', '.join(str(itm) for itm in self.selectors)}]"

    def __eq__(self, __value: object) -> bool:
        return (
            isinstance(__value, JSONPathRecursiveDescentSegment)
            and self.selectors == __value.selectors
            and self.token == __value.token
        )

    def __hash__(self) -> int:
        return hash(("..", self.selectors, self.token))