File: dump.py

package info (click to toggle)
galileo 0.5.1-4
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 392 kB
  • ctags: 578
  • sloc: python: 3,462; xml: 23; makefile: 14
file content (130 lines) | stat: -rw-r--r-- 3,980 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import base64
import logging

logger = logging.getLogger(__name__)

from .utils import a2x, a2lsbi, a2b


class CRC16(object):
    """ A rather generic CRC16 class """
    def __init__(self, poly=0x1021, Invert=True, IV=0x0000, FV=0x0000):
        self.poly = poly
        self.value = IV
        self.FV = FV
        if Invert:
            self.update_byte = self.update_byte_MSB
        else:
            self.update_byte = self.update_byte_LSB

    def update_byte_MSB(self, byte):
        self.value ^= byte << 8
        for i in range(8):
            if self.value & 0x8000:
                self.value = (self.value << 1) ^ self.poly
            else:
                self.value <<= 1
        self.value &= 0xffff

    def update_byte_LSB(self, byte):
        self.value ^= byte
        for i in range(8):
            if self.value & 0x0001:
                self.value = (self.value >> 1) ^ self.poly
            else:
                self.value >>= 1

    def update(self, array):
        for c in array:
            self.update_byte(c)

    def final(self):
        return self.value ^ self.FV


class Dump(object):
    def __init__(self, _type):
        self._type = _type
        self.data = []
        self.footer = []
        self.crc = CRC16()
        self.esc = [0, 0]

    def unSLIP1(self, data):
        """ The protocol uses a particular version of SLIP (RFC 1055) applied
        only on the first byte of the data"""
        END = 0xC0
        ESC = 0xDB
        ESC_ = {0xDC: END,
                0xDD: ESC}
        if data[0] == ESC:
            # increment the escape counter
            self.esc[data[1] - 0xDC] += 1
            # return the escaped value
            return [ESC_[data[1]]] + data[2:]
        return data

    def add(self, data):
        if data[0] == 0xc0:
            assert self.footer == []
            self.footer = data
            return
        data = self.unSLIP1(data)
        self.crc.update(data)
        self.data.extend(data)

    @property
    def len(self):
        return len(self.data)

    def isValid(self):
        if not self.footer:
            return False
        dataType = self.footer[2]
        if dataType != self._type:
            logger.error('Dump is not of requested type: %x != %x',
                         dataType, self._type)
            return False
        crcVal = self.crc.final()
        transportCRC = a2lsbi(self.footer[3:5])
        if transportCRC != crcVal:
            logger.error("Error in communication, Expected CRC: 0x%04X,"
                         " received 0x%04X", crcVal, transportCRC)
            return False
        nbBytes = a2lsbi(self.footer[5:9])
        if self.len != nbBytes:
            logger.error("Error in communication, Expected length: %d bytes,"
                         " received %d bytes", nbBytes, self.len)
            return False
        return True

    def toFile(self, filename):
        logger.debug("Dumping megadump to %s", filename)
        with open(filename, 'wt') as dumpfile:
            for i in range(0, self.len, 20):
                dumpfile.write(a2x(self.data[i:i + 20]) + '\n')
            dumpfile.write(a2x(self.footer) + '\n')

    def toBase64(self):
        return base64.b64encode(a2b(self.data + self.footer)).decode('utf-8')

class DumpResponse(object):
    def __init__(self, data, chunk_len):
        self.data = data
        self._chunk_len = chunk_len
        self.__index = 0

    def __iter__(self):
        return self

    def __next__(self):
        if self.__index >= len(self.data):
            raise StopIteration
        if self.data[self.__index] not in (0xC0, 0xDB):
            self.__index += self._chunk_len
            return self.data[self.__index-self._chunk_len:self.__index]
        b = self.data[self.__index]
        self.__index += self._chunk_len - 1
        return [0xDB] + [{0xC0: 0xDC, 0xDB: 0xDD}[b]] + self.data[self.__index-self._chunk_len+2:self.__index]
    # For python2
    next = __next__