File: writer.py

package info (click to toggle)
python-ipfix 0.9.7-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 380 kB
  • sloc: python: 1,825; makefile: 149
file content (138 lines) | stat: -rw-r--r-- 5,201 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#
# python-ipfix (c) 2013 Brian Trammell.
#
# Many thanks to the mPlane consortium (http://www.ict-mplane.eu) for
# its material support of this effort.
# 
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program.  If not, see <http://www.gnu.org/licenses/>.
#

from . import message

class MessageStreamWriter:
    """
    Writes records to a stream of IPFIX messages.
    
    Uses an :class:`ipfix.message.MessageBuffer` internally, and continually
    writes records into messages, exporting messages to the stream each time the 
    maximum message size (MTU) is reached. Use :func:`to_stream` to get an
    instance.
    
    Suitable for writing to IPFIX files (see :rfc:`5655`) as well as to TCP 
    sockets. When writing a stream to a file, use mode='wb'.
    
    ..warning: This class is not yet suitable for UDP export; this is an open
               issue to be fixed in a subsequent release.
    
    """
    def __init__(self, stream, mtu=65535):
        self.stream = stream
        self.msg = message.MessageBuffer()    
        self.msg.mtu = mtu
        self.msgcount = 0
        
    def _retry_after_flush(self, fn, *args, **kwargs):
        try:
            return fn(*args, **kwargs)
        except message.EndOfMessage:
            self.flush()
            return fn(*args, **kwargs)
    
    def set_domain(self, odid):
        """
        Sets the observation domain for subsequent messages sent with 
        this Writer.
        
        :param odid: Observation domain ID to use for export. Note that
                     templates are scoped to observation domain, so 
                     templates will need to be added after switching to a 
                     new observation domain ID.

        """
        if self.msg.export_needs_flush():
            self.msg.write_message(self.stream)
        self.msg.begin_export(odid)

    def add_template(self, tmpl):
        """
        Add a template to this Writer. Adding a template makes it 
        available for use for exporting records; see :meth:`set_export_template`. 
        
        :param tmpl: the template to add

        """
        self.msg.add_template(tmpl)
    
    def set_export_template(self, tid):
        """
        Set the template to be used for export by subsequent calls to
        :meth:`export_namedict` and :meth:`export_tuple`.

        :param tid: Template ID of the Template that will be used to encode 
                    records to the Writer. The corresponding Template must 
                    have already been added to the Writer, see 
                    :meth:`add_template`.
        """
        self.curtid = tid
        self._retry_after_flush(message.MessageBuffer.export_ensure_set, 
                               self.msg, self.curtid)
        
    def export_namedict(self, rec):
        """
        Export a record to the message, using the current template
        The record is a dictionary mapping IE names to values. The
        dictionary must contain a value for each IE in the template. Keys in the
        dictionary not in the template will be ignored.
        
        :param rec: the record to export, as a dictionary
        
        """
        self._retry_after_flush(message.MessageBuffer.export_ensure_set, 
                               self.msg, self.curtid)
        self._retry_after_flush(message.MessageBuffer.export_namedict, 
                               self.msg, rec)
            
    def export_tuple(self, rec):
        self._retry_after_flush(message.MessageBuffer.export_ensure_set, 
                                self.msg, self.curtid)
        self._retry_after_flush(message.MessageBuffer.export_tuple, 
                                self.msg, rec)
    
    def flush(self):
        """
        Export an in-progress Message immediately.
        
        Used internally to manage message boundaries, but
        can also be used to force immediate export (e.g. to reduce delay
        due to buffer dwell time), as well as to finish write operations on
        a Writer before closing the underlying stream.
        """
        
        setid = self.msg.cursetid
        self.msg.write_message(self.stream)
        self.msgcount += 1
        self.msg.begin_export()
        self.msg.export_ensure_set(setid)

def to_stream(stream, mtu=65535):
    """
    Get a MessageStreamWriter for a given stream
    
    :param stream: stream to write
    :param mtu: maximum message size in bytes; defaults to 65535,
                the largest possible ipfix message.
    :return: a :class:`MessageStreamWriter` wrapped around the stream.

    """
    return MessageStreamWriter(stream, mtu)