1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
|
package internal
import (
"context"
"strings"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
)
// TestJobQueueRun tests the parallel job queue functionality
// and ensures that the maximum number of allowed goroutines does not exceed the value
// set in the config.
func TestJobQueueRun(t *testing.T) {
simulatedJobDuration := 5 * time.Millisecond
jobsNumber := 10
tests := []struct {
Description string
Parallelism int
}{{
Description: "sequential jobs",
Parallelism: 1,
}, {
Description: "parallel jobs",
Parallelism: 3,
}}
for _, test := range tests {
t.Run(test.Description, func(t *testing.T) {
doneCh := make(chan struct{})
var lock sync.Mutex
runningGoroutines := 0
maxConcurrentGoroutines := 0
// A mock processor function that is used to gather data
// about the number of goroutines called.
processorFunc := func(_ context.Context) error {
lock.Lock()
runningGoroutines++
if runningGoroutines > maxConcurrentGoroutines {
maxConcurrentGoroutines = runningGoroutines
}
lock.Unlock()
time.Sleep(simulatedJobDuration)
lock.Lock()
runningGoroutines--
lock.Unlock()
return nil
}
ctx := context.Background()
jobProcessor := NewJobProcessor(ctx, test.Parallelism)
for range jobsNumber {
jobProcessor.Add(processorFunc)
}
// Execute run in a goroutine so that we can exit early if the test
// hangs or takes too long to execute.
go func() {
if err := jobProcessor.Run(ctx); err != nil {
t.Error(err)
}
close(doneCh)
}()
// Wait for run to complete or timeout after a certain duration
select {
case <-doneCh:
case <-time.After(1000 * time.Millisecond):
t.Error("Timeout waiting for function completion")
}
// The maximum number of parallel downloads executed should not exceed
// the number defined in the configuration.
require.Equal(t, maxConcurrentGoroutines, test.Parallelism)
})
}
}
// TestJobQueueStop cancels a job queue and makes sure queued
// jobs are not processed.
func TestJobQueueStop(t *testing.T) {
doneCh := make(chan struct{})
processedJobs := 0
maxProcessedJobs := 5
ctx := context.Background()
jobProcessor := NewJobProcessor(ctx, 1)
processorFunc := func(_ context.Context) error {
processedJobs++
if processedJobs == maxProcessedJobs {
jobProcessor.Stop()
}
return nil
}
for range 10 {
jobProcessor.Add(processorFunc)
}
// Execute run in a goroutine so that we can exit early if the test
// hangs or takes too long to execute.
go func() {
err := jobProcessor.Run(ctx)
if err == nil {
t.Error(`expected "processing canceled" error`)
}
if !strings.Contains(err.Error(), "processing canceled") {
t.Errorf(`expected "processing canceled" error, got %q`, err)
}
close(doneCh)
}()
// Wait for run to complete or timeout after a certain duration
select {
case <-doneCh:
case <-time.After(1000 * time.Millisecond):
t.Error("Timeout waiting for function completion")
}
require.Equal(t, processedJobs, maxProcessedJobs)
}
|