1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
|
// Copyright 2020 Google LLC. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// mapdemo is a simple example that shows how a verifiable map can be
// constructed in Beam.
package main
import (
"context"
"crypto"
"encoding/json"
"flag"
"fmt"
"os"
"path/filepath"
"reflect"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/local"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"k8s.io/klog/v2"
"github.com/google/trillian/experimental/batchmap"
"github.com/google/trillian/merkle/coniks"
"github.com/google/trillian/merkle/smt/node"
)
const hash = crypto.SHA512_256
var (
output = flag.String("output", "", "Output directory in which the tiles will be written.")
valueSalt = flag.String("value_salt", "v1", "Some string that will be smooshed in with the generated value before hashing. Allows generated values to be deterministic but variable.")
startKey = flag.Int64("start_key", 0, "Keys will be generated starting with this index.")
keyCount = flag.Int64("key_count", 1<<5, "The number of keys that will be placed in the map.")
treeID = flag.Int64("tree_id", 12345, "The ID of the tree. Used as a salt in hashing.")
prefixStrata = flag.Int("prefix_strata", 1, "The number of strata of 8-bit strata before the final strata. 3 is optimal for trees up to 2^30. 10 is required to import into Trillian.")
)
func init() {
beam.RegisterType(reflect.TypeOf((*mapEntryFn)(nil)).Elem())
beam.RegisterType(reflect.TypeOf((*writeTileFn)(nil)).Elem())
}
func main() {
klog.InitFlags(nil)
flag.Parse()
beam.Init()
output := filepath.Clean(*output)
if output == "" {
klog.Exitf("No output provided")
}
// Create the directory if it doesn't exist
if _, err := os.Stat(output); os.IsNotExist(err) {
if err = os.Mkdir(output, 0o700); err != nil {
klog.Fatalf("couldn't find or create directory %s, %v", output, err)
}
}
p, s := beam.NewPipelineWithRoot()
// Get the collection of key/values that are to be committed to by the map.
// Here we generate them from scratch, but a real application would likely want to commit to
// data read from some data source.
entries := beam.ParDo(s, &mapEntryFn{*valueSalt, *treeID}, createRange(s, *startKey, *keyCount))
// Create the map, which will be returned as a collection of Tiles.
allTiles, err := batchmap.Create(s, entries, *treeID, hash, *prefixStrata)
if err != nil {
klog.Fatalf("Failed to create pipeline: %v", err)
}
// Write this collection of tiles to the output directory.
beam.ParDo0(s, &writeTileFn{output}, allTiles)
// All of the above constructs the pipeline but doesn't run it. Now we run it.
if err := beamx.Run(context.Background(), p); err != nil {
klog.Fatalf("Failed to execute job: %v", err)
}
klog.Infof("Pipeline completed successfully! Output data is at %s", output)
}
// mapEntryFn is a Beam ParDo function that generates a key/value from an int64 input.
// In a real application this would be replaced with a function that generated the Entry
// objects from some domain objects (e.g. for Certificate Transparency this might take
// a Certificate as input and generate an Entry that represents it).
type mapEntryFn struct {
Salt string
TreeID int64
}
func (fn *mapEntryFn) ProcessElement(i int64) *batchmap.Entry {
h := hash.New()
_, _ = fmt.Fprintf(h, "%d", i)
kbs := h.Sum(nil)
leafID := node.NewID(string(kbs), uint(len(kbs)*8))
data := []byte(fmt.Sprintf("[%s]%d", fn.Salt, i))
return &batchmap.Entry{
HashKey: kbs,
HashValue: coniks.Default.HashLeaf(fn.TreeID, leafID, data),
}
}
// writeTileFn serializes the tile into the given directory, using the tile
// path to determine the file name.
// This is reasonable for a demo with a small number of tiles, but with large
// maps with multiple revisions, it is conceivable that one could run out of
// inodes on the filesystem, and thus using a database locally or storing the
// tile data in cloud storage are more likely to scale.
type writeTileFn struct {
Directory string
}
func (fn *writeTileFn) ProcessElement(ctx context.Context, t *batchmap.Tile) error {
fs := local.New(ctx)
w, err := fs.OpenWrite(ctx, fmt.Sprintf("%s/path_%x", fn.Directory, t.Path))
if err != nil {
return err
}
defer func() {
if err := w.Close(); err != nil {
klog.Errorf("Close(): %v", err)
}
}()
bs, err := json.Marshal(t)
if err != nil {
return err
}
_, err = w.Write(bs)
return err
}
// createRange simply generates a PCollection of int64 which is used to seed the demo
// pipeline.
func createRange(s beam.Scope, start, count int64) beam.PCollection {
// TODO(mhutchinson): make this parallel
values := make([]int64, count)
for i := int64(0); i < count; i++ {
values[i] = start + i
}
return beam.CreateList(s, values)
}
|