File: wire.py

package info (click to toggle)
electrum 4.0.9-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 35,248 kB
  • sloc: python: 222,785; sh: 165; java: 73; javascript: 10; makefile: 9
file content (82 lines) | stat: -rw-r--r-- 2,457 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license

import contextlib
import struct

import dns.exception
import dns.name

class Parser:
    def __init__(self, wire, current=0):
        self.wire = wire
        self.current = 0
        self.end = len(self.wire)
        if current:
            self.seek(current)
        self.furthest = current

    def remaining(self):
        return self.end - self.current

    def get_bytes(self, size):
        if size > self.remaining():
            raise dns.exception.FormError
        output = self.wire[self.current:self.current + size]
        self.current += size
        self.furthest = max(self.furthest, self.current)
        return output

    def get_counted_bytes(self, length_size=1):
        length = int.from_bytes(self.get_bytes(length_size), 'big')
        return self.get_bytes(length)

    def get_remaining(self):
        return self.get_bytes(self.remaining())

    def get_uint8(self):
        return struct.unpack('!B', self.get_bytes(1))[0]

    def get_uint16(self):
        return struct.unpack('!H', self.get_bytes(2))[0]

    def get_uint32(self):
        return struct.unpack('!I', self.get_bytes(4))[0]

    def get_struct(self, format):
        return struct.unpack(format, self.get_bytes(struct.calcsize(format)))

    def get_name(self, origin=None):
        name = dns.name.from_wire_parser(self)
        if origin:
            name = name.relativize(origin)
        return name

    def seek(self, where):
        # Note that seeking to the end is OK!  (If you try to read
        # after such a seek, you'll get an exception as expected.)
        if where < 0 or where > self.end:
            raise dns.exception.FormError
        self.current = where

    @contextlib.contextmanager
    def restrict_to(self, size):
        if size > self.remaining():
            raise dns.exception.FormError
        saved_end = self.end
        try:
            self.end = self.current + size
            yield
            # We make this check here and not in the finally as we
            # don't want to raise if we're already raising for some
            # other reason.
            if self.current != self.end:
                raise dns.exception.FormError
        finally:
            self.end = saved_end

    @contextlib.contextmanager
    def restore_furthest(self):
        try:
            yield None
        finally:
            self.current = self.furthest