1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850
|
// Copyright 2024 The Tessera authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tessera
import (
"context"
"errors"
"fmt"
"net/http"
"os"
"sync"
"sync/atomic"
"time"
f_log "github.com/transparency-dev/formats/log"
"github.com/transparency-dev/merkle/rfc6962"
"github.com/transparency-dev/tessera/api/layout"
"github.com/transparency-dev/tessera/internal/otel"
"github.com/transparency-dev/tessera/internal/parse"
"github.com/transparency-dev/tessera/internal/witness"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"golang.org/x/mod/sumdb/note"
"k8s.io/klog/v2"
)
const (
// DefaultBatchMaxSize is used by storage implementations if no WithBatching option is provided when instantiating it.
DefaultBatchMaxSize = 256
// DefaultBatchMaxAge is used by storage implementations if no WithBatching option is provided when instantiating it.
DefaultBatchMaxAge = 250 * time.Millisecond
// DefaultCheckpointInterval is used by storage implementations if no WithCheckpointInterval option is provided when instantiating it.
DefaultCheckpointInterval = 10 * time.Second
// DefaultCheckpointRepublishInterval is used by storage implementations if no WithCheckpointRepublishInterval option is provided when instantiating it.
DefaultCheckpointRepublishInterval = 10 * time.Minute
// DefaultPushbackMaxOutstanding is used by storage implementations if no WithPushback option is provided when instantiating it.
DefaultPushbackMaxOutstanding = 4096
// DefaultGarbageCollectionInterval is the default value used if no WithGarbageCollectionInterval option is provided.
DefaultGarbageCollectionInterval = time.Minute
// DefaultAntispamInMemorySize is the recommended default limit on the number of entries in the in-memory antispam cache.
// The amount of data stored for each entry is small (32 bytes of hash + 8 bytes of index), so in the general case it should be fine
// to have a very large cache.
DefaultAntispamInMemorySize = 256 << 10
// DefaultWitnessTimeout is the default maximum time to wait for responses from configured witnesses.
DefaultWitnessTimeout = 5 * time.Second
)
var (
appenderAddsTotal metric.Int64Counter
appenderAddHistogram metric.Int64Histogram
appenderHighestIndex metric.Int64Gauge
appenderIntegratedSize metric.Int64Gauge
appenderIntegrateLatency metric.Int64Histogram
appenderDeadlineRemaining metric.Int64Histogram
appenderNextIndex metric.Int64Gauge
appenderSignedSize metric.Int64Gauge
appenderWitnessedSize metric.Int64Gauge
appenderWitnessRequests metric.Int64Counter
appenderWitnessHistogram metric.Int64Histogram
followerEntriesProcessed metric.Int64Gauge
followerLag metric.Int64Gauge
// Custom histogram buckets as we're still interested in details in the 1-2s area.
histogramBuckets = []float64{0, 10, 50, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 1200, 1400, 1600, 1800, 2000, 2500, 3000, 4000, 5000, 6000, 8000, 10000}
)
func init() {
var err error
appenderAddsTotal, err = meter.Int64Counter(
"tessera.appender.add.calls",
metric.WithDescription("Number of calls to the appender lifecycle Add function"),
metric.WithUnit("{call}"))
if err != nil {
klog.Exitf("Failed to create appenderAddsTotal metric: %v", err)
}
appenderAddHistogram, err = meter.Int64Histogram(
"tessera.appender.add.duration",
metric.WithDescription("Duration of calls to the appender lifecycle Add function"),
metric.WithUnit("ms"),
metric.WithExplicitBucketBoundaries(histogramBuckets...))
if err != nil {
klog.Exitf("Failed to create appenderAddDuration metric: %v", err)
}
appenderHighestIndex, err = meter.Int64Gauge(
"tessera.appender.index",
metric.WithDescription("Highest index assigned by appender lifecycle Add function"))
if err != nil {
klog.Exitf("Failed to create appenderHighestIndex metric: %v", err)
}
appenderIntegratedSize, err = meter.Int64Gauge(
"tessera.appender.integrated.size",
metric.WithDescription("Size of the integrated (but not necessarily published) tree"),
metric.WithUnit("{entry}"))
if err != nil {
klog.Exitf("Failed to create appenderIntegratedSize metric: %v", err)
}
appenderIntegrateLatency, err = meter.Int64Histogram(
"tessera.appender.integrate.latency",
metric.WithDescription("Duration between an index being assigned by Add, and that index being integrated in the tree"),
metric.WithUnit("ms"),
metric.WithExplicitBucketBoundaries(histogramBuckets...))
if err != nil {
klog.Exitf("Failed to create appenderIntegrateLatency metric: %v", err)
}
appenderDeadlineRemaining, err = meter.Int64Histogram(
"tessera.appender.deadline.remaining",
metric.WithDescription("Duration remaining before context cancellation when appender is invoked (only set for contexts with deadline)"),
metric.WithUnit("ms"),
metric.WithExplicitBucketBoundaries(histogramBuckets...))
if err != nil {
klog.Exitf("Failed to create appenderDeadlineRemaining metric: %v", err)
}
appenderNextIndex, err = meter.Int64Gauge(
"tessera.appender.next_index",
metric.WithDescription("The next available index to be assigned to entries"))
if err != nil {
klog.Exitf("Failed to create appenderNextIndex metric: %v", err)
}
appenderSignedSize, err = meter.Int64Gauge(
"tessera.appender.signed.size",
metric.WithDescription("Size of the latest signed checkpoint"),
metric.WithUnit("{entry}"))
if err != nil {
klog.Exitf("Failed to create appenderSignedSize metric: %v", err)
}
appenderWitnessedSize, err = meter.Int64Gauge(
"tessera.appender.witnessed.size",
metric.WithDescription("Size of the latest successfully witnessed checkpoint"),
metric.WithUnit("{entry}"))
if err != nil {
klog.Exitf("Failed to create appenderWitnessedSize metric: %v", err)
}
followerEntriesProcessed, err = meter.Int64Gauge(
"tessera.follower.processed",
metric.WithDescription("Number of entries processed"),
metric.WithUnit("{entry}"))
if err != nil {
klog.Exitf("Failed to create followerEntriesProcessed metric: %v", err)
}
followerLag, err = meter.Int64Gauge(
"tessera.follower.lag",
metric.WithDescription("Number of unprocessed entries in the current integrated tree"),
metric.WithUnit("{entry}"))
if err != nil {
klog.Exitf("Failed to create followerLag metric: %v", err)
}
appenderWitnessRequests, err = meter.Int64Counter(
"tessera.appender.witness.requests",
metric.WithDescription("Number of attempts to witness a log checkpoint"),
metric.WithUnit("{call}"))
if err != nil {
klog.Exitf("Failed to create appenderWitnessRequests metric: %v", err)
}
appenderWitnessHistogram, err = meter.Int64Histogram(
"tessera.appender.witness.duration",
metric.WithDescription("Duration of calls to the configured witness group"),
metric.WithUnit("ms"),
metric.WithExplicitBucketBoundaries(histogramBuckets...))
if err != nil {
klog.Exitf("Failed to create appenderWitnessHistogram metric: %v", err)
}
}
// AddFn adds a new entry to be sequenced by the storage implementation.
//
// This method should quickly return an IndexFuture, which can be called to resolve to the
// index **durably** assigned to the new entry (or an error).
//
// Implementations MUST NOT allow the future to resolve to an index value unless/until it has
// been durably committed by the storage.
//
// Callers MUST NOT assume that an entry has been accepted or durably stored until they have
// successfully resolved the future.
//
// Once the future resolves and returns an index, the entry can be considered to have been
// durably sequenced and will be preserved even in the event that the process terminates.
//
// Once an entry is sequenced, the storage implementation MUST integrate it into the tree soon
// (how long this is expected to take is left unspecified, but as a guideline it should happen
// within single digit seconds). Until the entry is integrated and published, clients of the log
// will not be able to verifiably access this value.
//
// Personalities which require blocking until the entry is integrated (e.g. because they wish
// to return an inclusion proof) may use the PublicationAwaiter to wrap the call to this method.
type AddFn func(ctx context.Context, entry *Entry) IndexFuture
// IndexFuture is the signature of a function which can return an assigned index or error.
//
// Implementations of this func are likely to be "futures", or a promise to return this data at
// some point in the future, and as such will block when called if the data isn't yet available.
type IndexFuture func() (Index, error)
// Index represents a durably assigned index for some entry.
type Index struct {
// Index is the location in the log to which a particular entry has been assigned.
Index uint64
// IsDup is true if Index represents a previously assigned index for an identical entry.
IsDup bool
}
// Appender allows personalities access to the lifecycle methods associated with logs
// in sequencing mode. This only has a single method, but other methods are likely to be added
// such as a Shutdown method for #341.
type Appender struct {
Add AddFn
}
// NewAppender returns an Appender, which allows a personality to incrementally append new
// leaves to the log and to read from it.
//
// The return values are the Appender for adding new entries, a shutdown function, a log reader,
// and an error if any of the objects couldn't be constructed.
//
// Shutdown ensures that all calls to Add that have returned a value will be resolved. Any
// futures returned by _this appender_ which resolve to an index will be integrated and have
// a checkpoint that commits to them published if this returns successfully. After this returns,
// any calls to Add will fail.
//
// The context passed into this function will be referenced by any background tasks that are started
// in the Appender. The correct process for shutting down an Appender cleanly is to first call the
// shutdown function that is returned, and then cancel the context. Cancelling the context without calling
// shutdown first may mean that some entries added by this appender aren't in the log when the process
// exits.
func NewAppender(ctx context.Context, d Driver, opts *AppendOptions) (*Appender, func(ctx context.Context) error, LogReader, error) {
type appendLifecycle interface {
Appender(context.Context, *AppendOptions) (*Appender, LogReader, error)
}
lc, ok := d.(appendLifecycle)
if !ok {
return nil, nil, nil, fmt.Errorf("driver %T does not implement Appender lifecycle", d)
}
if opts == nil {
return nil, nil, nil, errors.New("opts cannot be nil")
}
if err := opts.valid(); err != nil {
return nil, nil, nil, err
}
a, r, err := lc.Appender(ctx, opts)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to init appender lifecycle: %v", err)
}
for i := len(opts.addDecorators) - 1; i >= 0; i-- {
a.Add = opts.addDecorators[i](a.Add)
}
sd := &integrationStats{}
a.Add = sd.statsDecorator(a.Add)
for _, f := range opts.followers {
go f.Follow(ctx, r)
go followerStats(ctx, f, r.IntegratedSize)
}
go sd.updateStats(ctx, r)
t := terminator{
delegate: a.Add,
readCheckpoint: r.ReadCheckpoint,
}
// TODO(mhutchinson): move this into the decorators
a.Add = func(ctx context.Context, entry *Entry) IndexFuture {
if deadline, ok := ctx.Deadline(); ok {
appenderDeadlineRemaining.Record(ctx, time.Until(deadline).Milliseconds())
}
ctx, span := tracer.Start(ctx, "tessera.Appender.Add")
defer span.End()
// NOTE: We memoize the returned value here so that repeated calls to the returned
// future don't result in unexpected side-effects from inner AddFn functions
// being called multiple times.
// Currently this is the outermost wrapping of Add so we do the memoization
// here, if this changes, ensure that we move the memoization call so that
// this remains true.
return memoizeFuture(t.Add(ctx, entry))
}
return a, t.Shutdown, r, nil
}
// memoizeFuture wraps an AddFn delegate with logic to ensure that the delegate is called at most
// once.
func memoizeFuture(delegate IndexFuture) IndexFuture {
f := sync.OnceValues(func() (Index, error) {
return delegate()
})
return f
}
func followerStats(ctx context.Context, f Follower, size func(context.Context) (uint64, error)) {
name := f.Name()
t := time.NewTicker(200 * time.Millisecond)
for {
select {
case <-ctx.Done():
return
case <-t.C:
}
n, err := f.EntriesProcessed(ctx)
if err != nil {
klog.Errorf("followerStats: follower %q EntriesProcessed(): %v", name, err)
continue
}
s, err := size(ctx)
if err != nil {
klog.Errorf("followerStats: follower %q size(): %v", name, err)
}
attrs := metric.WithAttributes(followerNameKey.String(name))
followerEntriesProcessed.Record(ctx, otel.Clamp64(n), attrs)
followerLag.Record(ctx, otel.Clamp64(s-n), attrs)
}
}
// idxAt represents an index first seen at a particular time.
type idxAt struct {
idx uint64
at time.Time
}
// integrationStats knows how to track and populate metrics related to integration performance.
//
// Currently, this tracks integration latency only.
// The integration latency tracking works via a "sample & consume" mechanism, whereby an Add decorator
// will record an assigned index along with the time it was assigned. An asynchronous process will
// periodically compare the sample with the current integrated tree size, and if the sampled index is
// found to be covered by the tree the elapsed period is recorded and the sample "consumed".
//
// Only one sample may be held at a time.
type integrationStats struct {
// indexSample points to a sampled indexAt, or nil if there has been no sample made _or_ the sample was consumed.
indexSample atomic.Pointer[idxAt]
}
// sample creates a new sample with the provided index if no sample is already held.
func (i *integrationStats) sample(idx uint64) {
i.indexSample.CompareAndSwap(nil, &idxAt{idx: idx, at: time.Now()})
}
// latency will check whether the provided tree size is larger than the currently sampled index (if one exists),
// and, if so, "consume" the sample and return the elapsed interval since the sample was taken.
//
// The returned bool is true if a sample exists and whose index is lower than the provided tree size, and
// false otherwise.
func (i *integrationStats) latency(size uint64) (time.Duration, bool) {
ia := i.indexSample.Load()
// If there _is_ a sample...
if ia != nil {
// and the sampled index is lower than the tree size
if ia.idx < size {
// then reset the sample store here so that we're able to accept a future sample.
i.indexSample.Store(nil)
}
return time.Since(ia.at), true
}
return 0, false
}
// updateStates periodically checks the current integrated tree size and attempts to
// consume any held sample, updating the metric if possible.
//
// This is a long running function, exitingly only when the provided context is done.
func (i *integrationStats) updateStats(ctx context.Context, r LogReader) {
if r == nil {
klog.Warning("updateStates: nil logreader provided, not updating stats")
return
}
t := time.NewTicker(100 * time.Millisecond)
for {
select {
case <-ctx.Done():
return
case <-t.C:
}
s, err := r.IntegratedSize(ctx)
if err != nil {
klog.Errorf("IntegratedSize: %v", err)
continue
}
appenderIntegratedSize.Record(ctx, otel.Clamp64(s))
if d, ok := i.latency(s); ok {
appenderIntegrateLatency.Record(ctx, d.Milliseconds())
}
i, err := r.NextIndex(ctx)
if err != nil {
klog.Errorf("NextIndex: %v", err)
}
appenderNextIndex.Record(ctx, otel.Clamp64(i))
}
}
// statsDecorator wraps a delegate AddFn with code to calculate/update
// metric stats.
func (i *integrationStats) statsDecorator(delegate AddFn) AddFn {
return func(ctx context.Context, entry *Entry) IndexFuture {
start := time.Now()
f := delegate(ctx, entry)
return func() (Index, error) {
idx, err := f()
attr := []attribute.KeyValue{}
if err != nil {
switch {
// Record the fact there was pushback, if any.
case errors.Is(err, ErrPushbackAntispam):
attr = append(attr, attribute.String("tessera.pushback", "antispam"))
case errors.Is(err, ErrPushbackIntegration):
attr = append(attr, attribute.String("tessera.pushback", "integration"))
case errors.Is(err, ErrPushback):
attr = append(attr, attribute.String("tessera.pushback", "other"))
default:
// If it's not a pushback, just flag that it's an errored request to avoid high cardinality of attribute values.
// TODO(al): We might want to bucket errors into OTel status codes in the future, though.
attr = append(attr, attribute.String("tessera.error.type", "_OTHER"))
}
}
attr = append(attr, attribute.Bool("tessera.duplicate", idx.IsDup))
appenderAddsTotal.Add(ctx, 1, metric.WithAttributes(attr...))
d := time.Since(start)
appenderAddHistogram.Record(ctx, d.Milliseconds(), metric.WithAttributes(attr...))
if !idx.IsDup {
i.sample(idx.Index)
}
return idx, err
}
}
}
type terminator struct {
delegate AddFn
readCheckpoint func(ctx context.Context) ([]byte, error)
// This mutex guards the stopped state. We use this instead of an atomic.Boolean
// to get the property that no readers of this state can have the lock when the
// write gets it. This means that no in-flight Add operations will be occurring on
// Shutdown.
mu sync.RWMutex
stopped bool
// largestIssued tracks the largest index allocated by this appender.
largestIssued atomic.Uint64
}
func (t *terminator) Add(ctx context.Context, entry *Entry) IndexFuture {
t.mu.RLock()
defer t.mu.RUnlock()
if t.stopped {
return func() (Index, error) {
return Index{}, errors.New("appender has been shut down")
}
}
res := t.delegate(ctx, entry)
return func() (Index, error) {
i, err := res()
if err != nil {
return i, err
}
// https://github.com/golang/go/issues/63999 - atomically set largest issued index
old := t.largestIssued.Load()
for old < i.Index && !t.largestIssued.CompareAndSwap(old, i.Index) {
old = t.largestIssued.Load()
}
appenderHighestIndex.Record(ctx, otel.Clamp64(t.largestIssued.Load()))
return i, err
}
}
// Shutdown ensures that all calls to Add that have returned a value will be resolved. Any
// futures returned by _this appender_ which resolve to an index will be integrated and have
// a checkpoint that commits to them published if this returns successfully.
//
// After this returns, any calls to Add will fail.
func (t *terminator) Shutdown(ctx context.Context) error {
t.mu.Lock()
defer t.mu.Unlock()
t.stopped = true
maxIndex := t.largestIssued.Load()
if maxIndex == 0 {
// special case no work done
return nil
}
sleepTime := 0 * time.Millisecond
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
time.Sleep(sleepTime)
}
sleepTime = 100 * time.Millisecond // after the first time, ensure we sleep in any other loops
cp, err := t.readCheckpoint(ctx)
if err != nil {
if !errors.Is(err, os.ErrNotExist) {
return err
}
continue
}
_, size, _, err := parse.CheckpointUnsafe(cp)
if err != nil {
return err
}
klog.V(1).Infof("Shutting down, waiting for checkpoint committing to size %d (current checkpoint is %d)", maxIndex, size)
if size > maxIndex {
return nil
}
}
}
// NewAppendOptions creates a new options struct for configuring appender lifecycle instances.
//
// These options are configured through the use of the various `With.*` function calls on the returned
// instance.
func NewAppendOptions() *AppendOptions {
return &AppendOptions{
batchMaxSize: DefaultBatchMaxSize,
batchMaxAge: DefaultBatchMaxAge,
entriesPath: layout.EntriesPath,
bundleIDHasher: defaultIDHasher,
checkpointInterval: DefaultCheckpointInterval,
checkpointRepublishInterval: DefaultCheckpointRepublishInterval,
addDecorators: make([]func(AddFn) AddFn, 0),
pushbackMaxOutstanding: DefaultPushbackMaxOutstanding,
garbageCollectionInterval: DefaultGarbageCollectionInterval,
}
}
// AppendOptions holds settings for all storage implementations.
type AppendOptions struct {
// newCP knows how to format and sign checkpoints.
newCP func(ctx context.Context, size uint64, hash []byte) ([]byte, error)
batchMaxAge time.Duration
batchMaxSize uint
pushbackMaxOutstanding uint
// EntriesPath knows how to format entry bundle paths.
entriesPath func(n uint64, p uint8) string
// bundleIDHasher knows how to create antispam leaf identities for entries in a serialised bundle.
bundleIDHasher func([]byte) ([][]byte, error)
checkpointInterval time.Duration
checkpointRepublishInterval time.Duration
witnesses WitnessGroup
witnessOpts WitnessOptions
addDecorators []func(AddFn) AddFn
followers []Follower
// garbageCollectionInterval of zero should be interpreted as requesting garbage collection to be disabled.
garbageCollectionInterval time.Duration
}
// valid returns an error if an invalid combination of options has been set, or nil otherwise.
func (o AppendOptions) valid() error {
if o.newCP == nil {
return errors.New("invalid AppendOptions: WithCheckpointSigner must be set")
}
if o.checkpointRepublishInterval > 0 && o.checkpointRepublishInterval < o.checkpointInterval {
return fmt.Errorf("invalid AppendOptions: WithCheckpointRepublishInterval (%d) is smaller than WithCheckpointInterval (%d)", o.checkpointRepublishInterval, o.checkpointInterval)
}
return nil
}
// WithAntispam configures the appender to use the antispam mechanism to reduce the number of duplicates which
// can be added to the log.
//
// As a starting point, the minimum size of the of in-memory cache should be set to the configured PushbackThreshold
// of the provided antispam implementation, multiplied by the number of concurrent front-end instances which
// are accepting write-traffic. Data stored in the in-memory cache is relatively small (32 bytes hash, 8 bytes index),
// so we recommend erring on the larger side as there is little downside to over-sizing the cache; consider using
// the DefaultAntispamInMemorySize as the value here.
//
// For more details on how the antispam mechanism works, including tuning guidance, see docs/design/antispam.md.
func (o *AppendOptions) WithAntispam(inMemEntries uint, as Antispam) *AppendOptions {
o.addDecorators = append(o.addDecorators, newInMemoryDedup(inMemEntries))
if as != nil {
o.addDecorators = append(o.addDecorators, as.Decorator())
o.followers = append(o.followers, as.Follower(o.bundleIDHasher))
}
return o
}
// CheckpointPublisher returns a function which should be used to create, sign, and potentially witness a new checkpoint.
func (o AppendOptions) CheckpointPublisher(lr LogReader, httpClient *http.Client) func(context.Context, uint64, []byte) ([]byte, error) {
return func(ctx context.Context, size uint64, root []byte) ([]byte, error) {
ctx, span := tracer.Start(ctx, "tessera.CheckpointPublisher")
defer span.End()
cp, err := o.newCP(ctx, size, root)
if err != nil {
return nil, fmt.Errorf("newCP: %v", err)
}
appenderSignedSize.Record(ctx, otel.Clamp64(size))
// Handle witnessing
{
// Figure out the likely size the witnesses are aware of, but don't fail hard if we're unable
// to do so:
// a) it could be that this is the first checkpoint we're publishing
// b) the witnessing protocol has a fallback path in case we get it wrong, anyway.
var oldSize uint64
oldCP, err := lr.ReadCheckpoint(ctx)
if err != nil {
klog.Infof("Failed to fetch old checkpoint: %v", err)
} else {
_, oldSize, _, err = parse.CheckpointUnsafe(oldCP)
if err != nil {
return nil, fmt.Errorf("failed to parse old checkpoint: %v", err)
}
}
wg := witness.NewWitnessGateway(o.witnesses, httpClient, oldSize, lr.ReadTile)
start := time.Now()
ctx, cancel := context.WithTimeout(ctx, o.witnessOpts.Timeout)
defer cancel()
witAttr := []attribute.KeyValue{}
cp, err = wg.Witness(ctx, cp)
if err != nil {
if !o.witnessOpts.FailOpen {
appenderWitnessRequests.Add(ctx, 1, metric.WithAttributes(attribute.String("error.type", "failed")))
return nil, err
}
klog.Warningf("WitnessGateway: failing-open despite error: %v", err)
witAttr = append(witAttr, attribute.String("error.type", "failed_open"))
}
appenderWitnessRequests.Add(ctx, 1, metric.WithAttributes(witAttr...))
appenderWitnessedSize.Record(ctx, otel.Clamp64(size))
d := time.Since(start)
appenderWitnessHistogram.Record(ctx, d.Milliseconds(), metric.WithAttributes(witAttr...))
}
return cp, nil
}
}
func (o AppendOptions) BatchMaxAge() time.Duration {
return o.batchMaxAge
}
func (o AppendOptions) BatchMaxSize() uint {
return o.batchMaxSize
}
func (o AppendOptions) PushbackMaxOutstanding() uint {
return o.pushbackMaxOutstanding
}
func (o AppendOptions) EntriesPath() func(uint64, uint8) string {
return o.entriesPath
}
func (o AppendOptions) CheckpointInterval() time.Duration {
return o.checkpointInterval
}
func (o AppendOptions) CheckpointRepublishInterval() time.Duration {
return o.checkpointRepublishInterval
}
func (o AppendOptions) GarbageCollectionInterval() time.Duration {
return o.garbageCollectionInterval
}
// WithCheckpointSigner is an option for setting the note signer and verifier to use when creating and parsing checkpoints.
// This option is mandatory for creating logs where the checkpoint is signed locally, e.g. in
// the Appender mode. This does not need to be provided where the storage will be used to mirror
// other logs.
//
// A primary signer must be provided:
// - the primary signer is the "canonical" signing identity which should be used when creating new checkpoints.
//
// Zero or more dditional signers may also be provided.
// This enables cases like:
// - a rolling key rotation, where checkpoints are signed by both the old and new keys for some period of time,
// - using different signature schemes for different audiences, etc.
//
// When providing additional signers, their names MUST be identical to the primary signer name, and this name will be used
// as the checkpoint Origin line.
//
// Checkpoints signed by these signer(s) will be standard checkpoints as defined by https://c2sp.org/tlog-checkpoint.
func (o *AppendOptions) WithCheckpointSigner(s note.Signer, additionalSigners ...note.Signer) *AppendOptions {
origin := s.Name()
for _, signer := range additionalSigners {
if origin != signer.Name() {
klog.Exitf("WithCheckpointSigner: additional signer name (%q) does not match primary signer name (%q)", signer.Name(), origin)
}
}
o.newCP = func(ctx context.Context, size uint64, hash []byte) ([]byte, error) {
_, span := tracer.Start(ctx, "tessera.SignCheckpoint")
defer span.End()
// If we're signing a zero-sized tree, the tlog-checkpoint spec says (via RFC6962) that
// the root must be SHA256 of the empty string, so we'll enforce that here:
if size == 0 {
emptyRoot := rfc6962.DefaultHasher.EmptyRoot()
hash = emptyRoot[:]
}
cpRaw := f_log.Checkpoint{
Origin: origin,
Size: size,
Hash: hash,
}.Marshal()
n, err := note.Sign(¬e.Note{Text: string(cpRaw)}, append([]note.Signer{s}, additionalSigners...)...)
if err != nil {
return nil, fmt.Errorf("note.Sign: %w", err)
}
return n, nil
}
return o
}
// WithBatching configures the batching behaviour of leaves being sequenced.
// A batch will be allowed to grow in memory until either:
// - the number of entries in the batch reach maxSize
// - the first entry in the batch has reached maxAge
//
// At this point the batch will be sent to the sequencer.
//
// Configuring these parameters allows the personality to tune to get the desired
// balance of sequencing latency with cost. In general, larger batches allow for
// lower cost of operation, where more frequent batches reduce the amount of time
// required for entries to be included in the log.
//
// If this option isn't provided, storage implementations with use the DefaultBatchMaxSize and DefaultBatchMaxAge consts above.
func (o *AppendOptions) WithBatching(maxSize uint, maxAge time.Duration) *AppendOptions {
o.batchMaxSize = maxSize
o.batchMaxAge = maxAge
return o
}
// WithPushback allows configuration of when the storage should start pushing back on add requests.
//
// maxOutstanding is the number of "in-flight" add requests - i.e. the number of entries with sequence numbers
// assigned, but which are not yet integrated into the log.
func (o *AppendOptions) WithPushback(maxOutstanding uint) *AppendOptions {
o.pushbackMaxOutstanding = maxOutstanding
return o
}
// WithCheckpointInterval configures the frequency at which Tessera will attempt to create & publish
// new checkpoints.
//
// Well behaved clients of the log will only "see" newly sequenced entries once a new checkpoint is published,
// so it's important to set that value such that it works well with your ecosystem.
//
// Regularly publishing new checkpoints:
// - helps show that the log is "live", even if no entries are being added.
// - enables clients of the log to reason about how frequently they need to have their
// view of the log refreshed, which in turn helps reduce work/load across the ecosystem.
//
// Note that this option probably only makes sense for long-lived applications (e.g. HTTP servers).
//
// If this option isn't provided, storage implementations will use the DefaultCheckpointInterval const above.
func (o *AppendOptions) WithCheckpointInterval(interval time.Duration) *AppendOptions {
o.checkpointInterval = interval
return o
}
// WithCheckpointRepublishInterval configures the frequency at which Tessera will allow re-publishing
// checkpoints where the log hasn't grown since the last checkpoint was published.
//
// Setting this less than or equal to zero will disable republication of unchanged checkpoints.
func (o *AppendOptions) WithCheckpointRepublishInterval(interval time.Duration) *AppendOptions {
o.checkpointRepublishInterval = interval
return o
}
// WithWitnesses configures the set of witnesses that Tessera will contact in order to counter-sign
// a checkpoint before publishing it. A request will be sent to every witness referenced by the group
// using the URLs method. The checkpoint will be accepted for publishing when a sufficient number of
// witnesses to Satisfy the group have responded.
//
// If this method is not called, then the default empty WitnessGroup will be used, which contacts zero
// witnesses and requires zero witnesses in order to publish.
func (o *AppendOptions) WithWitnesses(witnesses WitnessGroup, opts *WitnessOptions) *AppendOptions {
if opts == nil {
opts = &WitnessOptions{}
}
if opts.Timeout == 0 {
opts.Timeout = DefaultWitnessTimeout
}
o.witnesses = witnesses
o.witnessOpts = *opts
return o
}
// WitnessOptions contains extra optional configuration for how Tessera should use/interact with
// a user-provided WitnessGroup policy.
type WitnessOptions struct {
// Timeout is the maximum time to wait while attempting to satisfy the configured witness policy.
//
// If the policy has not already been satisfied at the point this duration has passed, Tessera
// will stop waiting for more responses. The FailOpen option below controls whether or not the
// checkpoint will be published in this case.
//
// If unset, uses DefaultWitnessTimeout.
Timeout time.Duration
// FailOpen controls whether a checkpoint, for which the witness policy was unable to be met,
// should still be published.
//
// This setting is intended only for facilitating early "non-blocking" adoption of witnessing,
// and will be disabled and/or removed in the future.
FailOpen bool
}
// WithGarbageCollectionInterval allows the interval between scans to remove obsolete partial
// tiles and entry bundles.
//
// Setting to zero disables garbage collection.
func (o *AppendOptions) WithGarbageCollectionInterval(interval time.Duration) *AppendOptions {
o.garbageCollectionInterval = interval
return o
}
|