1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
|
package dissolve
import (
"errors"
"runtime"
)
// Dissolver allows to put function to in-memory queue and process
// it with workers until success. The order of execution is not maintained.
// Jobs will be lost after closing. Jobs not saved to persistent store so
// do not survive process restart.
// Centrifuge uses this for asynchronously unsubscribing node from channels
// in broker. As soon as process restarts all connections to broker get
// closed automatically so it's ok to lose jobs inside Dissolver queue.
type Dissolver struct {
queue queue
numWorkers int
}
// New creates new Dissolver.
func New(numWorkers int) *Dissolver {
return &Dissolver{
queue: newQueue(),
numWorkers: numWorkers,
}
}
// Run launches workers to process Jobs from queue concurrently.
func (d *Dissolver) Run() error {
for i := 0; i < d.numWorkers; i++ {
go d.runWorker()
}
return nil
}
// Close stops processing Jobs, no more Jobs can be submitted after closing.
func (d *Dissolver) Close() error {
d.queue.Close()
return nil
}
// Submit Job to be reliably processed.
func (d *Dissolver) Submit(job Job) error {
if !d.queue.Add(job) {
return errors.New("can not submit job to closed dissolver")
}
return nil
}
func (d *Dissolver) runWorker() {
for {
job, ok := d.queue.Wait()
if !ok {
if d.queue.Closed() {
break
}
continue
}
err := job()
if err != nil {
// Put to the end of queue.
runtime.Gosched()
d.queue.Add(job)
}
}
}
|