File: utils.py

package info (click to toggle)
nfstest 3.2-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,580 kB
  • sloc: python: 24,102; makefile: 3
file content (386 lines) | stat: -rw-r--r-- 14,310 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
#===============================================================================
# Copyright 2014 NetApp, Inc. All Rights Reserved,
# contribution by Jorge Mora <mora@netapp.com>
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#===============================================================================
"""
Pktt utilities module

The Packet trace utilities module has classes which augment functionality
of basic data types like displaying integers as their hex equivalent.
It also includes an Enum base class which displays the integer as its
string representation given by a mapping dictionary. There is also a
class to be used as a base class for an RPC payload object.
This module also includes some module variables to change how certain
objects are displayed.
"""
import nfstest_config as c
from packet.unpack import Unpack
from baseobj import BaseObj, fstrobj

# Module constants
__author__    = "Jorge Mora (%s)" % c.NFSTEST_AUTHOR_EMAIL
__copyright__ = "Copyright (C) 2014 NetApp, Inc."
__license__   = "GPL v2"
__version__   = "1.6"

# RPC type constants
RPC_CALL  = 0
RPC_REPLY = 1
rpc_type = {RPC_CALL:'call', RPC_REPLY:'reply'}

# Module variables that change the way an RPC packet is displayed
RPC_type = True  # Display RPC type, e.g., call or reply
RPC_load = True  # Display RPC load name, e.g., NFS, etc.
RPC_ver  = True  # Display RPC load version, e.g., v3, v4, etc.
RPC_xid  = True  # Display RPC xid

# Module variables that change the way an RPC payload is displayed
NFS_mainop = False # Display only the main operation in an NFS COMPOUND
LOAD_body  = True  # Display the body of layer/procedure/operation

# Module variables for Enum
ENUM_CHECK = False  # If True, Enums are strictly enforced
ENUM_REPR  = False  # If True, Enums are displayed as numbers

# Module variables for Bitmaps
BMAP_CHECK = False  # If True, bitmaps are strictly enforced

class ByteHex(int):
    """Byte integer object which is displayed in hex"""
    def __str__(self):
        return "0x%02x" % self
    __repr__ = __str__

class ShortHex(int):
    """Short integer object which is displayed in hex"""
    def __str__(self):
        return "0x%04x" % self
    __repr__ = __str__

class IntHex(int):
    """Integer object which is displayed in hex"""
    def __str__(self):
        return "0x%08x" % self
    __repr__ = __str__

class LongHex(int):
    """Long integer object which is displayed in hex"""
    def __str__(self):
        return "0x%016x" % self
    __repr__ = __str__

class DateStr(float):
    """Floating point object which is displayed as a date"""
    _strfmt = "{0:date}"
    def __str__(self):
        return repr(fstrobj.format(self._strfmt, self))

class StrHex(bytes):
    """String object which is displayed in hex"""
    def __str__(self):
        return "0x" + self.hex()

class EnumInval(Exception):
    """Exception for an invalid enum value"""
    pass

class Enum(int):
    """Enum base object
       This should only be used as a base class where the class attributes
       should be initialized
    """
    _offset = 0    # Strip the first bytes from the string name after conversion
    _enumdict = {} # Enum mapping dictionary to convert integer to string name

    def __new__(cls, unpack):
        """Constructor which checks if integer is a valid enum value"""
        if isinstance(unpack, int):
            # Value is given as an integer
            value = unpack
        else:
            # Unpack integer
            value = unpack.unpack_int()
        # Instantiate base class (integer class)
        obj = super(Enum, cls).__new__(cls, value)
        if ENUM_CHECK and obj._enumdict.get(value) is None:
            raise EnumInval("value=%s not in enum '%s'" % (value, obj.__class__.__name__))
        return obj

    def __str__(self):
        """Informal string representation, display value using the mapping
           dictionary provided as a class attribute
        """
        value = self._enumdict.get(self)
        if value is None:
            return int.__str__(int(self))
        else:
            return value[self._offset:]

    def __repr__(self):
        """Official string representation, display value using the mapping
           dictionary provided as a class attribute when ENUM_REPR is False
        """
        if ENUM_REPR:
            # Use base object representation
            return super(Enum, self).__repr__()
        else:
            return self.__str__()

class BitmapInval(Exception):
    """Exception for an invalid bit number"""
    pass

def bitmap_info(unpack, bitmap, key_enum=None, func_map=None):
    """Returns a list of bits set on the bitmap or a dictionary where the
       key is the bit number given by bitmap and the value is the decoded
       value by evaluating the function used for that specific bit number

       unpack:
           Unpack object
       bitmap:
           Unsigned integer where a value must be decoded for every bit that
           is set, starting from the least significant bit
       key_enum:
           Use Enum for bit number so the key could be displayed as the bit
           name instead of the bit number [default: None]
       func_map:
           Dictionary which maps a bit number to the function to be used for
           decoding the value for that bit number. The function must have
           the "unpack" object as the only argument. If this is None a list
           of bit attributes is returned instead [default: None]
    """
    ret = {}
    blist = []
    bitnum = 0

    if func_map:
        # Get size of opaque
        length = unpack.unpack_uint()
        # Save offset to make sure to consume all bytes
        offset = unpack.tell()

    while bitmap > 0:
        # Check if bit is set
        if bitmap & 0x01 == 1:
            if func_map:
                # Get decoding function for this bit number
                func = func_map.get(bitnum)
                if func is None:
                    if BMAP_CHECK:
                        raise BitmapInval("decoding function not found for bit number %d" % bitnum)
                    else:
                        break
                else:
                    if key_enum:
                        # Use Enum as the key instead of a plain number
                        ret[key_enum(bitnum)] = func(unpack)
                    else:
                        ret[bitnum] = func(unpack)
            else:
                # Add attribute to list
                blist.append(key_enum(bitnum))
        bitmap = bitmap >> 1
        bitnum += 1

    if func_map:
        count = length + offset - unpack.tell()
        if count > 0:
            # Read rest of data for bitmap
            pad = (4 - (length % 4)) if (length % 4) else 0
            unpack.read(count + pad)
        # Return bitmap info dictionary
        return ret
    else:
        # Return the list of bit attributes
        return blist

class OptionFlags(BaseObj):
    """OptionFlags base object

       This base class is used to have a set of raw flags represented by an
       integer and splits every bit into an object attribute according to the
       class attribute _bitnames where the key is the bit number and the value
       is the attribute name.

       This should only be used as a base class where the class attribute
       _bitnames should be initialized. The class attribute _reversed can
       also be initialized to reverse the _bitnames so the first bit becomes
       the last, e.g., _reversed = 31, bits are reversed on a 32 bit integer
       so 0 becomes 31, 1 becomes 30, etc.

       Usage:
           from packet.utils import OptionFlags

           class MyFlags(OptionFlags):
               _bitnames = {0:"bit0", 1:"bit1", 2:"bit2", 3:"bit3"}

           x = MyFlags(10) # 10 = 0b1010

           The attributes of object are:
               x.rawflags = 10, # Original raw flags
               x.bit0     = 0,
               x.bit1     = 1,
               x.bit2     = 0,
               x.bit3     = 1,
    """
    _strfmt1  = "{0}"
    _strfmt2  = "{0}"
    _rawfunc  = IntHex # Raw flags object modifier
    _attrlist = ("rawflags",)
    # Dictionary where key is bit number and value is attribute name
    _bitnames = {}
    # Bit numbers are reversed if > 0, this is the max number of bits in flags
    # if set to 31, bits are reversed on a 32 bit integer (0 becomes 31, etc.)
    _reversed = 0

    def __init__(self, options):
        """Initialize object's private data.

           options:
               Unsigned integer of raw flags
        """
        self.rawflags = self._rawfunc(options) # Raw option flags
        bitnames = self._bitnames
        for bit,name in bitnames.items():
            if self._reversed > 0:
                # Bit numbers are reversed
                bit = self._reversed - bit
            setattr(self, name, (options >> bit) & 0x01)
        # Get attribute list sorted by its bit number
        self._attrlist += tuple(bitnames[k] for k in sorted(bitnames))

    def str_flags(self):
        """Display the flag names which are set, e.g., in the above example
           the output will be "bit1,bit3" (bit1=1, bit3=1)
           Use "__str__ = OptionFlags.str_flags" to have it as the default
           string representation
        """
        ulist = []
        bitnames = self._bitnames
        for bit in sorted(bitnames):
            if self._reversed > 0:
                # Bit numbers are reversed
                bit = self._reversed - bit
            if (self.rawflags >> bit) & 0x01:
                ulist.append(bitnames[bit])
        return ",".join(ulist)

class RPCload(BaseObj):
    """RPC load base object
       This is used as a base class for an RPC payload object
    """
    # Class attributes
    _pindex  = 0    # Discard this number of characters from the procedure name
    _strname = None # Name to display in object's debug representation level=1

    def rpc_str(self, name=None):
        """Display RPC string"""
        out = ""
        rpc = self._rpc
        if name is None:
            self._strname = self.__class__.__name__
            name = self._strname
        if RPC_load:
            out += "%-5s " % name
        if RPC_ver:
            mvstr = ""
            minorversion = getattr(self, 'minorversion', None)
            if minorversion is not None and minorversion >= 0:
                mvstr = ".%d" % minorversion
            vers = "v%d%s" % (rpc.version, mvstr)
            out += "%-4s " % vers
        if RPC_type:
            out += "%-5s " % rpc_type.get(rpc.type)
        if RPC_xid:
            out += "xid:0x%08x " % rpc.xid
        return out

    def main_op(self):
        """Get the main NFS operation"""
        return self

    def __str__(self):
        """Informal string representation"""
        rdebug = self.debug_repr()
        if rdebug == 1:
            out = self.rpc_str(self._strname)
            out += "%-10s" % str(self.procedure)[self._pindex:]
            if LOAD_body and getattr(self, "switch", None) is not None:
                itemstr = str(self.switch)
                if len(itemstr):
                    out += " " + itemstr

            rpc = self._rpc
            if rpc.type and getattr(self, "status", 0) != 0:
                # Display the status of the packet only if it is an error
                out += " %s" % self.status
            return out
        else:
            return BaseObj.__str__(self)

class RDMAbase(BaseObj):
    """RDMA base object

       Base class for an RDMA reduced payload object having RDMA write
       chunks. An application having a DDP (direct data placement) item
       must inherit this class and use the rdma_opaque method as a
       dissecting function.

       Usage:
           from packet.utils import RDMAbase

           # For an original class definition with DDP items
           class APPobj(BaseObj):
               def __init__(self, unpack):
                   self.test = nfs_bool(unpack)
                   self.data = unpack.unpack_opaque()

           # Class definition to access RDMA chunk writes
           class APPobj(RDMAbase):
               def __init__(self, unpack):
                   self.test = self.rdma_opaque(nfs_bool, unpack)
                   self.data = self.rdma_opaque(unpack.unpack_opaque)
    """
    # Class attribute is shared by all instances
    rdma_write_chunks = []

    def rdma_opaque(self, func, *kwts, **kwds):
        """Dissecting method for a DDP item
           The first positional argument is the original dissecting
           function to be called when there is no RDMA write chunks.
           The rest of the arguments (positional or named) are passed
           directly to the dissecting function.
        """
        if self.rdma_write_chunks:
            # There are RDMA write chunks, use the next chunk data
            # instead of calling the original decoding function
            data = b""
            for rsegment in self.rdma_write_chunks.pop(0):
                # Just get the bytes for the segment, dropping the
                # padding bytes if any
                data += rsegment.get_data(padding=False)
            unpack = None
            if len(kwts) == 0:
                # If no arguments are given check if the original function
                # is an instance method like unpack.unpack_opaque
                unpack = getattr(func, "__self__")
            elif isinstance(kwts[0], Unpack):
                # At least one positional argument is given and the first is
                # an instance of Unpack
                unpack = kwts[0]
            if unpack:
                # Throw away the opaque size
                unpack.unpack_uint()
            return data
        else:
            # Call original decoding function with all arguments given
            return func(*kwts, **kwds)