1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
|
package dstream
// TODO make type generic
import (
"fmt"
)
type lagChunk struct {
xform
SourceData Dstream
// Lags maps variable names to the number of lags to include
// for the variable. Variables not included in the map are
// retained with no lags.
Lags map[string]int
namespos map[string]int
nobs int // total sample size
nobsKnown bool // indicates whether the sample size is available
maxlag int // maximum of all requested lags
doneInit bool // init has run
}
// LagChunk returns a new Dstream in which specified variables are
// included with lagged values. Lagged values are only computed
// within a chunk, not across chunk boundaries, and the first m values
// of each chunk are omitted, where m is the maximum lag value.
func LagChunk(data Dstream, lags map[string]int) Dstream {
lc := &lagChunk{
SourceData: data,
Lags: lags,
}
lc.init()
return lc
}
func (lc *lagChunk) init() {
maxlag := 0
for _, v := range lc.Lags {
if v > maxlag {
maxlag = v
}
}
lc.maxlag = maxlag
// Create the names of the new variables
var names []string
for _, a := range lc.SourceData.Names() {
if lc.Lags[a] == 0 {
names = append(names, a)
} else {
for j := 0; j <= lc.Lags[a]; j++ {
b := fmt.Sprintf("%s[%d]", a, -j)
names = append(names, b)
}
}
}
lc.names = names
lc.namespos = make(map[string]int)
for pos, na := range lc.names {
lc.namespos[na] = pos
}
lc.doneInit = true
}
func (lc *lagChunk) NumObs() int {
if lc.nobsKnown {
return lc.nobs
}
return -1
}
func (lc *lagChunk) Reset() {
lc.SourceData.Reset()
lc.doneInit = false
}
func (lc *lagChunk) Next() bool {
if !lc.doneInit {
lc.init()
}
if !lc.SourceData.Next() {
lc.nobsKnown = true
return false
}
if lc.bdata == nil {
lc.bdata = make([]interface{}, len(lc.names))
}
// Loop over the original data columns
jj := 0
maxlag := lc.maxlag
for j, oname := range lc.SourceData.Names() {
v := lc.SourceData.GetPos(j)
if ilen(v) <= maxlag {
// Segment is too short to use
continue
}
q := lc.Lags[oname]
switch v := v.(type) {
case []float64:
n := len(v)
lc.nobs += n - maxlag
for k := 0; k <= q; k++ {
lc.bdata[jj] = v[(maxlag - k):(n - k)]
jj++
}
case []string:
n := len(v)
lc.nobs += n - maxlag
for k := 0; k <= q; k++ {
lc.bdata[jj] = v[(maxlag - k):(n - k)]
jj++
}
default:
panic("unknown data type")
}
}
return true
}
|