1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
|
#ifndef CERT_TRANS_LOG_ETCD_CONSISTENT_STORE_H_
#define CERT_TRANS_LOG_ETCD_CONSISTENT_STORE_H_
#include <stdint.h>
#include <memory>
#include <mutex>
#include <vector>
#include "base/macros.h"
#include "log/consistent_store.h"
#include "log/logged_entry.h"
#include "proto/ct.pb.h"
#include "util/etcd.h"
#include "util/libevent_wrapper.h"
#include "util/status.h"
#include "util/sync_task.h"
namespace cert_trans {
class MasterElection;
class EtcdConsistentStore : public ConsistentStore {
public:
// No change of ownership for |client|, |executor| must continue to be valid
// at least as long as this object is, and should not be the libevent::Base
// used by |client|.
EtcdConsistentStore(libevent::Base* base, util::Executor* executor,
EtcdClient* client, const MasterElection* election,
const std::string& root, const std::string& node_id);
virtual ~EtcdConsistentStore();
util::StatusOr<int64_t> NextAvailableSequenceNumber() const override;
util::Status SetServingSTH(const ct::SignedTreeHead& new_sth) override;
util::StatusOr<ct::SignedTreeHead> GetServingSTH() const override;
util::Status AddPendingEntry(LoggedEntry* entry) override;
util::Status GetPendingEntryForHash(
const std::string& hash, EntryHandle<LoggedEntry>* entry) const override;
util::Status GetPendingEntries(
std::vector<EntryHandle<LoggedEntry>>* entries) const override;
util::Status GetSequenceMapping(
EntryHandle<ct::SequenceMapping>* entry) const override;
util::Status UpdateSequenceMapping(
EntryHandle<ct::SequenceMapping>* entry) override;
util::StatusOr<ct::ClusterNodeState> GetClusterNodeState() const override;
util::Status SetClusterNodeState(const ct::ClusterNodeState& state) override;
void WatchServingSTH(const ConsistentStore::ServingSTHCallback& cb,
util::Task* task) override;
void WatchClusterNodeStates(
const ConsistentStore::ClusterNodeStateCallback& cb,
util::Task* task) override;
void WatchClusterConfig(const ConsistentStore::ClusterConfigCallback& cb,
util::Task* task) override;
util::Status SetClusterConfig(const ct::ClusterConfig& config) override;
// Removes sequenced entries with sequence numbers covered by the current
// serving STH.
util::StatusOr<int64_t> CleanupOldEntries() override;
private:
void WaitForServingSTHVersion(std::unique_lock<std::mutex>* lock,
const int version);
template <class T>
util::Status GetEntry(const std::string& path, EntryHandle<T>* entry) const;
util::Status GetAllEntriesInDir(
const std::string& dir,
std::vector<EntryHandle<LoggedEntry>>* entries) const;
util::Status UpdateEntry(EntryHandleBase* entry);
util::Status CreateEntry(EntryHandleBase* entry);
util::Status ForceSetEntry(EntryHandleBase* entry);
util::Status ForceSetEntryWithTTL(const std::chrono::seconds& ttl,
EntryHandleBase* entry);
util::Status DeleteEntry(const EntryHandleBase& entry);
std::string GetEntryPath(const LoggedEntry& entry) const;
std::string GetEntryPath(const std::string& hash) const;
std::string GetNodePath(const std::string& node_id) const;
std::string GetFullPath(const std::string& key) const;
void CheckMappingIsContiguousWithServingTree(
const ct::SequenceMapping& mapping) const;
// The following 3 methods are static just so that they have friend access to
// the private c'tor/setters of Update<>
// Converts a single Node to an Update<T> (using
// TypedUpdateFromNode() below), and calls |callback| with it.
template <class T, class CB>
static void ConvertSingleUpdate(
const std::string& full_path, const CB& callback,
const std::vector<EtcdClient::Node>& updates);
// Converts a vector of Nodes to a vector<Update<T>> (using
// TypedUpdateFromNode() below), and calls |callback| with it.
template <class T, class CB>
static void ConvertMultipleUpdate(
const CB& callback, const std::vector<EtcdClient::Node>& updates);
// Converts a generic Node to an Update<T>.
// T must implement ParseFromString().
template <class T>
static Update<T> TypedUpdateFromNode(const EtcdClient::Node& node);
void UpdateLocalServingSTH(const std::unique_lock<std::mutex>& lock,
const EntryHandle<ct::SignedTreeHead>& handle);
void OnEtcdServingSTHUpdated(const Update<ct::SignedTreeHead>& update);
void OnClusterConfigUpdated(const Update<ct::ClusterConfig>& update);
void StartEtcdStatsFetch();
void EtcdStatsFetchDone(EtcdClient::StatsResponse* response,
util::Task* task);
util::Status MaybeReject(const std::string& type) const;
EtcdClient* const client_; // We don't own this.
libevent::Base* base_; // We don't own this.
util::Executor* const executor_; // We don't own this.
const MasterElection* const election_; // We don't own this.
const std::string root_;
const std::string node_id_;
std::condition_variable serving_sth_cv_;
util::SyncTask serving_sth_watch_task_;
util::SyncTask cluster_config_watch_task_;
util::SyncTask etcd_stats_task_;
mutable std::mutex mutex_;
bool received_initial_sth_;
std::unique_ptr<EntryHandle<ct::SignedTreeHead>> serving_sth_;
std::unique_ptr<ct::ClusterConfig> cluster_config_;
bool exiting_;
int64_t num_etcd_entries_;
friend class EtcdConsistentStoreTest;
template <class T>
friend class TreeSignerTest;
DISALLOW_COPY_AND_ASSIGN(EtcdConsistentStore);
};
} // namespace cert_trans
#endif // CERT_TRANS_LOG_ETCD_CONSISTENT_STORE_H_
|