File: template.py

package info (click to toggle)
python-ipfix 0.9.7-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 380 kB
  • sloc: python: 1,825; makefile: 149
file content (381 lines) | stat: -rw-r--r-- 13,071 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
#
# python-ipfix (c) 2013 Brian Trammell.
#
# Many thanks to the mPlane consortium (http://www.ict-mplane.eu) for
# its material support of this effort.
# 
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program.  If not, see <http://www.gnu.org/licenses/>.
#

"""
Representation of IPFIX templates.
Provides template-based packing and unpacking of data in IPFIX messages.

For reading, templates are handled internally. For writing, use 
:func:`from_ielist` to create a template. 

See :mod:`ipfix.message` for examples.

"""
from . import ie    
from . import types
from functools import lru_cache
import struct

# Builtin exceptions
class IpfixEncodeError(Exception):
    """Raised on internal encoding errors, or if message MTU is too small"""
    def __init__(self, *args):
        super().__init__(args)

class IpfixDecodeError(Exception):
    """Raised when decoding a malformed IPFIX message"""
    def __init__(self, *args):
        super().__init__(args)

# constants for v9
V9_TEMPLATE_SET_ID = 0
V9_OPTIONS_SET_ID = 1

# constants
TEMPLATE_SET_ID = 2
OPTIONS_SET_ID = 3

# template encoding/decoding structs
_tmplhdr_st = struct.Struct("!HH")
_otmplhdr_st = struct.Struct("!HHH")
_iespec_st = struct.Struct("!HH")
_iepen_st = struct.Struct("!L")

class TemplatePackingPlan:
    """
    Plan to pack/unpack a specific set of indices for a template.
    Used internally by Templates for efficient encoding and decoding.
    
    """
    def __init__(self, tmpl, indices):
        self.tmpl = tmpl
        self.indices = indices
        self.ranks = sorted(range(len(indices)), key=indices.__getitem__)
        self.valenc = []
        self.valdec = []
                
        packstring = "!"
        for i, t in enumerate(e.type for e in tmpl.ies):
            if i >= tmpl.fixlen_count():
                break
            if i in indices:
                packstring += t.stel
                self.valenc.append(t.valenc)
                self.valdec.append(t.valdec)
            else:
                packstring += t.skipel           

        self.st = struct.Struct(packstring)

    def __repr__(self):
        return "<TemplatePackingPlan "+repr(self.tmpl) +\
                " pack " + str(self.st.format) +\
                " indices " + " ".join(str(i) for i in self.indices)+">"

class Template:
    """
    An IPFIX Template.
    
    A template is an ordered list of IPFIX Information Elements with an ID.
    
    """
    def __init__(self, tid = 0, iterable = None):
        if tid < 256 or tid > 65535:
            raise ValueError("bad template ID "+str(tid))
        
        self.tid = tid
        self.minlength = 0
        self.enclength = 0
        self.scopecount = 0
        self.varlenslice = None
        self.packplan = None
        
        self.ies = []
        if iterable:
            if not isinstance(iterable, ie.InformationElementList):
                iterable = ie.InformationElementList(iterable)
            for elem in iterable:
                self.append(elem)
        
    def __repr__(self):
        return "<Template ID "+str(self.tid)+" count "+ \
               str(len(self.ies))+" scope "+str(self.scopecount)+">"

    def identical_to(self, other):
        """
        Determine if two templates are identical to each other.

        Two templates are considered identical if they contain the same 
        IEs in the same order, and the same scope count. Template ID
        is not considered as part of the test for template identity.
        """

        # FIXME this needs to check IE lengths as well
        return (self.ies == other.ies) and (self.scopecount == other.scopecount)

    def append(self, ie):
        """Append an IE to this Template"""
        self.ies.append(ie)

        if ie.length == types.VARLEN:
            self.minlength += 1
            if self.varlenslice is None:
                self.varlenslice = len(self.ies) - 1
        else:
            self.minlength += ie.length

        self.enclength += _iespec_st.size
        if (ie.pen):
            self.enclength += _iepen_st.size

    def count(self):
        """Count IEs in this template"""
        return len(self.ies)

    def fixlen_count(self):
        """
        Count of fixed-length IEs in this template before the first
        variable-length IE; this is the size of the portion of the template
        which can be encoded/decoded efficiently.
        
        """
        if self.varlenslice is not None:
            return self.varlenslice
        else:
            return self.count()

    def finalize(self):
        """Compile a default packing plan. Called after append()ing all IEs."""
        self.packplan = TemplatePackingPlan(self, range(self.count()))

    @lru_cache(maxsize = 32)
    def packplan_for_ielist(self, ielist):
        """
        Given a list of IEs, devise and cache a packing plan.
        Used by the tuple interfaces.
        
        """
        return TemplatePackingPlan(self, [self.ies.index(ie) for ie in ielist])
    
    def decode_from(self, buf, offset, packplan = None):
        """Decodes a record into a tuple containing values in template order"""

        # use default packplan unless someone hacked us not to
        if not packplan:
            packplan = self.packplan
        
        # decode fixed values 
        vals = [f(v) for f, v in zip(packplan.valdec, packplan.st.unpack_from(buf, offset))]
        offset += packplan.st.size
        
        # short circuit on no varlen
        if not self.varlenslice:
            return (vals, offset)
        
        # direct iteration over remaining IEs
        for i, ie in zip(range(self.varlenslice, self.count()), 
                         self.ies[self.varlenslice:]):
            length = ie.length
            if length == types.VARLEN:
                (length, offset) = types.decode_varlen(buf, offset)
            if i in packplan.indices:
                vals.append(ie.type.decode_single_value_from(
                                    buf, offset, length))
            offset += length
            
        return (vals, offset)

    def decode_namedict_from(self, buf, offset, recinf = None):
        """Decodes a record from a buffer into a dict keyed by IE name."""
        (vals, offset) = self.decode_from(buf, offset)
        return ({ k: v for k,v in zip((ie.name for ie in self.ies), vals)}, offset)
        
    def decode_tuple_from(self, buf, offset, recinf = None):
        """
        Decodes a record from a buffer into a tuple,
        ordered as the IEs in the InformationElementList given as recinf.

        """
        if recinf:
            packplan = self.packplan_for_ielist(recinf)
        else:
            packplan = self.packplan
            
        (vals, offset) = self.decode_from(buf, offset, packplan = packplan)
        
        outvals = tuple(v for i,v in sorted(zip(packplan.ranks, vals)))

        # re-sort values in same order as packplan indices
        return (outvals, offset)
        
    def encode_to(self, buf, offset, vals, packplan = None):
        """Encodes a record from a tuple containing values in template order"""
                
        # use default packplan unless someone hacked us not to
        if not packplan:
            packplan = self.packplan
        
        # encode fixed values
        fixvals = [f(v) for f,v in zip(packplan.valenc, vals)]
        packplan.st.pack_into(buf, offset, *fixvals)
        offset += packplan.st.size

        # shortcircuit no varlen
        if not self.varlenslice:
            return offset

        # direct iteration over remaining IEs
        for i, ie, val in zip(range(self.varlenslice, self.count()),
                              self.ies[self.varlenslice:],
                              vals[self.varlenslice:]):
            if i in packplan.indices:
                #print("    encoding "+str(ie))
                if ie.length == types.VARLEN:
                    # FIXME this arrangement requires double-encode of varlen
                    # values, one to get the length, one to do the encode. 
                    # Fixing this requires a rearrangement of type encoding
                    # though. For now we'll just say that if you're exporting
                    # varlen you get to put up with some inefficiency. :)
                    offset = types.encode_varlen(buf, offset, 
                                                 len(ie.type.valenc(val)))
                offset = ie.type.encode_single_value_to(val, buf, offset)
                
        return offset
    
    def encode_namedict_to(self, buf, offset, rec, recinf = None):
        """Encodes a record from a dict containing values keyed by IE name"""
        return self.encode_to(buf, offset, [rec[ie.name] for ie in self.ies])
        
    def encode_tuple_to(self, buf, offset, rec, recinf = None):
        """
        Encodes a record from a tuple containing values ordered as the IEs 
        in the template.
         
        """
        return self.encode_to(buf, offset, rec)
    
    def encode_template_to(self, buf, offset, setid):
        """
        Encodes the template to a buffer.
        Encodes as a Template if setid is TEMPLATE_SET_ID,
        as an Options Template if setid is OPTIONS_SET_ID.
        
        """
        if setid == TEMPLATE_SET_ID:
            _tmplhdr_st.pack_into(buf, offset, self.tid, self.count())
            offset += _tmplhdr_st.size
        elif setid == OPTIONS_SET_ID:
            _otmplhdr_st.pack_into(buf, offset, self.tid, self.count(), self.scopecount)
            offset += _otmplhdr_st.size
        else:
            raise IpfixEncodeError("bad template set id "+str(setid))
            
        for e in self.ies:
            if e.pen:
                _iespec_st.pack_into(buf, offset, e.num | 0x8000, e.length)
                offset += _iespec_st.size
                _iepen_st.pack_into(buf, offset, e.pen)
                offset += _iepen_st.size
            else: 
                _iespec_st.pack_into(buf, offset, e.num, e.length)
                offset += _iespec_st.size
        
        return offset
    
    def native_setid(self):
        if self.scopecount:
            return OPTIONS_SET_ID
        else:
            return TEMPLATE_SET_ID

def withdrawal_length(setid):
    if setid == TEMPLATE_SET_ID:
        return _tmplhdr_st.size
    elif setid == OPTIONS_SET_ID:
        return _otmplhdr_st.size
    else:
        return IpfixEncodeError("bad template set id "+str(setid))
        
def encode_withdrawal_to(buf, offset, setid, tid):
    if setid == TEMPLATE_SET_ID:
        _tmplhdr_st.pack_into(buf, offset, tid, 0)
        offset += _tmplhdr_st.size
    elif setid == OPTIONS_SET_ID:
        _otmplhdr_st.pack_into(buf, offset, tid, 0, 0)
        offset += _otmplhdr_st.size
    else:
        raise IpfixEncodeError("bad template set id "+str(setid))
    
    return offset
    
def decode_template_from(buf, offset, setid):
    """
    Decodes a template from a buffer.
    Decodes as a Template if setid is TEMPLATE_SET_ID,
    as an Options Template if setid is OPTIONS_SET_ID.
    
    """
    if (setid == TEMPLATE_SET_ID) or (setid == V9_TEMPLATE_SET_ID):
        (tid, count) = _tmplhdr_st.unpack_from(buf, offset);
        scopecount = 0
        offset += _tmplhdr_st.size
    elif (setid == OPTIONS_SET_ID) or (setid == V9_OPTIONS_SET_ID):
        (tid, count, scopecount) = _otmplhdr_st.unpack_from(buf, offset);
        offset += _otmplhdr_st.size
    else:
        raise IpfixDecodeError("bad template set id "+str(setid))
        
    tmpl = Template(tid)
    tmpl.scopecount = scopecount
    
    while count:
        (num, length) = _iespec_st.unpack_from(buf, offset)
        offset += _iespec_st.size
        if num & 0x8000:
            num &= 0x7fff
            pen = _iepen_st.unpack_from(buf, offset)[0]
            offset += _iespec_st.size
        else:
            pen = 0
        tmpl.append(ie.for_template_entry(pen, num, length))
        count -= 1

    tmpl.finalize()

    return (tmpl, offset)
    
def from_ielist(tid, ielist):
    
    tmpl = Template(tid, ielist)
    
    tmpl.finalize()
    
    return tmpl

def for_specs(tid, *specs):
    """
    Create a template from a template ID and a list of IESpecs
    
    :param tid: Template ID, must be between 256 and 65535.
    :param *specs: List of IESpecs 
    :return: A new Template, ready to use for writing to a Message
              
    """
    return from_ielist(tid, ie.spec_list(specs))