File: lagchunk.go

package info (click to toggle)
golang-github-kshedden-dstream 0.0~git20190512.c4c4106-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 596 kB
  • sloc: makefile: 30
file content (133 lines) | stat: -rw-r--r-- 2,469 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package dstream

// TODO make type generic

import (
	"fmt"
)

type lagChunk struct {
	xform

	SourceData Dstream

	// Lags maps variable names to the number of lags to include
	// for the variable.  Variables not included in the map are
	// retained with no lags.
	Lags map[string]int

	namespos map[string]int

	nobs      int  // total sample size
	nobsKnown bool // indicates whether the sample size is available
	maxlag    int  // maximum of all requested lags
	doneInit  bool // init has run
}

// LagChunk returns a new Dstream in which specified variables are
// included with lagged values.  Lagged values are only computed
// within a chunk, not across chunk boundaries, and the first m values
// of each chunk are omitted, where m is the maximum lag value.
func LagChunk(data Dstream, lags map[string]int) Dstream {
	lc := &lagChunk{
		SourceData: data,
		Lags:       lags,
	}
	lc.init()
	return lc
}

func (lc *lagChunk) init() {
	maxlag := 0
	for _, v := range lc.Lags {
		if v > maxlag {
			maxlag = v
		}
	}
	lc.maxlag = maxlag

	// Create the names of the new variables
	var names []string
	for _, a := range lc.SourceData.Names() {
		if lc.Lags[a] == 0 {
			names = append(names, a)
		} else {
			for j := 0; j <= lc.Lags[a]; j++ {
				b := fmt.Sprintf("%s[%d]", a, -j)
				names = append(names, b)
			}
		}
	}

	lc.names = names

	lc.namespos = make(map[string]int)
	for pos, na := range lc.names {
		lc.namespos[na] = pos
	}

	lc.doneInit = true
}

func (lc *lagChunk) NumObs() int {
	if lc.nobsKnown {
		return lc.nobs
	}
	return -1
}

func (lc *lagChunk) Reset() {
	lc.SourceData.Reset()
	lc.doneInit = false
}

func (lc *lagChunk) Next() bool {

	if !lc.doneInit {
		lc.init()
	}

	if !lc.SourceData.Next() {
		lc.nobsKnown = true
		return false
	}

	if lc.bdata == nil {
		lc.bdata = make([]interface{}, len(lc.names))
	}

	// Loop over the original data columns
	jj := 0
	maxlag := lc.maxlag
	for j, oname := range lc.SourceData.Names() {

		v := lc.SourceData.GetPos(j)
		if ilen(v) <= maxlag {
			// Segment is too short to use
			continue
		}

		q := lc.Lags[oname]
		switch v := v.(type) {
		case []float64:
			n := len(v)
			lc.nobs += n - maxlag
			for k := 0; k <= q; k++ {
				lc.bdata[jj] = v[(maxlag - k):(n - k)]
				jj++
			}
		case []string:
			n := len(v)
			lc.nobs += n - maxlag
			for k := 0; k <= q; k++ {
				lc.bdata[jj] = v[(maxlag - k):(n - k)]
				jj++
			}
		default:
			panic("unknown data type")

		}
	}

	return true
}