1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357
|
## Commit-Log based Trillian storage
*Status: Draft*
*Authors: al@google.com, drysdale@google.com, filippo@cloudflare.com*
*Last Updated: 2017-05-12*
## Objective
A design for an alternative Trillian storage layer which uses a distributed and
immutable *commit log* as the source of truth for a Trillian Log's contents and
sequence information, and one or more independent *"readonly"* databases built
from the commit log to serve queries.
This design allows for:
* flexibility in scaling Trillian deployments,
* easier recovery from corrupt/failed database deployments since in many
cases operators can simply delete the failed DB instance and allow it to be
rebuilt from the commit log, while the remaining instances continue to
serve.
Initially, this will be built using Apache Kafka for the commit log, with
datacentre-local Apache HBase instances for the serving databases, since this
is what Cloudflare has operational experience in running, but other distributed
commit-log and database engines may be available - this model should also work
with instance-local database implementations such as RocksDB etc. too.
Having Trillian support a commit-log based storage system will also ensure
Trillian doesn't inadvertently tie itself exclusively to strong globally
consistent storage.
## Background
Trillian currently supports two storage technologies, MySQL and Spanner, which
provide strong global consistency.
The design presented here requires:
* A durable, ordered, and immutable commit log.
* A "local" storage mechanism which can support the operations required by
the Trillian {tree,log}_storage API.
## Design Overview

The `leaves` topic is the canonical source of truth for the ordering of leaves
in a log.
The `STHs` topic is a list of all STHs for a given log.
Kafka topics are configured never to expire entries (this is a supported mode),
and Kafka is known to scale to multiple terabytes within a single partition.
HBase instances are assumed to be one-per-cluster, built from the contents of
the Kafka topics, and, consequently, are essentially disposable.
Queued leaves are sent by the Trillian frontends to the Kafka `Leaves` topic.
Since Kafka topics are append-only and immutable, this effectively sequences
the entries in the queue.
The signer nodes track the leaves and STHs topics to bring their local database
instances up-to-date. The current master signer will additionally incorporate
new entries in the leaves topic into its tree, ensuring the Kafka offset number
of each leaf matches its position in the Merkle tree, then generate a new
STH which it publishes to the STH topic before updating its local database.
Since the commit log forms the source of truth for the log entry ordering and
committed STHs, everything else can be derived from that. This means that
updates to the serving HBase DBs can be made to be idempotent, which means that
the transactional requirements of Trillian's LogStorage APIs can be relaxed:
writes to local storage can be buffered and flushed at `Commit` time, and the
only constraint on the implementation is that the final new/updated STH must
only be written to the local storage iff all other buffered writes have been
successfully flushed.
The addition of this style of storage implementation requires that Trillian
does not guarantee the perfect deduplication of entries, even though it may be
possible to do so with some storage implementations. i.e. personalities MUST
present LeafIdentityHashes, and Trillian MAY deduplicate.
## Detailed Design
#### Enqueuing leaves
RPC calls to frontend `QueueLeaves` results in the leaves being individually
added to the Kafka topic `Leaves`. They need to be added individually to allow
the Kafka topic sequencing to be the definitive source of log sequence
information.
Log frontends may attempt to de-duplicate incoming leaves by consulting the
local storage DB using the identity hash (and/or e.g. using a per-instance LRU
cache), but this will always be a "best effort" affair, so the Trillian APIs
must not assume that duplicates are impossible, even though in practice, when
using other storage implementations, they may well be so currently.
#### Master election
Multiple sequencers may be running to provide resilience, if this is the case
there must be a mechanism for choosing a single master instance among the
running sequencers. The Trillian repo provides an etcd-backed implementation
of this already.
A sequencer must only participate/remain the master if its local database state
is at least as new at the latest message in the Kafka `STHs` topic.
The current master sequencer will create new STHs and publish them to the
`STHs` topic, the remaining sequencers will run in a "mirror" mode to keep
their local database state up-to-date with the master.
#### Local DB storage
This does not *need* to be transactional, because writes should be idempotent,
but the implementation of the Trillian storage driver must buffer *all*
writes and only attempt to apply them to the local storage when `Commit` is
called.
The write of an updated STH to local storage needs slightly special attention,
in that it must be the last thing written by `Commit`, and must only be written
if all other buffered writes succeeded.
In the case of a partial commit failure, or crash of the signer, the next
sequencing cycle should find that identical writes are re-attempted due to the
signer process outlined below.
#### Sequencing
Assigning sequence numbers to queued leaves is implicitly performed by the
addition of entries to the Kafka `Leaves` topic (this is termed *offset* in
Kafka documentation).
##### Abstract Signer process
```golang
func SignerRun() {
// if any of the below operations fail, just bail and retry
// read `dbSTH` (containing `treeRevision` and `sthOffset`) from local DB
dbSTH.treeRevision, dbSTH.sthOffset = tx.LatestSTH()
// Sanity check that the STH table has what we already know.
ourSTH := kafka.Read("STHs/<treeID>", dbSTH.sthOffset)
if ourSTH == nil {
klog.Errorf("should not happen - local DB has data ahead of STHs topic")
return
}
if ourSTH.expectedOffset != dbSTH.sthOffset {
klog.Errorf("should not happen - local DB committed to invalid STH from topic")
return
}
if ourSTH.timestamp != dbSTH.timestamp || ourSTH.tree_size != dbSTH.tree_size {
klog.Errorf("should not happen - local DB has different data than STHs topic")
return
}
// Look to see if anyone else has already stored data just ahead of our STH.
nextOffset := dbSTH.sthOffset
nextSTH := nil
for {
nextOffset++
nextSTH = kafka.Read("STHs/<treeID>", nextOffset)
if nextSTH == nil {
break
}
if nextSTH.expectedOffset != nextOffset {
// Someone's been writing STHs when they weren't supposed to be, skip
// this one until we find another which is in-sync.
klog.Warning("skipping unexpected STH")
continue
}
if nextSTH.timestamp < ourSTH.timestamp || nextSTH.tree_size < ourSTH.tree_size {
klog.Fatal("should not happen - earlier STH with later offset")
return
}
}
if nextSTH == nil {
// We're up-to-date with the STHs topic (as of a moment ago) ...
if !IsMaster() {
// ... but we're not allowed to create fresh STHs.
return
}
// ... and we're the master. Move the STHs topic along to encompass any unincorporated leaves.
offset := dbSTH.tree_size
batch := kafka.Read("Leaves", offset, batchSize)
for b := range batch {
db.Put("/<treeID>/leaves/<b.offset>", b.contents)
}
root := UpdateMerkleTreeAndBufferNodes(batch, treeRevision+1)
newSTH := STH{root, ...}
newSTH.treeRevision = dbSTH.treeRevision + 1
newSTH.expectedOffset = nextOffset
actualOffset := kafka.Append("STHs/<treeID>", newSTH)
if actualOffset != nextOffset {
klog.Warning("someone else wrote an STH while we were master")
tx.Abort()
return
}
newSTH.sthOffset = actualOffset
tx.BufferNewSTHForDB(newSTH)
tx.Commit() // flush writes
} else {
// There is an STH one ahead of us that we're not caught up with yet.
// Read the leaves between what we have in our DB, and that STH...
leafRange := InclusiveExclusive(dbSTH.tree_size, nextSTH.tree_size)
batch := kafka.Read("Leaves", leafRange)
// ... and store them in our local DB
for b := range batch {
db.Put("<treeID>/leaves/<b.offset>", b.contents)
}
newRoot := tx.UpdateMerkleTreeAndBufferNodes(batch, treeRevision+1)
if newRoot != nextSTH.root {
klog.Warning("calculated root hash != expected root hash, corrupt DB?")
tx.Abort()
return
}
tx.BufferNewSTHForDB(nextSTH)
tx.Commit() // flush writes
// We may still not be caught up, but that's for the next time around.
}
}
```
##### Fit with storage interfaces
LogStorage interfaces will need to be tweaked slightly, in particular:
- `UpdateSequencedLeaves` should be pulled out of `LeafDequeuer` and moved
into a `LeafSequencer` (or something) interface.
- It would be nice to introduce a roll-up interface which describes the
responsibilities of the "local DB" thing, so that we can compose
`commit-queue+local` storage implementations using existing DB impls
(or at least not tie this tightly to HBase).
###### TX
```golang
type splitTX struct {
treeID int64
...
dbTX *storage.LogTX // something something handwavy
cqTX *storage.??? // something something handwavy
dbSTH *trillian.SignedTreeHead
nextSTH *trillian.SignedTreeHead // actually something which contains this plus some metadata
treeRevision int64
sthOffset int64
}
```
###### `Storage.Begin()`
Starts a Trillian transaction, this will do:
1. the read of `currentSTH`, `treeRevision`, and `sthOffset` from the DB
1. verification of that against its corresponding entry in Kafka
and return a `LogTX` struct containing these values as unexported fields.
**The HBase LogTX struct will buffer all writes locally until `Commit` is
called**, whereupon it'll attempt to action the writes as HBase `PUT` requests
(presumably it can be smart about batching where appropriate).
```golang
// Begin starts a Trillian transaction.
// This will get the latest known STH from the "local" DB, and verify
// that the corresponding STH in Kafka matches.
func (ls *CQComboStorage) Begin() (LogTX, error) {
// create db and cq "TX" objects
tx := &splitTX{...}
// read `dbSTH` (containing `treeRevision` and `sthOffset`) from local DB
tx.dbSTH, tx.treeRevision, tx.stdOffset := dbTX.latestSTH()
// Sanity check that the STH table has what we already know.
ourSTH := cqTX.GetSTHAt(tx.sthOffset)
if ourSTH == nil {
return nil, fmt.Errorf("should not happen - local DB has data ahead of STHs topic")
}
if ourSTH.expectedOffset != dbSTH.sthOffset {
return nil, fmt.Errorf("should not happen - local DB committed to invalid STH from topic")
}
if ourSTH.timestamp != dbSTH.timestamp || ourSTH.tree_size != dbSTH.tree_size {
return nil, fmt.Errorf("should not happen - local DB has different data than STHs topic")
}
...
return tx, nil
}
```
###### `DequeueLeaves()`
Calls to this method ignore `limit` and `cutoff` when there exist newer STHs in
the Kafka queue (because we're following someone else's footsteps), and return
the `batch` of leaves outlined above.
*TODO(al): should this API be reworked?*
```golang
func (tx *splitTX) DequeueLeaves() (..., error) {
// Look to see if anyone else has already stored data just ahead of our STH.
nextOffset := tx.sthOffset
nextSTH := nil
for {
nextOffset++
tx.nextSTH = tx.cqTX.GetSTHAt(nextOffset)
if nextSTH == nil {
break
}
if nextSTH.expectedOffset != nextOffset {
// Someone's been writing STHs when they weren't supposed to be, skip
// this one until we find another which is in-sync.
klog.Warning("skipping invalid STH")
continue
}
if nextSTH.timestamp < ourSTH.timestamp || nextSTH.tree_size < ourSTH.tree_size {
return nil, fmt.Errorf("should not happen - earlier STH with later offset")
}
}
if nextSTH == nil {
offset := tx.dbSTH.tree_size
batch := tx.cqTX.ReadLeaves(offset, limit)
return batch, nil
} else {
// There is an STH one ahead of us that we're not caught up with yet.
for {
nextOffset++
nextSTH = tx.cqTX.ReadSTH(nextOffset)
if nextSTH.timestamp < dbSTH.timestamp || nextSTH.tree_size < dbSTH.tree_size {
return nil, fmt.Errorf("should not happen - earlier STH with later offset")
}
}
// Read the leaves between what we have in our DB, and that STH...
leafRange := InclusiveExclusive(dbSTH.tree_size, nextSTH.tree_size)
batch := tx.cqTX.ReadLeaves(leafRange)
return nil, batch
}
}
```
###### `UpdateSequencedLeaves()`
This method should be moved out from `LeafDequeuer` and into a new interface
`LeafWriter` implemented by dbTX.
**TODO(al): keep writing!**
|