1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
|
# -*- encoding: utf-8 -*-
"""Manage the marshaling and unmarshaling of AMQP frames
unmarshal will turn a raw AMQP byte stream into the appropriate AMQP objects
from the specification file.
marshal will take an object created from the specification file and turn it
into a raw byte stream.
"""
import logging
import struct
import typing
from pamqp import (base, body, commands, common, constants, decode, exceptions,
header, heartbeat)
LOGGER = logging.getLogger(__name__)
UNMARSHAL_FAILURE = 0, 0, None
FrameTypes = typing.Union[base.Frame, body.ContentBody, header.ContentHeader,
header.ProtocolHeader, heartbeat.Heartbeat]
def marshal(frame_value: FrameTypes, channel_id: int) -> bytes:
"""Marshal a frame to be sent over the wire.
:raises: ValueError
"""
if isinstance(frame_value, header.ProtocolHeader):
return frame_value.marshal()
elif isinstance(frame_value, base.Frame):
return _marshal_method_frame(frame_value, channel_id)
elif isinstance(frame_value, header.ContentHeader):
return _marshal_content_header_frame(frame_value, channel_id)
elif isinstance(frame_value, body.ContentBody):
return _marshal_content_body_frame(frame_value, channel_id)
elif isinstance(frame_value, heartbeat.Heartbeat):
return frame_value.marshal()
raise ValueError('Could not determine frame type: {}'.format(frame_value))
def unmarshal(data_in: bytes) -> typing.Tuple[int, int, FrameTypes]:
"""Takes in binary data and maps builds the appropriate frame type,
returning a frame object.
:returns: tuple of bytes consumed, channel, and a frame object
:raises: exceptions.UnmarshalingException
"""
try: # Look to see if it's a protocol header frame
value = _unmarshal_protocol_header_frame(data_in)
except ValueError as error:
raise exceptions.UnmarshalingException(header.ProtocolHeader, error)
else:
if value:
return 8, 0, value
frame_type, channel_id, frame_size = frame_parts(data_in)
# Heartbeats do not have frame length indicators
if frame_type == constants.FRAME_HEARTBEAT and frame_size == 0:
return 8, channel_id, heartbeat.Heartbeat()
if not frame_size:
raise exceptions.UnmarshalingException('Unknown', 'No frame size')
byte_count = constants.FRAME_HEADER_SIZE + frame_size + 1
if byte_count > len(data_in):
raise exceptions.UnmarshalingException('Unknown',
'Not all data received')
if data_in[byte_count - 1] != constants.FRAME_END:
raise exceptions.UnmarshalingException('Unknown', 'Last byte error')
frame_data = data_in[constants.FRAME_HEADER_SIZE:byte_count - 1]
if frame_type == constants.FRAME_METHOD:
return byte_count, channel_id, _unmarshal_method_frame(frame_data)
elif frame_type == constants.FRAME_HEADER:
return byte_count, channel_id, _unmarshal_header_frame(frame_data)
elif frame_type == constants.FRAME_BODY:
return byte_count, channel_id, _unmarshal_body_frame(frame_data)
raise exceptions.UnmarshalingException(
'Unknown', 'Unknown frame type: {}'.format(frame_type))
def frame_parts(data: bytes) -> typing.Tuple[int, int, typing.Optional[int]]:
"""Attempt to decode a low-level frame, returning frame parts"""
try: # Get the Frame Type, Channel Number and Frame Size
return struct.unpack('>BHI', data[0:constants.FRAME_HEADER_SIZE])
except struct.error: # Did not receive a full frame
return UNMARSHAL_FAILURE
def _marshal(frame_type: int, channel_id: int, payload: bytes) -> bytes:
"""Marshal the low-level AMQ frame"""
return b''.join([
struct.pack('>BHI', frame_type, channel_id, len(payload)), payload,
constants.FRAME_END_CHAR
])
def _marshal_content_body_frame(value: body.ContentBody,
channel_id: int) -> bytes:
"""Marshal as many content body frames as needed to transmit the content"""
return _marshal(constants.FRAME_BODY, channel_id, value.marshal())
def _marshal_content_header_frame(value: header.ContentHeader,
channel_id: int) -> bytes:
"""Marshal a content header frame"""
return _marshal(constants.FRAME_HEADER, channel_id, value.marshal())
def _marshal_method_frame(value: base.Frame, channel_id: int) -> bytes:
"""Marshal a method frame"""
return _marshal(constants.FRAME_METHOD, channel_id,
common.Struct.integer.pack(value.index) + value.marshal())
def _unmarshal_protocol_header_frame(data_in: bytes) \
-> typing.Optional[header.ProtocolHeader]:
"""Attempt to unmarshal a protocol header frame
The ProtocolHeader is abbreviated in size and functionality compared to
the rest of the frame types, so return UNMARSHAL_ERROR doesn't apply
as cleanly since we don't have all of the attributes to return even
regardless of success or failure.
:raises: ValueError
"""
if data_in[0:4] == constants.AMQP: # Do the first four bytes match?
frame = header.ProtocolHeader()
frame.unmarshal(data_in)
return frame
return None
def _unmarshal_method_frame(frame_data: bytes) -> base.Frame:
"""Attempt to unmarshal a method frame
:raises: pamqp.exceptions.UnmarshalingException
"""
bytes_used, method_index = decode.long_int(frame_data[0:4])
try:
method = commands.INDEX_MAPPING[method_index]()
except KeyError:
raise exceptions.UnmarshalingException(
'Unknown', 'Unknown method index: {}'.format(str(method_index)))
try:
method.unmarshal(frame_data[bytes_used:])
except struct.error as error:
raise exceptions.UnmarshalingException(method, error)
return method
def _unmarshal_header_frame(frame_data: bytes) -> header.ContentHeader:
"""Attempt to unmarshal a header frame
:raises: pamqp.exceptions.UnmarshalingException
"""
content_header = header.ContentHeader()
try:
content_header.unmarshal(frame_data)
except struct.error as error:
raise exceptions.UnmarshalingException('ContentHeader', error)
return content_header
def _unmarshal_body_frame(frame_data: bytes) -> body.ContentBody:
"""Attempt to unmarshal a body frame"""
content_body = body.ContentBody(b'')
content_body.unmarshal(frame_data)
return content_body
|