File: helpers_test.go

package info (click to toggle)
golang-github-azure-go-amqp 1.0.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, trixie
  • size: 4,192 kB
  • sloc: makefile: 22
file content (113 lines) | stat: -rw-r--r-- 4,061 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package amqp

import (
	"fmt"

	"github.com/Azure/go-amqp/internal/encoding"
	"github.com/Azure/go-amqp/internal/fake"
	"github.com/Azure/go-amqp/internal/frames"
	"github.com/stretchr/testify/require"
)

func newResponse(b []byte, err error) (fake.Response, error) {
	if err != nil {
		return fake.Response{}, err
	}
	return fake.Response{Payload: b}, nil
}

func sendInitialFlowFrame(t require.TestingT, channel uint16, netConn *fake.NetConn, handle uint32, credit uint32) {
	nextIncoming := uint32(0)
	count := uint32(0)
	available := uint32(0)
	b, err := fake.EncodeFrame(frames.TypeAMQP, 0, &frames.PerformFlow{
		NextIncomingID: &nextIncoming,
		IncomingWindow: 1000000,
		OutgoingWindow: 1000000,
		NextOutgoingID: nextIncoming + 1,
		Handle:         &handle,
		DeliveryCount:  &count,
		LinkCredit:     &credit,
		Available:      &available,
	})
	require.NoError(t, err)
	netConn.SendFrame(b)
}

// standard frame handler for connecting/disconnecting etc.
// returns zero-value, nil for unhandled frames.
func senderFrameHandler(channel uint16, ssm encoding.SenderSettleMode) func(uint16, frames.FrameBody) (fake.Response, error) {
	return func(remoteChannel uint16, req frames.FrameBody) (fake.Response, error) {
		switch tt := req.(type) {
		case *fake.AMQPProto:
			return newResponse(fake.ProtoHeader(fake.ProtoAMQP))
		case *frames.PerformOpen:
			return newResponse(fake.PerformOpen("container"))
		case *frames.PerformClose:
			return newResponse(fake.PerformClose(nil))
		case *frames.PerformBegin:
			return newResponse(fake.PerformBegin(channel, remoteChannel))
		case *frames.PerformEnd:
			return newResponse(fake.PerformEnd(channel, nil))
		case *frames.PerformAttach:
			return newResponse(fake.SenderAttach(channel, tt.Name, 0, ssm))
		case *frames.PerformDetach:
			return newResponse(fake.PerformDetach(channel, 0, nil))
		default:
			return fake.Response{}, nil
		}
	}
}

// similar to senderFrameHandler but returns an error on unhandled frames
func senderFrameHandlerNoUnhandled(channel uint16, ssm encoding.SenderSettleMode) func(uint16, frames.FrameBody) (fake.Response, error) {
	return func(remoteChannel uint16, req frames.FrameBody) (fake.Response, error) {
		resp, err := senderFrameHandler(channel, ssm)(remoteChannel, req)
		if resp.Payload == nil && err == nil {
			return fake.Response{}, fmt.Errorf("unhandled frame %T", req)
		}
		return resp, err
	}
}

// standard frame handler for connecting/disconnecting etc.
// returns zero-value, nil for unhandled frames.
func receiverFrameHandler(channel uint16, rsm encoding.ReceiverSettleMode) func(uint16, frames.FrameBody) (fake.Response, error) {
	return func(remoteChannel uint16, req frames.FrameBody) (fake.Response, error) {
		switch tt := req.(type) {
		case *fake.AMQPProto:
			return newResponse(fake.ProtoHeader(fake.ProtoAMQP))
		case *frames.PerformOpen:
			return newResponse(fake.PerformOpen("container"))
		case *frames.PerformClose:
			return newResponse(fake.PerformClose(nil))
		case *frames.PerformBegin:
			return newResponse(fake.PerformBegin(channel, remoteChannel))
		case *frames.PerformEnd:
			return newResponse(fake.PerformEnd(channel, nil))
		case *frames.PerformAttach:
			return newResponse(fake.ReceiverAttach(channel, tt.Name, 0, rsm, tt.Source.Filter))
		case *frames.PerformDetach:
			return newResponse(fake.PerformDetach(channel, 0, nil))
		default:
			return fake.Response{}, nil
		}
	}
}

// similar to receiverFrameHandler but returns an error on unhandled frames
// NOTE: consumes flow frames
func receiverFrameHandlerNoUnhandled(channel uint16, rsm encoding.ReceiverSettleMode) func(uint16, frames.FrameBody) (fake.Response, error) {
	return func(remoteChannel uint16, req frames.FrameBody) (fake.Response, error) {
		resp, err := receiverFrameHandler(channel, rsm)(remoteChannel, req)
		if resp.Payload != nil || err != nil {
			return resp, err
		}
		switch req.(type) {
		case *frames.PerformFlow, *fake.KeepAlive:
			return fake.Response{}, nil
		default:
			return fake.Response{}, fmt.Errorf("unhandled frame %T", req)
		}
	}
}