1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
|
//go:build integration
// +build integration
package kinesis_test
import (
"context"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
)
func TestInteg_SubscribeToShard(t *testing.T) {
desc, err := svc.DescribeStream(&kinesis.DescribeStreamInput{
StreamName: &streamName,
})
if err != nil {
t.Fatalf("expect no error, %v", err)
}
cons, err := svc.DescribeStreamConsumer(
&kinesis.DescribeStreamConsumerInput{
StreamARN: desc.StreamDescription.StreamARN,
ConsumerName: &consumerName,
})
if err != nil {
t.Fatalf("expect no error, %v", err)
}
ctx, cancelFn := context.WithTimeout(context.Background(), 60*time.Second)
defer cancelFn()
var recordsRx int32
var ignoredCount int32
var goodCount int32
var wg sync.WaitGroup
wg.Add(len(desc.StreamDescription.Shards))
for i, shard := range desc.StreamDescription.Shards {
go func(idx int, s *kinesis.Shard) {
defer wg.Done()
params := &kinesis.SubscribeToShardInput{
ConsumerARN: cons.ConsumerDescription.ConsumerARN,
StartingPosition: &kinesis.StartingPosition{
Type: aws.String(kinesis.ShardIteratorTypeAtTimestamp),
Timestamp: &startingTimestamp,
},
ShardId: s.ShardId,
}
sub, err := svc.SubscribeToShardWithContext(ctx, params)
if err != nil {
t.Errorf("expect no error, %v, %v", err, *s.ShardId)
}
defer sub.EventStream.Close()
Loop:
for event := range sub.EventStream.Events() {
switch e := event.(type) {
case *kinesis.SubscribeToShardEvent:
if len(e.Records) == 0 {
atomic.AddInt32(&ignoredCount, 1)
} else {
atomic.AddInt32(&goodCount, 1)
for _, r := range e.Records {
if len(r.Data) == 0 {
t.Errorf("expect data in record, got none")
}
}
}
newCount := atomic.AddInt32(&recordsRx, int32(len(e.Records)))
if int(newCount) >= len(records) {
break Loop
}
}
}
if err := sub.EventStream.Err(); err != nil {
t.Errorf("expect no error, %v, %v", err, *s.ShardId)
}
}(i, shard)
}
wg.Wait()
if e, a := len(records), int(recordsRx); e != a {
t.Errorf("expected to read %v records, got %v", e, a)
}
t.Log("Ignored", ignoredCount, "empty events, non-empty", goodCount)
}
|