1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505
|
package amqp
import (
"context"
"encoding/binary"
"errors"
"fmt"
"sync"
"github.com/Azure/go-amqp/internal/buffer"
"github.com/Azure/go-amqp/internal/debug"
"github.com/Azure/go-amqp/internal/encoding"
"github.com/Azure/go-amqp/internal/frames"
)
// Sender sends messages on a single AMQP link.
type Sender struct {
l link
transfers chan transferEnvelope // sender uses to send transfer frames
mu sync.Mutex // protects buf and nextDeliveryTag
buf buffer.Buffer
nextDeliveryTag uint64
rollback chan struct{}
}
// LinkName() is the name of the link used for this Sender.
func (s *Sender) LinkName() string {
return s.l.key.name
}
// MaxMessageSize is the maximum size of a single message.
func (s *Sender) MaxMessageSize() uint64 {
return s.l.maxMessageSize
}
// SendOptions contains any optional values for the Sender.Send method.
type SendOptions struct {
// Indicates the message is to be sent as settled when settlement mode is SenderSettleModeMixed.
// If the settlement mode is SenderSettleModeUnsettled and Settled is true, an error is returned.
Settled bool
}
// Send sends a Message.
//
// Blocks until the message is sent or an error occurs. If the peer is
// configured for receiver settlement mode second, the call also blocks
// until the peer confirms message settlement.
//
// - ctx controls waiting for the message to be sent and possibly confirmed
// - msg is the message to send
// - opts contains optional values, pass nil to accept the defaults
//
// If the context's deadline expires or is cancelled before the operation
// completes, the message is in an unknown state of transmission.
//
// Send is safe for concurrent use. Since only a single message can be
// sent on a link at a time, this is most useful when settlement confirmation
// has been requested (receiver settle mode is second). In this case,
// additional messages can be sent while the current goroutine is waiting
// for the confirmation.
func (s *Sender) Send(ctx context.Context, msg *Message, opts *SendOptions) error {
// check if the link is dead. while it's safe to call s.send
// in this case, this will avoid some allocations etc.
select {
case <-s.l.done:
return s.l.doneErr
default:
// link is still active
}
done, err := s.send(ctx, msg, opts)
if err != nil {
return err
}
// wait for transfer to be confirmed
select {
case state := <-done:
if state, ok := state.(*encoding.StateRejected); ok {
if state.Error != nil {
return state.Error
}
return errors.New("the peer rejected the message without specifying an error")
}
return nil
case <-s.l.done:
return s.l.doneErr
case <-ctx.Done():
// TODO: if the message is not settled and we never received a disposition, how can we consider the message as sent?
return ctx.Err()
}
}
// send is separated from Send so that the mutex unlock can be deferred without
// locking the transfer confirmation that happens in Send.
func (s *Sender) send(ctx context.Context, msg *Message, opts *SendOptions) (chan encoding.DeliveryState, error) {
const (
maxDeliveryTagLength = 32
maxTransferFrameHeader = 66 // determined by calcMaxTransferFrameHeader
)
if len(msg.DeliveryTag) > maxDeliveryTagLength {
return nil, &Error{
Condition: ErrCondMessageSizeExceeded,
Description: fmt.Sprintf("delivery tag is over the allowed %v bytes, len: %v", maxDeliveryTagLength, len(msg.DeliveryTag)),
}
}
s.mu.Lock()
defer s.mu.Unlock()
s.buf.Reset()
err := msg.Marshal(&s.buf)
if err != nil {
return nil, err
}
if s.l.maxMessageSize != 0 && uint64(s.buf.Len()) > s.l.maxMessageSize {
return nil, &Error{
Condition: ErrCondMessageSizeExceeded,
Description: fmt.Sprintf("encoded message size exceeds max of %d", s.l.maxMessageSize),
}
}
senderSettled := senderSettleModeValue(s.l.senderSettleMode) == SenderSettleModeSettled
if opts != nil {
if opts.Settled && senderSettleModeValue(s.l.senderSettleMode) == SenderSettleModeUnsettled {
return nil, errors.New("can't send message as settled when sender settlement mode is unsettled")
} else if opts.Settled {
senderSettled = true
}
}
var (
maxPayloadSize = int64(s.l.session.conn.peerMaxFrameSize) - maxTransferFrameHeader
)
deliveryTag := msg.DeliveryTag
if len(deliveryTag) == 0 {
// use uint64 encoded as []byte as deliveryTag
deliveryTag = make([]byte, 8)
binary.BigEndian.PutUint64(deliveryTag, s.nextDeliveryTag)
s.nextDeliveryTag++
}
fr := frames.PerformTransfer{
Handle: s.l.outputHandle,
DeliveryID: &needsDeliveryID,
DeliveryTag: deliveryTag,
MessageFormat: &msg.Format,
More: s.buf.Len() > 0,
}
for fr.More {
buf, _ := s.buf.Next(maxPayloadSize)
fr.Payload = append([]byte(nil), buf...)
fr.More = s.buf.Len() > 0
if !fr.More {
// SSM=settled: overrides RSM; no acks.
// SSM=unsettled: sender should wait for receiver to ack
// RSM=first: receiver considers it settled immediately, but must still send ack (SSM=unsettled only)
// RSM=second: receiver sends ack and waits for return ack from sender (SSM=unsettled only)
// mark final transfer as settled when sender mode is settled
fr.Settled = senderSettled
// set done on last frame
fr.Done = make(chan encoding.DeliveryState, 1)
}
// NOTE: we MUST send a copy of fr here since we modify it post send
frameCtx := frameContext{
Ctx: ctx,
Done: make(chan struct{}),
}
select {
case s.transfers <- transferEnvelope{FrameCtx: &frameCtx, InputHandle: s.l.inputHandle, Frame: fr}:
// frame was sent to our mux
case <-s.l.done:
return nil, s.l.doneErr
case <-ctx.Done():
return nil, &Error{Condition: ErrCondTransferLimitExceeded, Description: fmt.Sprintf("credit limit exceeded for sending link %s", s.l.key.name)}
}
select {
case <-frameCtx.Done:
if frameCtx.Err != nil {
if !fr.More {
select {
case s.rollback <- struct{}{}:
// the write never happened so signal the mux to roll back the delivery count and link credit
case <-s.l.close:
// the link is going down
}
}
return nil, frameCtx.Err
}
// frame was written to the network
case <-s.l.done:
return nil, s.l.doneErr
}
// clear values that are only required on first message
fr.DeliveryID = nil
fr.DeliveryTag = nil
fr.MessageFormat = nil
}
return fr.Done, nil
}
// Address returns the link's address.
func (s *Sender) Address() string {
if s.l.target == nil {
return ""
}
return s.l.target.Address
}
// Close closes the Sender and AMQP link.
// - ctx controls waiting for the peer to acknowledge the close
//
// If the context's deadline expires or is cancelled before the operation
// completes, an error is returned. However, the operation will continue to
// execute in the background. Subsequent calls will return a *LinkError
// that contains the context's error message.
func (s *Sender) Close(ctx context.Context) error {
return s.l.closeLink(ctx)
}
// newSendingLink creates a new sending link and attaches it to the session
func newSender(target string, session *Session, opts *SenderOptions) (*Sender, error) {
l := newLink(session, encoding.RoleSender)
l.target = &frames.Target{Address: target}
l.source = new(frames.Source)
s := &Sender{
l: l,
rollback: make(chan struct{}),
}
if opts == nil {
return s, nil
}
for _, v := range opts.Capabilities {
s.l.source.Capabilities = append(s.l.source.Capabilities, encoding.Symbol(v))
}
if opts.Durability > DurabilityUnsettledState {
return nil, fmt.Errorf("invalid Durability %d", opts.Durability)
}
s.l.source.Durable = opts.Durability
if opts.DynamicAddress {
s.l.target.Address = ""
s.l.dynamicAddr = opts.DynamicAddress
}
if opts.ExpiryPolicy != "" {
if err := encoding.ValidateExpiryPolicy(opts.ExpiryPolicy); err != nil {
return nil, err
}
s.l.source.ExpiryPolicy = opts.ExpiryPolicy
}
s.l.source.Timeout = opts.ExpiryTimeout
if opts.Name != "" {
s.l.key.name = opts.Name
}
if opts.Properties != nil {
s.l.properties = make(map[encoding.Symbol]any)
for k, v := range opts.Properties {
if k == "" {
return nil, errors.New("link property key must not be empty")
}
s.l.properties[encoding.Symbol(k)] = v
}
}
if opts.RequestedReceiverSettleMode != nil {
if rsm := *opts.RequestedReceiverSettleMode; rsm > ReceiverSettleModeSecond {
return nil, fmt.Errorf("invalid RequestedReceiverSettleMode %d", rsm)
}
s.l.receiverSettleMode = opts.RequestedReceiverSettleMode
}
if opts.SettlementMode != nil {
if ssm := *opts.SettlementMode; ssm > SenderSettleModeMixed {
return nil, fmt.Errorf("invalid SettlementMode %d", ssm)
}
s.l.senderSettleMode = opts.SettlementMode
}
s.l.source.Address = opts.SourceAddress
for _, v := range opts.TargetCapabilities {
s.l.target.Capabilities = append(s.l.target.Capabilities, encoding.Symbol(v))
}
if opts.TargetDurability != DurabilityNone {
s.l.target.Durable = opts.TargetDurability
}
if opts.TargetExpiryPolicy != ExpiryPolicySessionEnd {
s.l.target.ExpiryPolicy = opts.TargetExpiryPolicy
}
if opts.TargetExpiryTimeout != 0 {
s.l.target.Timeout = opts.TargetExpiryTimeout
}
return s, nil
}
func (s *Sender) attach(ctx context.Context) error {
if err := s.l.attach(ctx, func(pa *frames.PerformAttach) {
pa.Role = encoding.RoleSender
if pa.Target == nil {
pa.Target = new(frames.Target)
}
pa.Target.Dynamic = s.l.dynamicAddr
}, func(pa *frames.PerformAttach) {
if s.l.target == nil {
s.l.target = new(frames.Target)
}
// if dynamic address requested, copy assigned name to address
if s.l.dynamicAddr && pa.Target != nil {
s.l.target.Address = pa.Target.Address
}
}); err != nil {
return err
}
s.transfers = make(chan transferEnvelope)
return nil
}
type senderTestHooks struct {
MuxSelect func()
MuxTransfer func()
}
func (s *Sender) mux(hooks senderTestHooks) {
if hooks.MuxSelect == nil {
hooks.MuxSelect = nopHook
}
if hooks.MuxTransfer == nil {
hooks.MuxTransfer = nopHook
}
defer func() {
close(s.l.done)
}()
Loop:
for {
var outgoingTransfers chan transferEnvelope
if s.l.linkCredit > 0 {
debug.Log(1, "TX (Sender %p) (enable): target: %q, link credit: %d, deliveryCount: %d", s, s.l.target.Address, s.l.linkCredit, s.l.deliveryCount)
outgoingTransfers = s.transfers
} else {
debug.Log(1, "TX (Sender %p) (pause): target: %q, link credit: %d, deliveryCount: %d", s, s.l.target.Address, s.l.linkCredit, s.l.deliveryCount)
}
closed := s.l.close
if s.l.closeInProgress {
// swap out channel so it no longer triggers
closed = nil
// disable sending once closing is in progress.
// this prevents races with mux shutdown and
// the peer sending disposition frames.
outgoingTransfers = nil
}
hooks.MuxSelect()
select {
// received frame
case q := <-s.l.rxQ.Wait():
// populated queue
fr := *q.Dequeue()
s.l.rxQ.Release(q)
// if muxHandleFrame returns an error it means the mux must terminate.
// note that in the case of a client-side close due to an error, nil
// is returned in order to keep the mux running to ack the detach frame.
if err := s.muxHandleFrame(fr); err != nil {
s.l.doneErr = err
return
}
// send data
case env := <-outgoingTransfers:
hooks.MuxTransfer()
select {
case s.l.session.txTransfer <- env:
debug.Log(2, "TX (Sender %p): mux transfer to Session: %d, %s", s, s.l.session.channel, env.Frame)
// decrement link-credit after entire message transferred
if !env.Frame.More {
s.l.deliveryCount++
s.l.linkCredit--
// we are the sender and we keep track of the peer's link credit
debug.Log(3, "TX (Sender %p): link: %s, link credit: %d", s, s.l.key.name, s.l.linkCredit)
}
continue Loop
case <-s.l.close:
continue Loop
case <-s.l.session.done:
continue Loop
}
case <-closed:
if s.l.closeInProgress {
// a client-side close due to protocol error is in progress
continue
}
// sender is being closed by the client
s.l.closeInProgress = true
fr := &frames.PerformDetach{
Handle: s.l.outputHandle,
Closed: true,
}
s.l.txFrame(&frameContext{Ctx: context.Background()}, fr)
case <-s.l.session.done:
s.l.doneErr = s.l.session.doneErr
return
case <-s.rollback:
s.l.deliveryCount--
s.l.linkCredit++
debug.Log(3, "TX (Sender %p): rollback link: %s, link credit: %d", s, s.l.key.name, s.l.linkCredit)
}
}
}
// muxHandleFrame processes fr based on type.
// depending on the peer's RSM, it might return a disposition frame for sending
func (s *Sender) muxHandleFrame(fr frames.FrameBody) error {
debug.Log(2, "RX (Sender %p): %s", s, fr)
switch fr := fr.(type) {
// flow control frame
case *frames.PerformFlow:
// the sender's link-credit variable MUST be set according to this formula when flow information is given by the receiver:
// link-credit(snd) := delivery-count(rcv) + link-credit(rcv) - delivery-count(snd)
linkCredit := *fr.LinkCredit - s.l.deliveryCount
if fr.DeliveryCount != nil {
// DeliveryCount can be nil if the receiver hasn't processed
// the attach. That shouldn't be the case here, but it's
// what ActiveMQ does.
linkCredit += *fr.DeliveryCount
}
s.l.linkCredit = linkCredit
if !fr.Echo {
return nil
}
var (
// copy because sent by pointer below; prevent race
deliveryCount = s.l.deliveryCount
)
// send flow
resp := &frames.PerformFlow{
Handle: &s.l.outputHandle,
DeliveryCount: &deliveryCount,
LinkCredit: &linkCredit, // max number of messages
}
select {
case s.l.session.tx <- frameBodyEnvelope{FrameCtx: &frameContext{Ctx: context.Background()}, FrameBody: resp}:
debug.Log(2, "TX (Sender %p): mux frame to Session (%p): %d, %s", s, s.l.session, s.l.session.channel, resp)
case <-s.l.close:
return nil
case <-s.l.session.done:
return s.l.session.doneErr
}
case *frames.PerformDisposition:
if fr.Settled {
return nil
}
// peer is in mode second, so we must send confirmation of disposition.
// NOTE: the ack must be sent through the session so it can close out
// the in-flight disposition.
dr := &frames.PerformDisposition{
Role: encoding.RoleSender,
First: fr.First,
Last: fr.Last,
Settled: true,
}
select {
case s.l.session.tx <- frameBodyEnvelope{FrameCtx: &frameContext{Ctx: context.Background()}, FrameBody: dr}:
debug.Log(2, "TX (Sender %p): mux frame to Session (%p): %d, %s", s, s.l.session, s.l.session.channel, dr)
case <-s.l.close:
return nil
case <-s.l.session.done:
return s.l.session.doneErr
}
return nil
default:
return s.l.muxHandleFrame(fr)
}
return nil
}
|