File: commit_log_based_storage_design.md

package info (click to toggle)
trillian 1.7.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 6,600 kB
  • sloc: sh: 1,181; javascript: 474; sql: 330; makefile: 39
file content (357 lines) | stat: -rw-r--r-- 13,224 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
## Commit-Log based Trillian storage

*Status: Draft*

*Authors: al@google.com, drysdale@google.com, filippo@cloudflare.com*

*Last Updated: 2017-05-12*

## Objective

A design for an alternative Trillian storage layer which uses a distributed and
immutable *commit log* as the source of truth for a Trillian Log's contents and
sequence information, and one or more independent *"readonly"* databases built
from the commit log to serve queries.

This design allows for:

*   flexibility in scaling Trillian deployments,
*   easier recovery from corrupt/failed database deployments since in many
    cases operators can simply delete the failed DB instance and allow it to be
    rebuilt from the commit log, while the remaining instances continue to
    serve.

Initially, this will be built using Apache Kafka for the commit log, with
datacentre-local Apache HBase instances for the serving databases, since this
is what Cloudflare has operational experience in running, but other distributed
commit-log and database engines may be available - this model should also work
with instance-local database implementations such as RocksDB etc. too.

Having Trillian support a commit-log based storage system will also ensure
Trillian doesn't inadvertently tie itself exclusively to strong globally
consistent storage.

## Background

Trillian currently supports two storage technologies, MySQL and Spanner, which
provide strong global consistency.

The design presented here requires:

*   A durable, ordered, and immutable commit log.
*   A "local" storage mechanism which can support the operations required by
    the Trillian {tree,log}_storage API.



## Design Overview

![overview diagram](commit_log_based_storage_design_overview.png "Overview")

The `leaves` topic is the canonical source of truth for the ordering of leaves
in a log.

The `STHs` topic is a list of all STHs for a given log.

Kafka topics are configured never to expire entries (this is a supported mode),
and Kafka is known to scale to multiple terabytes within a single partition.

HBase instances are assumed to be one-per-cluster, built from the contents of
the Kafka topics, and, consequently, are essentially disposable.

Queued leaves are sent by the Trillian frontends to the Kafka `Leaves` topic.
Since Kafka topics are append-only and immutable, this effectively sequences
the entries in the queue.
The signer nodes track the leaves and STHs topics to bring their local database
instances up-to-date.  The current master signer will additionally incorporate
new entries in the leaves topic into its tree, ensuring the Kafka offset number
of each leaf matches its position in the Merkle tree, then generate a new
STH which it publishes to the STH topic before updating its local database.

Since the commit log forms the source of truth for the log entry ordering and
committed STHs, everything else can be derived from that. This means that
updates to the serving HBase DBs can be made to be idempotent, which means that
the transactional requirements of Trillian's LogStorage APIs can be relaxed:
writes to local storage can be buffered and flushed at `Commit` time, and the
only constraint on the implementation is that the final new/updated STH must
only be written to the local storage iff all other buffered writes have been
successfully flushed.

The addition of this style of storage implementation requires that Trillian
does not guarantee the perfect deduplication of entries, even though it may be
possible to do so with some storage implementations.  i.e. personalities MUST
present LeafIdentityHashes, and Trillian MAY deduplicate.

## Detailed Design

#### Enqueuing leaves

RPC calls to frontend `QueueLeaves` results in the leaves being individually
added to the Kafka topic `Leaves`.  They need to be added individually to allow
the Kafka topic sequencing to be the definitive source of log sequence
information.

Log frontends may attempt to de-duplicate incoming leaves by consulting the
local storage DB using the identity hash (and/or e.g. using a per-instance LRU
cache), but this will always be a "best effort" affair, so the Trillian APIs
must not assume that duplicates are impossible, even though in practice, when
using other storage implementations, they may well be so currently.

#### Master election

Multiple sequencers may be running to provide resilience, if this is the case
there must be a mechanism for choosing a single master instance among the
running sequencers. The Trillian repo provides an etcd-backed implementation
of this already.

A sequencer must only participate/remain the master if its local database state
is at least as new at the latest message in the Kafka `STHs` topic.

The current master sequencer will create new STHs and publish them to the
`STHs` topic, the remaining sequencers will run in a "mirror" mode to keep
their local database state up-to-date with the master.

#### Local DB storage

This does not *need* to be transactional, because writes should be idempotent,
but the implementation of the Trillian storage driver must buffer *all*
writes and only attempt to apply them to the local storage when `Commit` is
called.

The write of an updated STH to local storage needs slightly special attention,
in that it must be the last thing written by `Commit`, and must only be written
if all other buffered writes succeeded.

In the case of a partial commit failure, or crash of the signer, the next
sequencing cycle should find that identical writes are re-attempted due to the
signer process outlined below.

#### Sequencing

Assigning sequence numbers to queued leaves is implicitly performed by the
addition of entries to the Kafka `Leaves` topic (this is termed *offset* in
Kafka documentation).


##### Abstract Signer process

```golang
func SignerRun() {
  // if any of the below operations fail, just bail and retry

  // read `dbSTH` (containing `treeRevision` and `sthOffset`) from local DB
  dbSTH.treeRevision, dbSTH.sthOffset = tx.LatestSTH()

  // Sanity check that the STH table has what we already know.
  ourSTH := kafka.Read("STHs/<treeID>", dbSTH.sthOffset)
  if ourSTH == nil {
    klog.Errorf("should not happen - local DB has data ahead of STHs topic")
    return
  }
  if ourSTH.expectedOffset != dbSTH.sthOffset {
    klog.Errorf("should not happen - local DB committed to invalid STH from topic")
    return
  }
  if ourSTH.timestamp != dbSTH.timestamp || ourSTH.tree_size != dbSTH.tree_size {
    klog.Errorf("should not happen - local DB has different data than STHs topic")
    return
  }

  // Look to see if anyone else has already stored data just ahead of our STH.
  nextOffset := dbSTH.sthOffset
  nextSTH := nil
  for {
    nextOffset++
    nextSTH = kafka.Read("STHs/<treeID>", nextOffset)
    if nextSTH == nil {
      break
    }
    if nextSTH.expectedOffset != nextOffset {
      // Someone's been writing STHs when they weren't supposed to be, skip
      // this one until we find another which is in-sync.
      klog.Warning("skipping unexpected STH")
      continue
    }
    if nextSTH.timestamp < ourSTH.timestamp || nextSTH.tree_size < ourSTH.tree_size {
      klog.Fatal("should not happen - earlier STH with later offset")
      return
    }
  }

  if nextSTH == nil {
    // We're up-to-date with the STHs topic (as of a moment ago) ...
    if !IsMaster() {
      // ... but we're not allowed to create fresh STHs.
      return
    }
    // ... and we're the master. Move the STHs topic along to encompass any unincorporated leaves.
    offset := dbSTH.tree_size
    batch := kafka.Read("Leaves", offset, batchSize)
    for b := range batch {
      db.Put("/<treeID>/leaves/<b.offset>", b.contents)
    }

    root := UpdateMerkleTreeAndBufferNodes(batch, treeRevision+1)
    newSTH := STH{root, ...}
    newSTH.treeRevision = dbSTH.treeRevision + 1
    newSTH.expectedOffset = nextOffset
    actualOffset := kafka.Append("STHs/<treeID>", newSTH)
    if actualOffset != nextOffset {
      klog.Warning("someone else wrote an STH while we were master")
      tx.Abort()
      return
    }
    newSTH.sthOffset = actualOffset
    tx.BufferNewSTHForDB(newSTH)
    tx.Commit() // flush writes
  } else {
    // There is an STH one ahead of us that we're not caught up with yet.
    // Read the leaves between what we have in our DB, and that STH...
    leafRange := InclusiveExclusive(dbSTH.tree_size, nextSTH.tree_size)
    batch := kafka.Read("Leaves", leafRange)
    // ... and store them in our local DB
    for b := range batch {
      db.Put("<treeID>/leaves/<b.offset>", b.contents)
    }
    newRoot := tx.UpdateMerkleTreeAndBufferNodes(batch, treeRevision+1)
    if newRoot != nextSTH.root {
      klog.Warning("calculated root hash != expected root hash, corrupt DB?")
      tx.Abort()
      return
    }
    tx.BufferNewSTHForDB(nextSTH)
    tx.Commit() // flush writes
    // We may still not be caught up, but that's for the next time around.
  }
}
```

##### Fit with storage interfaces

LogStorage interfaces will need to be tweaked slightly, in particular:
 - `UpdateSequencedLeaves` should be pulled out of `LeafDequeuer` and moved
   into a `LeafSequencer` (or something) interface.
 - It would be nice to introduce a roll-up interface which describes the
   responsibilities of the "local DB" thing, so that we can compose
   `commit-queue+local` storage implementations using existing DB impls
   (or at least not tie this tightly to HBase).

###### TX
```golang

type splitTX struct {
   treeID       int64
   ...

   dbTX         *storage.LogTX // something something handwavy
   cqTX         *storage.???   // something something handwavy

   dbSTH        *trillian.SignedTreeHead
   nextSTH      *trillian.SignedTreeHead  // actually something which contains this plus some metadata
   treeRevision int64
   sthOffset    int64
}
```

###### `Storage.Begin()`

Starts a Trillian transaction, this will do:
   1. the read of `currentSTH`, `treeRevision`, and `sthOffset` from the DB
   1. verification of that against its corresponding entry in Kafka

and return a `LogTX` struct containing these values as unexported fields.
**The HBase LogTX struct will buffer all writes locally until `Commit` is
called**, whereupon it'll attempt to action the writes as HBase `PUT` requests
(presumably it can be smart about batching where appropriate).

```golang
// Begin starts a Trillian transaction.
// This will get the latest known STH from the "local" DB, and verify
// that the corresponding STH in Kafka matches.
func (ls *CQComboStorage) Begin() (LogTX, error) {
  // create db and cq "TX" objects

  tx := &splitTX{...}

  // read `dbSTH` (containing `treeRevision` and `sthOffset`) from local DB
  tx.dbSTH, tx.treeRevision, tx.stdOffset := dbTX.latestSTH()

  // Sanity check that the STH table has what we already know.
  ourSTH := cqTX.GetSTHAt(tx.sthOffset)

  if ourSTH == nil {
    return nil, fmt.Errorf("should not happen - local DB has data ahead of STHs topic")
  }
  if ourSTH.expectedOffset != dbSTH.sthOffset {
    return nil, fmt.Errorf("should not happen - local DB committed to invalid STH from topic")
  }
  if ourSTH.timestamp != dbSTH.timestamp || ourSTH.tree_size != dbSTH.tree_size {
    return nil, fmt.Errorf("should not happen - local DB has different data than STHs topic")
  }

  ...

  return tx, nil
}
```


###### `DequeueLeaves()`
Calls to this method ignore `limit` and `cutoff` when there exist newer STHs in
the Kafka queue (because we're following someone else's footsteps), and return
the `batch` of leaves outlined above.

*TODO(al): should this API be reworked?*

```golang
func (tx *splitTX) DequeueLeaves() (..., error) {
  // Look to see if anyone else has already stored data just ahead of our STH.
  nextOffset := tx.sthOffset
  nextSTH := nil
  for {
      nextOffset++
      tx.nextSTH = tx.cqTX.GetSTHAt(nextOffset)
      if nextSTH == nil {
        break
      }
      if nextSTH.expectedOffset != nextOffset {
        // Someone's been writing STHs when they weren't supposed to be, skip
        // this one until we find another which is in-sync.
        klog.Warning("skipping invalid STH")
        continue
      }
      if nextSTH.timestamp < ourSTH.timestamp || nextSTH.tree_size < ourSTH.tree_size {
        return nil, fmt.Errorf("should not happen - earlier STH with later offset")
    }
  }

  if nextSTH == nil {
    offset := tx.dbSTH.tree_size
    batch := tx.cqTX.ReadLeaves(offset, limit)
    return batch, nil
  } else {
    // There is an STH one ahead of us that we're not caught up with yet.
    for {
      nextOffset++
      nextSTH = tx.cqTX.ReadSTH(nextOffset)
      if nextSTH.timestamp < dbSTH.timestamp || nextSTH.tree_size < dbSTH.tree_size {
        return nil, fmt.Errorf("should not happen - earlier STH with later offset")
      }
    }
    // Read the leaves between what we have in our DB, and that STH...
    leafRange := InclusiveExclusive(dbSTH.tree_size, nextSTH.tree_size)
    batch := tx.cqTX.ReadLeaves(leafRange)
    return nil, batch
  }
}

```

###### `UpdateSequencedLeaves()`

This method should be moved out from `LeafDequeuer` and into a new interface
`LeafWriter` implemented by dbTX.

**TODO(al): keep writing!**