1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
|
package amqp
import (
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/Azure/go-amqp/internal/encoding"
"github.com/stretchr/testify/require"
)
func TestCreditorIssueCredits(t *testing.T) {
r := newTestLink(t)
require.NoError(t, r.creditor.IssueCredit(3))
drain, credits := r.creditor.FlowBits(1)
require.False(t, drain)
require.EqualValues(t, 3+1, credits, "flow frame includes the pending credits and our current credits")
// flow clears the previous data once it's been called.
drain, credits = r.creditor.FlowBits(4)
require.False(t, drain)
require.EqualValues(t, 0, credits, "drain flow frame always sets link-credit to 0")
}
func TestCreditorDrain(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2)
defer cancel()
r := newTestLink(t)
require.NoError(t, r.creditor.IssueCredit(3))
// only one drain allowed at a time.
drainRoutines := sync.WaitGroup{}
drainRoutines.Add(2)
var err1, err2 error
go func() {
defer drainRoutines.Done()
err1 = r.creditor.Drain(ctx, r)
}()
go func() {
defer drainRoutines.Done()
err2 = r.creditor.Drain(ctx, r)
}()
// one of the drain calls will have succeeded, the other one should still be blocking.
time.Sleep(time.Second * 2)
// the next time someone requests a flow frame it'll drain (this doesn't affect the blocked Drain() calls)
drain, credits := r.creditor.FlowBits(101)
require.True(t, drain)
require.EqualValues(t, 0, credits, "Drain always drains with 0 credit")
// unblock the last of the drainers
r.creditor.EndDrain()
require.Nil(t, r.creditor.drained, "drain completes and removes the drained channel")
// wait for all the drain routines to end
drainRoutines.Wait()
// one of them should have failed (if both succeeded we've somehow let them both run)
require.False(t, err1 == nil && err2 == nil)
if err1 == nil {
require.Error(t, err2, errAlreadyDraining.Error())
} else {
require.Error(t, err1, errAlreadyDraining.Error())
}
}
func TestCreditorIssueCreditsWhileDrainingFails(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2)
defer cancel()
r := newTestLink(t)
require.NoError(t, r.creditor.IssueCredit(3))
// only one drain allowed at a time.
drainRoutines := sync.WaitGroup{}
drainRoutines.Add(2)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
err := r.creditor.Drain(ctx, newTestLink(t))
require.NoError(t, err)
}()
time.Sleep(time.Second * 2)
// drain is still active, so...
require.Error(t, r.creditor.IssueCredit(1), errLinkDraining.Error())
r.creditor.EndDrain()
wg.Wait()
}
func TestCreditorDrainRespectsContext(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2)
defer cancel()
mc := creditor{}
cancel()
require.Error(t, mc.Drain(ctx, newTestLink(t)), context.Canceled.Error())
}
func TestCreditorDrainReturnsProperError(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2)
defer cancel()
errs := []*Error{
&encoding.Error{
Condition: ErrCondDecodeError,
},
nil,
}
for i, err := range errs {
t.Run(fmt.Sprintf("Error[%d]", i), func(t *testing.T) {
mc := creditor{}
link := newTestLink(t)
link.l.doneErr = err
close(link.l.done)
detachErr := mc.Drain(ctx, link)
require.Equal(t, err, detachErr)
})
}
}
|