File: link.go

package info (click to toggle)
golang-github-azure-go-amqp 1.0.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,192 kB
  • sloc: makefile: 22
file content (393 lines) | stat: -rw-r--r-- 13,207 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
package amqp

import (
	"context"
	"errors"
	"fmt"
	"sync"

	"github.com/Azure/go-amqp/internal/debug"
	"github.com/Azure/go-amqp/internal/encoding"
	"github.com/Azure/go-amqp/internal/frames"
	"github.com/Azure/go-amqp/internal/queue"
	"github.com/Azure/go-amqp/internal/shared"
)

// linkKey uniquely identifies a link on a connection by name and direction.
//
// A link can be identified uniquely by the ordered tuple
//
//	(source-container-id, target-container-id, name)
//
// On a single connection the container ID pairs can be abbreviated
// to a boolean flag indicating the direction of the link.
type linkKey struct {
	name string
	role encoding.Role // Local role: sender/receiver
}

// link contains the common state and methods for sending and receiving links
type link struct {
	key linkKey // Name and direction

	// NOTE: outputHandle and inputHandle might not have the same value

	// our handle
	outputHandle uint32

	// remote's handle
	inputHandle uint32

	// frames destined for this link are added to this queue by Session.muxFrameToLink
	rxQ *queue.Holder[frames.FrameBody]

	// used for gracefully closing link
	close     chan struct{} // signals a link's mux to shut down; DO NOT use this to check if a link has terminated, use done instead
	closeOnce *sync.Once    // closeOnce protects close from being closed multiple times

	done     chan struct{} // closed when the link has terminated (mux exited); DO NOT wait on this from within a link's mux() as it will never trigger!
	doneErr  error         // contains the mux error state; ONLY written to by the mux and MUST only be read from after done is closed!
	closeErr error         // contains the error state returned from closeLink(); ONLY closeLink() reads/writes this!

	session    *Session                // parent session
	source     *frames.Source          // used for Receiver links
	target     *frames.Target          // used for Sender links
	properties map[encoding.Symbol]any // additional properties sent upon link attach

	// "The delivery-count is initialized by the sender when a link endpoint is created,
	// and is incremented whenever a message is sent. Only the sender MAY independently
	// modify this field. The receiver's value is calculated based on the last known
	// value from the sender and any subsequent messages received on the link. Note that,
	// despite its name, the delivery-count is not a count but a sequence number
	// initialized at an arbitrary point by the sender."
	deliveryCount uint32

	// The current maximum number of messages that can be handled at the receiver endpoint of the link. Only the receiver endpoint
	// can independently set this value. The sender endpoint sets this to the last known value seen from the receiver.
	linkCredit uint32

	senderSettleMode   *SenderSettleMode
	receiverSettleMode *ReceiverSettleMode
	maxMessageSize     uint64

	closeInProgress bool // indicates that the detach performative has been sent
	dynamicAddr     bool // request a dynamic link address from the server
}

func newLink(s *Session, r encoding.Role) link {
	l := link{
		key:       linkKey{shared.RandString(40), r},
		session:   s,
		close:     make(chan struct{}),
		closeOnce: &sync.Once{},
		done:      make(chan struct{}),
	}

	// set the segment size relative to respective window
	var segmentSize int
	if r == encoding.RoleReceiver {
		segmentSize = int(s.incomingWindow)
	} else {
		segmentSize = int(s.outgoingWindow)
	}

	l.rxQ = queue.NewHolder(queue.New[frames.FrameBody](segmentSize))
	return l
}

// waitForFrame waits for an incoming frame to be queued.
// it returns the next frame from the queue, or an error.
// the error is either from the context or session.doneErr.
// not meant for consumption outside of link.go.
func (l *link) waitForFrame(ctx context.Context) (frames.FrameBody, error) {
	select {
	case <-ctx.Done():
		return nil, ctx.Err()
	case <-l.session.done:
		// session has terminated, no need to deallocate in this case
		return nil, l.session.doneErr
	case q := <-l.rxQ.Wait():
		// frame received
		fr := q.Dequeue()
		l.rxQ.Release(q)
		return *fr, nil
	}
}

// attach sends the Attach performative to establish the link with its parent session.
// this is automatically called by the new*Link constructors.
func (l *link) attach(ctx context.Context, beforeAttach func(*frames.PerformAttach), afterAttach func(*frames.PerformAttach)) error {
	if err := l.session.freeAbandonedLinks(ctx); err != nil {
		return err
	}

	// once the abandoned links have been cleaned up we can create our link
	if err := l.session.allocateHandle(ctx, l); err != nil {
		return err
	}

	attach := &frames.PerformAttach{
		Name:               l.key.name,
		Handle:             l.outputHandle,
		ReceiverSettleMode: l.receiverSettleMode,
		SenderSettleMode:   l.senderSettleMode,
		MaxMessageSize:     l.maxMessageSize,
		Source:             l.source,
		Target:             l.target,
		Properties:         l.properties,
	}

	// link-specific configuration of the attach frame
	beforeAttach(attach)

	if err := l.txFrameAndWait(ctx, attach); err != nil {
		return err
	}

	// wait for response
	fr, err := l.waitForFrame(ctx)
	if err != nil {
		l.session.abandonLink(l)
		return err
	}

	resp, ok := fr.(*frames.PerformAttach)
	if !ok {
		debug.Log(1, "RX (link %p): unexpected attach response frame %T", l, fr)
		if err := l.session.conn.Close(); err != nil {
			return err
		}
		return &ConnError{inner: fmt.Errorf("unexpected attach response: %#v", fr)}
	}

	// If the remote encounters an error during the attach it returns an Attach
	// with no Source or Target. The remote then sends a Detach with an error.
	//
	//   Note that if the application chooses not to create a terminus, the session
	//   endpoint will still create a link endpoint and issue an attach indicating
	//   that the link endpoint has no associated local terminus. In this case, the
	//   session endpoint MUST immediately detach the newly created link endpoint.
	//
	// http://docs.oasis-open.org/amqp/core/v1.0/csprd01/amqp-core-transport-v1.0-csprd01.html#doc-idp386144
	if resp.Source == nil && resp.Target == nil {
		// wait for detach
		fr, err := l.waitForFrame(ctx)
		if err != nil {
			// we timed out waiting for the peer to close the link, this really isn't an abandoned link.
			// however, we still need to send the detach performative to ack the peer.
			l.session.abandonLink(l)
			return err
		}

		detach, ok := fr.(*frames.PerformDetach)
		if !ok {
			if err := l.session.conn.Close(); err != nil {
				return err
			}
			return &ConnError{inner: fmt.Errorf("unexpected frame while waiting for detach: %#v", fr)}
		}

		// send return detach
		fr = &frames.PerformDetach{
			Handle: l.outputHandle,
			Closed: true,
		}
		if err := l.txFrameAndWait(ctx, fr); err != nil {
			return err
		}

		if detach.Error == nil {
			return fmt.Errorf("received detach with no error specified")
		}
		return detach.Error
	}

	if l.maxMessageSize == 0 || resp.MaxMessageSize < l.maxMessageSize {
		l.maxMessageSize = resp.MaxMessageSize
	}

	// link-specific configuration post attach
	afterAttach(resp)

	if err := l.setSettleModes(resp); err != nil {
		// close the link as there's a mismatch on requested/supported settlement modes
		dr := &frames.PerformDetach{
			Handle: l.outputHandle,
			Closed: true,
		}
		if err := l.txFrameAndWait(ctx, dr); err != nil {
			return err
		}
		return err
	}

	return nil
}

// setSettleModes sets the settlement modes based on the resp frames.PerformAttach.
//
// If a settlement mode has been explicitly set locally and it was not honored by the
// server an error is returned.
func (l *link) setSettleModes(resp *frames.PerformAttach) error {
	var (
		localRecvSettle = receiverSettleModeValue(l.receiverSettleMode)
		respRecvSettle  = receiverSettleModeValue(resp.ReceiverSettleMode)
	)
	if l.receiverSettleMode != nil && localRecvSettle != respRecvSettle {
		return fmt.Errorf("amqp: receiver settlement mode %q requested, received %q from server", l.receiverSettleMode, &respRecvSettle)
	}
	l.receiverSettleMode = &respRecvSettle

	var (
		localSendSettle = senderSettleModeValue(l.senderSettleMode)
		respSendSettle  = senderSettleModeValue(resp.SenderSettleMode)
	)
	if l.senderSettleMode != nil && localSendSettle != respSendSettle {
		return fmt.Errorf("amqp: sender settlement mode %q requested, received %q from server", l.senderSettleMode, &respSendSettle)
	}
	l.senderSettleMode = &respSendSettle

	return nil
}

// muxHandleFrame processes fr based on type.
func (l *link) muxHandleFrame(fr frames.FrameBody) error {
	switch fr := fr.(type) {
	case *frames.PerformDetach:
		if !fr.Closed {
			l.closeWithError(ErrCondNotImplemented, fmt.Sprintf("non-closing detach not supported: %+v", fr))
			return nil
		}

		// there are two possibilities:
		// - this is the ack to a client-side Close()
		// - the peer is closing the link so we must ack

		if l.closeInProgress {
			// if the client-side close was initiated due to an error (l.closeWithError)
			// then l.doneErr will already be set. in this case, return that error instead
			// of an empty LinkError which indicates a clean client-side close.
			if l.doneErr != nil {
				return l.doneErr
			}
			return &LinkError{}
		}

		dr := &frames.PerformDetach{
			Handle: l.outputHandle,
			Closed: true,
		}
		l.txFrame(&frameContext{Ctx: context.Background()}, dr)
		return &LinkError{RemoteErr: fr.Error}

	default:
		debug.Log(1, "RX (link %p): unexpected frame: %s", l, fr)
		l.closeWithError(ErrCondInternalError, fmt.Sprintf("link received unexpected frame %T", fr))
		return nil
	}
}

// Close closes the Sender and AMQP link.
func (l *link) closeLink(ctx context.Context) error {
	var ctxErr error
	l.closeOnce.Do(func() {
		close(l.close)

		// once the mux has received the ack'ing detach performative, the mux will
		// exit which deletes the link and closes l.done.
		select {
		case <-l.done:
			l.closeErr = l.doneErr
		case <-ctx.Done():
			// notify the caller that the close timed out/was cancelled.
			// the mux will remain running and once the ack is received it will terminate.
			ctxErr = ctx.Err()

			// record that the close timed out/was cancelled.
			// subsequent calls to closeLink() will return this
			debug.Log(1, "TX (link %p) closing %s: %v", l, l.key.name, ctxErr)
			l.closeErr = &LinkError{inner: ctxErr}
		}
	})

	if ctxErr != nil {
		return ctxErr
	}

	var linkErr *LinkError
	if errors.As(l.closeErr, &linkErr) && linkErr.RemoteErr == nil && linkErr.inner == nil {
		// an empty LinkError means the link was cleanly closed by the caller
		return nil
	}
	return l.closeErr
}

// closeWithError initiates closing the link with the specified AMQP error.
// the mux must continue to run until the ack'ing detach is received.
// l.doneErr is populated with a &LinkError{} containing an inner error constructed from the specified values
//   - cnd is the AMQP error condition
//   - desc is the error description
func (l *link) closeWithError(cnd ErrCond, desc string) {
	amqpErr := &Error{Condition: cnd, Description: desc}
	if l.closeInProgress {
		debug.Log(3, "TX (link %p) close error already pending, discarding %v", l, amqpErr)
		return
	}

	dr := &frames.PerformDetach{
		Handle: l.outputHandle,
		Closed: true,
		Error:  amqpErr,
	}
	l.closeInProgress = true
	l.doneErr = &LinkError{inner: fmt.Errorf("%s: %s", cnd, desc)}
	l.txFrame(&frameContext{Ctx: context.Background()}, dr)
}

// txFrame sends the specified frame via the link's session.
// you MUST call this instead of session.txFrame() to ensure
// that frames are not sent during session shutdown.
func (l *link) txFrame(frameCtx *frameContext, fr frames.FrameBody) {
	// NOTE: there is no need to select on l.done as this is either
	// called from a link's mux or before the mux has even started.
	select {
	case <-l.session.done:
		// the link's session has terminated, let that propagate to the link's mux
	case <-l.session.endSent:
		// we swallow this to prevent the link's mux from terminating.
		// l.session.done will soon close so this is temporary.
	case l.session.tx <- frameBodyEnvelope{FrameCtx: frameCtx, FrameBody: fr}:
		debug.Log(2, "TX (link %p): mux frame to Session (%p): %s", l, l.session, fr)
	}
}

// txFrame sends the specified frame via the link's session.
// you MUST call this instead of session.txFrame() to ensure
// that frames are not sent during session shutdown.
func (l *link) txFrameAndWait(ctx context.Context, fr frames.FrameBody) error {
	frameCtx := frameContext{
		Ctx:  ctx,
		Done: make(chan struct{}),
	}

	// NOTE: there is no need to select on l.done as this is either
	// called from a link's mux or before the mux has even started.

	select {
	case <-l.session.done:
		return l.session.doneErr
	case <-l.session.endSent:
		// we swallow this to prevent the link's mux from terminating.
		// l.session.done will soon close so this is temporary.
		return nil
	case l.session.tx <- frameBodyEnvelope{FrameCtx: &frameCtx, FrameBody: fr}:
		debug.Log(2, "TX (link %p): mux frame to Session (%p): %s", l, l.session, fr)
	}

	select {
	case <-frameCtx.Done:
		return frameCtx.Err
	case <-l.session.done:
		return l.session.doneErr
	}
}