1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
|
import json
from enum import Enum, auto
from typing import Any, Dict, List, Optional, Tuple, Union
import yaml
from yaml.constructor import ConstructorError
from yaml.nodes import MappingNode
from .exceptions import DataParsingError, DataValidationError
from .renaming import Renamed, renamed
# custom hook for 'json.loads()' to detect duplicate keys in data
# source: https://stackoverflow.com/q/14902299/12858520
def _json_raise_duplicates(pairs: List[Tuple[Any, Any]]) -> Optional[Any]:
dict_out: Dict[Any, Any] = {}
for key, val in pairs:
if key in dict_out:
raise DataParsingError(f"Duplicate attribute key detected: {key}")
dict_out[key] = val
return dict_out
# custom loader for 'yaml.load()' to detect duplicate keys in data
# source: https://gist.github.com/pypt/94d747fe5180851196eb
class _RaiseDuplicatesLoader(yaml.SafeLoader):
def construct_mapping(self, node: Union[MappingNode, Any], deep: bool = False) -> Dict[Any, Any]:
if not isinstance(node, MappingNode):
raise ConstructorError(None, None, f"expected a mapping node, but found {node.id}", node.start_mark)
mapping: Dict[Any, Any] = {}
for key_node, value_node in node.value:
key = self.construct_object(key_node, deep=deep) # type: ignore
# we need to check, that the key object can be used in a hash table
try:
_ = hash(key) # type: ignore
except TypeError as exc:
raise ConstructorError(
"while constructing a mapping",
node.start_mark,
f"found unacceptable key ({exc})",
key_node.start_mark,
) from exc
# check for duplicate keys
if key in mapping:
raise DataParsingError(f"duplicate key detected: {key_node.start_mark}")
value = self.construct_object(value_node, deep=deep) # type: ignore
mapping[key] = value
return mapping
class DataFormat(Enum):
YAML = auto()
JSON = auto()
def parse_to_dict(self, text: str) -> Any:
if self is DataFormat.YAML:
# RaiseDuplicatesLoader extends yaml.SafeLoader, so this should be safe
# https://python.land/data-processing/python-yaml#PyYAML_safe_load_vs_load
return renamed(yaml.load(text, Loader=_RaiseDuplicatesLoader)) # type: ignore
if self is DataFormat.JSON:
return renamed(json.loads(text, object_pairs_hook=_json_raise_duplicates))
raise NotImplementedError(f"Parsing of format '{self}' is not implemented")
def dict_dump(self, data: Union[Dict[str, Any], Renamed], indent: Optional[int] = None) -> str:
if isinstance(data, Renamed):
data = data.original()
if self is DataFormat.YAML:
return yaml.safe_dump(data, indent=indent) # type: ignore
if self is DataFormat.JSON:
return json.dumps(data, indent=indent)
raise NotImplementedError(f"Exporting to '{self}' format is not implemented")
def parse_yaml(data: str) -> Any:
return DataFormat.YAML.parse_to_dict(data)
def parse_json(data: str) -> Any:
return DataFormat.JSON.parse_to_dict(data)
def try_to_parse(data: str) -> Any:
"""Attempt to parse the data as a JSON or YAML string."""
try:
return parse_json(data)
except json.JSONDecodeError as je:
try:
return parse_yaml(data)
except yaml.YAMLError as ye:
# We do not raise-from here because there are two possible causes
# and we may not know which one is the actual one.
raise DataParsingError( # pylint: disable=raise-missing-from
f"failed to parse data, JSON: {je}, YAML: {ye}"
) from ye
def data_combine(data: Dict[Any, Any], additional_data: Dict[Any, Any], object_path: str = "") -> Dict[Any, Any]:
"""Combine dictionaries data"""
for key in additional_data:
if key in data:
# if both are dictionaries we can try to combine them deeper
if isinstance(data[key], (Dict, dict)) and isinstance(additional_data[key], (Dict, dict)):
data[key] = data_combine(data[key], additional_data[key], f"{object_path}/{key}").copy()
continue
# otherwise we cannot combine them
raise DataValidationError(f"duplicity key '{key}' with value in data", object_path)
val = additional_data[key]
data[key] = val.copy() if hasattr(val, "copy") else val
return data
|