File: join.go

package info (click to toggle)
golang-github-kshedden-dstream 0.0~git20190512.c4c4106-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 596 kB
  • sloc: makefile: 30
file content (129 lines) | stat: -rw-r--r-- 3,250 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package dstream

import (
	"fmt"
	"os"
)

// Join performs a streaming join on several Dstreams that have been
// segmented by id variables.  If join has type Join, then
// join.Data[i] is the current chunk of the i^th stream.  All streams
// being joined must have been segmented by an id variable whose
// values are ascending.  The id variable must have type uint64.
//
// A call to the Next method always advances the first stream
// (Data[0]) by one chunk.  The other elements of Data are advanced
// until their id variable is equal to (if possible) or greater than
// the id variable of Data[0].  If equality is achieved, the
// corresponding element of join.status is set to true.
// join.Status[0] is always false and has no meaning.
//
// The dstream values to be joined must be segmented so that the id
// variable is constant within chunks, and increases in numeric value
// with subsequent calls to the Next method.
type Join struct {

	// A sequence of segmented Dstreams to advance in unison.
	Data []Dstream

	// Status[j] means that the id variable for Data value j is
	// equal to the id variable for Data value 0.  Status[0] is
	// not used.
	Status []bool

	inames []string

	ipos []int

	id [][]uint64
}

// NewJoin creates a Join of the given Dstreams, using the variable
// names in names as ids. The Dstreams in data must be segmented by
// the inames variables before calling NewJoin.
func NewJoin(data []Dstream, names []string) *Join {

	if len(data) != len(names) {
		panic("NewJoin: data and names should have same length\n")
	}

	w := &Join{
		Data:   data,
		inames: names,
	}

	for k, da := range data {
		na := da.Names()
		f := false
		for j, na := range na {
			if na == names[k] {
				w.ipos = append(w.ipos, j)
				f = true
				break
			}
		}
		if !f {
			msg := fmt.Sprintf("Can't find index variable %s in %dth dataset", names[k], k)
			os.Stderr.WriteString(msg)
			os.Exit(1)
		}
	}

	w.Status = make([]bool, len(data))
	w.id = make([][]uint64, len(data))

	return w
}

func (w *Join) needsadvance(j int) bool {
	if w.id[j] == nil || len(w.id[j]) == 0 {
		return true
	}

	if w.id[j][0] < w.id[0][0] {
		return true
	}

	return false
}

func (w *Join) clearstatus() {
	for j := range w.Status {
		w.Status[j] = false
	}
}

// Next advances to the next chunk.  The first dstream, which is
// contained in join.Data[0], always advances to the next sequential
// value of its id variable.  The other dstreams (join.Data[j] for j >
// 0) advance until their id variables are equal to or greater than
// the id variable for the current chunk of join.Data[0].  The status
// field (join.status) indicates which dstreams in the join are
// currently on the same id value as the first dstream (join.Data[0]).
func (w *Join) Next() bool {

	// Advance the index stream
	f := w.Data[0].Next()
	if !f {
		return false
	}
	w.id[0] = w.Data[0].GetPos(w.ipos[0]).([]uint64)

	// Advance the other streams
	w.clearstatus()
	for j := 1; j < len(w.Data); j++ {

		// Keep advancing as long as the stream is behind the index stream.
		for w.needsadvance(j) {
			f := w.Data[j].Next()
			if !f {
				break
			}
			w.id[j] = w.Data[j].GetPos(w.ipos[j]).([]uint64)
		}

		w.Status[j] = w.id[j][0] == w.id[0][0]
	}

	return true
}