File: dissolve.go

package info (click to toggle)
golang-github-centrifugal-centrifuge 0.15.0%2Bgit20210306.f435ba2-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 1,612 kB
  • sloc: javascript: 102; makefile: 2
file content (66 lines) | stat: -rw-r--r-- 1,486 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package dissolve

import (
	"errors"
	"runtime"
)

// Dissolver allows to put function to in-memory queue and process
// it with workers until success. The order of execution is not maintained.
// Jobs will be lost after closing. Jobs not saved to persistent store so
// do not survive process restart.
// Centrifuge uses this for asynchronously unsubscribing node from channels
// in broker. As soon as process restarts all connections to broker get
// closed automatically so it's ok to lose jobs inside Dissolver queue.
type Dissolver struct {
	queue      queue
	numWorkers int
}

// New creates new Dissolver.
func New(numWorkers int) *Dissolver {
	return &Dissolver{
		queue:      newQueue(),
		numWorkers: numWorkers,
	}
}

// Run launches workers to process Jobs from queue concurrently.
func (d *Dissolver) Run() error {
	for i := 0; i < d.numWorkers; i++ {
		go d.runWorker()
	}
	return nil
}

// Close stops processing Jobs, no more Jobs can be submitted after closing.
func (d *Dissolver) Close() error {
	d.queue.Close()
	return nil
}

// Submit Job to be reliably processed.
func (d *Dissolver) Submit(job Job) error {
	if !d.queue.Add(job) {
		return errors.New("can not submit job to closed dissolver")
	}
	return nil
}

func (d *Dissolver) runWorker() {
	for {
		job, ok := d.queue.Wait()
		if !ok {
			if d.queue.Closed() {
				break
			}
			continue
		}
		err := job()
		if err != nil {
			// Put to the end of queue.
			runtime.Gosched()
			d.queue.Add(job)
		}
	}
}