1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
|
package dstream
import (
"bufio"
"compress/gzip"
"fmt"
"io"
"os"
"path"
"sort"
"github.com/golang/snappy"
)
type bcols struct {
bpath string
dtypes map[string]string
// Data types for all variables in the source directory, including
// those not included in the Dstream when reading.
dtypesAll map[string]string
names []string
bdata []interface{}
rdrs []*bufio.Reader
// io.Readers, etc. that need closing
toclose []io.Closer
chunksize int
namepos map[string]int
done bool
doneInit bool
nobsKnown bool
nobs int
// Names to include or exclude. If include is empty it is ignored.
include []string
exclude []string
}
// NewBCols takes data stored in a column-wise compressed format under
// the given directory path, and returns it as a Dstream. 'path' is
// the directory containing the data, 'chunksize' is the number of
// values included in each chunk, 'include' and 'exclude' are lists of
// variable names to include (exclude) respectively.
//
// The underlying bcols format is simple. Each column of data is
// stored in its own file, in binary native format, compressed using
// either gzip or snappy compression. A file called 'dtypes.json'
// contains a dictionary mapping variable names to data types.
func NewBCols(path string, chunksize int) *bcols {
b := &bcols{
bpath: path,
chunksize: chunksize,
}
return b
}
// Include specifies variables that are included when reading the data from BCols
// storage into a Dstream. If Include is not called, all variables not listed in
// a call to Exclude are read.
func (bc *bcols) Include(vars ...string) *bcols {
bc.include = vars
return bc
}
// Exclude specifies variables that are not read from the Bcols storage.
func (bc *bcols) Exclude(vars ...string) *bcols {
bc.exclude = vars
return bc
}
// Done is called to signal the conclusion of configuring the BCols reader or
// writer.
func (bc *bcols) Done() Dstream {
bc.init()
return bc
}
// usenames returns the variable names to include in the Dstream.
func (bc *bcols) usenames() []string {
inc := make(map[string]bool)
for _, v := range bc.include {
inc[v] = true
_, ok := bc.dtypesAll[v]
if !ok {
msg := fmt.Sprintf("Variable '%s' does not exist\n", v)
panic(msg)
}
}
// If no variables are included, default is to set include to
// equal all variable names.
if len(bc.include) == 0 {
for k := range bc.dtypesAll {
inc[k] = true
}
}
exc := make(map[string]bool)
for _, v := range bc.exclude {
exc[v] = true
_, ok := bc.dtypesAll[v]
if !ok {
msg := fmt.Sprintf("Variable '%s' does not exist\n", v)
panic(msg)
}
}
var use []string
for k := range inc {
if !exc[k] {
use = append(use, k)
}
}
sort.StringSlice(use).Sort()
return use
}
func (bc *bcols) Names() []string {
return bc.names
}
func (bc *bcols) Close() {
for _, x := range bc.toclose {
x.Close()
}
}
func (bc *bcols) Reset() {
bc.Close()
bc.toclose = bc.toclose[0:0]
bc.rdrs = bc.rdrs[0:0]
bc.init()
bc.nobsKnown = false
bc.nobs = 0
bc.done = false
}
func (bc *bcols) NumVar() int {
return len(bc.names)
}
func (bc *bcols) NumObs() int {
if !bc.nobsKnown {
return -1
}
return bc.nobs
}
func (bc *bcols) GetPos(j int) interface{} {
return bc.bdata[j]
}
func (bc *bcols) Get(na string) interface{} {
pos, ok := bc.namepos[na]
if !ok {
msg := fmt.Sprintf("Variable '%s' not found", na)
panic(msg)
}
return bc.bdata[pos]
}
// BColsWriter writes a dstream to disk in bcols format.
type BColsWriter struct {
// The source data to be written
stream Dstream
// The directory where the results will be written
path string
// either "sz" (snappy), "gz" (gzip), or empty string
cmpr string
// A writer for each variable
wtrs []io.Writer
// All io values needing closing
needsClosing []io.Closer
}
// NewBColsWriter creates a new BColsWriter that writes the given
// dstream.
func NewBColsWriter(d Dstream) *BColsWriter {
return &BColsWriter{
stream: d,
}
}
// Path sets the location (a directory path) to which the data are
// written.
func (bw *BColsWriter) Path(p string) *BColsWriter {
bw.path = p
err := os.MkdirAll(p, 0770)
if err != nil {
panic(err)
}
return bw
}
func (bw *BColsWriter) init() {
if bw.path == "" {
msg := "Path value not set"
panic(msg)
}
// Default compression type
if bw.cmpr == "" {
bw.cmpr = "gz"
}
names := bw.stream.Names()
bw.writeDtypes()
for _, na := range names {
na += ".bin"
if bw.cmpr == "gz" {
na += ".gz"
} else if bw.cmpr == "sz" {
na += ".sz"
} else {
msg := fmt.Sprintf("Compression type %s not recognized", bw.cmpr)
panic(msg)
}
// Create the file
fn := path.Join(bw.path, na)
f, err := os.Create(fn)
if err != nil {
panic(err)
}
bw.needsClosing = append(bw.needsClosing, f)
// Wrap it in a compressor if needed
switch bw.cmpr {
case "gz":
g := gzip.NewWriter(f)
bw.needsClosing = append(bw.needsClosing, g)
bw.wtrs = append(bw.wtrs, g)
case "sz":
g := snappy.NewBufferedWriter(f)
bw.needsClosing = append(bw.needsClosing, g)
bw.wtrs = append(bw.wtrs, g)
default:
bw.wtrs = append(bw.wtrs, f)
}
}
}
// Done flushes the data to disk.
func (bw *BColsWriter) Done() {
bw.init()
bw.write()
// Need to process in reverse order so that nested writers are
// closed inside-out.
for j := len(bw.needsClosing) - 1; j >= 0; j-- {
f := bw.needsClosing[j]
f.Close()
}
}
|