1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
|
#include "server/log_processes.h"
#include <gflags/gflags.h>
#include <iostream>
#include "monitoring/latency.h"
#include "monitoring/monitoring.h"
#include "server/metrics.h"
DEFINE_int32(tree_signing_frequency_seconds, 600,
"How often should we issue a new signed tree head. Approximate: "
"the signer process will kick off if in the beginning of the "
"server select loop, at least this period has elapsed since the "
"last signing. Set this well below the MMD to ensure we sign in "
"a timely manner. Must be greater than 0.");
DEFINE_int32(sequencing_frequency_seconds, 10,
"How often should new entries be sequenced. The sequencing runs "
"in parallel with the tree signing and cleanup.");
DEFINE_int32(cleanup_frequency_seconds, 10,
"How often should new entries be cleanedup. The cleanup runs in "
"in parallel with the tree signing and sequencing.");
using cert_trans::Counter;
using cert_trans::Gauge;
using cert_trans::Latency;
using google::RegisterFlagValidator;
using ct::SignedTreeHead;
using std::function;
using std::chrono::milliseconds;
using std::chrono::seconds;
using std::chrono::steady_clock;
namespace {
Gauge<>* latest_local_tree_size_gauge =
Gauge<>::New("latest_local_tree_size",
"Size of latest locally generated STH.");
Counter<bool>* sequencer_total_runs = Counter<bool>::New(
"sequencer_total_runs", "successful",
"Total number of sequencer runs broken out by success.");
Latency<milliseconds> sequencer_sequence_latency_ms(
"sequencer_sequence_latency_ms",
"Total time spent sequencing entries by sequencer");
Counter<bool>* signer_total_runs =
Counter<bool>::New("signer_total_runs", "successful",
"Total number of signer runs broken out by success.");
Latency<milliseconds> signer_run_latency_ms("signer_run_latency_ms",
"Total runtime of signer");
static bool ValidateIsPositive(const char* flagname, int value) {
if (value <= 0) {
std::cout << flagname << " must be greater than 0" << std::endl;
return false;
}
return true;
}
static const bool sign_dummy =
RegisterFlagValidator(&FLAGS_tree_signing_frequency_seconds,
&ValidateIsPositive);
}
namespace cert_trans {
void SignMerkleTree(TreeSigner* tree_signer, ConsistentStore* store,
ClusterStateController* controller) {
CHECK_NOTNULL(tree_signer);
CHECK_NOTNULL(store);
CHECK_NOTNULL(controller);
const steady_clock::duration period(
(seconds(FLAGS_tree_signing_frequency_seconds)));
steady_clock::time_point target_run_time(steady_clock::now());
while (true) {
{
ScopedLatency signer_run_latency(
signer_run_latency_ms.GetScopedLatency());
const TreeSigner::UpdateResult result(tree_signer->UpdateTree());
switch (result) {
case TreeSigner::OK: {
const SignedTreeHead latest_sth(tree_signer->LatestSTH());
latest_local_tree_size_gauge->Set(latest_sth.tree_size());
controller->NewTreeHead(latest_sth);
signer_total_runs->Increment(true /* successful */);
break;
}
case TreeSigner::INSUFFICIENT_DATA:
LOG(INFO) << "Can't update tree because we don't have all the "
<< "entries locally, will try again later.";
signer_total_runs->Increment(false /* successful */);
break;
default:
LOG(FATAL) << "Error updating tree: " << result;
}
}
const steady_clock::time_point now(steady_clock::now());
while (target_run_time <= now) {
target_run_time += period;
}
std::this_thread::sleep_for(target_run_time - now);
}
}
void CleanUpEntries(ConsistentStore* store,
const function<bool()>& is_master) {
CHECK_NOTNULL(store);
CHECK(is_master);
const steady_clock::duration period(
(seconds(FLAGS_cleanup_frequency_seconds)));
steady_clock::time_point target_run_time(steady_clock::now());
while (true) {
if (is_master()) {
// Keep cleaning up until there's no more work to do.
// This should help to keep the etcd contents size down during heavy
// load.
while (true) {
const util::StatusOr<int64_t> num_cleaned(store->CleanupOldEntries());
if (!num_cleaned.ok()) {
LOG(WARNING) << "Problem cleaning up old entries: "
<< num_cleaned.status();
break;
}
if (num_cleaned.ValueOrDie() == 0) {
break;
}
}
}
const steady_clock::time_point now(steady_clock::now());
while (target_run_time <= now) {
target_run_time += period;
}
std::this_thread::sleep_for(target_run_time - now);
}
}
void SequenceEntries(TreeSigner* tree_signer,
const function<bool()>& is_master) {
CHECK_NOTNULL(tree_signer);
CHECK(is_master);
const steady_clock::duration period(
(seconds(FLAGS_sequencing_frequency_seconds)));
steady_clock::time_point target_run_time(steady_clock::now());
while (true) {
if (is_master()) {
const ScopedLatency sequencer_sequence_latency(
sequencer_sequence_latency_ms.GetScopedLatency());
util::Status status(tree_signer->SequenceNewEntries());
if (!status.ok()) {
LOG(WARNING) << "Problem sequencing new entries: " << status;
}
sequencer_total_runs->Increment(status.ok());
}
const steady_clock::time_point now(steady_clock::now());
while (target_run_time <= now) {
target_run_time += period;
}
std::this_thread::sleep_for(target_run_time - now);
}
}
} // namespace cert_trans
|