1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368
|
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
import (
"container/list"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"math"
"math/rand"
"net/http"
"os"
"strconv"
"strings"
"time"
bigquery "google.golang.org/api/bigquery/v2"
storage "google.golang.org/api/storage/v1"
)
const (
GB = 1 << 30
MaxBackoff = 30000
BaseBackoff = 250
BackoffGrowthFactor = 1.8
BackoffGrowthDamper = 0.25
JobStatusDone = "DONE"
DatasetAlreadyExists = "Already Exists: Dataset"
TableWriteEmptyDisposition = "WRITE_EMPTY"
)
func init() {
scope := fmt.Sprintf("%s %s %s", bigquery.BigqueryScope,
storage.DevstorageReadOnlyScope,
"https://www.googleapis.com/auth/userinfo.profile")
registerDemo("bigquery", scope, bqMain)
}
// This example demonstrates loading objects from Google Cloud Storage into
// BigQuery. Objects are specified by their bucket and a name prefix. Each
// object will be loaded into a new table identified by the object name minus
// any file extension. All tables are added to the specified dataset (one will
// be created if necessary). Currently, tables will not be overwritten and an
// attempt to load an object into a dataset that already contains its table
// will emit an error message indicating the table already exists.
// A schema file must be provided and it will be applied to every object/table.
// Example usage:
// go-api-demo -clientid="my-clientid" -secret="my-secret" bq myProject
// myDataBucket datafile2013070 DataFiles2013
// ./datafile_schema.json 100
//
// This will load all objects (e.g. all data files from July 2013) from
// gs://myDataBucket into a (possibly new) BigQuery dataset named DataFiles2013
// using the schema file provided and allowing up to 100 bad records. Assuming
// each object is named like datafileYYYYMMDD.csv.gz and all of July's files are
// stored in the bucket, 9 tables will be created named like datafile201307DD
// where DD ranges from 01 to 09, inclusive.
// When the program completes, it will emit a results line similar to:
//
// 9 files loaded in 3m58s (18m2.708s). Size: 7.18GB Rows: 7130725
//
// The total elapsed time from the start of first job to the end of the last job
// (effectively wall clock time) is shown. In parenthesis is the aggregate time
// taken to load all tables.
func bqMain(client *http.Client, argv []string) {
if len(argv) != 6 {
fmt.Fprintln(os.Stderr,
"Usage: bq project_id bucket prefix dataset schema max_bad_records")
return
}
var (
project = argv[0]
bucket = argv[1]
objPrefix = argv[2]
datasetId = argv[3]
schemaFile = argv[4]
)
badRecords, err := strconv.ParseInt(argv[5], 10, 64)
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
rand.Seed(time.Now().UnixNano())
service, err := storage.New(client)
if err != nil {
log.Fatalf("Unable to create Storage service: %v", err)
}
// Get the list of objects in the bucket matching the specified prefix.
list := service.Objects.List(bucket)
list.Prefix(objPrefix)
objects, err := list.Do()
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
// Create the wrapper and insert the (new) dataset.
dataset, err := newBQDataset(client, project, datasetId)
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
if err = dataset.insert(true); err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
objectSource := &tableSource{
maxBadRecords: badRecords,
disposition: TableWriteEmptyDisposition,
}
// Load the schema from disk.
f, err := ioutil.ReadFile(schemaFile)
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
if err = json.Unmarshal(f, &objectSource.schema); err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
// Assumes all objects have .csv, .csv.gz (or no) extension.
tableIdFromObject := func(name string) string {
return strings.TrimSuffix(strings.TrimSuffix(name, ".gz"), ".csv")
}
// A jobset is way to group a collection of jobs together for monitoring.
// For this example, we just use the name of the bucket and object prefix.
jobset := fmt.Sprintf("%s:%s", bucket, objPrefix)
fmt.Fprintf(os.Stderr, "\nLoading %d objects.\n", len(objects.Items))
// Load each object into a dataset of the same name (minus any extension).
// A successful insert call will inject the job into our queue for monitoring.
for _, o := range objects.Items {
objectSource.id = tableIdFromObject(o.Name)
objectSource.uri = fmt.Sprintf("gs://%s/%s", o.Bucket, o.Name)
if err = dataset.load(jobset, objectSource); err != nil {
fmt.Fprintln(os.Stderr, err)
}
}
dataset.monitor(jobset)
}
// Wraps the BigQuery service and dataset and provides some helper functions.
type bqDataset struct {
project string
id string
bq *bigquery.Service
dataset *bigquery.Dataset
jobsets map[string]*list.List
}
func newBQDataset(client *http.Client, dsProj string, dsId string) (*bqDataset,
error) {
service, err := bigquery.New(client)
if err != nil {
log.Fatalf("Unable to create BigQuery service: %v", err)
}
return &bqDataset{
project: dsProj,
id: dsId,
bq: service,
dataset: &bigquery.Dataset{
DatasetReference: &bigquery.DatasetReference{
DatasetId: dsId,
ProjectId: dsProj,
},
},
jobsets: make(map[string]*list.List),
}, nil
}
func (ds *bqDataset) insert(existsOK bool) error {
call := ds.bq.Datasets.Insert(ds.project, ds.dataset)
_, err := call.Do()
if err != nil && (!existsOK || !strings.Contains(err.Error(),
DatasetAlreadyExists)) {
return err
}
return nil
}
type tableSource struct {
id string
uri string
schema bigquery.TableSchema
maxBadRecords int64
disposition string
}
func (ds *bqDataset) load(jobset string, source *tableSource) error {
job := &bigquery.Job{
Configuration: &bigquery.JobConfiguration{
Load: &bigquery.JobConfigurationLoad{
DestinationTable: &bigquery.TableReference{
DatasetId: ds.dataset.DatasetReference.DatasetId,
ProjectId: ds.project,
TableId: source.id,
},
MaxBadRecords: source.maxBadRecords,
Schema: &source.schema,
SourceUris: []string{source.uri},
WriteDisposition: source.disposition,
},
},
}
call := ds.bq.Jobs.Insert(ds.project, job)
job, err := call.Do()
if err != nil {
return err
}
_, ok := ds.jobsets[jobset]
if !ok {
ds.jobsets[jobset] = list.New()
}
ds.jobsets[jobset].PushBack(job)
return nil
}
func (ds *bqDataset) getJob(id string) (*bigquery.Job, error) {
return ds.bq.Jobs.Get(ds.project, id).Do()
}
func (ds *bqDataset) monitor(jobset string) {
jobq, ok := ds.jobsets[jobset]
if !ok {
return
}
var backoff float64 = BaseBackoff
pause := func(grow bool) {
if grow {
backoff *= BackoffGrowthFactor
backoff -= (backoff * rand.Float64() * BackoffGrowthDamper)
backoff = math.Min(backoff, MaxBackoff)
fmt.Fprintf(os.Stderr, "[%s] Checking remaining %d jobs...\n", jobset,
1+jobq.Len())
}
time.Sleep(time.Duration(backoff) * time.Millisecond)
}
var stats jobStats
// Track a 'head' pending job in queue for detecting cycling.
head := ""
// Loop until all jobs are done - with either success or error.
for jobq.Len() > 0 {
jel := jobq.Front()
job := jel.Value.(*bigquery.Job)
jobq.Remove(jel)
jid := job.JobReference.JobId
loop := false
// Check and possibly pick a new head job id.
if len(head) == 0 {
head = jid
} else {
if jid == head {
loop = true
}
}
// Retrieve the job's current status.
pause(loop)
j, err := ds.getJob(jid)
if err != nil {
fmt.Fprintln(os.Stderr, err)
// In this case of a transient API error, we want keep the job.
if j == nil {
jobq.PushBack(job)
} else {
// Must reset head tracker if job is discarded.
if loop {
head = ""
backoff = BaseBackoff
}
}
continue
}
// Reassign with the updated job data (from Get).
// We don't use j here as Get might return nil for this value.
job = j
if job.Status.State != JobStatusDone {
jobq.PushBack(job)
continue
}
if res := job.Status.ErrorResult; res != nil {
fmt.Fprintln(os.Stderr, res.Message)
} else {
stat := job.Statistics
lstat := stat.Load
stats.files += 1
stats.bytesIn += lstat.InputFileBytes
stats.bytesOut += lstat.OutputBytes
stats.rows += lstat.OutputRows
stats.elapsed +=
time.Duration(stat.EndTime-stat.StartTime) * time.Millisecond
if stats.start.IsZero() {
stats.start = time.Unix(stat.StartTime/1000, 0)
} else {
t := time.Unix(stat.StartTime/1000, 0)
if stats.start.Sub(t) > 0 {
stats.start = t
}
}
if stats.finish.IsZero() {
stats.finish = time.Unix(stat.EndTime/1000, 0)
} else {
t := time.Unix(stat.EndTime/1000, 0)
if t.Sub(stats.finish) > 0 {
stats.finish = t
}
}
}
// When the head job is processed reset the backoff since the loads
// run in BQ in parallel.
if loop {
head = ""
backoff = BaseBackoff
}
}
fmt.Fprintf(os.Stderr, "%#v\n", stats)
}
type jobStats struct {
// Number of files (sources) loaded.
files int64
// Bytes read from source (possibly compressed).
bytesIn int64
// Bytes loaded into BigQuery (uncompressed).
bytesOut int64
// Rows loaded into BigQuery.
rows int64
// Time taken to load source into table.
elapsed time.Duration
// Start time of the job.
start time.Time
// End time of the job.
finish time.Time
}
func (s jobStats) GoString() string {
return fmt.Sprintf("\n%d files loaded in %v (%v). Size: %.2fGB Rows: %d\n",
s.files, s.finish.Sub(s.start), s.elapsed, float64(s.bytesOut)/GB,
s.rows)
}
|