File: mapdemo.go

package info (click to toggle)
trillian 1.7.2-1
  • links: PTS, VCS
  • area: main
  • in suites: experimental, forky, sid
  • size: 6,600 kB
  • sloc: sh: 1,181; javascript: 474; sql: 330; makefile: 39
file content (158 lines) | stat: -rw-r--r-- 5,357 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
// Copyright 2020 Google LLC. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// mapdemo is a simple example that shows how a verifiable map can be
// constructed in Beam.
package main

import (
	"context"
	"crypto"
	"encoding/json"
	"flag"
	"fmt"
	"os"
	"path/filepath"
	"reflect"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/local"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
	"k8s.io/klog/v2"

	"github.com/google/trillian/experimental/batchmap"
	"github.com/google/trillian/merkle/coniks"
	"github.com/google/trillian/merkle/smt/node"
)

const hash = crypto.SHA512_256

var (
	output       = flag.String("output", "", "Output directory in which the tiles will be written.")
	valueSalt    = flag.String("value_salt", "v1", "Some string that will be smooshed in with the generated value before hashing. Allows generated values to be deterministic but variable.")
	startKey     = flag.Int64("start_key", 0, "Keys will be generated starting with this index.")
	keyCount     = flag.Int64("key_count", 1<<5, "The number of keys that will be placed in the map.")
	treeID       = flag.Int64("tree_id", 12345, "The ID of the tree. Used as a salt in hashing.")
	prefixStrata = flag.Int("prefix_strata", 1, "The number of strata of 8-bit strata before the final strata. 3 is optimal for trees up to 2^30. 10 is required to import into Trillian.")
)

func init() {
	beam.RegisterType(reflect.TypeOf((*mapEntryFn)(nil)).Elem())
	beam.RegisterType(reflect.TypeOf((*writeTileFn)(nil)).Elem())
}

func main() {
	klog.InitFlags(nil)
	flag.Parse()
	beam.Init()

	output := filepath.Clean(*output)
	if output == "" {
		klog.Exitf("No output provided")
	}

	// Create the directory if it doesn't exist
	if _, err := os.Stat(output); os.IsNotExist(err) {
		if err = os.Mkdir(output, 0o700); err != nil {
			klog.Fatalf("couldn't find or create directory %s, %v", output, err)
		}
	}

	p, s := beam.NewPipelineWithRoot()

	// Get the collection of key/values that are to be committed to by the map.
	// Here we generate them from scratch, but a real application would likely want to commit to
	// data read from some data source.
	entries := beam.ParDo(s, &mapEntryFn{*valueSalt, *treeID}, createRange(s, *startKey, *keyCount))

	// Create the map, which will be returned as a collection of Tiles.
	allTiles, err := batchmap.Create(s, entries, *treeID, hash, *prefixStrata)
	if err != nil {
		klog.Fatalf("Failed to create pipeline: %v", err)
	}

	// Write this collection of tiles to the output directory.
	beam.ParDo0(s, &writeTileFn{output}, allTiles)

	// All of the above constructs the pipeline but doesn't run it. Now we run it.
	if err := beamx.Run(context.Background(), p); err != nil {
		klog.Fatalf("Failed to execute job: %v", err)
	}
	klog.Infof("Pipeline completed successfully! Output data is at %s", output)
}

// mapEntryFn is a Beam ParDo function that generates a key/value from an int64 input.
// In a real application this would be replaced with a function that generated the Entry
// objects from some domain objects (e.g. for Certificate Transparency this might take
// a Certificate as input and generate an Entry that represents it).
type mapEntryFn struct {
	Salt   string
	TreeID int64
}

func (fn *mapEntryFn) ProcessElement(i int64) *batchmap.Entry {
	h := hash.New()
	_, _ = fmt.Fprintf(h, "%d", i)
	kbs := h.Sum(nil)
	leafID := node.NewID(string(kbs), uint(len(kbs)*8))

	data := []byte(fmt.Sprintf("[%s]%d", fn.Salt, i))

	return &batchmap.Entry{
		HashKey:   kbs,
		HashValue: coniks.Default.HashLeaf(fn.TreeID, leafID, data),
	}
}

// writeTileFn serializes the tile into the given directory, using the tile
// path to determine the file name.
// This is reasonable for a demo with a small number of tiles, but with large
// maps with multiple revisions, it is conceivable that one could run out of
// inodes on the filesystem, and thus using a database locally or storing the
// tile data in cloud storage are more likely to scale.
type writeTileFn struct {
	Directory string
}

func (fn *writeTileFn) ProcessElement(ctx context.Context, t *batchmap.Tile) error {
	fs := local.New(ctx)
	w, err := fs.OpenWrite(ctx, fmt.Sprintf("%s/path_%x", fn.Directory, t.Path))
	if err != nil {
		return err
	}

	defer func() {
		if err := w.Close(); err != nil {
			klog.Errorf("Close(): %v", err)
		}
	}()

	bs, err := json.Marshal(t)
	if err != nil {
		return err
	}
	_, err = w.Write(bs)
	return err
}

// createRange simply generates a PCollection of int64 which is used to seed the demo
// pipeline.
func createRange(s beam.Scope, start, count int64) beam.PCollection {
	// TODO(mhutchinson): make this parallel
	values := make([]int64, count)
	for i := int64(0); i < count; i++ {
		values[i] = start + i
	}
	return beam.CreateList(s, values)
}