File: _pattern.py

package info (click to toggle)
python-jsonpath 2.0.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,028 kB
  • sloc: python: 9,473; makefile: 6
file content (112 lines) | stat: -rw-r--r-- 3,267 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from typing import List
from typing import Optional

try:
    import regex as re

    REGEX_AVAILABLE = True
except ImportError:
    import re  # type: ignore

    REGEX_AVAILABLE = False

try:
    from iregexp_check import check

    IREGEXP_AVAILABLE = True
except ImportError:
    IREGEXP_AVAILABLE = False

from jsonpath.exceptions import JSONPathError
from jsonpath.function_extensions import ExpressionType
from jsonpath.function_extensions import FilterFunction
from jsonpath.lru_cache import LRUCache
from jsonpath.lru_cache import ThreadSafeLRUCache


class AbstractRegexFilterFunction(FilterFunction):
    """Base class for filter function that accept regular expression arguments.

    Arguments:
        cache_capacity: The size of the regular expression cache.
        debug: When `True`, raise an exception when regex pattern compilation
            fails. The default - as required by RFC 9535 - is `False`, which
            silently ignores bad patterns.
        thread_safe: When `True`, use a `ThreadSafeLRUCache` instead of an
            instance of `LRUCache`.
    """

    arg_types = [ExpressionType.VALUE, ExpressionType.VALUE]
    return_type = ExpressionType.LOGICAL

    def __init__(
        self,
        *,
        cache_capacity: int = 300,
        debug: bool = False,
        thread_safe: bool = False,
    ):
        self.cache: LRUCache[str, Optional[re.Pattern]] = (  # type: ignore
            ThreadSafeLRUCache(capacity=cache_capacity)
            if thread_safe
            else LRUCache(capacity=cache_capacity)
        )

        self.debug = debug

    def check_cache(self, pattern: str) -> Optional[re.Pattern]:  # type: ignore
        """Return a compiled re pattern if `pattern` is valid, or `None` otherwise."""
        try:
            _pattern = self.cache[pattern]
        except KeyError:
            if IREGEXP_AVAILABLE and not check(pattern):
                if self.debug:
                    raise JSONPathError(
                        "search pattern is not a valid I-Regexp", token=None
                    ) from None
                _pattern = None
            else:
                if REGEX_AVAILABLE:
                    pattern = map_re(pattern)

                try:
                    _pattern = re.compile(pattern)
                except re.error:
                    if self.debug:
                        raise
                    _pattern = None

            self.cache[pattern] = _pattern

        return _pattern


def map_re(pattern: str) -> str:
    """Convert an I-Regexp pattern into a Python re pattern."""
    escaped = False
    char_class = False
    parts: List[str] = []
    for ch in pattern:
        if escaped:
            parts.append(ch)
            escaped = False
            continue

        if ch == ".":
            if not char_class:
                parts.append(r"(?:(?![\r\n])\P{Cs}|\p{Cs}\p{Cs})")
            else:
                parts.append(ch)
        elif ch == "\\":
            escaped = True
            parts.append(ch)
        elif ch == "[":
            char_class = True
            parts.append(ch)
        elif ch == "]":
            char_class = False
            parts.append(ch)
        else:
            parts.append(ch)

    return "".join(parts)