1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
|
package amqp
import (
"context"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
// RequestFunc may take information from a publisher request and put it into a
// request context. In Subscribers, RequestFuncs are executed prior to invoking
// the endpoint.
type RequestFunc func(context.Context, *amqp.Publishing, *amqp.Delivery) context.Context
// SubscriberResponseFunc may take information from a request context and use it to
// manipulate a Publisher. SubscriberResponseFuncs are only executed in
// subscribers, after invoking the endpoint but prior to publishing a reply.
type SubscriberResponseFunc func(context.Context,
*amqp.Delivery,
Channel,
*amqp.Publishing,
) context.Context
// PublisherResponseFunc may take information from an AMQP request and make the
// response available for consumption. PublisherResponseFunc are only executed
// in publishers, after a request has been made, but prior to it being decoded.
type PublisherResponseFunc func(context.Context, *amqp.Delivery) context.Context
// SetPublishExchange returns a RequestFunc that sets the Exchange field
// of an AMQP Publish call.
func SetPublishExchange(publishExchange string) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
return context.WithValue(ctx, ContextKeyExchange, publishExchange)
}
}
// SetPublishKey returns a RequestFunc that sets the Key field
// of an AMQP Publish call.
func SetPublishKey(publishKey string) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
return context.WithValue(ctx, ContextKeyPublishKey, publishKey)
}
}
// SetPublishDeliveryMode sets the delivery mode of a Publishing.
// Please refer to AMQP delivery mode constants in the AMQP package.
func SetPublishDeliveryMode(dmode uint8) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
pub.DeliveryMode = dmode
return ctx
}
}
// SetNackSleepDuration returns a RequestFunc that sets the amount of time
// to sleep in the event of a Nack.
// This has to be used in conjunction with an error encoder that Nack and sleeps.
// One example is the SingleNackRequeueErrorEncoder.
// It is designed to be used by Subscribers.
func SetNackSleepDuration(duration time.Duration) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
return context.WithValue(ctx, ContextKeyNackSleepDuration, duration)
}
}
// SetConsumeAutoAck returns a RequestFunc that sets whether or not to autoAck
// messages when consuming.
// When set to false, the publisher will Ack the first message it receives with
// a matching correlationId.
// It is designed to be used by Publishers.
func SetConsumeAutoAck(autoAck bool) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
return context.WithValue(ctx, ContextKeyAutoAck, autoAck)
}
}
// SetConsumeArgs returns a RequestFunc that set the arguments for amqp Consume
// function.
// It is designed to be used by Publishers.
func SetConsumeArgs(args amqp.Table) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
return context.WithValue(ctx, ContextKeyConsumeArgs, args)
}
}
// SetContentType returns a RequestFunc that sets the ContentType field of
// an AMQP Publishing.
func SetContentType(contentType string) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
pub.ContentType = contentType
return ctx
}
}
// SetContentEncoding returns a RequestFunc that sets the ContentEncoding field
// of an AMQP Publishing.
func SetContentEncoding(contentEncoding string) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
pub.ContentEncoding = contentEncoding
return ctx
}
}
// SetCorrelationID returns a RequestFunc that sets the CorrelationId field
// of an AMQP Publishing.
func SetCorrelationID(cid string) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
pub.CorrelationId = cid
return ctx
}
}
// SetAckAfterEndpoint returns a SubscriberResponseFunc that prompts the service
// to Ack the Delivery object after successfully evaluating the endpoint,
// and before it encodes the response.
// It is designed to be used by Subscribers.
func SetAckAfterEndpoint(multiple bool) SubscriberResponseFunc {
return func(ctx context.Context,
deliv *amqp.Delivery,
ch Channel,
pub *amqp.Publishing,
) context.Context {
deliv.Ack(multiple)
return ctx
}
}
func getPublishExchange(ctx context.Context) string {
if exchange := ctx.Value(ContextKeyExchange); exchange != nil {
return exchange.(string)
}
return ""
}
func getPublishKey(ctx context.Context) string {
if publishKey := ctx.Value(ContextKeyPublishKey); publishKey != nil {
return publishKey.(string)
}
return ""
}
func getNackSleepDuration(ctx context.Context) time.Duration {
if duration := ctx.Value(ContextKeyNackSleepDuration); duration != nil {
return duration.(time.Duration)
}
return 0
}
func getConsumeAutoAck(ctx context.Context) bool {
if autoAck := ctx.Value(ContextKeyAutoAck); autoAck != nil {
return autoAck.(bool)
}
return false
}
func getConsumeArgs(ctx context.Context) amqp.Table {
if args := ctx.Value(ContextKeyConsumeArgs); args != nil {
return args.(amqp.Table)
}
return nil
}
type contextKey int
const (
// ContextKeyExchange is the value of the reply Exchange in
// amqp.Publish.
ContextKeyExchange contextKey = iota
// ContextKeyPublishKey is the value of the ReplyTo field in
// amqp.Publish.
ContextKeyPublishKey
// ContextKeyNackSleepDuration is the duration to sleep for if the
// service Nack and requeues a message.
// This is to prevent sporadic send-resending of message
// when a message is constantly Nack'd and requeued.
ContextKeyNackSleepDuration
// ContextKeyAutoAck is the value of autoAck field when calling
// amqp.Channel.Consume.
ContextKeyAutoAck
// ContextKeyConsumeArgs is the value of consumeArgs field when calling
// amqp.Channel.Consume.
ContextKeyConsumeArgs
)
|