File: processor.go

package info (click to toggle)
golang-github-biogo-biogo 1.0.4-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, sid, trixie
  • size: 5,332 kB
  • sloc: sh: 282; makefile: 2
file content (116 lines) | stat: -rw-r--r-- 2,482 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
// Copyright ©2011-2012 The bíogo Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package concurrent

import (
	"fmt"
	"runtime"
	"sync"
)

// Interface is a type that performs an operation on itself, returning any error.
type Operator interface {
	Operation() (interface{}, error)
}

// The Processor type manages a number of concurrent Processes.
type Processor struct {
	in      chan Operator
	out     chan Result
	stop    chan struct{}
	work    chan struct{}
	threads int
	wg      *sync.WaitGroup
}

// Return a new Processor to operate the function f over the number of threads specified taking
// input from queue and placing the result in buffer. Threads is limited by GOMAXPROCS, if threads is greater
// GOMAXPROCS or less than 1 then threads is set to GOMAXPROCS.
func NewProcessor(queue chan Operator, buffer int, threads int) (p *Processor) {
	if available := runtime.GOMAXPROCS(0); threads > available || threads < 1 {
		threads = available
	}

	p = &Processor{
		in:      queue,
		out:     make(chan Result, buffer),
		stop:    make(chan struct{}),
		work:    make(chan struct{}, threads),
		threads: threads,
		wg:      &sync.WaitGroup{},
	}
	for i := 0; i < threads; i++ {
		p.work <- struct{}{}
	}

	for i := 0; i < threads; i++ {
		p.wg.Add(1)
		go func() {
			<-p.work
			defer func() {
				if err := recover(); err != nil {
					p.out <- Result{nil, fmt.Errorf("concurrent: processor panic: %v", err)}
				}
				p.work <- struct{}{}
				if len(p.work) == p.threads {
					close(p.out)
				}
				p.wg.Done()
			}()

			for input := range p.in {
				v, e := input.Operation()
				if p.out != nil {
					p.out <- Result{v, e}
				}
				select {
				case <-p.stop:
					return
				default:
				}
			}
		}()
	}

	return
}

// Submit values for processing.
func (p *Processor) Process(value ...Operator) {
	for _, v := range value {
		p.in <- v
	}
}

// Get the next available result.
func (p *Processor) Result() (interface{}, error) {
	r := <-p.out
	return r.Value, r.Err
}

// Close the queue.
func (p *Processor) Close() {
	close(p.in)
}

// Return the number of working goroutines.
func (p *Processor) Working() int {
	return p.threads - len(p.work)
}

// Terminate the goroutines.
func (p *Processor) Stop() {
	close(p.stop)
}

// Wait for all running processes to finish.
func (p *Processor) Wait() {
	p.wg.Wait()
}

type Result struct {
	Value interface{}
	Err   error
}