File: queue.go

package info (click to toggle)
gitlab-workhorse 0.8.5%2Bdebian-3
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 792 kB
  • ctags: 443
  • sloc: makefile: 48; python: 15
file content (76 lines) | stat: -rw-r--r-- 1,800 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package queueing

import (
	"errors"
	"time"
)

type errTooManyRequests struct{ error }
type errQueueingTimedout struct{ error }

var ErrTooManyRequests = &errTooManyRequests{errors.New("too many requests queued")}
var ErrQueueingTimedout = &errQueueingTimedout{errors.New("queueing timedout")}

type Queue struct {
	busyCh    chan struct{}
	waitingCh chan struct{}
}

// NewQueue creates a new queue
// limit specifies number of requests run concurrently
// queueLimit specifies maximum number of requests that can be queued
// if the number of requests is above the limit
func NewQueue(limit, queueLimit uint) *Queue {
	return &Queue{
		busyCh:    make(chan struct{}, limit),
		waitingCh: make(chan struct{}, limit+queueLimit),
	}
}

// Acquire takes one slot from the Queue
// and returns when a request should be processed
// it allows up to (limit) of requests running at a time
// it allows to queue up to (queue-limit) requests
func (s *Queue) Acquire(timeout time.Duration) (err error) {
	// push item to a queue to claim your own slot (non-blocking)
	select {
	case s.waitingCh <- struct{}{}:
		break
	default:
		return ErrTooManyRequests
	}

	defer func() {
		if err != nil {
			<-s.waitingCh
		}
	}()

	// fast path: push item to current processed items (non-blocking)
	select {
	case s.busyCh <- struct{}{}:
		return nil
	default:
		break
	}

	timer := time.NewTimer(timeout)
	defer timer.Stop()

	// push item to current processed items (blocking)
	select {
	case s.busyCh <- struct{}{}:
		return nil

	case <-timer.C:
		return ErrQueueingTimedout
	}
}

// Release marks the finish of processing of requests
// It triggers next request to be processed if it's in queue
func (s *Queue) Release() {
	// dequeue from queue to allow next request to be processed
	<-s.waitingCh
	<-s.busyCh
}