File: segment.go

package info (click to toggle)
golang-github-kshedden-dstream 0.0~git20190512.c4c4106-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 596 kB
  • sloc: makefile: 30
file content (138 lines) | stat: -rw-r--r-- 2,534 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package dstream

import "fmt"

type segmentedData struct {
	xform

	// Variables whose changing values define the segments
	vars []string

	// Positions of the variables in vars
	vpos []int

	stash []interface{}

	// Beginning/end of the current segment
	pos int

	// true if there is no more data to read from the source; this
	// does not necessarily mean that there is no more data to
	// yield.
	done bool
}

// Segment restructures the chunks of a Dstream so that chunk
// boundaries are determined by any change in the consecutive values
// of a specified set of variables.
func Segment(data Dstream, vars ...string) Dstream {
	s := &segmentedData{
		xform: xform{
			source: data,
		},
		vars: vars,
	}
	s.init()

	return s
}

func (sd *segmentedData) init() {

	// Get the positions of the variables that define the
	// segments.
	sd.names = sd.source.Names()
	mp := make(map[string]int)
	for k, v := range sd.names {
		mp[v] = k
	}
	var vpos []int
	for _, v := range sd.vars {
		vpos = append(vpos, mp[v])
	}
	sd.vpos = vpos

	nvar := sd.source.NumVar()
	sd.bdata = make([]interface{}, nvar)
	sd.stash = make([]interface{}, nvar)

	sd.source.Next()
	sd.setb()
}

// setb sets the pointers in bdata to point to the current source data.
func (sd *segmentedData) setb() {
	nvar := sd.source.NumVar()
	for j := 0; j < nvar; j++ {
		sd.bdata[j] = sd.source.GetPos(j)
	}
}

func (sd *segmentedData) Get(na string) interface{} {

	if sd.namepos == nil {
		sd.setNamePos()
	}

	pos, ok := sd.namepos[na]
	if !ok {
		msg := fmt.Sprintf("Variable '%s' not found", na)
		panic(msg)
	}
	return sd.GetPos(pos)
}

func (sd *segmentedData) Reset() {
	sd.xform.Reset()
	sd.pos = 0
	sd.source.Next()
	sd.setb()
	sd.done = false
}

func (sd *segmentedData) Next() bool {

	// Stash contains at most one group.
	truncate(sd.stash)

	if ilen(sd.bdata[0]) == 0 && sd.done {
		return false
	} else if sd.done {
		truncate(sd.bdata)
	}

	// Cut off the previous group, and find the end of the current
	// group.
	sd.leftsliceb(sd.pos)
	sd.pos = sd.findSegment(0)

	// Found a complete group in the current chunk.
	if sd.pos != -1 {
		return true
	}

	sd.pos = 0
	sd.setstash()

	// Get a complete group in the stash, keep reading until we
	// have somthing to return.
	for {
		f := sd.source.Next()

		if !f {
			sd.done = true

			// No more data, whatever is left in bdata is
			// the last segment, or we are done if adata
			// is empty.
			return ilen(sd.bdata[0]) > 0
		}
		sd.setb()

		fd := sd.fixstash()

		if fd {
			return true
		}
	}
}