File: header.py

package info (click to toggle)
python-pamqp 3.3.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 708 kB
  • sloc: python: 7,042; makefile: 135; xml: 85; sh: 6
file content (121 lines) | stat: -rw-r--r-- 4,076 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# -*- encoding: utf-8 -*-
"""
AMQP Header Class Definitions

For encoding AMQP Header frames into binary AMQP stream data and decoding AMQP
binary data into AMQP Header frames.

"""
import struct
import typing

from pamqp import commands, constants, decode

BasicProperties = typing.Optional[commands.Basic.Properties]


class ProtocolHeader:
    """Class that represents the AMQP Protocol Header"""
    name = 'ProtocolHeader'

    def __init__(self,
                 major_version: int = constants.VERSION[0],
                 minor_version: int = constants.VERSION[1],
                 revision: int = constants.VERSION[2]):
        """Construct a Protocol Header frame object for the specified AMQP
        version.

        :param major_version: The AMQP major version (``0``)
        :param minor_version: The AMQP major version (``9``)
        :param revision: The AMQP major version (``1``)

        """
        self.major_version = major_version
        self.minor_version = minor_version
        self.revision = revision

    def marshal(self) -> bytes:
        """Return the full AMQP wire protocol frame data representation of the
        ProtocolHeader frame.

        """
        return constants.AMQP + struct.pack('BBBB', 0, self.major_version,
                                            self.minor_version, self.revision)

    def unmarshal(self, data: bytes) -> int:
        """Dynamically decode the frame data applying the values to the method
        object by iterating through the attributes in order and decoding them.

        :param data: The frame value to unpack
        :raises: ValueError

        """
        try:
            (self.major_version, self.minor_version,
             self.revision) = struct.unpack('BBB', data[5:8])
        except struct.error:
            raise ValueError(
                'Could not unpack protocol header from {!r}'.format(data))
        return 8


class ContentHeader:
    """Represent a content header frame

    A Content Header frame is received after a Basic.Deliver or Basic.GetOk
    frame and has the data and properties for the Content Body frames that
    follow.

    """
    name = 'ContentHeader'

    def __init__(self,
                 weight: int = 0,
                 body_size: int = 0,
                 properties: typing.Optional[BasicProperties] = None):
        """Initialize the Exchange.DeleteOk class

        Weight is unused and must be `0`

        :param weight: The unused content weight field
        :param body_size: The size in bytes of the message body
        :param properties: The message properties

        """
        self.class_id = None
        self.weight = weight
        self.body_size = body_size
        self.properties = properties or commands.Basic.Properties()

    def marshal(self) -> bytes:
        """Return the AMQP binary encoded value of the frame"""
        return struct.pack('>HxxQ', commands.Basic.frame_id,
                           self.body_size) + self.properties.marshal()

    def unmarshal(self, data: bytes) -> None:
        """Dynamically decode the frame data applying the values to the method
        object by iterating through the attributes in order and decoding them.

        :param data: The raw frame data to unmarshal

        """
        self.class_id, self.weight, self.body_size = struct.unpack(
            '>HHQ', data[0:12])
        offset, flags = self._get_flags(data[12:])
        self.properties.unmarshal(flags, data[12 + offset:])

    @staticmethod
    def _get_flags(data: bytes) -> typing.Tuple[int, int]:
        """Decode the flags from the data returning the bytes consumed and
        flags.

        """
        bytes_consumed, flags, flagword_index = 0, 0, 0
        while True:
            consumed, partial_flags = decode.short_int(data)
            bytes_consumed += consumed
            flags |= (partial_flags << (flagword_index * 16))
            if not partial_flags & 1:  # pragma: nocover
                break
            flagword_index += 1  # pragma: nocover
        return bytes_consumed, flags