File: parsing.py

package info (click to toggle)
knot-resolver 6.0.17-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 16,376 kB
  • sloc: javascript: 42,732; ansic: 40,311; python: 12,580; cpp: 2,121; sh: 1,988; xml: 193; makefile: 181
file content (112 lines) | stat: -rw-r--r-- 4,614 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import json
from enum import Enum, auto
from typing import Any, Dict, List, Optional, Tuple, Union

import yaml
from yaml.constructor import ConstructorError
from yaml.nodes import MappingNode

from .exceptions import DataParsingError, DataValidationError
from .renaming import Renamed, renamed


# custom hook for 'json.loads()' to detect duplicate keys in data
# source: https://stackoverflow.com/q/14902299/12858520
def _json_raise_duplicates(pairs: List[Tuple[Any, Any]]) -> Optional[Any]:
    dict_out: Dict[Any, Any] = {}
    for key, val in pairs:
        if key in dict_out:
            raise DataParsingError(f"Duplicate attribute key detected: {key}")
        dict_out[key] = val
    return dict_out


# custom loader for 'yaml.load()' to detect duplicate keys in data
# source: https://gist.github.com/pypt/94d747fe5180851196eb
class _RaiseDuplicatesLoader(yaml.SafeLoader):
    def construct_mapping(self, node: Union[MappingNode, Any], deep: bool = False) -> Dict[Any, Any]:
        if not isinstance(node, MappingNode):
            raise ConstructorError(None, None, f"expected a mapping node, but found {node.id}", node.start_mark)
        mapping: Dict[Any, Any] = {}
        for key_node, value_node in node.value:
            key = self.construct_object(key_node, deep=deep)  # type: ignore
            # we need to check, that the key object can be used in a hash table
            try:
                _ = hash(key)  # type: ignore
            except TypeError as exc:
                raise ConstructorError(
                    "while constructing a mapping",
                    node.start_mark,
                    f"found unacceptable key ({exc})",
                    key_node.start_mark,
                ) from exc

            # check for duplicate keys
            if key in mapping:
                raise DataParsingError(f"duplicate key detected: {key_node.start_mark}")
            value = self.construct_object(value_node, deep=deep)  # type: ignore
            mapping[key] = value
        return mapping


class DataFormat(Enum):
    YAML = auto()
    JSON = auto()

    def parse_to_dict(self, text: str) -> Any:
        if self is DataFormat.YAML:
            # RaiseDuplicatesLoader extends yaml.SafeLoader, so this should be safe
            # https://python.land/data-processing/python-yaml#PyYAML_safe_load_vs_load
            return renamed(yaml.load(text, Loader=_RaiseDuplicatesLoader))  # type: ignore
        if self is DataFormat.JSON:
            return renamed(json.loads(text, object_pairs_hook=_json_raise_duplicates))
        raise NotImplementedError(f"Parsing of format '{self}' is not implemented")

    def dict_dump(self, data: Union[Dict[str, Any], Renamed], indent: Optional[int] = None) -> str:
        if isinstance(data, Renamed):
            data = data.original()

        if self is DataFormat.YAML:
            return yaml.safe_dump(data, indent=indent)  # type: ignore
        if self is DataFormat.JSON:
            return json.dumps(data, indent=indent)
        raise NotImplementedError(f"Exporting to '{self}' format is not implemented")


def parse_yaml(data: str) -> Any:
    return DataFormat.YAML.parse_to_dict(data)


def parse_json(data: str) -> Any:
    return DataFormat.JSON.parse_to_dict(data)


def try_to_parse(data: str) -> Any:
    """Attempt to parse the data as a JSON or YAML string."""

    try:
        return parse_json(data)
    except json.JSONDecodeError as je:
        try:
            return parse_yaml(data)
        except yaml.YAMLError as ye:
            # We do not raise-from here because there are two possible causes
            # and we may not know which one is the actual one.
            raise DataParsingError(  # pylint: disable=raise-missing-from
                f"failed to parse data, JSON: {je}, YAML: {ye}"
            ) from ye


def data_combine(data: Dict[Any, Any], additional_data: Dict[Any, Any], object_path: str = "") -> Dict[Any, Any]:
    """Combine dictionaries data"""
    for key in additional_data:
        if key in data:
            # if both are dictionaries we can try to combine them deeper
            if isinstance(data[key], (Dict, dict)) and isinstance(additional_data[key], (Dict, dict)):
                data[key] = data_combine(data[key], additional_data[key], f"{object_path}/{key}").copy()
                continue
            # otherwise we cannot combine them
            raise DataValidationError(f"duplicity key '{key}' with value in data", object_path)
        val = additional_data[key]
        data[key] = val.copy() if hasattr(val, "copy") else val
    return data