File: unpack.py

package info (click to toggle)
nfstest 3.2-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,580 kB
  • sloc: python: 24,102; makefile: 3
file content (394 lines) | stat: -rw-r--r-- 14,321 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
#===============================================================================
# Copyright 2012 NetApp, Inc. All Rights Reserved,
# contribution by Jorge Mora <mora@netapp.com>
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#===============================================================================
"""
Unpack module

Provides the object for managing and unpacking raw data from a working buffer.
"""
import struct
import nfstest_config as c

# Module constants
__author__    = "Jorge Mora (%s)" % c.NFSTEST_AUTHOR_EMAIL
__copyright__ = "Copyright (C) 2012 NetApp, Inc."
__license__   = "GPL v2"
__version__   = "2.4"

# Module variables
UNPACK_ERROR = False  # Raise unpack error when True

class Unpack(object):
    """Unpack object

       Usage:
           from packet.unpack import Unpack

           x = Unpack(buffer)

           # Get 32 bytes from the working buffer and move the offset pointer
           data = x.read(32)

           # Get all the unprocessed bytes from the working buffer
           # (all bytes starting from the offset pointer)
           # Do not move the offset pointer
           data = x.getbytes()

           # Get all bytes from the working buffer from the given offset
           # Do not move the offset pointer
           data = x.getbytes(offset)

           # Return the number of unprocessed bytes left in the working buffer
           size = x.size()
           size = len(x)

           # Get the offset pointer
           offset = x.tell()

           # Set the offset pointer
           x.seek(offset)

           # Append the given data to the working buffer
           x.append(data)

           # Insert the given data to the working buffer right before the
           # offset pointer. This resets the working buffer completely
           # and the offset pointer is initialized to zero. It is like
           # re-instantiating the object like:
           #   x = Unpack(data + x.getbytes())
           x.insert(data)

           # Save state
           sid = x.save_state()

           # Restore state
           x.restore_state(sid)

           # Unpack an 'unsigned short' (2 bytes in network order)
           short_int = x.unpack(2, '!H')[0]

           # Unpack different basic types
           char      = x.unpack_char()
           uchar     = x.unpack_uchar()
           short     = x.unpack_short()
           ushort    = x.unpack_ushort()
           int       = x.unpack_int()
           uint      = x.unpack_uint()
           int64     = x.unpack_int64()
           uint64    = x.unpack_uint64()
           data1     = x.unpack_opaque()
           data2     = x.unpack_opaque(64)  # Length of opaque must be <= 64
           data3     = x.unpack_fopaque(32)

           # Get string where length is given as an unsigned integer
           buffer = x.unpack_string()
           # Get string of fixed length
           buffer = x.unpack_string(32)
           # Get string where length is given as a short integer
           buffer = x.unpack_string(Unpack.unpack_short)
           buffer = x.unpack_string(ltype=Unpack.unpack_short)
           # Get string padded to a 4 byte boundary, discard padding bytes
           buffer = x.unpack_string(pad=4)

           # Get an array of unsigned integers
           alist = x.unpack_array()
           # Get a fixed length array of unsigned integers
           alist = x.unpack_array(ltype=10)
           # Get an array of short integers
           alist = x.unpack_array(Unpack.unpack_short)
           # Get an array of strings, the length of the array is given
           # by a short integer
           alist = x.unpack_array(Unpack.unpack_string, Unpack.unpack_short)
           # Get an array of strings, the length of each string is given by
           # a short integer and each string is padded to a 4 byte boundary
           alist = x.unpack_array(Unpack.unpack_string, uargs={'ltype':Unpack.unpack_short, 'pad':4})
           # Get an array of objects decoded by item_obj where the first
           # argument to item_obj is the unpack object, e.g., item = item_obj(x)
           alist = x.unpack_array(item_obj)

           # Get a list of unsigned integers
           alist = x.unpack_list()
           # Get a list of short integers
           alist = x.unpack_list(Unpack.unpack_short)
           # Get a list of strings, the next item flag is given
           # by a short integer
           alist = x.unpack_list(Unpack.unpack_string, Unpack.unpack_short)
           # Get a list of strings, the length of each string is given by
           # a short integer and each string is padded to a 4 byte boundary
           alist = x.unpack_list(Unpack.unpack_string, uargs={'ltype':Unpack.unpack_short, 'pad':4})

           # Unpack a conditional, it unpacks a conditional flag first and
           # if it is true it unpacks the item given and returns it. If the
           # conditional flag decoded is false, the method returns None
           buffer = x.unpack_conditional(Unpack.unpack_opaque)

           # Unpack an array of unsigned integers and convert array into
           # a single long integer
           bitmask = unpack_bitmap()
    """
    def __init__(self, data):
        """Constructor

           Initialize object's private data.

           data:
               Raw packet data
        """
        self._offset = 0
        self._data = data
        self._state = []

    def _get_ltype(self, ltype):
        """Get length of element"""
        if isinstance(ltype, int):
            # An integer is given, just return it
            return ltype
        else:
            # A function is given, return output of function
            return ltype(self)

    def size(self):
        """Return the number of unprocessed bytes left in the working buffer"""
        return len(self._data) - self._offset
    __len__ = size

    def tell(self):
        """Get the offset pointer."""
        return self._offset

    def seek(self, offset):
        """Set the offset pointer."""
        slen = len(self._data)
        if offset > slen:
            offset = slen
        self._offset = offset

    def append(self, data):
        """Append data to the working buffer."""
        self._data += data

    def insert(self, data):
        """Insert data to the beginning of the current working buffer."""
        if len(self._state):
            # Save working buffer in the saved state since the buffer
            # will be overwritten
            state = self._state[-1]
            if len(state) == 2:
                state.append(self._data)
        self._data = data + self._data[self._offset:]
        self._offset = 0

    def save_state(self):
        """Save state and return the state id"""
        sid = len(self._state)
        self._state.append([sid, self._offset])
        return sid

    def restore_state(self, sid):
        """Restore state given by the state id"""
        max = len(self._state)
        while sid < len(self._state):
            state = self._state.pop()
            self._offset = state[1]
            if len(state) == 3:
                self._data = state[2]

    def getbytes(self, offset=None):
        """Get the number of bytes given from the working buffer.
           Do not move the offset pointer.

           offset:
               Starting offset of data to return [default: current offset]
        """
        if offset is None:
            return self._data[self._offset:]
        return self._data[offset:]

    def read(self, size, pad=0):
        """Get the number of bytes given from the working buffer.
           Move the offset pointer.

           size:
               Length of data to get
           pad:
               Get and discard padding bytes [default: 0]
               If given, data is padded to this byte boundary
        """
        buf = self._data[self._offset:self._offset+size]
        if pad > 0:
            # Discard padding bytes
            size += int((size+pad-1)/pad)*pad - size
        self._offset += size
        dlen = len(self._data)
        if self._offset > dlen:
            self._offset = dlen
        return buf

    def unpack(self, size, fmt):
        """Get the number of bytes given from the working buffer and process
           it according to the given format.
           Return a tuple of unpack items, see struct.unpack.

           size:
               Length of data to process
           fmt:
               Format string on how to process data
        """
        return struct.unpack(fmt, self.read(size))

    def unpack_char(self):
        """Get a signed char"""
        return self.unpack(1, '!b')[0]

    def unpack_uchar(self):
        """Get an unsigned char"""
        return self.unpack(1, '!B')[0]

    def unpack_short(self):
        """Get a signed short integer"""
        return self.unpack(2, '!h')[0]

    def unpack_ushort(self):
        """Get an unsigned short integer"""
        return self.unpack(2, '!H')[0]

    def unpack_int(self):
        """Get a signed integer"""
        return self.unpack(4, '!i')[0]

    def unpack_uint(self):
        """Get an unsigned integer"""
        return self.unpack(4, '!I')[0]

    def unpack_int64(self):
        """Get a signed 64 bit integer"""
        return self.unpack(8, '!q')[0]

    def unpack_uint64(self):
        """Get an unsigned 64 bit integer"""
        return self.unpack(8, '!Q')[0]

    def unpack_opaque(self, maxcount=0):
        """Get a variable length opaque up to a maximum length of maxcount"""
        size = self.unpack_uint()
        if maxcount > 0 and size > maxcount:
            raise Exception("Opaque exceeds maximum length")
        return self.read(size, pad=4)

    def unpack_fopaque(self, size):
        """Get a fixed length opaque"""
        return self.read(size, pad=4)

    def unpack_utf8(self, maxcount=0):
        """Get a variable length utf8 string up to a maximum length of maxcount"""
        return self.unpack_opaque(maxcount).decode()

    def unpack_futf8(self, size):
        """Get a fixed length utf8 string"""
        return self.unpack_fopaque(size).decode()

    def unpack_string(self, ltype=unpack_uint, pad=0, maxcount=0):
        """Get a variable length string

           ltype:
               Function to decode length of string [default: unpack_uint]
               Could also be given as an integer to have a fixed length string
           pad:
               Get and discard padding bytes [default: 0]
               If given, string is padded to this byte boundary
           maxcount:
               Maximum length of string [default: any length]
        """
        slen = self._get_ltype(ltype)
        if maxcount > 0 and slen > maxcount:
            raise Exception("String exceeds maximum length")
        return self.read(slen, pad)

    def unpack_array(self, unpack_item=unpack_uint, ltype=unpack_uint, uargs={}, maxcount=0, islist=False):
        """Get a variable length array, the type of objects in the array
           is given by the unpacking function unpack_item and the type
           to decode the length of the array is given by ltype

           unpack_item:
               Unpack function for each item in the array [default: unpack_uint]
           ltype:
               Function to decode length of array [default: unpack_uint]
               Could also be given as an integer to have a fixed length array
           uargs:
               Named arguments to pass to unpack_item function [default: {}]
           maxcount:
               Maximum length of array [default: any length]
        """
        ret = []
        # Get length of array
        slen = self._get_ltype(ltype)
        if maxcount > 0 and slen > maxcount:
            raise Exception("Array exceeds maximum length")
        while slen > 0:
            try:
                # Unpack each item in the array
                ret.append(unpack_item(self, **uargs))
                if islist:
                    slen = self._get_ltype(ltype)
                else:
                    slen -= 1
            except:
                if UNPACK_ERROR:
                    raise
                break
        return ret

    def unpack_list(self, *kwts, **kwds):
        """Get an indeterminate size list, the type of objects in the list
           is given by the unpacking function unpack_item and the type
           to decode the next item flag is given by ltype

           unpack_item:
               Unpack function for each item in the list [default: unpack_uint]
           ltype:
               Function to decode the next item flag [default: unpack_uint]
           uargs:
               Named arguments to pass to unpack_item function [default: {}]
        """
        kwds['islist'] = True
        return self.unpack_array(*kwts, **kwds)

    def unpack_conditional(self, unpack_item=unpack_uint, ltype=unpack_uint, uargs={}):
        """Get an item if condition flag given by ltype is true, if condition
           flag is false then return None

           unpack_item:
               Unpack function for item if condition is true [default: unpack_uint]
           ltype:
               Function to decode the condition flag [default: unpack_uint]
           uargs:
               Named arguments to pass to unpack_item function [default: {}]
        """
        # Get condition flag
        if self._get_ltype(ltype):
            # Unpack item if condition is true
            return unpack_item(self, **uargs)
        return None

    def unpack_bitmap(self):
        """Unpack an array of unsigned integers and convert array into
           a single long integer
        """
        bitmask = 0
        nshift = 0
        # Unpack array of uint32
        blist = self.unpack_array()
        for bint in blist:
            bitmask += bint << nshift
            nshift += 32
        return bitmask