1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
|
// Package chans contains functions for manipulating channels.
package chans
import (
"context"
"reflect"
"github.com/bradenaw/juniper/xslices"
)
// SendContext sends item on channel c and returns nil, unless ctx expires in which case it returns
// ctx.Err().
func SendContext[T any](ctx context.Context, c chan<- T, item T) error {
select {
case <-ctx.Done():
return ctx.Err()
case c <- item:
return nil
}
}
// RecvContext attempts to receive from channel c. If c is closed before or during, returns (_,
// false, nil). If ctx expires before or during, returns (_, _, ctx.Err()).
func RecvContext[T any](ctx context.Context, c <-chan T) (T, bool, error) {
select {
case <-ctx.Done():
var zero T
return zero, false, ctx.Err()
case item, ok := <-c:
return item, ok, nil
}
}
// Merge sends all values from all in channels to out.
//
// Merge blocks until all ins have closed and all values have been sent. It does not close out.
func Merge[T any](out chan<- T, in ...<-chan T) {
if len(in) == 1 {
for item := range in[0] {
out <- item
}
return
} else if len(in) == 2 {
merge2(out, in[0], in[1])
return
} else if len(in) == 3 {
merge3(out, in[0], in[1], in[2])
return
}
selectCases := xslices.Map(in, func(x <-chan T) reflect.SelectCase {
return reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(x),
}
})
for {
if len(selectCases) == 0 {
return
}
chosen, item, ok := reflect.Select(selectCases)
if ok {
out <- item.Interface().(T)
} else {
selectCases = xslices.RemoveUnordered(selectCases, chosen, 1)
}
}
}
// Merge special-case with no reflection.
func merge2[T any](out chan<- T, in0, in1 <-chan T) {
nDone := 0
for {
select {
case item, ok := <-in0:
if ok {
out <- item
} else {
in0 = nil
nDone++
if nDone == 2 {
return
}
}
case item, ok := <-in1:
if ok {
out <- item
} else {
in1 = nil
nDone++
if nDone == 2 {
return
}
}
}
}
}
// Merge special-case with no reflection.
func merge3[T any](out chan<- T, in0, in1, in2 <-chan T) {
nDone := 0
for {
select {
case item, ok := <-in0:
if ok {
out <- item
} else {
in0 = nil
nDone++
if nDone == 3 {
return
}
}
case item, ok := <-in1:
if ok {
out <- item
} else {
in1 = nil
nDone++
if nDone == 3 {
return
}
}
case item, ok := <-in2:
if ok {
out <- item
} else {
in2 = nil
nDone++
if nDone == 3 {
return
}
}
}
}
}
// Replicate sends all values sent to src to every channel in dsts.
//
// Replicate blocks until src is closed and all values have been sent to all dsts. It does not close
// dsts.
func Replicate[T any](src <-chan T, dsts ...chan<- T) {
for item := range src {
for _, dst := range dsts {
dst <- item
}
}
}
|