1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
|
from typing import List, Any, Union
import inspect
from ..signature import SignatureTree, Variant
import ast
def signature_contains_type(signature: Union[str, SignatureTree], body: List[Any],
token: str) -> bool:
'''For a given signature and body, check to see if it contains any members
with the given token'''
if type(signature) is str:
signature = SignatureTree._get(signature)
queue = []
contains_variants = False
for st in signature.types:
queue.append(st)
while True:
if not queue:
break
st = queue.pop()
if st.token == token:
return True
elif st.token == 'v':
contains_variants = True
queue.extend(st.children)
if not contains_variants:
return False
for member in body:
queue.append(member)
while True:
if not queue:
return False
member = queue.pop()
if type(member) is Variant and \
signature_contains_type(member.signature, [member.value], token):
return True
elif type(member) is list:
queue.extend(member)
elif type(member) is dict:
queue.extend(member.values())
def replace_fds_with_idx(signature: Union[str, SignatureTree],
body: List[Any]) -> (List[Any], List[int]):
'''Take the high level body format and convert it into the low level body
format. Type 'h' refers directly to the fd in the body. Replace that with
an index and return the corresponding list of unix fds that can be set on
the Message'''
if type(signature) is str:
signature = SignatureTree._get(signature)
if not signature_contains_type(signature, body, 'h'):
return body, []
unix_fds = []
def _replace(fd):
try:
return unix_fds.index(fd)
except ValueError:
unix_fds.append(fd)
return len(unix_fds) - 1
_replace_fds(body, signature.types, _replace)
return body, unix_fds
def replace_idx_with_fds(signature: Union[str, SignatureTree], body: List[Any],
unix_fds: List[int]) -> List[Any]:
'''Take the low level body format and return the high level body format.
Type 'h' refers to an index in the unix_fds array. Replace those with the
actual file descriptor or `None` if one does not exist.'''
if type(signature) is str:
signature = SignatureTree._get(signature)
if not signature_contains_type(signature, body, 'h'):
return body
def _replace(idx):
try:
return unix_fds[idx]
except IndexError:
return None
_replace_fds(body, signature.types, _replace)
return body
def parse_annotation(annotation: str) -> str:
'''
Because of PEP 563, if `from __future__ import annotations` is used in code
or on Python version >=3.10 where this is the default, return annotations
from the `inspect` module will return annotations as "forward definitions".
In this case, we must eval the result which we do only when given a string
constant.
'''
def raise_value_error():
raise ValueError(f'service annotations must be a string constant (got {annotation})')
if not annotation or annotation is inspect.Signature.empty:
return ''
if type(annotation) is not str:
raise_value_error()
try:
body = ast.parse(annotation).body
if len(body) == 1 and type(body[0].value) is ast.Constant:
if type(body[0].value.value) is not str:
raise_value_error()
return body[0].value.value
except SyntaxError:
pass
return annotation
def _replace_fds(body_obj: List[Any], children, replace_fn):
'''Replace any type 'h' with the value returned by replace_fn() given the
value of the fd field. This is used by the high level interfaces which
allow type 'h' to be the fd directly instead of an index in an external
array such as in the spec.'''
for index, st in enumerate(children):
if not any(sig in st.signature for sig in 'hv'):
continue
if st.signature == 'h':
body_obj[index] = replace_fn(body_obj[index])
elif st.token == 'a':
if st.children[0].token == '{':
_replace_fds(body_obj[index], st.children, replace_fn)
else:
for i, child in enumerate(body_obj[index]):
if st.signature == 'ah':
body_obj[index][i] = replace_fn(child)
else:
_replace_fds([child], st.children, replace_fn)
elif st.token in '(':
_replace_fds(body_obj[index], st.children, replace_fn)
elif st.token in '{':
for key, value in list(body_obj.items()):
body_obj.pop(key)
if st.children[0].signature == 'h':
key = replace_fn(key)
if st.children[1].signature == 'h':
value = replace_fn(value)
else:
_replace_fds([value], [st.children[1]], replace_fn)
body_obj[key] = value
elif st.signature == 'v':
if body_obj[index].signature == 'h':
body_obj[index].value = replace_fn(body_obj[index].value)
else:
_replace_fds([body_obj[index].value], [body_obj[index].type], replace_fn)
elif st.children:
_replace_fds(body_obj[index], st.children, replace_fn)
|