File: reader.py

package info (click to toggle)
python-aioraven 0.7.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 216 kB
  • sloc: python: 2,428; makefile: 5
file content (101 lines) | stat: -rw-r--r-- 3,180 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# Copyright 2022 Scott K Logan
# Licensed under the Apache License, Version 2.0

from asyncio.events import AbstractEventLoop
from asyncio.events import get_event_loop
from asyncio.futures import Future
from typing import Dict
from typing import List
from typing import Optional
import xml.etree.ElementTree as Et

from aioraven.data import RAVEnData


class RAVEnReader:
    """Read stanzas from a RAVEn device."""

    _exception: Optional[Exception]
    _waiters: Dict[Optional[str], List[Future[Optional[RAVEnData]]]]

    def __init__(self, loop: Optional[AbstractEventLoop] = None) -> None:
        """
        Construct a RAVEnReader.

        :param loop: The event loop instance to use.
        """
        if loop is None:
            self._loop = get_event_loop()
        else:
            self._loop = loop
        self._eof = False
        self._waiters = {None: []}
        self._exception = None

    def __repr__(self) -> str:
        info = [self.__class__.__name__]
        if self._eof:
            info.append('eof')
        if self._exception:
            info.append('e=%r' % self._exception)
        num_waiters = sum(len(w) for w in self._waiters.values())
        if num_waiters:
            info.append('w=%d' % num_waiters)
        return '<%s>' % ' '.join(info)

    def exception(self) -> Optional[Exception]:
        return self._exception

    def set_exception(self, exc: Exception) -> None:
        # Only ParseError is recoverable
        if not isinstance(exc, Et.ParseError):
            self._exception = exc
        for waiters in self._waiters.values():
            while waiters:
                waiter = waiters.pop(0)
                if not waiter.cancelled():
                    waiter.set_exception(exc)

    def feed_eof(self) -> None:
        self._eof = True
        for waiters in self._waiters.values():
            while waiters:
                waiter = waiters.pop(0)
                if not waiter.cancelled():
                    waiter.set_result(None)

    def feed_element(self, data: Et.Element) -> None:
        self._waiters.setdefault(data.tag, [])
        waiters = self._waiters.get(data.tag, [])
        res: RAVEnData = {}
        for e in data:
            if e.tag in res:
                orig = res[e.tag]
                if not isinstance(orig, list):
                    res[e.tag] = [orig, e.text]
                else:
                    orig.append(e.text)
            else:
                res[e.tag] = e.text
        while waiters:
            waiter = waiters.pop(0)
            if not waiter.cancelled():
                waiter.set_result(res)
                break
        else:
            waiters = self._waiters[None]
            if waiters:
                waiters.pop(0).set_result(res)

    async def read_tag(
        self,
        tag: Optional[str] = None,
    ) -> Optional[RAVEnData]:
        if self._eof:
            return None
        if self._exception is not None:
            raise self._exception
        self._waiters.setdefault(tag, [])
        waiter: Future[Optional[RAVEnData]] = self._loop.create_future()
        self._waiters[tag].append(waiter)
        return await waiter