1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
|
package amqp
import (
"context"
"errors"
"sync"
)
type creditor struct {
mu sync.Mutex
// future values for the next flow frame.
pendingDrain bool
creditsToAdd uint32
// drained is set when a drain is active and we're waiting
// for the corresponding flow from the remote.
drained chan struct{}
}
var (
errLinkDraining = errors.New("link is currently draining, no credits can be added")
errAlreadyDraining = errors.New("drain already in process")
)
// EndDrain ends the current drain, unblocking any active Drain calls.
func (mc *creditor) EndDrain() {
mc.mu.Lock()
defer mc.mu.Unlock()
if mc.drained != nil {
close(mc.drained)
mc.drained = nil
}
}
// FlowBits gets gets the proper values for the next flow frame
// and resets the internal state.
// Returns:
//
// (drain: true, credits: 0) if a flow is needed (drain)
// (drain: false, credits > 0) if a flow is needed (issue credit)
// (drain: false, credits == 0) if no flow needed.
func (mc *creditor) FlowBits(currentCredits uint32) (bool, uint32) {
mc.mu.Lock()
defer mc.mu.Unlock()
drain := mc.pendingDrain
var credits uint32
if mc.pendingDrain {
// only send one drain request
mc.pendingDrain = false
}
// either:
// drain is true (ie, we're going to send a drain frame, and the credits for it should be 0)
// mc.creditsToAdd == 0 (no flow frame needed, no new credits are being issued)
if drain || mc.creditsToAdd == 0 {
credits = 0
} else {
credits = mc.creditsToAdd + currentCredits
}
mc.creditsToAdd = 0
return drain, credits
}
// Drain initiates a drain and blocks until EndDrain is called.
// If the context's deadline expires or is cancelled before the operation
// completes, the drain might not have happened.
func (mc *creditor) Drain(ctx context.Context, r *Receiver) error {
mc.mu.Lock()
if mc.drained != nil {
mc.mu.Unlock()
return errAlreadyDraining
}
mc.drained = make(chan struct{})
// use a local copy to avoid racing with EndDrain()
drained := mc.drained
mc.pendingDrain = true
mc.mu.Unlock()
// cause mux() to check our flow conditions.
select {
case r.receiverReady <- struct{}{}:
default:
}
// send drain, wait for responding flow frame
select {
case <-drained:
return nil
case <-r.l.done:
return r.l.doneErr
case <-ctx.Done():
return ctx.Err()
}
}
// IssueCredit queues up additional credits to be requested at the next
// call of FlowBits()
func (mc *creditor) IssueCredit(credits uint32) error {
mc.mu.Lock()
defer mc.mu.Unlock()
if mc.drained != nil {
return errLinkDraining
}
mc.creditsToAdd += credits
return nil
}
|