File: queue.go

package info (click to toggle)
golang-github-evilsocket-islazy 1.11.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 292 kB
  • sloc: javascript: 8; makefile: 3
file content (95 lines) | stat: -rw-r--r-- 2,022 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package async

import (
	"runtime"
	"sync"
)

// Job is the generic interface object representing the data being
// pushed to the jobs queue and being passed to the workers.
type Job interface{}

// Logic is the implementation of the logic each worker will execute.
type Logic func(arg Job)

// WorkQueue is the object representing an async jobs queue with
// a given number of concurrent workers.
type WorkQueue struct {
	workers  int
	jobChan  chan Job
	stopChan chan struct{}
	jobs     sync.WaitGroup
	done     sync.WaitGroup
	logic    Logic
}

// NewQueue creates a new job queue with a specific worker logic.
// If workers is greater or equal than zero, it will be auto
// scaled to the number of logical CPUs usable by the current
// process.
func NewQueue(workers int, logic Logic) *WorkQueue {
	return createQueue(workers, logic, 0)
}

func NewBufferedQueue(workers int, logic Logic, size int) *WorkQueue {
	return createQueue(workers, logic, size)
}

func createQueue(workers int, logic Logic, bufferSize int) *WorkQueue {
	if workers <= 0 {
		workers = runtime.NumCPU()
	}
	wq := &WorkQueue{
		workers:  workers,
		jobChan:  make(chan Job, bufferSize),
		stopChan: make(chan struct{}),
		jobs:     sync.WaitGroup{},
		done:     sync.WaitGroup{},
		logic:    logic,
	}

	for i := 0; i < workers; i++ {
		wq.done.Add(1)
		go wq.worker(i)
	}

	return wq
}

func (wq *WorkQueue) worker(id int) {
	defer wq.done.Done()
	for {
		select {
		case <-wq.stopChan:
			return

		case job := <-wq.jobChan:
			wq.logic(job)
			wq.jobs.Done()
		}
	}
}

// Add pushes a new job to the queue.
func (wq *WorkQueue) Add(job Job) {
	wq.jobs.Add(1)
	wq.jobChan <- job
}

// Wait stops until all the workers stopped.
func (wq *WorkQueue) Wait() {
	wq.done.Wait()
}

// WaitDone stops until all jobs on the queue have been processed.
func (wq *WorkQueue) WaitDone() {
	wq.jobs.Wait()
}

// Stop stops the job queue and the workers.
func (wq *WorkQueue) Stop() {
	close(wq.stopChan)
	wq.jobs.Wait()
	wq.done.Wait()
	close(wq.jobChan)
}