File: job_processor_test.go

package info (click to toggle)
geoipupdate 7.1.1-1
  • links: PTS, VCS
  • area: contrib
  • in suites: forky, sid
  • size: 432 kB
  • sloc: sh: 103; makefile: 70; perl: 51
file content (128 lines) | stat: -rw-r--r-- 3,092 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package internal

import (
	"context"
	"strings"
	"sync"
	"testing"
	"time"

	"github.com/stretchr/testify/require"
)

// TestJobQueueRun tests the parallel job queue functionality
// and ensures that the maximum number of allowed goroutines does not exceed the value
// set in the config.
func TestJobQueueRun(t *testing.T) {
	simulatedJobDuration := 5 * time.Millisecond
	jobsNumber := 10

	tests := []struct {
		Description string
		Parallelism int
	}{{
		Description: "sequential jobs",
		Parallelism: 1,
	}, {
		Description: "parallel jobs",
		Parallelism: 3,
	}}

	for _, test := range tests {
		t.Run(test.Description, func(t *testing.T) {
			doneCh := make(chan struct{})
			var lock sync.Mutex
			runningGoroutines := 0
			maxConcurrentGoroutines := 0

			// A mock processor function that is used to gather data
			// about the number of goroutines called.
			processorFunc := func(_ context.Context) error {
				lock.Lock()
				runningGoroutines++
				if runningGoroutines > maxConcurrentGoroutines {
					maxConcurrentGoroutines = runningGoroutines
				}
				lock.Unlock()

				time.Sleep(simulatedJobDuration)

				lock.Lock()
				runningGoroutines--
				lock.Unlock()
				return nil
			}

			ctx := context.Background()
			jobProcessor := NewJobProcessor(ctx, test.Parallelism)
			for range jobsNumber {
				jobProcessor.Add(processorFunc)
			}

			// Execute run in a goroutine so that we can exit early if the test
			// hangs or takes too long to execute.
			go func() {
				if err := jobProcessor.Run(ctx); err != nil {
					t.Error(err)
				}
				close(doneCh)
			}()

			// Wait for run to complete or timeout after a certain duration
			select {
			case <-doneCh:
			case <-time.After(1000 * time.Millisecond):
				t.Error("Timeout waiting for function completion")
			}

			// The maximum number of parallel downloads executed should not exceed
			// the number defined in the configuration.
			require.Equal(t, maxConcurrentGoroutines, test.Parallelism)
		})
	}
}

// TestJobQueueStop cancels a job queue and makes sure queued
// jobs are not processed.
func TestJobQueueStop(t *testing.T) {
	doneCh := make(chan struct{})
	processedJobs := 0
	maxProcessedJobs := 5

	ctx := context.Background()
	jobProcessor := NewJobProcessor(ctx, 1)

	processorFunc := func(_ context.Context) error {
		processedJobs++
		if processedJobs == maxProcessedJobs {
			jobProcessor.Stop()
		}
		return nil
	}

	for range 10 {
		jobProcessor.Add(processorFunc)
	}

	// Execute run in a goroutine so that we can exit early if the test
	// hangs or takes too long to execute.
	go func() {
		err := jobProcessor.Run(ctx)
		if err == nil {
			t.Error(`expected "processing canceled" error`)
		}
		if !strings.Contains(err.Error(), "processing canceled") {
			t.Errorf(`expected "processing canceled" error, got %q`, err)
		}
		close(doneCh)
	}()

	// Wait for run to complete or timeout after a certain duration
	select {
	case <-doneCh:
	case <-time.After(1000 * time.Millisecond):
		t.Error("Timeout waiting for function completion")
	}

	require.Equal(t, processedJobs, maxProcessedJobs)
}