File: scanner.py

package info (click to toggle)
python-varlink 31.0.0-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 372 kB
  • sloc: python: 2,462; sh: 177; makefile: 31
file content (447 lines) | stat: -rw-r--r-- 14,750 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
#!-*-coding:utf8-*-
from __future__ import print_function
from __future__ import unicode_literals

try:
    from builtins import str
    from builtins import int
    from builtins import object
    from builtins import unicode
except ImportError:
    pass

import re

try:
    basestring
except NameError:
    basestring = str

try:
    from types import SimpleNamespace
except:  # Python 2
    from argparse import Namespace as SimpleNamespace

try:
    from collections.abc import (Set, Mapping)
except:  # Python 2
    from collections import (Set, Mapping)

from collections import OrderedDict

from .error import (MethodNotFound, InvalidParameter)


class Scanner(object):
    """Class for scanning a varlink interface definition."""

    def __init__(self, string):
        if hasattr(re, "ASCII"):
            ASCII = re.ASCII
        else:
            ASCII = 0
        self.whitespace = re.compile(r'([ \t\n]|#.*$)+', ASCII | re.MULTILINE)
        self.docstring = re.compile(r'(?:.?)+#(.*)(?:\n|\r\n)')
        # FIXME: nested ()
        self.method_signature = re.compile(r'([ \t\n]|#.*$)*(\([^)]*\))([ \t\n]|#.*$)*->([ \t\n]|#.*$)*(\([^)]*\))',
                                           ASCII | re.MULTILINE)

        self.keyword_pattern = re.compile(r'\b[a-z]+\b|[:,(){}]|->|\[\]|\?|\[string\]\(\)|\[string\]', ASCII)
        self.patterns = {
            'interface-name': re.compile(r'[A-Za-z]([A-Za-z])*([.][A-Za-z0-9]([-]*[A-Za-z0-9])*)+|xn--([0-9a-z])*([.][A-Za-z0-9]([-]*[A-Za-z0-9])*)+'),
            'member-name': re.compile(r'\b[A-Z][A-Za-z0-9]*\b', ASCII),
            'identifier': re.compile(r'\b[A-Za-z]([_]?[A-Za-z0-9])*\b', ASCII),
        }

        self.string = string
        self.pos = 0
        self.current_doc = ""

    def get(self, expected):
        m = self.whitespace.match(self.string, self.pos)
        if m:
            doc = self.docstring.findall(self.string[m.start():m.end()])
            if len(doc):
                try:
                    self.current_doc += "\n".join(doc)
                except UnicodeError:
                    self.current_doc += "\n".join(
                            [el.decode("utf-8") for el in doc])
            self.pos = m.end()

        pattern = self.patterns.get(expected)
        if pattern:
            m = pattern.match(self.string, self.pos)
            if m:
                self.pos = m.end()
                return m.group(0)
        else:
            m = self.keyword_pattern.match(self.string, self.pos)
            if m and m.group(0) == expected:
                self.pos = m.end()
                return True

    def expect(self, expected):
        value = self.get(expected)
        if not value:
            raise SyntaxError("expected '{}'".format(expected))
        return value

    def end(self):
        m = self.whitespace.match(self.string, self.pos)
        if m:
            doc = self.docstring.findall(self.string[m.start():m.end()])
            if len(doc):
                try:
                    self.current_doc += "\n".join(doc)
                except UnicodeError:
                    self.current_doc += "\n".join(
                            [el.decode("utf-8") for el in doc])
            self.pos = m.end()

        return self.pos >= len(self.string)

    def read_type(self, lastmaybe=False):
        if self.get('?'):
            if lastmaybe:
                raise SyntaxError("double '??'")
            return _Maybe(self.read_type(lastmaybe=True))

        if self.get('[string]()'):
            return set()

        if self.get('[string]'):
            return _Dict(self.read_type())

        if self.get('[]'):
            return _Array(self.read_type())

        if self.get('object'):
            return _Object()

        if self.get('bool'):
            t = bool()
        elif self.get('int'):
            t = int()
        elif self.get('float'):
            t = float()
        elif self.get('string'):
            t = str()
        else:
            name = self.get('member-name')
            if name:
                t = _CustomType(name)
            else:
                t = self.read_struct()

        return t

    def read_struct(self):
        _isenum = None
        self.expect('(')
        fields = OrderedDict()
        if not self.get(')'):
            while True:
                name = self.expect('identifier')
                if _isenum == None:
                    if self.get(':'):
                        _isenum = False
                        fields[name] = self.read_type()
                        if not self.get(','):
                            break
                        continue
                    elif self.get(','):
                        _isenum = True
                        fields[name] = True
                        continue
                    else:
                        raise SyntaxError("after '{}'".format(name))
                elif not _isenum:
                    try:
                        self.expect(':')
                        fields[name] = self.read_type()
                    except SyntaxError as e:
                        raise SyntaxError("after '{}': {}".format(name, e))
                else:
                    fields[name] = True

                if not self.get(','):
                    break
            self.expect(')')
        if _isenum:
            return _Enum(fields.keys())
        else:
            return _Struct(fields)

    def read_member(self):
        if self.get('type'):
            try:
                _name = self.expect('member-name')
            except SyntaxError:
                m = self.whitespace.match(self.string, self.pos)
                if m:
                    start = m.end()
                else:
                    start = self.pos
                m = self.whitespace.search(self.string, start)
                if m:
                    stop = m.start()
                else:
                    stop = start

                raise SyntaxError("'{}' not a valid type name.".format(self.string[start:stop]))
            try:
                _type = self.read_type()
            except SyntaxError as e:
                raise SyntaxError("in '{}': {}".format(_name, e))
            doc = self.current_doc
            self.current_doc = ""
            return _Alias(_name, _type, doc)
        elif self.get('method'):
            name = self.expect('member-name')
            # FIXME
            sig = self.method_signature.match(self.string, self.pos)
            if sig:
                sig = name + sig.group(0)
            in_type = self.read_struct()
            self.expect('->')
            out_type = self.read_struct()
            doc = self.current_doc
            self.current_doc = ""
            return _Method(name, in_type, out_type, sig, doc)
        elif self.get('error'):
            doc = self.current_doc
            self.current_doc = ""
            return _Error(self.expect('member-name'), self.read_type(), doc)
        else:
            raise SyntaxError('expected type, method, or error')


class _Object(object):
    pass


class _Struct(object):

    def __init__(self, fields):
        self.fields = OrderedDict(fields)


class _Enum(object):

    def __init__(self, fields):
        self.fields = fields


class _Array(object):

    def __init__(self, element_type):
        self.element_type = element_type


class _Maybe(object):

    def __init__(self, element_type):
        self.element_type = element_type


class _Dict(object):

    def __init__(self, element_type):
        self.element_type = element_type


class _CustomType(object):

    def __init__(self, name):
        self.name = name


class _Alias(object):

    def __init__(self, name, varlink_type, doc=None):
        self.name = name
        self.type = varlink_type
        self.doc = doc


class _Method(object):

    def __init__(self, name, in_type, out_type, _signature, doc=None):
        self.name = name
        self.in_type = in_type
        self.out_type = out_type
        self.signature = _signature
        self.doc = doc


class _Error(object):

    def __init__(self, name, varlink_type, doc=None):
        self.name = name
        self.type = varlink_type
        self.doc = doc


class Interface(object):
    """Class for a parsed varlink interface definition."""

    def __init__(self, description):
        """description -- description string in varlink interface definition language"""
        self.description = description

        scanner = Scanner(description)
        scanner.expect('interface')
        self.name = scanner.expect('interface-name')
        self.doc = scanner.current_doc
        scanner.current_doc = ""
        self.members = OrderedDict()
        while not scanner.end():
            member = scanner.read_member()
            self.members[member.name] = member

    def get_description(self):
        """return the description string in varlink interface definition language"""
        return self.description

    def get_method(self, name):
        method = self.members.get(name)
        if method and isinstance(method, _Method):
            return method
        raise MethodNotFound(name)

    def filter_params(self, parent_name, varlink_type, _namespaced, args, kwargs):
        # print("filter_params", type(varlink_type), repr(varlink_type), args, kwargs, type(args))

        if isinstance(varlink_type, _Maybe):
            if args == None:
                return None
            return self.filter_params(parent_name, varlink_type.element_type, _namespaced, args, kwargs)

        if isinstance(varlink_type, _Dict):
            if args == None:
                return {}

            if isinstance(args, Mapping):
                for (k, v) in args.items():
                    args[k] = self.filter_params(parent_name + '[' + k + ']', varlink_type.element_type, _namespaced, v,
                                                 None)
                return args
            else:
                InvalidParameter(parent_name)

        if isinstance(varlink_type, _CustomType):
            # print("CustomType", varlink_type.name)
            return self.filter_params(parent_name, self.members.get(varlink_type.name), _namespaced, args, kwargs)

        if isinstance(varlink_type, _Alias):
            # print("Alias", varlink_type.name)
            return self.filter_params(parent_name, varlink_type.type, _namespaced, args, kwargs)

        if isinstance(varlink_type, _Object):
            return args

        if isinstance(varlink_type, _Enum) and ( isinstance(args, str) or isinstance(args, unicode) ):
            # print("Returned str:", args)
            return args

        if isinstance(varlink_type, _Array):
            if args == None:
                return []

            return [self.filter_params(parent_name + '[]', varlink_type.element_type, _namespaced, x, None) for x in
                    args]

        if isinstance(varlink_type, Set):
            # print("Returned set:", set(args))
            return set(args)

        if isinstance(varlink_type, basestring) and isinstance(args, basestring):
            return args

        if isinstance(varlink_type, float) and (isinstance(args, float) or isinstance(args, int)):
            # print("Returned float:", args)
            return float(args)

        if isinstance(varlink_type, bool) and isinstance(args, bool):
            # print("Returned bool:", args)
            return args

        if isinstance(varlink_type, int) and (isinstance(args, float) or isinstance(args, int)):
            # print("Returned int:", args)
            if isinstance(args, float):
                return int(args + 0.5)
            return int(args)

        if not isinstance(varlink_type, _Struct):
            raise InvalidParameter(parent_name)
            # SyntaxError("Expected type %s, got %s with value '%s'" % (type(varlink_type), type(args),
            #                                                                args))

        if _namespaced:
            out = SimpleNamespace()
        else:
            out = {}

        varlink_struct = None
        if not isinstance(args, tuple):
            varlink_struct = args
            args = None

        for name in varlink_type.fields:
            if isinstance(args, tuple):
                if args:
                    val = args[0]
                    if len(args) > 1:
                        args = args[1:]
                    else:
                        args = None
                    ret = self.filter_params(parent_name + "." + name, varlink_type.fields[name], _namespaced, val,
                                             None)
                    if ret != None:
                        # print("SetOUT:", name)
                        if _namespaced:
                            setattr(out, name, ret)
                        else:
                            out[name] = ret
                    continue
                else:
                    if name in kwargs:
                        ret = self.filter_params(parent_name + "." + name, varlink_type.fields[name], _namespaced,
                                                 kwargs[name], None)
                        if ret != None:
                            # print("SetOUT:", name)
                            if _namespaced:
                                setattr(out, name, ret)
                            else:
                                out[name] = ret
                        continue

            if varlink_struct:
                if isinstance(varlink_struct, Mapping):
                    if name not in varlink_struct:
                        continue

                    val = varlink_struct[name]
                    ret = self.filter_params(parent_name + "." + name, varlink_type.fields[name], _namespaced, val,
                                             None)
                    if ret != None:
                        # print("SetOUT:", name)
                        if _namespaced:
                            setattr(out, name, ret)
                        else:
                            out[name] = ret
                elif hasattr(varlink_struct, name):
                    val = getattr(varlink_struct, name)
                    ret = self.filter_params(parent_name + "." + name, varlink_type.fields[name], _namespaced, val,
                                             None)
                    if ret != None:
                        # print("SetOUT:", name)
                        if _namespaced:
                            setattr(out, name, ret)
                        else:
                            out[name] = ret
                else:
                    continue

        return out