1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
|
package main
import (
"fmt"
"time"
)
// ErrScheduleTimeout returned by Pool to indicate that there no free
// goroutines during some period of time.
var ErrScheduleTimeout = fmt.Errorf("schedule error: timed out")
// Pool contains logic of goroutine reuse.
type Pool struct {
sem chan struct{}
work chan func()
}
// NewPool creates new goroutine pool with given size. It also creates a work
// queue of given size. Finally, it spawns given amount of goroutines
// immediately.
func NewPool(size, queue, spawn int) *Pool {
if spawn <= 0 && queue > 0 {
panic("dead queue configuration detected")
}
if spawn > size {
panic("spawn > workers")
}
p := &Pool{
sem: make(chan struct{}, size),
work: make(chan func(), queue),
}
for i := 0; i < spawn; i++ {
p.sem <- struct{}{}
go p.worker(func() {})
}
return p
}
// Schedule schedules task to be executed over pool's workers.
func (p *Pool) Schedule(task func()) {
p.schedule(task, nil)
}
// ScheduleTimeout schedules task to be executed over pool's workers.
// It returns ErrScheduleTimeout when no free workers met during given timeout.
func (p *Pool) ScheduleTimeout(timeout time.Duration, task func()) error {
return p.schedule(task, time.After(timeout))
}
func (p *Pool) schedule(task func(), timeout <-chan time.Time) error {
select {
case <-timeout:
return ErrScheduleTimeout
case p.work <- task:
return nil
case p.sem <- struct{}{}:
go p.worker(task)
return nil
}
}
func (p *Pool) worker(task func()) {
defer func() { <-p.sem }()
task()
for task := range p.work {
task()
}
}
|