File: frame.py

package info (click to toggle)
python-aioamqp 0.15.0-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 456 kB
  • sloc: python: 2,741; makefile: 187
file content (99 lines) | stat: -rw-r--r-- 2,879 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
"""
    Helper class to decode AMQP responses

AMQP Frame implementations


0      1         3         7                      size+7      size+8
+------+---------+---------+    +-------------+     +-----------+
| type | channel |   size  |    |   payload   |     | frame-end |
+------+---------+---------+    +-------------+     +-----------+
 octets   short     long         'size' octets          octet

The frame-end octet MUST always be the hexadecimal value %xCE

type:

Type = 1, "METHOD": method frame.
Type = 2, "HEADER": content header frame.
Type = 3, "BODY": content body frame.
Type = 4, "HEARTBEAT": heartbeat frame.


Method Payload

0          2           4
+----------+-----------+-------------- - -
| class-id | method-id | arguments...
+----------+-----------+-------------- - -
    short     short       ...

Content Payload

0          2        4           12               14
+----------+--------+-----------+----------------+------------- - -
| class-id | weight | body size | property flags | property list...
+----------+--------+-----------+----------------+------------- - -
   short     short    long long       short        remainder...

"""

import asyncio

import pamqp.encode
import pamqp.frame

from . import exceptions
from . import constants as amqp_constants


DUMP_FRAMES = False


def write(writer, channel, encoder):
    """Writes the built frame from the encoder

        writer:     asyncio StreamWriter
        channel:    amqp Channel identifier
        encoder:    frame encoder from pamqp which can be marshalled

    Returns int, the number of bytes written.
    """
    return writer.write(pamqp.frame.marshal(encoder, channel))


async def read(reader):
    """Read a new frame from the wire

        reader:     asyncio StreamReader

    Returns (channel, frame) a tuple containing both channel and the pamqp frame,
                             the object describing the frame
    """
    if not reader:
        raise exceptions.AmqpClosedConnection()
    try:
        data = await reader.readexactly(7)
    except (asyncio.IncompleteReadError, OSError) as ex:
        raise exceptions.AmqpClosedConnection() from ex

    frame_type, channel, frame_length = pamqp.frame.frame_parts(data)

    payload_data = await reader.readexactly(frame_length)
    frame = None

    if frame_type == amqp_constants.TYPE_METHOD:
        frame = pamqp.frame._unmarshal_method_frame(payload_data)

    elif frame_type == amqp_constants.TYPE_HEADER:
        frame = pamqp.frame._unmarshal_header_frame(payload_data)

    elif frame_type == amqp_constants.TYPE_BODY:
        frame = pamqp.frame._unmarshal_body_frame(payload_data)

    elif frame_type == amqp_constants.TYPE_HEARTBEAT:
        frame = pamqp.heartbeat.Heartbeat()

    frame_end = await reader.readexactly(1)
    assert frame_end == amqp_constants.FRAME_END
    return channel, frame