1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346
|
// Copyright 2017 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package firestore
import (
"context"
"errors"
"fmt"
"io"
"os"
"strings"
"time"
vkit "cloud.google.com/go/firestore/apiv1"
"cloud.google.com/go/internal/trace"
"cloud.google.com/go/internal/version"
"github.com/golang/protobuf/ptypes"
gax "github.com/googleapis/gax-go/v2"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/api/transport"
pb "google.golang.org/genproto/googleapis/firestore/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
// resourcePrefixHeader is the name of the metadata header used to indicate
// the resource being operated on.
const resourcePrefixHeader = "google-cloud-resource-prefix"
// DetectProjectID is a sentinel value that instructs NewClient to detect the
// project ID. It is given in place of the projectID argument. NewClient will
// use the project ID from the given credentials or the default credentials
// (https://developers.google.com/accounts/docs/application-default-credentials)
// if no credentials were provided. When providing credentials, not all
// options will allow NewClient to extract the project ID. Specifically a JWT
// does not have the project ID encoded.
const DetectProjectID = "*detect-project-id*"
// A Client provides access to the Firestore service.
type Client struct {
c *vkit.Client
projectID string
databaseID string // A client is tied to a single database.
}
// NewClient creates a new Firestore client that uses the given project.
func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*Client, error) {
var o []option.ClientOption
// If this environment variable is defined, configure the client to talk to the emulator.
if addr := os.Getenv("FIRESTORE_EMULATOR_HOST"); addr != "" {
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithPerRPCCredentials(emulatorCreds{}))
if err != nil {
return nil, fmt.Errorf("firestore: dialing address from env var FIRESTORE_EMULATOR_HOST: %s", err)
}
o = []option.ClientOption{option.WithGRPCConn(conn)}
}
o = append(o, opts...)
if projectID == DetectProjectID {
creds, err := transport.Creds(ctx, o...)
if err != nil {
return nil, fmt.Errorf("fetching creds: %v", err)
}
if creds.ProjectID == "" {
return nil, errors.New("firestore: see the docs on DetectProjectID")
}
projectID = creds.ProjectID
}
vc, err := vkit.NewClient(ctx, o...)
if err != nil {
return nil, err
}
vc.SetGoogleClientInfo("gccl", version.Repo)
c := &Client{
c: vc,
projectID: projectID,
databaseID: "(default)", // always "(default)", for now
}
return c, nil
}
// Close closes any resources held by the client.
//
// Close need not be called at program exit.
func (c *Client) Close() error {
return c.c.Close()
}
func (c *Client) path() string {
return fmt.Sprintf("projects/%s/databases/%s", c.projectID, c.databaseID)
}
func withResourceHeader(ctx context.Context, resource string) context.Context {
md, _ := metadata.FromOutgoingContext(ctx)
md = md.Copy()
md[resourcePrefixHeader] = []string{resource}
return metadata.NewOutgoingContext(ctx, md)
}
// Collection creates a reference to a collection with the given path.
// A path is a sequence of IDs separated by slashes.
//
// Collection returns nil if path contains an even number of IDs or any ID is empty.
func (c *Client) Collection(path string) *CollectionRef {
coll, _ := c.idsToRef(strings.Split(path, "/"), c.path())
return coll
}
// Doc creates a reference to a document with the given path.
// A path is a sequence of IDs separated by slashes.
//
// Doc returns nil if path contains an odd number of IDs or any ID is empty.
func (c *Client) Doc(path string) *DocumentRef {
_, doc := c.idsToRef(strings.Split(path, "/"), c.path())
return doc
}
// CollectionGroup creates a reference to a group of collections that include
// the given ID, regardless of parent document.
//
// For example, consider:
// France/Cities/Paris = {population: 100}
// Canada/Cities/Montreal = {population: 90}
//
// CollectionGroup can be used to query across all "Cities" regardless of
// its parent "Countries". See ExampleCollectionGroup for a complete example.
func (c *Client) CollectionGroup(collectionID string) *CollectionGroupRef {
return newCollectionGroupRef(c, c.path(), collectionID)
}
func (c *Client) idsToRef(IDs []string, dbPath string) (*CollectionRef, *DocumentRef) {
if len(IDs) == 0 {
return nil, nil
}
for _, id := range IDs {
if id == "" {
return nil, nil
}
}
coll := newTopLevelCollRef(c, dbPath, IDs[0])
i := 1
for i < len(IDs) {
doc := newDocRef(coll, IDs[i])
i++
if i == len(IDs) {
return nil, doc
}
coll = newCollRefWithParent(c, doc, IDs[i])
i++
}
return coll, nil
}
// GetAll retrieves multiple documents with a single call. The
// DocumentSnapshots are returned in the order of the given DocumentRefs.
// The return value will always contain the same number of DocumentSnapshots
// as the number of DocumentRefs in the input.
//
// If the same DocumentRef is specified multiple times in the input, the return
// value will contain the same number of DocumentSnapshots referencing the same
// document.
//
// If a document is not present, the corresponding DocumentSnapshot's Exists
// method will return false.
func (c *Client) GetAll(ctx context.Context, docRefs []*DocumentRef) (_ []*DocumentSnapshot, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/firestore.GetAll")
defer func() { trace.EndSpan(ctx, err) }()
return c.getAll(ctx, docRefs, nil)
}
func (c *Client) getAll(ctx context.Context, docRefs []*DocumentRef, tid []byte) ([]*DocumentSnapshot, error) {
var docNames []string
docIndices := map[string][]int{} // doc name to positions in docRefs
for i, dr := range docRefs {
if dr == nil {
return nil, errNilDocRef
}
docNames = append(docNames, dr.Path)
docIndices[dr.Path] = append(docIndices[dr.Path], i)
}
req := &pb.BatchGetDocumentsRequest{
Database: c.path(),
Documents: docNames,
}
if tid != nil {
req.ConsistencySelector = &pb.BatchGetDocumentsRequest_Transaction{tid}
}
streamClient, err := c.c.BatchGetDocuments(withResourceHeader(ctx, req.Database), req)
if err != nil {
return nil, err
}
// Read and remember all results from the stream.
var resps []*pb.BatchGetDocumentsResponse
for {
resp, err := streamClient.Recv()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
resps = append(resps, resp)
}
// Results may arrive out of order. Put each at the right indices.
docs := make([]*DocumentSnapshot, len(docNames))
for _, resp := range resps {
var (
indices []int
doc *pb.Document
err error
)
switch r := resp.Result.(type) {
case *pb.BatchGetDocumentsResponse_Found:
indices = docIndices[r.Found.Name]
doc = r.Found
case *pb.BatchGetDocumentsResponse_Missing:
indices = docIndices[r.Missing]
doc = nil
default:
return nil, errors.New("firestore: unknown BatchGetDocumentsResponse result type")
}
for _, index := range indices {
if docs[index] != nil {
return nil, fmt.Errorf("firestore: %q seen twice", docRefs[index].Path)
}
docs[index], err = newDocumentSnapshot(docRefs[index], doc, c, resp.ReadTime)
if err != nil {
return nil, err
}
}
}
return docs, nil
}
// Collections returns an iterator over the top-level collections.
func (c *Client) Collections(ctx context.Context) *CollectionIterator {
it := &CollectionIterator{
client: c,
it: c.c.ListCollectionIds(
withResourceHeader(ctx, c.path()),
&pb.ListCollectionIdsRequest{Parent: c.path() + "/documents"}),
}
it.pageInfo, it.nextFunc = iterator.NewPageInfo(
it.fetch,
func() int { return len(it.items) },
func() interface{} { b := it.items; it.items = nil; return b })
return it
}
// Batch returns a WriteBatch.
func (c *Client) Batch() *WriteBatch {
return &WriteBatch{c: c}
}
// commit calls the Commit RPC outside of a transaction.
func (c *Client) commit(ctx context.Context, ws []*pb.Write) ([]*WriteResult, error) {
req := &pb.CommitRequest{
Database: c.path(),
Writes: ws,
}
res, err := c.c.Commit(withResourceHeader(ctx, req.Database), req)
if err != nil {
return nil, err
}
if len(res.WriteResults) == 0 {
return nil, errors.New("firestore: missing WriteResult")
}
var wrs []*WriteResult
for _, pwr := range res.WriteResults {
wr, err := writeResultFromProto(pwr)
if err != nil {
return nil, err
}
wrs = append(wrs, wr)
}
return wrs, nil
}
func (c *Client) commit1(ctx context.Context, ws []*pb.Write) (*WriteResult, error) {
wrs, err := c.commit(ctx, ws)
if err != nil {
return nil, err
}
return wrs[0], nil
}
// A WriteResult is returned by methods that write documents.
type WriteResult struct {
// The time at which the document was updated, or created if it did not
// previously exist. Writes that do not actually change the document do
// not change the update time.
UpdateTime time.Time
}
func writeResultFromProto(wr *pb.WriteResult) (*WriteResult, error) {
t, err := ptypes.Timestamp(wr.UpdateTime)
if err != nil {
t = time.Time{}
// TODO(jba): Follow up if Delete is supposed to return a nil timestamp.
}
return &WriteResult{UpdateTime: t}, nil
}
func sleep(ctx context.Context, dur time.Duration) error {
switch err := gax.Sleep(ctx, dur); err {
case context.Canceled:
return status.Error(codes.Canceled, "context canceled")
case context.DeadlineExceeded:
return status.Error(codes.DeadlineExceeded, "context deadline exceeded")
default:
return err
}
}
// emulatorCreds is an instance of grpc.PerRPCCredentials that will configure a
// client to act as an admin for the Firestore emulator. It always hardcodes
// the "authorization" metadata field to contain "Bearer owner", which the
// Firestore emulator accepts as valid admin credentials.
type emulatorCreds struct{}
func (ec emulatorCreds) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{"authorization": "Bearer owner"}, nil
}
func (ec emulatorCreds) RequireTransportSecurity() bool {
return false
}
|