File: util.py

package info (click to toggle)
python-dbus-next 0.2.3-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 696 kB
  • sloc: python: 6,018; makefile: 45; xml: 29
file content (162 lines) | stat: -rw-r--r-- 5,622 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
from typing import List, Any, Union
import inspect
from ..signature import SignatureTree, Variant
import ast


def signature_contains_type(signature: Union[str, SignatureTree], body: List[Any],
                            token: str) -> bool:
    '''For a given signature and body, check to see if it contains any members
    with the given token'''
    if type(signature) is str:
        signature = SignatureTree._get(signature)

    queue = []
    contains_variants = False
    for st in signature.types:
        queue.append(st)

    while True:
        if not queue:
            break
        st = queue.pop()
        if st.token == token:
            return True
        elif st.token == 'v':
            contains_variants = True
        queue.extend(st.children)

    if not contains_variants:
        return False

    for member in body:
        queue.append(member)

    while True:
        if not queue:
            return False
        member = queue.pop()
        if type(member) is Variant and \
                signature_contains_type(member.signature, [member.value], token):
            return True
        elif type(member) is list:
            queue.extend(member)
        elif type(member) is dict:
            queue.extend(member.values())


def replace_fds_with_idx(signature: Union[str, SignatureTree],
                         body: List[Any]) -> (List[Any], List[int]):
    '''Take the high level body format and convert it into the low level body
    format. Type 'h' refers directly to the fd in the body. Replace that with
    an index and return the corresponding list of unix fds that can be set on
    the Message'''
    if type(signature) is str:
        signature = SignatureTree._get(signature)

    if not signature_contains_type(signature, body, 'h'):
        return body, []

    unix_fds = []

    def _replace(fd):
        try:
            return unix_fds.index(fd)
        except ValueError:
            unix_fds.append(fd)
            return len(unix_fds) - 1

    _replace_fds(body, signature.types, _replace)

    return body, unix_fds


def replace_idx_with_fds(signature: Union[str, SignatureTree], body: List[Any],
                         unix_fds: List[int]) -> List[Any]:
    '''Take the low level body format and return the high level body format.
    Type 'h' refers to an index in the unix_fds array. Replace those with the
    actual file descriptor or `None` if one does not exist.'''
    if type(signature) is str:
        signature = SignatureTree._get(signature)

    if not signature_contains_type(signature, body, 'h'):
        return body

    def _replace(idx):
        try:
            return unix_fds[idx]
        except IndexError:
            return None

    _replace_fds(body, signature.types, _replace)

    return body


def parse_annotation(annotation: str) -> str:
    '''
    Because of PEP 563, if `from __future__ import annotations` is used in code
    or on Python version >=3.10 where this is the default, return annotations
    from the `inspect` module will return annotations as "forward definitions".
    In this case, we must eval the result which we do only when given a string
    constant.
    '''
    def raise_value_error():
        raise ValueError(f'service annotations must be a string constant (got {annotation})')

    if not annotation or annotation is inspect.Signature.empty:
        return ''
    if type(annotation) is not str:
        raise_value_error()
    try:
        body = ast.parse(annotation).body
        if len(body) == 1 and type(body[0].value) is ast.Constant:
            if type(body[0].value.value) is not str:
                raise_value_error()
            return body[0].value.value
    except SyntaxError:
        pass

    return annotation


def _replace_fds(body_obj: List[Any], children, replace_fn):
    '''Replace any type 'h' with the value returned by replace_fn() given the
    value of the fd field. This is used by the high level interfaces which
    allow type 'h' to be the fd directly instead of an index in an external
    array such as in the spec.'''
    for index, st in enumerate(children):
        if not any(sig in st.signature for sig in 'hv'):
            continue
        if st.signature == 'h':
            body_obj[index] = replace_fn(body_obj[index])
        elif st.token == 'a':
            if st.children[0].token == '{':
                _replace_fds(body_obj[index], st.children, replace_fn)
            else:
                for i, child in enumerate(body_obj[index]):
                    if st.signature == 'ah':
                        body_obj[index][i] = replace_fn(child)
                    else:
                        _replace_fds([child], st.children, replace_fn)
        elif st.token in '(':
            _replace_fds(body_obj[index], st.children, replace_fn)
        elif st.token in '{':
            for key, value in list(body_obj.items()):
                body_obj.pop(key)
                if st.children[0].signature == 'h':
                    key = replace_fn(key)
                if st.children[1].signature == 'h':
                    value = replace_fn(value)
                else:
                    _replace_fds([value], [st.children[1]], replace_fn)
                body_obj[key] = value

        elif st.signature == 'v':
            if body_obj[index].signature == 'h':
                body_obj[index].value = replace_fn(body_obj[index].value)
            else:
                _replace_fds([body_obj[index].value], [body_obj[index].type], replace_fn)

        elif st.children:
            _replace_fds(body_obj[index], st.children, replace_fn)