File: window.go

package info (click to toggle)
golang-github-kurin-blazer 0.5.3-2
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 580 kB
  • sloc: makefile: 4
file content (162 lines) | stat: -rw-r--r-- 4,286 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
// Copyright 2018, the Blazer authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package window provides a type for efficiently recording events as they
// occur over a given span of time.  Events added to the window will remain
// until the time expires.
package window

import (
	"sync"
	"time"
)

// A Window efficiently records events that have occurred over a span of time
// extending from some fixed interval ago to now.  Events that pass beyond this
// horizon are discarded.
type Window struct {
	mu      sync.Mutex
	events  []interface{}
	res     time.Duration
	last    time.Time
	reduce  Reducer
	forever bool
	e       interface{}
}

// A Reducer should take two values from the window and combine them into a
// third value that will be stored in the window.  The values i or j may be
// nil.  The underlying types for both arguments and the output should be
// identical.
//
// If the reducer is any kind of slice or list, then data usage will grow
// linearly with the number of events added to the window.
//
// Reducer will be called on its own output: Reducer(Reducer(x, y), z).
type Reducer func(i, j interface{}) interface{}

// New returns an initialized window for events over the given duration at the
// given resolution.  Windows with tight resolution (i.e., small values for
// that argument) will be more accurate, at the cost of some memory.
//
// A size of 0 means "forever"; old events will never be removed.
func New(size, resolution time.Duration, r Reducer) *Window {
	if size > 0 {
		return &Window{
			res:    resolution,
			events: make([]interface{}, size/resolution),
			reduce: r,
		}
	}
	return &Window{
		forever: true,
		reduce:  r,
	}
}

func (w *Window) bucket(now time.Time) int {
	nanos := now.UnixNano()
	abs := nanos / int64(w.res)
	return int(abs) % len(w.events)
}

// sweep keeps the window valid.  It needs to be called from every method that
// views or updates the window, and the caller needs to hold the mutex.
func (w *Window) sweep(now time.Time) {
	if w.forever {
		return
	}
	defer func() {
		w.last = now
	}()

	// This compares now and w.last's monotonic clocks.
	diff := now.Sub(w.last)
	if diff < 0 {
		// time went backwards somehow; zero events and return
		for i := range w.events {
			w.events[i] = nil
		}
		return
	}
	last := now.Add(-diff)

	b := w.bucket(now)
	p := w.bucket(last)

	if b == p && diff <= w.res {
		// We're in the same bucket as the previous sweep, so all buckets are
		// valid.
		return
	}

	if diff > w.res*time.Duration(len(w.events)) {
		// We've gone longer than this window measures since the last sweep, just
		// zero the thing and have done.
		for i := range w.events {
			w.events[i] = nil
		}
		return
	}

	// Expire all invalid buckets.  This means buckets not seen since the
	// previous sweep and now, including the current bucket but not including the
	// previous bucket.
	old := int64(last.UnixNano()) / int64(w.res)
	new := int64(now.UnixNano()) / int64(w.res)
	for i := old + 1; i <= new; i++ {
		b := int(i) % len(w.events)
		w.events[b] = nil
	}
}

// Insert adds the given event.
func (w *Window) Insert(e interface{}) {
	w.insertAt(time.Now(), e)
}

func (w *Window) insertAt(t time.Time, e interface{}) {
	w.mu.Lock()
	defer w.mu.Unlock()

	if w.forever {
		w.e = w.reduce(w.e, e)
		return
	}

	w.sweep(t)
	w.events[w.bucket(t)] = w.reduce(w.events[w.bucket(t)], e)
}

// Reduce runs the window's reducer over the valid values and returns the
// result.
func (w *Window) Reduce() interface{} {
	return w.reducedAt(time.Now())
}

func (w *Window) reducedAt(t time.Time) interface{} {
	w.mu.Lock()
	defer w.mu.Unlock()

	if w.forever {
		return w.e
	}

	w.sweep(t)
	var n interface{}
	for i := range w.events {
		n = w.reduce(n, w.events[i])
	}
	return n
}