File: uploader.go

package info (click to toggle)
golang-google-cloud 0.9.0-10
  • links: PTS, VCS
  • area: main
  • in suites: buster, buster-backports
  • size: 4,964 kB
  • sloc: sh: 98; makefile: 82; awk: 67
file content (177 lines) | stat: -rw-r--r-- 5,730 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package bigquery

import (
	"errors"
	"fmt"
	"reflect"

	"golang.org/x/net/context"
)

// An Uploader does streaming inserts into a BigQuery table.
// It is safe for concurrent use.
type Uploader struct {
	t *Table

	// SkipInvalidRows causes rows containing invalid data to be silently
	// ignored. The default value is false, which causes the entire request to
	// fail if there is an attempt to insert an invalid row.
	SkipInvalidRows bool

	// IgnoreUnknownValues causes values not matching the schema to be ignored.
	// The default value is false, which causes records containing such values
	// to be treated as invalid records.
	IgnoreUnknownValues bool

	// A TableTemplateSuffix allows Uploaders to create tables automatically.
	//
	// Experimental: this option is experimental and may be modified or removed in future versions,
	// regardless of any other documented package stability guarantees.
	//
	// When you specify a suffix, the table you upload data to
	// will be used as a template for creating a new table, with the same schema,
	// called <table> + <suffix>.
	//
	// More information is available at
	// https://cloud.google.com/bigquery/streaming-data-into-bigquery#template-tables
	TableTemplateSuffix string
}

// Uploader returns an Uploader that can be used to append rows to t.
// The returned Uploader may optionally be further configured before its Put method is called.
func (t *Table) Uploader() *Uploader {
	return &Uploader{t: t}
}

// Put uploads one or more rows to the BigQuery service.
//
// If src is ValueSaver, then its Save method is called to produce a row for uploading.
//
// If src is a struct or pointer to a struct, then a schema is inferred from it
// and used to create a StructSaver. The InsertID of the StructSaver will be
// empty.
//
// If src is a slice of ValueSavers, structs, or struct pointers, then each
// element of the slice is treated as above, and multiple rows are uploaded.
//
// Put returns a PutMultiError if one or more rows failed to be uploaded.
// The PutMultiError contains a RowInsertionError for each failed row.
//
// Put will retry on temporary errors (see
// https://cloud.google.com/bigquery/troubleshooting-errors). This can result
// in duplicate rows if you do not use insert IDs. Also, if the error persists,
// the call will run indefinitely. Pass a context with a timeout to prevent
// hanging calls.
func (u *Uploader) Put(ctx context.Context, src interface{}) error {
	savers, err := valueSavers(src)
	if err != nil {
		return err
	}
	return u.putMulti(ctx, savers)
}

func valueSavers(src interface{}) ([]ValueSaver, error) {
	saver, ok, err := toValueSaver(src)
	if err != nil {
		return nil, err
	}
	if ok {
		return []ValueSaver{saver}, nil
	}
	srcVal := reflect.ValueOf(src)
	if srcVal.Kind() != reflect.Slice {
		return nil, fmt.Errorf("%T is not a ValueSaver, struct, struct pointer, or slice", src)

	}
	var savers []ValueSaver
	for i := 0; i < srcVal.Len(); i++ {
		s := srcVal.Index(i).Interface()
		saver, ok, err := toValueSaver(s)
		if err != nil {
			return nil, err
		}
		if !ok {
			return nil, fmt.Errorf("src[%d] has type %T, which is not a ValueSaver, struct or struct pointer", i, s)
		}
		savers = append(savers, saver)
	}
	return savers, nil
}

// Make a ValueSaver from x, which must implement ValueSaver already
// or be a struct or pointer to struct.
func toValueSaver(x interface{}) (ValueSaver, bool, error) {
	if _, ok := x.(StructSaver); ok {
		return nil, false, errors.New("bigquery: use &StructSaver, not StructSaver")
	}
	var insertID string
	// Handle StructSavers specially so we can infer the schema if necessary.
	if ss, ok := x.(*StructSaver); ok && ss.Schema == nil {
		x = ss.Struct
		insertID = ss.InsertID
		// Fall through so we can infer the schema.
	}
	if saver, ok := x.(ValueSaver); ok {
		return saver, ok, nil
	}
	v := reflect.ValueOf(x)
	// Support Put with []interface{}
	if v.Kind() == reflect.Interface {
		v = v.Elem()
	}
	if v.Kind() == reflect.Ptr {
		v = v.Elem()
	}
	if v.Kind() != reflect.Struct {
		return nil, false, nil
	}
	schema, err := inferSchemaReflectCached(v.Type())
	if err != nil {
		return nil, false, err
	}
	return &StructSaver{
		Struct:   x,
		InsertID: insertID,
		Schema:   schema,
	}, true, nil
}

func (u *Uploader) putMulti(ctx context.Context, src []ValueSaver) error {
	var rows []*insertionRow
	for _, saver := range src {
		row, insertID, err := saver.Save()
		if err != nil {
			return err
		}
		rows = append(rows, &insertionRow{InsertID: insertID, Row: row})
	}

	return u.t.c.service.insertRows(ctx, u.t.ProjectID, u.t.DatasetID, u.t.TableID, rows, &insertRowsConf{
		skipInvalidRows:     u.SkipInvalidRows,
		ignoreUnknownValues: u.IgnoreUnknownValues,
		templateSuffix:      u.TableTemplateSuffix,
	})
}

// An insertionRow represents a row of data to be inserted into a table.
type insertionRow struct {
	// If InsertID is non-empty, BigQuery will use it to de-duplicate insertions of
	// this row on a best-effort basis.
	InsertID string
	// The data to be inserted, represented as a map from field name to Value.
	Row map[string]Value
}