File: channel.go

package info (click to toggle)
golang-github-jkeiser-iter 0.0~git20200628.c8aa0ae-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bookworm-backports, experimental, forky, sid, trixie
  • size: 72 kB
  • sloc: makefile: 2
file content (125 lines) | stat: -rw-r--r-- 4,542 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package iter

import "log"

// A result from a iterator.Go() channel.
type ChannelItem struct {
	Value interface{}
	Error error
}

// Runs the iterator in a goroutine, sending to a channel.
//
// The returned items channel will send ChannelItems to you, which can indicate
// either a value or an error.  If item.Error is set, iteration is ready
// to terminate.  Otherwise item.Value is the next value of the iteration.
//
// When you are done iterating, you must close the done channel. Even if iteration
// was terminated early, this will ensure that the goroutine and channel are
// properly cleaned up.
//
//   channel, done := iter.Go(iterator)
//   defer close(done) // if early return or panic happens, this will clean up the goroutine
//   for item := range channel {
//     if item.Error != nil {
//       // Iteration failed; handle the error and exit the loop
//       ...
//     }
//     value := item.Value
//     ...
//   }
func (iterator Iterator) Go() (items <-chan ChannelItem, done chan<- bool) {
	itemsChannel := make(chan ChannelItem)
	doneChannel := make(chan bool)
	go iterator.IterateToChannel(itemsChannel, doneChannel)
	return itemsChannel, doneChannel
}

// Iterates all items and sends them to the given channel.  Runs on the current
// goroutine (call go iterator.IterateToChannel to set it up on a new goroutine).
// This will close the items channel when done.  If the done channel is closed,
// iteration will terminate.
func (iterator Iterator) IterateToChannel(items chan<- ChannelItem, done <-chan bool) {
	defer close(items)
	err := iterator.EachWithError(func(result interface{}) error {
		select {
		case items <- ChannelItem{Value: result}:
			return nil
		case _, _ = <-done:
			// If we are told we're done early, we finish quietly.
			return FINISHED
		}
	})
	if err != nil {
		items <- ChannelItem{Error: err}
	}
}

// Iterate over the channels from a Go(), calling a user-defined function for each value.
// This function handles all anomalous conditions including errors, early
// termination and safe cleanup of the goroutine and channels.
func EachFromChannel(items <-chan ChannelItem, done chan<- bool, processor func(interface{}) error) error {
	defer close(done) // if early return or panic happens, this will clean up the goroutine
	for item := range items {
		if item.Error != nil {
			return item.Error
		}
		err := processor(item.Value)
		if err != nil {
			return err
		}
	}
	return nil
}

// Perform the iteration in the background concurrently with the Each() statement.
// Useful when the iterator or iteratee will be doing blocking work.
//
// The bufferSize parameter lets you set how far ahead the background goroutine can
// get.
//
//   iterator.BackgroundEach(100, func(item interface{}) { ... })
func (iterator Iterator) BackgroundEach(bufferSize int, processor func(interface{}) error) error {
	itemsChannel := make(chan ChannelItem, bufferSize)
	doneChannel := make(chan bool)
	go iterator.IterateToChannel(itemsChannel, doneChannel)
	return EachFromChannel(itemsChannel, doneChannel, processor)
}

// Iterate to a channel in the background.
//
//   for value := range iter.GoSimple(iterator) {
//     ...
//   }
//
// With this method, two undesirable things can happen:
// - if the iteration stops early due to an error, you will not be able to handle
//   it (the goroutine will log and panic, and the program will exit).
// - if callers panic or exit early without retrieving all values from the channel,
//   the goroutine is blocked forever and leaks.
//
// The Go() routine allows you to handle both of these issues, at a small cost to
// caller complexity.  BackgroundEach() provides a simple way to use Go(), as
// well.
//
// That said, if you can make guarantees about no panics or don't care, this
// method can make calling code easier to read.
func (iterator Iterator) GoSimple() (values <-chan interface{}) {
	mainChannel := make(chan interface{})
	go iterator.IterateToChannelSimple(mainChannel)
	return mainChannel
}

// Iterates all items and sends them to the given channel.  Runs on the current
// goroutine (call go iterator.IterateToChannelSimple() to set it up on a new goroutine).
// This will close the values channel when done.  See warnings about GoSimple()
// vs. Go() in the GoSimple() method.
func (iterator Iterator) IterateToChannelSimple(values chan<- interface{}) {
	defer close(values)
	err := iterator.Each(func(item interface{}) {
		values <- item
	})
	if err != nil {
		log.Fatalf("Iterator returned an error in GoSimple: %v", err)
	}
}