1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
|
// Copyright 2025 The Tessera Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package stream provides support for streaming contiguous entries from logs.
package client
import (
"context"
"fmt"
"iter"
"github.com/transparency-dev/tessera/api/layout"
"k8s.io/klog/v2"
)
// TreeSizeFunc is a function which knows how to return the current tree size of a log.
type TreeSizeFunc func(ctx context.Context) (uint64, error)
// Bundle represents an entry bundle in a log, along with some metadata about which parts of the bundle
// are relevent.
type Bundle struct {
// RangeInfo decribes which of the entries in this bundle are relevent.
RangeInfo layout.RangeInfo
// Data is the raw serialised bundle, as fetched from the log.
//
// For a tlog-tiles compliant log, this can be unmarshaled using api.EntryBundle.
Data []byte
}
// EntryBundles produces an iterator which returns a stream of Bundle structs which cover the requested range of entries in their natural order in the log.
//
// If the adaptor encounters an error while reading an entry bundle, the encountered error will be returned via the iterator.
//
// This adaptor is optimised for the case where calling getBundle has some appreciable latency, and works
// around that by maintaining a read-ahead cache of subsequent bundles which is populated a number of parallel
// requests to getBundle. The request parallelism is set by the value of the numWorkers paramemter, which can be tuned
// to balance throughput against consumption of resources, but such balancing needs to be mindful of the nature of the
// source infrastructure, and how concurrent requests affect performance (e.g. GCS buckets vs. files on a single disk).
func EntryBundles(ctx context.Context, numWorkers uint, getSize TreeSizeFunc, getBundle EntryBundleFetcherFunc, fromEntry uint64, N uint64) iter.Seq2[Bundle, error] {
ctx, span := tracer.Start(ctx, "tessera.storage.StreamAdaptor")
defer span.End()
// bundleOrErr represents a fetched entry bundle and its params, or an error if we couldn't fetch it for
// some reason.
type bundleOrErr struct {
b Bundle
err error
}
// bundles will be filled with futures for in-order entry bundles by the worker
// go routines below.
// This channel will be drained by the loop at the bottom of this func which
// yields the bundles to the caller.
bundles := make(chan func() bundleOrErr, numWorkers)
exit := make(chan struct{})
// Fetch entry bundle resources in parallel.
// We use a limited number of tokens here to prevent this from
// consuming an unbounded amount of resources.
go func() {
ctx, span := tracer.Start(ctx, "tessera.storage.StreamAdaptorWorker")
defer span.End()
defer close(bundles)
treeSize, err := getSize(ctx)
if err != nil {
bundles <- func() bundleOrErr { return bundleOrErr{err: err} }
return
}
// We'll limit ourselves to numWorkers worth of on-going work using these tokens:
tokens := make(chan struct{}, numWorkers)
for range numWorkers {
tokens <- struct{}{}
}
klog.V(1).Infof("stream.EntryBundles: streaming [%d, %d)", fromEntry, fromEntry+N)
// For each bundle, pop a future into the bundles channel and kick off an async request
// to resolve it.
for ri := range layout.Range(fromEntry, fromEntry+N, treeSize) {
select {
case <-exit:
return
case <-tokens:
// We'll return a token below, once the bundle is fetched _and_ is being yielded.
}
c := make(chan bundleOrErr, 1)
go func(ri layout.RangeInfo) {
b, err := getBundle(ctx, ri.Index, ri.Partial)
c <- bundleOrErr{b: Bundle{RangeInfo: ri, Data: b}, err: err}
}(ri)
f := func() bundleOrErr {
b := <-c
// We're about to yield a value, so we can now return the token and unblock another fetch.
tokens <- struct{}{}
return b
}
bundles <- f
}
klog.V(1).Infof("stream.EntryBundles: exiting")
}()
return func(yield func(Bundle, error) bool) {
defer close(exit)
for f := range bundles {
b := f()
if !yield(b.b, b.err) {
return
}
// For now, force the iterator to stop if we've just returned an error.
// If there's a good reason to allow it to continue we can change this.
if b.err != nil {
return
}
}
klog.V(1).Infof("stream.EntryBundles: iter done")
}
}
// Entry represents a single leaf in a log.
type Entry[T any] struct {
// Index is the index of the entry in the log.
Index uint64
// Entry is the entry from the log.
Entry T
}
// Entries consumes an iterator of Bundle structs and transforms it using the provided unbundle function, and returns an iterator over the transformed data.
//
// Different unbundle implementations can be provided to return raw entry bytes, parsed entry structs, or derivations of entries (e.g. hashes) as needed.
func Entries[T any](bundles iter.Seq2[Bundle, error], unbundle func([]byte) ([]T, error)) iter.Seq2[Entry[T], error] {
return func(yield func(Entry[T], error) bool) {
for b, err := range bundles {
if err != nil {
yield(Entry[T]{}, err)
return
}
es, err := unbundle(b.Data)
if err != nil {
yield(Entry[T]{}, err)
return
}
if len(es) <= int(b.RangeInfo.First) {
yield(Entry[T]{}, fmt.Errorf("logic error: First is %d but only %d entries", b.RangeInfo.First, len(es)))
return
}
es = es[b.RangeInfo.First:]
if len(es) > int(b.RangeInfo.N) {
es = es[:b.RangeInfo.N]
}
rIdx := b.RangeInfo.Index*layout.EntryBundleWidth + uint64(b.RangeInfo.First)
for i, e := range es {
if !yield(Entry[T]{Index: rIdx + uint64(i), Entry: e}, nil) {
return
}
}
}
}
}
|