File: codec.py

package info (click to toggle)
python-a38 0.1.8-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 440 kB
  • sloc: python: 4,065; xml: 174; makefile: 80; sh: 14
file content (339 lines) | stat: -rw-r--r-- 10,119 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
from __future__ import annotations

import io
import json
import logging
import os
import subprocess
import tempfile

try:
    from defusedxml import ElementTree as ET
except ModuleNotFoundError:
    import xml.etree.ElementTree as ET

from typing import (Any, BinaryIO, Dict, List, Optional, Sequence, TextIO,
                    Type, Union)

try:
    import ruamel.yaml
    yaml = None
except ModuleNotFoundError:
    ruamel = None
    try:
        import yaml
    except ModuleNotFoundError:
        yaml = None

from . import crypto
from .fattura import auto_from_dict, auto_from_etree
from .models import Model

log = logging.getLogger("codec")

if ruamel is not None:
    def _load_yaml(fd: TextIO):
        yaml_loader = ruamel.yaml.YAML(typ="safe", pure=True)
        return yaml_loader.load(fd)

    def _write_yaml(data: Dict[str, Any], file: TextIO):
        yaml = ruamel.yaml.YAML(typ="safe")
        yaml.default_flow_style = False
        yaml.allow_unicode = True
        yaml.explicit_start = True
        yaml.dump(data, file)
elif yaml is not None:
    def _load_yaml(fd: TextIO):
        return yaml.load(fd, Loader=yaml.CLoader)

    def _write_yaml(data: Dict[str, Any], file: TextIO):
        yaml.dump(
            data, stream=file, default_flow_style=False, sort_keys=False,
            allow_unicode=True, explicit_start=True, Dumper=yaml.CDumper)
else:
    def _load_yaml(fd: TextIO):
        raise NotImplementedError("loading YAML requires ruamel.yaml or PyYAML to be installed")

    def _write_yaml(data: Dict[str, Any], file: TextIO):
        raise NotImplementedError("writing YAML requires ruamel.yaml or PyYAML to be installed")


class Codec:
    """
    Base class for format-specific reading and writing of fatture
    """
    # If True, file objects are expected to be open in binary mode
    binary = False

    def load(
            self,
            pathname: str,
            model: Optional[Type[Model]]) -> Model:
        """
        Load a fattura from a file.

        If model is provided it will be used for loading, otherwise the Model
        type will be autodetected
        """
        raise NotImplementedError(f"{self.__class__.__name__}.load is not implemented")

    def write_file(self, f: Model, file: Union[TextIO, BinaryIO]):
        """
        Write a fattura to the given file deescriptor.
        """
        raise NotImplementedError(f"{self.__class__.__name__}.write_file is not implemented")

    def save(self, f: Model, pathname: str):
        """
        Write a fattura to the given file
        """
        with open(pathname, "wb" if self.binary else "wt") as fd:
            self.write_file(f, fd)

    def interactive_edit(self, f: Model) -> Optional[Model]:
        """
        Edit the given model in an interactive editor, using the format of this
        codec
        """
        with io.StringIO() as orig:
            self.write_file(f, orig)
            return self.edit_buffer(orig.getvalue(), model=f.__class__)

    def edit_buffer(self, buf: str, model: Optional[Type[Model]] = None) -> Optional[Model]:
        """
        Open an editor on buf and return the edited fattura.

        Return None if editing did not change the contents.
        """
        editor = os.environ.get("EDITOR", "sensible-editor")

        current = buf
        error = None

        while True:
            with tempfile.NamedTemporaryFile(
                    mode="wt",
                    suffix=f".{self.EXTENSIONS[0]}") as tf:
                # Write out the current buffer
                tf.write(current)
                if error is not None:
                    tf.write(f"# ERROR: {error}")
                    error = None
                tf.flush()

                # Run the editor on it
                subprocess.run([editor, tf.name], check=True)

                # Reopen by name in case the editor did not write on the same
                # inode
                with open(tf.name, "rt") as fd:
                    lines = []
                    for line in fd:
                        if line.startswith("# ERROR: "):
                            continue
                        lines.append(line)
                    edited = "".join(lines)

                if edited == current:
                    return None

                try:
                    return self.load(tf.name, model=model)
                except Exception as e:
                    log.error("%s: cannot load edited file: %s", tf.name, e)
                    error = str(e)


class P7M(Codec):
    """
    P7M codec, that only supports loading
    """
    EXTENSIONS = ("p7m",)

    def load(
            self,
            pathname: str,
            model: Optional[Type[Model]] = None) -> Model:
        p7m = crypto.P7M(pathname)
        return p7m.get_fattura()


class JSON(Codec):
    """
    JSON codec.

    `indent` represents the JSON structure indentation, and can be None to
    output everything in a single line.

    `end` is a string that gets appended to the JSON structure.
    """
    EXTENSIONS = ("json",)

    def __init__(self, indent: Optional[int] = 1, end="\n"):
        self.indent = indent
        self.end = end

    def load(
            self,
            pathname: str,
            model: Optional[Type[Model]] = None) -> Model:
        with open(pathname, "rt") as fd:
            data = json.load(fd)
        if model:
            return model(**data)
        else:
            return auto_from_dict(data)

    def write_file(self, f: Model, file: TextIO):
        json.dump(f.to_jsonable(), file, indent=self.indent)
        if self.end is not None:
            file.write(self.end)


class YAML(Codec):
    """
    YAML codec
    """
    EXTENSIONS = ("yaml", "yml")

    def load(
            self,
            pathname: str,
            model: Optional[Type[Model]] = None) -> Model:
        with open(pathname, "rt") as fd:
            data = _load_yaml(fd)
        if model:
            return model(**data)
        else:
            return auto_from_dict(data)

    def write_file(self, f: Model, file: TextIO):
        _write_yaml(f.to_jsonable(), file)


class Python(Codec):
    """
    Python codec.

    `namespace` defines what namespace is used to refer to `a38` models. `None`
    means use a default, `False` means not to use a namespace, a string defines
    which namespace to use.

    `unformatted` can be set to True to skip code formatting.

    The code will be written with just the expression to build the fattura.

    The code assumes `import datetime` and `from decimal import Decimal`.

    If loadable is True, the file is written as a Python source that
    creates a `fattura` variable with the fattura, with all the imports that
    are needed. This generates a python file that can be loaded with load().

    Note that loading Python fatture executes arbitrary Python code!
    """
    EXTENSIONS = ("py",)

    def __init__(
            self, namespace: Union[None, bool, str] = "a38",
            unformatted: bool = False,
            loadable: bool = False):
        self.namespace = namespace
        self.unformatted = unformatted
        self.loadable = loadable

    def load(
            self,
            pathname: str,
            model: Optional[Type[Model]] = None) -> Model:
        with open(pathname, "rt") as fd:
            code = compile(fd.read(), pathname, 'exec')

        loc = {}
        exec(code, {}, loc)
        return loc["fattura"]

    def write_file(self, f: Model, file: TextIO):
        code = f.to_python(namespace=self.namespace)

        if not self.unformatted:
            try:
                from yapf.yapflib import yapf_api
            except ModuleNotFoundError:
                pass
            else:
                code, changed = yapf_api.FormatCode(code)

        if self.loadable:
            print("import datetime", file=file)
            print("from decimal import Decimal", file=file)
            if self.namespace:
                print("import", self.namespace, file=file)
            elif self.namespace is False:
                print("from a38.fattura import *", file=file)
            else:
                print("import a38", file=file)
            print(file=file)
            print("fattura = ", file=file, end="")
        print(code, file=file)


class XML(Codec):
    """
    XML codec
    """
    EXTENSIONS = ("xml",)

    binary = True

    def load(
            self,
            pathname: str,
            model: Optional[Type[Model]] = None) -> Model:
        tree = ET.parse(pathname)
        return auto_from_etree(tree.getroot())

    def write_file(self, f: Model, file: BinaryIO):
        tree = f.build_etree()
        tree.write(file, encoding="utf-8", xml_declaration=True)
        file.write(b"\n")


class Codecs:
    """
    A collection of codecs
    """
    ALL_CODECS = (XML, P7M, JSON, YAML, Python)

    def __init__(
            self,
            include: Optional[Sequence[Type[Codec]]] = None,
            exclude: Optional[Sequence[Type[Codec]]] = (Python,)):
        """
        if `include` is not None, only codecs in that list are used.

        If `exclude` is not None, all codecs are used except the given one.

        If neither `include` nor `exclude` are None, all codecs are used.

        By default, `exclude` is not None but it is set to exclude Python.
        """
        self.codecs: List[Type[Codec]]

        if include is not None and exclude is not None:
            raise ValueError("include and exclude cannot both be set")
        elif include is not None:
            self.codecs = list(include)
        elif exclude is not None:
            self.codecs = [c for c in self.ALL_CODECS if c not in exclude]
        else:
            self.codecs = list(self.ALL_CODECS)

    def codec_from_filename(self, pathname: str) -> Type[Codec]:
        """
        Infer a Codec class from the extension of the file at `pathname`.
        """
        ext = pathname.rsplit(".", 1)[1].lower()

        for c in self.codecs:
            if ext in c.EXTENSIONS:
                return c