1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
|
package dstream
type maxchunksize struct {
xform
// Maximum chunk size
size int
// Position in the current chunk
pos int
// If true, need to call source.Next before proceeding
first bool
}
// MaxChunkSize splits the chunks of the input Dstream so that no
// chunk has more than size rows.
func MaxChunkSize(data Dstream, size int) Dstream {
sc := &maxchunksize{
xform: xform{
source: data,
},
size: size,
first: true,
}
// Read the first source chunk.
sc.source.Next()
nvar := sc.source.NumVar()
sc.names = sc.source.Names()
sc.bdata = make([]interface{}, nvar)
return sc
}
func (sc *maxchunksize) Next() bool {
if sc.first {
if !sc.source.Next() {
return false
}
sc.first = false
}
// Advance or return if there is no current data
x := sc.source.GetPos(0)
m := ilen(x) - sc.pos
if m <= 0 {
if !sc.source.Next() {
return false
}
sc.pos = 0
x = sc.source.GetPos(0)
m = ilen(x) - sc.pos
if m <= 0 {
return false
}
}
if m > sc.size {
m = sc.size
}
for j := 0; j < sc.NumVar(); j++ {
x := sc.source.GetPos(j)
// TODO generic types
switch x := x.(type) {
case []float64:
sc.bdata[j] = x[sc.pos : sc.pos+m]
case []string:
sc.bdata[j] = x[sc.pos : sc.pos+m]
default:
panic("unknown type")
}
}
sc.pos += m
return true
}
func (sc *maxchunksize) Reset() {
sc.source.Reset()
sc.first = true
sc.pos = 0
}
|