File: types.py

package info (click to toggle)
python-ipfix 0.9.7-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 380 kB
  • sloc: python: 1,825; makefile: 149
file content (444 lines) | stat: -rw-r--r-- 15,023 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
#
# python-ipfix (c) 2013-2014 Brian Trammell.
#
# Many thanks to the mPlane consortium (http://www.ict-mplane.eu) for
# its material support of this effort.
# 
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program.  If not, see <http://www.gnu.org/licenses/>.
#

"""
Implementation of IPFIX abstract data types (ADT) and mappings to Python types.

Maps each IPFIX ADT to the corresponding Python type, as below:

======================= =============
       IPFIX Type        Python Type
======================= =============
octetArray              bytes
unsigned8               int
unsigned16              int
unsigned32              int
unsigned64              int
signed8                 int
signed16                int
signed32                int
signed64                int
float32                 float
float64                 float
boolean                 bool
macAddress              bytes
string                  str
dateTimeSeconds         datetime
dateTimeMilliseconds    datetime
dateTimeMicroseconds    datetime
dateTimeNanoseconds     datetime
ipv4Address             ipaddress
ipv6Address             ipaddress
======================= =============

Though client code generally will not use this module directly, it defines how
each IPFIX abstract data type will be represented in Python, and the concrete
IPFIX representation of each type. Type methods operate on buffers, as used
internally by the :class:`ipfix.message.MessageBuffer` class, so we'll create 
one to illustrate encoding and decoding:

>>> import ipfix.types
>>> buf = memoryview(bytearray(16))

Each of the encoding methods returns the offset into the buffer of the first
byte after the encoded value; since we're always encoding to the beginning
of the buffer in this example, this is equivalent to the length. 
We use this to bound the encoded value on subsequent decode.

Integers are represented by the python int type:

>>> unsigned32 = ipfix.types.for_name("unsigned32")
>>> length = unsigned32.encode_single_value_to(42, buf, 0)
>>> buf[0:length].tolist()
[0, 0, 0, 42]
>>> unsigned32.decode_single_value_from(buf, 0, length)
42

...floats by the float type, with the usual caveats about precision:

>>> float32 = ipfix.types.for_name("float32")
>>> length = float32.encode_single_value_to(42.03579, buf, 0)
>>> buf[0:length].tolist()
[66, 40, 36, 166]
>>> float32.decode_single_value_from(buf, 0, length)
42.035789489746094

...strings by the str type, encoded as UTF-8:

>>> string = ipfix.types.for_name("string")
>>> length = string.encode_single_value_to("Grüezi", buf, 0)
>>> buf[0:length].tolist()
[71, 114, 195, 188, 101, 122, 105]
>>> string.decode_single_value_from(buf, 0, length)
'Grüezi'

...addresses as the IPv4Address and IPv6Address types in the ipaddress module:

>>> from ipaddress import ip_address
>>> ipv4Address = ipfix.types.for_name("ipv4Address")
>>> length = ipv4Address.encode_single_value_to(ip_address("198.51.100.27"), buf, 0)
>>> buf[0:length].tolist()
[198, 51, 100, 27]
>>> ipv4Address.decode_single_value_from(buf, 0, length)
IPv4Address('198.51.100.27')
>>> ipv6Address = ipfix.types.for_name("ipv6Address")
>>> length = ipv6Address.encode_single_value_to(ip_address("2001:db8::c0:ffee"), buf, 0)
>>> buf[0:length].tolist()
[32, 1, 13, 184, 0, 0, 0, 0, 0, 0, 0, 0, 0, 192, 255, 238]
>>> ipv6Address.decode_single_value_from(buf, 0, length)
IPv6Address('2001:db8::c0:ffee')

...and the timestamps of various precision as a python datetime, 
encoded as per RFC5101bis:

>>> from datetime import datetime
>>> from datetime import timezone
>>> dtfmt = "%Y-%m-%d %H:%M:%S.%f"
>>> dt = datetime.strptime("2013-06-21 14:00:03.456789", dtfmt)

dateTimeSeconds truncates microseconds:

>>> dateTimeSeconds = ipfix.types.for_name("dateTimeSeconds")
>>> length = dateTimeSeconds.encode_single_value_to(dt, buf, 0)
>>> buf[0:length].tolist()
[81, 196, 92, 99]
>>> dateTimeSeconds.decode_single_value_from(buf, 0, length).strftime(dtfmt)
'2013-06-21 14:00:03.000000'

dateTimeMilliseconds truncates microseconds to the nearest millisecond:

>>> dateTimeMilliseconds = ipfix.types.for_name("dateTimeMilliseconds")
>>> length = dateTimeMilliseconds.encode_single_value_to(dt, buf, 0)
>>> buf[0:length].tolist()
[0, 0, 1, 63, 103, 8, 228, 128]
>>> dateTimeMilliseconds.decode_single_value_from(buf, 0, length).strftime(dtfmt)
'2013-06-21 14:00:03.456000'

dateTimeMicroseconds exports microseconds fully in NTP format:

>>> dateTimeMicroseconds = ipfix.types.for_name("dateTimeMicroseconds")
>>> length = dateTimeMicroseconds.encode_single_value_to(dt, buf, 0)
>>> buf[0:length].tolist()
[81, 196, 92, 99, 116, 240, 32, 0]
>>> dateTimeMicroseconds.decode_single_value_from(buf, 0, length).strftime(dtfmt)
'2013-06-21 14:00:03.456789'

dateTimeNanoseconds is also supported, but is identical to
dateTimeMicroseconds, as the datetime class in Python only supports
microsecond-level timing.

"""
from datetime import datetime, timedelta, timezone
from functools import total_ordering
from ipaddress import ip_address
import binascii
import struct
import math

VARLEN = 65535

ISO8601_FMT = "%Y-%m-%d %H:%M:%S"

class IpfixTypeError(ValueError):
    """Raised when attempting to do an unsupported operation on a type"""
    def __init__(self, *args):
        super().__init__(args)

# Table for downconverting struct elements for reduced length encoding
_stel_rle = { ('H', 1) : 'B',
              ('L', 2) : 'H',
              ('L', 1) : 'B',
              ('Q', 4) : 'L',
              ('Q', 2) : 'H',
              ('Q', 1) : 'B',
              ('h', 1) : 'b',
              ('l', 2) : 'h',
              ('l', 1) : 'b',
              ('q', 4) : 'l',
              ('q', 2) : 'h',
              ('q', 1) : 'b',
              ('d', 4) : 'f'}

# Builtin structs for varlen information
_varlen1_st = struct.Struct("!B")
_varlen2_st = struct.Struct("!H")

# builtin default encode/decode function
def _identity(x):
    return x

# Builtin type implementation
@total_ordering
class IpfixType:
    """Abstract interface for all IPFIX types. Used internally. """
    def __init__(self, name, num, valenc, valdec, valstr, valparse, roottype=None):
        self.name = name
        self.num = num
        self.valenc = valenc
        self.valdec = valdec
        self.valstr = valstr
        self.valparse = valparse
        if roottype:
            self.roottype = roottype
        else:
            self.roottype = self
        self.length = 0

    def __eq__(self, other):
        return (self.num, self.length) == (other.num, other.length)
    
    def __lt__(self, other):
        return (self.num, self.length) < (other.num, other.length)
    
    def __str__(self):
        return "<%s>" % self.name

    def __repr__(self):
        return "ipfix.types.for_name(%s)" % repr(self.name)

class StructType(IpfixType):
    """Type encoded by struct packing. Used internally."""
    def __init__(self, name, num, stel, valenc=_identity, valdec=_identity, valstr=str, valparse=int, roottype=None):
        super().__init__(name, num, valenc, valdec, valstr, valparse, roottype)
        self.stel = stel
        self.st = struct.Struct("!"+stel)
        self.length = self.st.size
        self.skipel = str(self.length)+"x"

    def for_length(self, length):
        if not length or length == self.length:
            return self
        elif self.roottype is _roottypes[0]:
            # FIXME this is kind of a hack to allow any-length encoding of octet arrays
            return StructType(self.name, self.num, str(length)+"s",
                              self.valenc, self.valdec, self.roottype)
        else:
            try:
                return StructType(self.name, self.num, 
                                  _stel_rle[(self.stel, length)], 
                                  self.valenc, self.valdec, self.roottype)
            except KeyError:
                raise IpfixTypeError("No RLE for <%s>[%u]" % 
                                     (self.name, length))

    def encode_single_value_to(self, val, buf, offset):
        self.st.pack_into(buf, offset, self.valenc(val))
        return offset + self.length
    
    def decode_single_value_from(self, buf, offset, length):
        assert(self.length == length)
        return self.valdec(self.st.unpack_from(buf, offset)[0])

class OctetArrayType(IpfixType):
    """Type encoded by byte array packing. Used internally."""
    def __init__(self, name, num, valenc=_identity, valdec=_identity, valstr=binascii.hexlify, valparse=binascii.unhexlify, roottype=None):
        super().__init__(name, num, valenc, valdec, valstr, valparse, roottype)
        self.length = VARLEN
    
    def for_length(self, length):
        if not length or length == self.length:
            return self
        else:
            return StructType(self.name, self.num, str(length)+"s", 
                              self.valenc, self.valdec, self.roottype)

    def encode_single_value_to(self, val, buf, offset):
        enc = self.valenc(val)
        buf[offset:offset+len(enc)] = enc
        return offset + len(enc)

    def decode_single_value_from(self, buf, offset, length):
        return self.valdec(buf[offset:offset+length].tobytes())

# Utility calls for buildin encoders/decoders
def dt2epoch(dt):
    return (dt - datetime(1970,1,1,0,0,tzinfo=None)).total_seconds()

# Builtin encoders/decoders

def _encode_smibool(bool):
    if bool:
        return 1
    else:
        return 2

def _decode_smibool(byte):
    if byte == 1:
        return True
    else:
        return False

def _str_bool(bool):
    if bool:
        return "true"
    else:
        return "false"

def _parse_bool(string):
    if string == 'true':
        return True
    else:
        return False

def _encode_utf8(string):   
    return string.encode()

def _decode_utf8(octets):
    return octets.decode()

def _encode_sec(dt):
    return int(dt2epoch(dt))
    
def _decode_sec(epoch):
    return datetime.utcfromtimestamp(epoch)

def _str_sec(dt):
    return dt.strftime(ISO8601_FMT)

def _parse_sec(string):
    return datetime.strptime(string, ISO8601_FMT)

def _encode_msec(dt):
    return int(dt2epoch(dt) * 1000)
    
def _decode_msec(epoch):
    return datetime.utcfromtimestamp(epoch/1000)

def _str_msec(dt):
    return "%s.%03u" % (dt.strftime(ISO8601_FMT), int(dt.microsecond / 1000))

def _parse_msec(string):
    (ss, mss) = string.split(".")
    return datetime.strptime(ss, ISO8601_FMT).replace(microsecond = int(mss) * 1000)

def _encode_ntp(dt):
    (tsf, tsi) = math.modf(dt2epoch(dt))
    return int((int(tsi) << 32) + (tsf * 2**32))

def _decode_ntp(ntp):
    tsf = ntp & (2**32 - 1)
    tsi = ntp >> 32
    return datetime.utcfromtimestamp(tsi + tsf / 2**32)

def _str_usec(dt):
    return "%s.%06u" % (dt.strftime(ISO8601_FMT), dt.microsecond)

def _parse_usec(string):
    (ss, uss) = string.split(".")
    return datetime.strptime(ss, ISO8601_FMT).replace(microsecond = int(uss))

def _encode_ip(ipaddr):
    return ipaddr.packed
    
# builtin type registry
_roottypes = [
    OctetArrayType("octetArray", 0),
    StructType("unsigned8",  1, "B"),
    StructType("unsigned16", 2, "H"),
    StructType("unsigned32", 3, "L"),
    StructType("unsigned64", 4, "Q"),
    StructType("signed8",    5, "b"),
    StructType("signed16",   6, "h"),
    StructType("signed32",   7, "l"),
    StructType("signed64",   8, "q"),
    StructType("float32",    9, "f", valparse=float),
    StructType("float64",    10, "d", valparse=float),
    StructType("boolean",    11, "B", 
                valenc=_encode_smibool, valdec=_decode_smibool,
                valstr=_str_bool, valparse=_parse_bool),
    StructType("macAddress", 12, "6s"),
    OctetArrayType("string", 13, 
                valenc=_encode_utf8, valdec=_decode_utf8,
                valstr=_identity, valparse=_identity),
    StructType("dateTimeSeconds", 14, "L", 
                valenc=_encode_sec, valdec=_decode_sec,
                valstr=_str_sec, valparse=_parse_sec),
    StructType("dateTimeMilliseconds", 15, "Q", 
                valenc=_encode_msec, valdec=_decode_msec,
                valstr=_str_msec, valparse=_parse_msec),
    StructType("dateTimeMicroseconds", 16, "Q", 
                valenc=_encode_ntp, valdec=_decode_ntp,
                valstr=_str_usec, valparse=_parse_usec),
    StructType("dateTimeNanoseconds", 17, "Q", 
                valenc=_encode_ntp, valdec=_decode_ntp, 
                valstr=_str_usec, valparse=_parse_usec),
    StructType("ipv4Address", 18, "4s", 
                valenc=_encode_ip, valdec=ip_address, 
                valparse=ip_address),
    StructType("ipv6Address", 19, "16s", 
                valenc=_encode_ip, valdec=ip_address, 
                valparse=ip_address)
]

_TypeForName = { ietype.name: ietype for ietype in _roottypes }

def use_integer_ipv4():
    """
    Use integers instead of ipaddress.IPv4Address to store IPv4 addresses.
    Changes behavior globally; should be called before using any IPFIX types.
    Designed for use with numpy arrays, to not require a Python object for
    storing IP addresses.
    """
    _roottypes[18] = StructType("ipv4address", 18, "L")
    _TypeForName = { ietype.name: ietype for ietype in _roottypes }

def for_name(name):
    """
    Return an IPFIX type for a given type name
    
    :param name: the name of the type to look up
    :returns: IpfixType -- type instance for that name
    :raises: IpfixTypeError
    
    """
    try: 
        return _TypeForName[name]
    except KeyError:
        raise IpfixTypeError("no such type "+name)

def decode_varlen(buf, offset):
    """Decode a IPFIX varlen encoded length; used internally by template"""
    length = _varlen1_st.unpack_from(buf, offset)[0]
    offset += _varlen1_st.size
    if length == 255:
        length = _varlen2_st.unpack_from(buf, offset)[0]
        offset += _varlen2_st.size
    return (length, offset)
    
def encode_varlen(buf, offset, length):
    """Encode a IPFIX varlen encoded length; used internally by template"""
    if length >= 255:
        _varlen1_st.pack_into(buf, offset, 255)
        offset += _varlen1_st.size
        _varlen2_st.pack_into(buf, offset, length)
        offset += _varlen2_st.size
    else:
        _varlen1_st.pack_into(buf, offset, length)
        offset += _varlen1_st.size
    return offset
    
def test_types_internals():
    try:
        for_name("bogus")
        assert False
    except IpfixTypeError:
        pass