1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
|
package frames
import (
"encoding/binary"
"errors"
"fmt"
"math"
"github.com/Azure/go-amqp/internal/buffer"
"github.com/Azure/go-amqp/internal/encoding"
)
const HeaderSize = 8
// Frame structure:
//
// header (8 bytes)
// 0-3: SIZE (total size, at least 8 bytes for header, uint32)
// 4: DOFF (data offset,at least 2, count of 4 bytes words, uint8)
// 5: TYPE (frame type)
// 0x0: AMQP
// 0x1: SASL
// 6-7: type dependent (channel for AMQP)
// extended header (opt)
// body (opt)
// Header in a structure appropriate for use with binary.Read()
type Header struct {
// size: an unsigned 32-bit integer that MUST contain the total frame size of the frame header,
// extended header, and frame body. The frame is malformed if the size is less than the size of
// the frame header (8 bytes).
Size uint32
// doff: gives the position of the body within the frame. The value of the data offset is an
// unsigned, 8-bit integer specifying a count of 4-byte words. Due to the mandatory 8-byte
// frame header, the frame is malformed if the value is less than 2.
DataOffset uint8
FrameType uint8
Channel uint16
}
// ParseHeader reads the header from r and returns the result.
//
// No validation is done.
func ParseHeader(r *buffer.Buffer) (Header, error) {
buf, ok := r.Next(8)
if !ok {
return Header{}, errors.New("invalid frameHeader")
}
_ = buf[7]
fh := Header{
Size: binary.BigEndian.Uint32(buf[0:4]),
DataOffset: buf[4],
FrameType: buf[5],
Channel: binary.BigEndian.Uint16(buf[6:8]),
}
if fh.Size < HeaderSize {
return fh, fmt.Errorf("received frame header with invalid size %d", fh.Size)
}
if fh.DataOffset < 2 {
return fh, fmt.Errorf("received frame header with invalid data offset %d", fh.DataOffset)
}
return fh, nil
}
// ParseBody reads and unmarshals an AMQP frame.
func ParseBody(r *buffer.Buffer) (FrameBody, error) {
payload := r.Bytes()
if r.Len() < 3 || payload[0] != 0 || encoding.AMQPType(payload[1]) != encoding.TypeCodeSmallUlong {
return nil, errors.New("invalid frame body header")
}
switch pType := encoding.AMQPType(payload[2]); pType {
case encoding.TypeCodeOpen:
t := new(PerformOpen)
err := t.Unmarshal(r)
return t, err
case encoding.TypeCodeBegin:
t := new(PerformBegin)
err := t.Unmarshal(r)
return t, err
case encoding.TypeCodeAttach:
t := new(PerformAttach)
err := t.Unmarshal(r)
return t, err
case encoding.TypeCodeFlow:
t := new(PerformFlow)
err := t.Unmarshal(r)
return t, err
case encoding.TypeCodeTransfer:
t := new(PerformTransfer)
err := t.Unmarshal(r)
return t, err
case encoding.TypeCodeDisposition:
t := new(PerformDisposition)
err := t.Unmarshal(r)
return t, err
case encoding.TypeCodeDetach:
t := new(PerformDetach)
err := t.Unmarshal(r)
return t, err
case encoding.TypeCodeEnd:
t := new(PerformEnd)
err := t.Unmarshal(r)
return t, err
case encoding.TypeCodeClose:
t := new(PerformClose)
err := t.Unmarshal(r)
return t, err
case encoding.TypeCodeSASLMechanism:
t := new(SASLMechanisms)
err := t.Unmarshal(r)
return t, err
case encoding.TypeCodeSASLChallenge:
t := new(SASLChallenge)
err := t.Unmarshal(r)
return t, err
case encoding.TypeCodeSASLOutcome:
t := new(SASLOutcome)
err := t.Unmarshal(r)
return t, err
default:
return nil, fmt.Errorf("unknown performative type %02x", pType)
}
}
// Write encodes fr into buf.
// split out from conn.WriteFrame for testing purposes.
func Write(buf *buffer.Buffer, fr Frame) error {
// write header
buf.Append([]byte{
0, 0, 0, 0, // size, overwrite later
2, // doff, see frameHeader.DataOffset comment
uint8(fr.Type), // frame type
})
buf.AppendUint16(fr.Channel) // channel
// write AMQP frame body
err := encoding.Marshal(buf, fr.Body)
if err != nil {
return err
}
// validate size
if uint(buf.Len()) > math.MaxUint32 {
return errors.New("frame too large")
}
// retrieve raw bytes
bufBytes := buf.Bytes()
// write correct size
binary.BigEndian.PutUint32(bufBytes, uint32(len(bufBytes)))
return nil
}
|