1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
|
package amqp
import (
"context"
"time"
"github.com/go-kit/kit/endpoint"
amqp "github.com/rabbitmq/amqp091-go"
)
// The golang AMQP implementation requires the []byte representation of
// correlation id strings to have a maximum length of 255 bytes.
const maxCorrelationIdLength = 255
// Publisher wraps an AMQP channel and queue, and provides a method that
// implements endpoint.Endpoint.
type Publisher struct {
ch Channel
q *amqp.Queue
enc EncodeRequestFunc
dec DecodeResponseFunc
before []RequestFunc
after []PublisherResponseFunc
deliverer Deliverer
timeout time.Duration
}
// NewPublisher constructs a usable Publisher for a single remote method.
func NewPublisher(
ch Channel,
q *amqp.Queue,
enc EncodeRequestFunc,
dec DecodeResponseFunc,
options ...PublisherOption,
) *Publisher {
p := &Publisher{
ch: ch,
q: q,
enc: enc,
dec: dec,
deliverer: DefaultDeliverer,
timeout: 10 * time.Second,
}
for _, option := range options {
option(p)
}
return p
}
// PublisherOption sets an optional parameter for clients.
type PublisherOption func(*Publisher)
// PublisherBefore sets the RequestFuncs that are applied to the outgoing AMQP
// request before it's invoked.
func PublisherBefore(before ...RequestFunc) PublisherOption {
return func(p *Publisher) { p.before = append(p.before, before...) }
}
// PublisherAfter sets the ClientResponseFuncs applied to the incoming AMQP
// request prior to it being decoded. This is useful for obtaining anything off
// of the response and adding onto the context prior to decoding.
func PublisherAfter(after ...PublisherResponseFunc) PublisherOption {
return func(p *Publisher) { p.after = append(p.after, after...) }
}
// PublisherDeliverer sets the deliverer function that the Publisher invokes.
func PublisherDeliverer(deliverer Deliverer) PublisherOption {
return func(p *Publisher) { p.deliverer = deliverer }
}
// PublisherTimeout sets the available timeout for an AMQP request.
func PublisherTimeout(timeout time.Duration) PublisherOption {
return func(p *Publisher) { p.timeout = timeout }
}
// Endpoint returns a usable endpoint that invokes the remote endpoint.
func (p Publisher) Endpoint() endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
ctx, cancel := context.WithTimeout(ctx, p.timeout)
defer cancel()
pub := amqp.Publishing{
ReplyTo: p.q.Name,
CorrelationId: randomString(randInt(5, maxCorrelationIdLength)),
}
if err := p.enc(ctx, &pub, request); err != nil {
return nil, err
}
for _, f := range p.before {
// Affect only amqp.Publishing
ctx = f(ctx, &pub, nil)
}
deliv, err := p.deliverer(ctx, p, &pub)
if err != nil {
return nil, err
}
for _, f := range p.after {
ctx = f(ctx, deliv)
}
response, err := p.dec(ctx, deliv)
if err != nil {
return nil, err
}
return response, nil
}
}
// Deliverer is invoked by the Publisher to publish the specified Publishing, and to
// retrieve the appropriate response Delivery object.
type Deliverer func(
context.Context,
Publisher,
*amqp.Publishing,
) (*amqp.Delivery, error)
// DefaultDeliverer is a deliverer that publishes the specified Publishing
// and returns the first Delivery object with the matching correlationId.
// If the context times out while waiting for a reply, an error will be returned.
func DefaultDeliverer(
ctx context.Context,
p Publisher,
pub *amqp.Publishing,
) (*amqp.Delivery, error) {
err := p.ch.Publish(
getPublishExchange(ctx),
getPublishKey(ctx),
false, //mandatory
false, //immediate
*pub,
)
if err != nil {
return nil, err
}
autoAck := getConsumeAutoAck(ctx)
msg, err := p.ch.Consume(
p.q.Name,
"", //consumer
autoAck,
false, //exclusive
false, //noLocal
false, //noWait
getConsumeArgs(ctx),
)
if err != nil {
return nil, err
}
for {
select {
case d := <-msg:
if d.CorrelationId == pub.CorrelationId {
if !autoAck {
d.Ack(false) //multiple
}
return &d, nil
}
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
// SendAndForgetDeliverer delivers the supplied publishing and
// returns a nil response.
// When using this deliverer please ensure that the supplied DecodeResponseFunc and
// PublisherResponseFunc are able to handle nil-type responses.
func SendAndForgetDeliverer(
ctx context.Context,
p Publisher,
pub *amqp.Publishing,
) (*amqp.Delivery, error) {
err := p.ch.Publish(
getPublishExchange(ctx),
getPublishKey(ctx),
false, //mandatory
false, //immediate
*pub,
)
return nil, err
}
|