1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
|
package dstream
import "fmt"
type segmentedData struct {
xform
// Variables whose changing values define the segments
vars []string
// Positions of the variables in vars
vpos []int
stash []interface{}
// Beginning/end of the current segment
pos int
// true if there is no more data to read from the source; this
// does not necessarily mean that there is no more data to
// yield.
done bool
}
// Segment restructures the chunks of a Dstream so that chunk
// boundaries are determined by any change in the consecutive values
// of a specified set of variables.
func Segment(data Dstream, vars ...string) Dstream {
s := &segmentedData{
xform: xform{
source: data,
},
vars: vars,
}
s.init()
return s
}
func (sd *segmentedData) init() {
// Get the positions of the variables that define the
// segments.
sd.names = sd.source.Names()
mp := make(map[string]int)
for k, v := range sd.names {
mp[v] = k
}
var vpos []int
for _, v := range sd.vars {
vpos = append(vpos, mp[v])
}
sd.vpos = vpos
nvar := sd.source.NumVar()
sd.bdata = make([]interface{}, nvar)
sd.stash = make([]interface{}, nvar)
sd.source.Next()
sd.setb()
}
// setb sets the pointers in bdata to point to the current source data.
func (sd *segmentedData) setb() {
nvar := sd.source.NumVar()
for j := 0; j < nvar; j++ {
sd.bdata[j] = sd.source.GetPos(j)
}
}
func (sd *segmentedData) Get(na string) interface{} {
if sd.namepos == nil {
sd.setNamePos()
}
pos, ok := sd.namepos[na]
if !ok {
msg := fmt.Sprintf("Variable '%s' not found", na)
panic(msg)
}
return sd.GetPos(pos)
}
func (sd *segmentedData) Reset() {
sd.xform.Reset()
sd.pos = 0
sd.source.Next()
sd.setb()
sd.done = false
}
func (sd *segmentedData) Next() bool {
// Stash contains at most one group.
truncate(sd.stash)
if ilen(sd.bdata[0]) == 0 && sd.done {
return false
} else if sd.done {
truncate(sd.bdata)
}
// Cut off the previous group, and find the end of the current
// group.
sd.leftsliceb(sd.pos)
sd.pos = sd.findSegment(0)
// Found a complete group in the current chunk.
if sd.pos != -1 {
return true
}
sd.pos = 0
sd.setstash()
// Get a complete group in the stash, keep reading until we
// have somthing to return.
for {
f := sd.source.Next()
if !f {
sd.done = true
// No more data, whatever is left in bdata is
// the last segment, or we are done if adata
// is empty.
return ilen(sd.bdata[0]) > 0
}
sd.setb()
fd := sd.fixstash()
if fd {
return true
}
}
}
|