File: creditor.go

package info (click to toggle)
golang-github-azure-go-amqp 1.0.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,192 kB
  • sloc: makefile: 22
file content (117 lines) | stat: -rw-r--r-- 2,604 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package amqp

import (
	"context"
	"errors"
	"sync"
)

type creditor struct {
	mu sync.Mutex

	// future values for the next flow frame.
	pendingDrain bool
	creditsToAdd uint32

	// drained is set when a drain is active and we're waiting
	// for the corresponding flow from the remote.
	drained chan struct{}
}

var (
	errLinkDraining    = errors.New("link is currently draining, no credits can be added")
	errAlreadyDraining = errors.New("drain already in process")
)

// EndDrain ends the current drain, unblocking any active Drain calls.
func (mc *creditor) EndDrain() {
	mc.mu.Lock()
	defer mc.mu.Unlock()

	if mc.drained != nil {
		close(mc.drained)
		mc.drained = nil
	}
}

// FlowBits gets gets the proper values for the next flow frame
// and resets the internal state.
// Returns:
//
//	(drain: true, credits: 0) if a flow is needed (drain)
//	(drain: false, credits > 0) if a flow is needed (issue credit)
//	(drain: false, credits == 0) if no flow needed.
func (mc *creditor) FlowBits(currentCredits uint32) (bool, uint32) {
	mc.mu.Lock()
	defer mc.mu.Unlock()

	drain := mc.pendingDrain
	var credits uint32

	if mc.pendingDrain {
		// only send one drain request
		mc.pendingDrain = false
	}

	// either:
	// drain is true (ie, we're going to send a drain frame, and the credits for it should be 0)
	// mc.creditsToAdd == 0 (no flow frame needed, no new credits are being issued)
	if drain || mc.creditsToAdd == 0 {
		credits = 0
	} else {
		credits = mc.creditsToAdd + currentCredits
	}

	mc.creditsToAdd = 0

	return drain, credits
}

// Drain initiates a drain and blocks until EndDrain is called.
// If the context's deadline expires or is cancelled before the operation
// completes, the drain might not have happened.
func (mc *creditor) Drain(ctx context.Context, r *Receiver) error {
	mc.mu.Lock()

	if mc.drained != nil {
		mc.mu.Unlock()
		return errAlreadyDraining
	}

	mc.drained = make(chan struct{})
	// use a local copy to avoid racing with EndDrain()
	drained := mc.drained
	mc.pendingDrain = true

	mc.mu.Unlock()

	// cause mux() to check our flow conditions.
	select {
	case r.receiverReady <- struct{}{}:
	default:
	}

	// send drain, wait for responding flow frame
	select {
	case <-drained:
		return nil
	case <-r.l.done:
		return r.l.doneErr
	case <-ctx.Done():
		return ctx.Err()
	}
}

// IssueCredit queues up additional credits to be requested at the next
// call of FlowBits()
func (mc *creditor) IssueCredit(credits uint32) error {
	mc.mu.Lock()
	defer mc.mu.Unlock()

	if mc.drained != nil {
		return errLinkDraining
	}

	mc.creditsToAdd += credits
	return nil
}