File: raw_dump.py

package info (click to toggle)
python-openflow 2021.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,224 kB
  • sloc: python: 6,906; sh: 4; makefile: 4
file content (63 lines) | stat: -rw-r--r-- 1,687 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
"""Help reading raw dump files."""
from pyof.v0x01.common.header import Header
from pyof.v0x01.common.utils import new_message_from_header


class RawDump:
    """A helper to deal with paths and reading raw files.

    Attributes:
        content (bytes): Raw file's content.
    """

    _HEADER_BYTES = 8  # According to OF Protocol specification

    def __init__(self, version, basename):
        """Information to locate the dump file.

        Args:
            version (str): OpenFlow protocol version, e.g. ``v0x01``.
            basename (str): Only the filename without extension.
                E.g. ``ofpt_echo_reply``.
        """
        self._path = 'raw/{}/{}.dat'.format(version, basename)

    def __repr__(self):
        return repr(self.unpack())

    def __bytes__(self):
        return self.read()

    def read(self):
        """Read the raw file.

        Returns:
            bytes: Raw file's content.
        """
        with open(self._path, 'rb') as file:
            return file.read()

    def unpack(self):
        """Unpack header and message from a byte sequence.

        Returns:
            The object type specified in the header with the corresponding
            header.
        """
        content = self.read()
        raw_header = content[:self._HEADER_BYTES]
        header = _unpack_header(raw_header)
        raw_msg = content[self._HEADER_BYTES:header.length.value]
        return _unpack_message(header, raw_msg)


def _unpack_header(raw_header):
    header = Header()
    header.unpack(raw_header)
    return header


def _unpack_message(header, raw_msg):
    msg = new_message_from_header(header)
    msg.unpack(raw_msg)
    return msg