1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
|
import re
from typing import Any, Final, Optional
from jsonpath_ng.ext import parse
from jsonpath_ng.jsonpath import Index
from moto.stepfunctions.parser.asl.utils.encoding import to_json_str
_PATTERN_SINGLETON_ARRAY_ACCESS_OUTPUT: Final[str] = r"\[\d+\]$"
_PATTERN_SLICE_OR_WILDCARD_ACCESS = (
r"\$(?:\.[^[]+\[(?:\*|\d*:\d*)\]|\[\*\])(?:\.[^[]+)*$"
)
def _is_singleton_array_access(path: str) -> bool:
# Returns true if the json path terminates with a literal singleton array access.
return bool(re.search(_PATTERN_SINGLETON_ARRAY_ACCESS_OUTPUT, path))
def _contains_slice_or_wildcard_array(path: str) -> bool:
# Returns true if the json path contains a slice or wildcard in the array.
# Slices at the root are discarded, but wildcard at the root is allowed.
return bool(re.search(_PATTERN_SLICE_OR_WILDCARD_ACCESS, path))
class NoSuchJsonPathError(Exception):
json_path: Final[str]
data: Final[Any]
_message: Optional[str]
def __init__(self, json_path: str, data: Any):
self.json_path = json_path
self.data = data
self._message = None
@property
def message(self) -> str:
if self._message is None:
data_json_str = to_json_str(self.data)
self._message = f"The JSONPath '{self.json_path}' could not be found in the input '{data_json_str}'"
return self._message
def __str__(self):
return self.message
def extract_json(path: str, data: Any) -> Any:
input_expr = parse(path)
matches = input_expr.find(data)
if not matches:
if _contains_slice_or_wildcard_array(path):
return []
raise NoSuchJsonPathError(json_path=path, data=data)
if (
len(matches) > 1
or isinstance(matches[0].path, Index)
# Last condition is different from LS and fixes a very specific bug
# https://github.com/getmoto/moto/issues/7825
or (matches[0].context and isinstance(matches[0].context.path, Index))
):
value = [match.value for match in matches]
# AWS StepFunctions breaks jsonpath specifications and instead
# unpacks literal singleton array accesses.
if _is_singleton_array_access(path=path) and len(value) == 1:
value = value[0]
else:
value = matches[0].value
return value
|