File: data_types.py

package info (click to toggle)
python-ical 12.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,776 kB
  • sloc: python: 15,157; sh: 9; makefile: 5
file content (187 lines) | stat: -rw-r--r-- 6,922 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
"""Library for parsing and encoding rfc5545 types."""

from __future__ import annotations

import logging
from collections.abc import Callable
from typing import Any, Protocol, TypeVar, get_origin

from pydantic import BaseModel, SerializationInfo
from pydantic.fields import FieldInfo

from ical.parsing.property import ParsedProperty, ParsedPropertyParameter
from ical.util import get_field_type

_LOGGER = logging.getLogger(__name__)

T_TYPE = TypeVar("T_TYPE", bound=type)


class DataType(Protocol):
    """Defines the protocol implemented by data types in this library.

    The methods defined in this protocol are all optional.
    """

    @classmethod
    def __property_type__(cls) -> type:
        """Defines the python type to match, if different from the type itself."""

    @classmethod
    def __parse_property_value__(cls, prop: ParsedProperty) -> type:
        """Parse the specified property value as a python type."""

    @classmethod
    def __encode_property_json__(cls, value: Any) -> str | dict[str, str]:
        """Encode the property during pydantic serialization to object model."""

    @classmethod
    def __encode_property_value__(cls, value: Any) -> str | None:
        """Encoded the property from the object model to the ics string value."""

    @classmethod
    def __encode_property_params__(
        cls, model_data: dict[str, Any]
    ) -> list[ParsedPropertyParameter]:
        """Encode the property parameters from the object model."""


class Registry:
    """Registry of data types."""

    def __init__(
        self,
    ) -> None:
        """Initialize Registry."""
        self._items: dict[str, type] = {}
        self._parse_property_value: dict[type, Callable[[ParsedProperty], Any]] = {}
        self._parse_parameter_by_name: dict[str, Callable[[ParsedProperty], Any]] = {}
        self._encode_property_json: dict[
            type, Callable[[Any], str | dict[str, str]]
        ] = {}
        self._encode_property_value: dict[type, Callable[[Any], str | None]] = {}
        self._encode_property_params: dict[
            type, Callable[[dict[str, Any]], list[ParsedPropertyParameter]]
        ] = {}
        self._disable_value_param: set[type] = set()
        self._parse_order: dict[type, int] = {}

    def register(
        self,
        name: str | None = None,
        disable_value_param: bool = False,
        parse_order: int | None = None,
    ) -> Callable[[T_TYPE], T_TYPE]:
        """Return decorator to register a type.

        The name when specified is the Property Data Type value name.
        """

        def decorator(func: T_TYPE) -> T_TYPE:
            """Register decorated function."""
            if name:
                self._items[name] = func
            data_type = func
            if data_type_func := getattr(func, "__property_type__", None):
                data_type = data_type_func()
            if parse_property_value := getattr(func, "__parse_property_value__", None):
                self._parse_property_value[data_type] = parse_property_value
                if name:
                    self._parse_parameter_by_name[name] = parse_property_value
            if encode_property_json := getattr(func, "__encode_property_json__", None):
                self._encode_property_json[data_type] = encode_property_json
            if encode_property_value := getattr(
                func, "__encode_property_value__", None
            ):
                self._encode_property_value[data_type] = encode_property_value
            if encode_property_params := getattr(
                func, "__encode_property_params__", None
            ):
                self._encode_property_params[data_type] = encode_property_params
            if disable_value_param:
                self._disable_value_param |= set({data_type})
            if parse_order:
                self._parse_order[data_type] = parse_order
            return func

        return decorator

    @property
    def parse_property_value(self) -> dict[type, Callable[[ParsedProperty], Any]]:
        """Registry of python types to functions to parse into pydantic model."""
        return self._parse_property_value

    @property
    def parse_parameter_by_name(self) -> dict[str, Callable[[ParsedProperty], Any]]:
        """Registry based on data value type string name."""
        return self._parse_parameter_by_name

    @property
    def encode_property_json(self) -> dict[type, Callable[[Any], str | dict[str, str]]]:
        """Registry of encoders run during pydantic json serialization."""
        return self._encode_property_json

    @property
    def encode_property_value(self) -> dict[type, Callable[[Any], str | None]]:
        """Registry of encoders that run on the output data model to ics."""
        return self._encode_property_value

    @property
    def encode_property_params(
        self,
    ) -> dict[type, Callable[[dict[str, Any]], list[ParsedPropertyParameter]]]:
        """Registry of property parameter encoders run on output data model."""
        return self._encode_property_params

    @property
    def disable_value_param(self) -> set[type]:
        """Return set of types that do not allow VALUE overrides by component parsing."""
        return self._disable_value_param

    @property
    def parse_order(self) -> dict[type, int]:
        """Return the parse ordering of the specified type."""
        return self._parse_order


DATA_TYPE: Registry = Registry()


def encode_model_property_params(
    fields: dict[str, FieldInfo], model_data: dict[str, Any]
) -> list[ParsedPropertyParameter]:
    """Encode a pydantic model's parameters as property params."""
    params = []
    for name, field in fields.items():
        key = field.alias or name
        if key == "value" or (values := model_data.get(key)) is None:
            continue
        annotation = get_field_type(field.annotation)
        origin = get_origin(annotation)
        if origin is not list:
            values = [values]
        if annotation is bool:
            encoder = DATA_TYPE.encode_property_value[bool]
            values = [encoder(value) for value in values]
        params.append(ParsedPropertyParameter(name=key, values=values))
    return params


def serialize_field(self: BaseModel, value: Any, info: SerializationInfo) -> Any:
    if not info.context or not info.context.get("ics"):
        return value
    if isinstance(value, list):
        res = []
        for val in value:
            for base in val.__class__.__mro__[:-1]:
                if (func := DATA_TYPE.encode_property_json.get(base)) is not None:
                    res.append(func(val))
                    break
            else:
                res.append(val)
        return res

    for base in value.__class__.__mro__[:-1]:
        if (func := DATA_TYPE.encode_property_json.get(base)) is not None:
            return func(value)
    return value