File: bundler.go

package info (click to toggle)
golang-google-api 0.0~git20161128.3cc2e59-1
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 64,468 kB
  • ctags: 71,262
  • sloc: makefile: 15
file content (261 lines) | stat: -rw-r--r-- 8,994 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
// Copyright 2016 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package bundler supports bundling (batching) of items. Bundling amortizes an
// action with fixed costs over multiple items. For example, if an API provides
// an RPC that accepts a list of items as input, but clients would prefer
// adding items one at a time, then a Bundler can accept individual items from
// the client and bundle many of them into a single RPC.
//
// This package is experimental and subject to change without notice.
package bundler

import (
	"errors"
	"reflect"
	"sync"
	"time"
)

const (
	DefaultDelayThreshold       = time.Second
	DefaultBundleCountThreshold = 10
	DefaultBundleByteThreshold  = 1e6 // 1M
	DefaultBufferedByteLimit    = 1e9 // 1G
)

var (
	// ErrOverflow indicates that Bundler's stored bytes exceeds its BufferedByteLimit.
	ErrOverflow = errors.New("bundler reached buffered byte limit")

	// ErrOversizedItem indicates that an item's size exceeds the maximum bundle size.
	ErrOversizedItem = errors.New("item size exceeds bundle byte limit")
)

// A Bundler collects items added to it into a bundle until the bundle
// exceeds a given size, then calls a user-provided function to handle the bundle.
type Bundler struct {
	// Starting from the time that the first message is added to a bundle, once
	// this delay has passed, handle the bundle. The default is DefaultDelayThreshold.
	DelayThreshold time.Duration

	// Once a bundle has this many items, handle the bundle. Since only one
	// item at a time is added to a bundle, no bundle will exceed this
	// threshold, so it also serves as a limit. The default is
	// DefaultBundleCountThreshold.
	BundleCountThreshold int

	// Once the number of bytes in current bundle reaches this threshold, handle
	// the bundle. The default is DefaultBundleByteThreshold. This triggers handling,
	// but does not cap the total size of a bundle.
	BundleByteThreshold int

	// The maximum size of a bundle, in bytes. Zero means unlimited.
	BundleByteLimit int

	// The maximum number of bytes that the Bundler will keep in memory before
	// returning ErrOverflow. The default is DefaultBufferedByteLimit.
	BufferedByteLimit int

	handler       func(interface{}) // called to handle a bundle
	itemSliceZero reflect.Value     // nil (zero value) for slice of items
	donec         chan struct{}     // closed when the Bundler is closed
	handlec       chan int          // sent to when a bundle is ready for handling
	timer         *time.Timer       // implements DelayThreshold

	mu            sync.Mutex
	bufferedSize  int           // total bytes buffered
	closedBundles []bundle      // bundles waiting to be handled
	curBundle     bundle        // incoming items added to this bundle
	calledc       chan struct{} // closed and re-created after handler is called
}

type bundle struct {
	items reflect.Value // slice of item type
	size  int           // size in bytes of all items
}

// NewBundler creates a new Bundler. When you are finished with a Bundler, call
// its Close method.
//
// itemExample is a value of the type that will be bundled. For example, if you
// want to create bundles of *Entry, you could pass &Entry{} for itemExample.
//
// handler is a function that will be called on each bundle. If itemExample is
// of type T, the argument to handler is of type []T. handler is always called
// sequentially for each bundle, and never in parallel.
func NewBundler(itemExample interface{}, handler func(interface{})) *Bundler {
	b := &Bundler{
		DelayThreshold:       DefaultDelayThreshold,
		BundleCountThreshold: DefaultBundleCountThreshold,
		BundleByteThreshold:  DefaultBundleByteThreshold,
		BufferedByteLimit:    DefaultBufferedByteLimit,

		handler:       handler,
		itemSliceZero: reflect.Zero(reflect.SliceOf(reflect.TypeOf(itemExample))),
		donec:         make(chan struct{}),
		handlec:       make(chan int, 1),
		calledc:       make(chan struct{}),
		timer:         time.NewTimer(1000 * time.Hour), // harmless initial timeout
	}
	b.curBundle.items = b.itemSliceZero
	go b.background()
	return b
}

// Add adds item to the current bundle. It marks the bundle for handling and
// starts a new one if any of the thresholds or limits are exceeded.
//
// If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then
// the item can never be handled. Add returns ErrOversizedItem in this case.
//
// If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit),
// Add returns ErrOverflow.
//
// Add never blocks.
func (b *Bundler) Add(item interface{}, size int) error {
	// If this item exceeds the maximum size of a bundle,
	// we can never send it.
	if b.BundleByteLimit > 0 && size > b.BundleByteLimit {
		return ErrOversizedItem
	}
	b.mu.Lock()
	defer b.mu.Unlock()
	// If adding this item would exceed our allotted memory
	// footprint, we can't accept it.
	if b.bufferedSize+size > b.BufferedByteLimit {
		return ErrOverflow
	}
	// If adding this item to the current bundle would cause it to exceed the
	// maximum bundle size, close the current bundle and start a new one.
	if b.BundleByteLimit > 0 && b.curBundle.size+size > b.BundleByteLimit {
		b.closeAndHandleBundle()
	}
	// Add the item.
	b.curBundle.items = reflect.Append(b.curBundle.items, reflect.ValueOf(item))
	b.curBundle.size += size
	b.bufferedSize += size
	// If this is the first item in the bundle, restart the timer.
	if b.curBundle.items.Len() == 1 {
		b.timer.Reset(b.DelayThreshold)
	}
	// If the current bundle equals the count threshold, close it.
	if b.curBundle.items.Len() == b.BundleCountThreshold {
		b.closeAndHandleBundle()
	}
	// If the current bundle equals or exceeds the byte threshold, close it.
	if b.curBundle.size >= b.BundleByteThreshold {
		b.closeAndHandleBundle()
	}
	return nil
}

// Flush waits until all items in the Bundler have been handled (that is,
// until the last invocation of handler has returned).
func (b *Bundler) Flush() {
	b.mu.Lock()
	b.closeBundle()
	// Unconditionally trigger the handling goroutine, to ensure calledc is closed
	// even if there are no outstanding bundles.
	select {
	case b.handlec <- 1:
	default:
	}
	calledc := b.calledc // remember locally, because it may change
	b.mu.Unlock()
	<-calledc
}

// Close calls Flush, then shuts down the Bundler. Close should always be
// called on a Bundler when it is no longer needed. You must wait for all calls
// to Add to complete before calling Close. Calling Add concurrently with Close
// may result in the added items being ignored.
func (b *Bundler) Close() {
	b.Flush()
	b.mu.Lock()
	b.timer.Stop()
	b.mu.Unlock()
	close(b.donec)
}

func (b *Bundler) closeAndHandleBundle() {
	if b.closeBundle() {
		// We have created a closed bundle.
		// Send to handlec without blocking.
		select {
		case b.handlec <- 1:
		default:
		}
	}
}

// closeBundle finishes the current bundle, adds it to the list of closed
// bundles and informs the background goroutine that there are bundles ready
// for processing.
//
// This should always be called with b.mu held.
func (b *Bundler) closeBundle() bool {
	if b.curBundle.items.Len() == 0 {
		return false
	}
	b.closedBundles = append(b.closedBundles, b.curBundle)
	b.curBundle.items = b.itemSliceZero
	b.curBundle.size = 0
	return true
}

// background runs in a separate goroutine, waiting for events and handling
// bundles.
func (b *Bundler) background() {
	done := false
	for {
		timedOut := false
		// Wait for something to happen.
		select {
		case <-b.handlec:
		case <-b.donec:
			done = true
		case <-b.timer.C:
			timedOut = true
		}
		// Handle closed bundles.
		b.mu.Lock()
		if timedOut {
			b.closeBundle()
		}
		buns := b.closedBundles
		b.closedBundles = nil
		// Closing calledc means we've sent all bundles. We need
		// a new channel for the next set of bundles, which may start
		// accumulating as soon as we release the lock.
		calledc := b.calledc
		b.calledc = make(chan struct{})
		b.mu.Unlock()
		for i, bun := range buns {
			b.handler(bun.items.Interface())
			// Drop the bundle's items, reducing our memory footprint.
			buns[i].items = reflect.Value{} // buns[i] because bun is a copy
			// Note immediately that we have more space, so Adds that occur
			// during this loop will have a chance of succeeding.
			b.mu.Lock()
			b.bufferedSize -= bun.size
			b.mu.Unlock()
		}
		// Signal that we've sent all outstanding bundles.
		close(calledc)
		if done {
			break
		}
	}
}