1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
|
"""JSONPath child and descendant segment definitions."""
from __future__ import annotations
from abc import ABC
from abc import abstractmethod
from typing import TYPE_CHECKING
from typing import AsyncIterable
from typing import Iterable
from typing import Mapping
from typing import Sequence
from typing import Tuple
from .exceptions import JSONPathRecursionError
if TYPE_CHECKING:
from .env import JSONPathEnvironment
from .match import JSONPathMatch
from .selectors import JSONPathSelector
from .token import Token
class JSONPathSegment(ABC):
"""Base class for all JSONPath segments."""
__slots__ = ("env", "token", "selectors")
def __init__(
self,
*,
env: JSONPathEnvironment,
token: Token,
selectors: Tuple[JSONPathSelector, ...],
) -> None:
self.env = env
self.token = token
self.selectors = selectors
@abstractmethod
def resolve(self, nodes: Iterable[JSONPathMatch]) -> Iterable[JSONPathMatch]:
"""Apply this segment to each `JSONPathMatch` in _nodes_."""
@abstractmethod
def resolve_async(
self, nodes: AsyncIterable[JSONPathMatch]
) -> AsyncIterable[JSONPathMatch]:
"""An async version of `resolve`."""
class JSONPathChildSegment(JSONPathSegment):
"""The JSONPath child selection segment."""
def resolve(self, nodes: Iterable[JSONPathMatch]) -> Iterable[JSONPathMatch]:
"""Select children of each node in _nodes_."""
for node in nodes:
for selector in self.selectors:
yield from selector.resolve(node)
async def resolve_async(
self, nodes: AsyncIterable[JSONPathMatch]
) -> AsyncIterable[JSONPathMatch]:
"""An async version of `resolve`."""
async for node in nodes:
for selector in self.selectors:
async for match in selector.resolve_async(node):
yield match
def __str__(self) -> str:
return f"[{', '.join(str(itm) for itm in self.selectors)}]"
def __eq__(self, __value: object) -> bool:
return (
isinstance(__value, JSONPathChildSegment)
and self.selectors == __value.selectors
and self.token == __value.token
)
def __hash__(self) -> int:
return hash((self.selectors, self.token))
class JSONPathRecursiveDescentSegment(JSONPathSegment):
"""The JSONPath recursive descent segment."""
def resolve(self, nodes: Iterable[JSONPathMatch]) -> Iterable[JSONPathMatch]:
"""Select descendants of each node in _nodes_."""
for node in nodes:
for _node in self._visit(node):
for selector in self.selectors:
yield from selector.resolve(_node)
async def resolve_async(
self, nodes: AsyncIterable[JSONPathMatch]
) -> AsyncIterable[JSONPathMatch]:
"""An async version of `resolve`."""
async for node in nodes:
for _node in self._visit(node):
for selector in self.selectors:
async for match in selector.resolve_async(_node):
yield match
def _visit(self, node: JSONPathMatch, depth: int = 1) -> Iterable[JSONPathMatch]:
"""Depth-first, pre-order node traversal."""
if depth > self.env.max_recursion_depth:
raise JSONPathRecursionError("recursion limit exceeded", token=self.token)
yield node
if isinstance(node.obj, Mapping):
for name, val in node.obj.items():
if isinstance(val, (Mapping, Sequence)):
_node = node.new_child(val, name)
yield from self._visit(_node, depth + 1)
elif isinstance(node.obj, Sequence) and not isinstance(node.obj, str):
for i, item in enumerate(node.obj):
if isinstance(item, (Mapping, Sequence)):
_node = node.new_child(item, i)
yield from self._visit(_node, depth + 1)
def __str__(self) -> str:
return f"..[{', '.join(str(itm) for itm in self.selectors)}]"
def __eq__(self, __value: object) -> bool:
return (
isinstance(__value, JSONPathRecursiveDescentSegment)
and self.selectors == __value.selectors
and self.token == __value.token
)
def __hash__(self) -> int:
return hash(("..", self.selectors, self.token))
|