1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
|
#ifndef CERT_TRANS_UTIL_ETCD_H_
#define CERT_TRANS_UTIL_ETCD_H_
#include <stdint.h>
#include <chrono>
#include <list>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <utility>
#include <vector>
#include "base/macros.h"
#include "net/url_fetcher.h"
#include "util/status.h"
#include "util/sync_task.h"
#include "util/task.h"
#include "util/util.h"
class JsonObject;
namespace cert_trans {
class EtcdClient {
public:
typedef std::pair<std::string, uint16_t> HostPortPair;
struct Node {
static const Node& InvalidNode();
Node() : Node(InvalidNode()) {
}
Node(int64_t created_index, int64_t modified_index, const std::string& key,
bool is_dir, const std::string& value, std::vector<Node>&& nodes,
bool deleted);
bool HasExpiry() const;
std::string ToString() const;
int64_t created_index_;
int64_t modified_index_;
std::string key_;
bool is_dir_;
std::string value_;
std::vector<Node> nodes_;
std::chrono::system_clock::time_point expires_;
bool deleted_;
};
struct Request {
Request(const std::string& thekey)
: key(thekey), recursive(false), wait_index(0) {
}
std::string key;
bool recursive;
int64_t wait_index;
};
struct Response {
Response() : etcd_index(-1) {
}
int64_t etcd_index;
};
struct GetResponse : public Response {
Node node;
};
struct GenericResponse : public Response {
std::shared_ptr<JsonObject> json_body;
};
struct StatsResponse : public Response {
std::map<std::string, int64_t> stats;
};
typedef std::function<void(const std::vector<Node>& updates)> WatchCallback;
EtcdClient(util::Executor* executor, UrlFetcher* fetcher,
const std::string& host, uint16_t port);
EtcdClient(util::Executor* executir, UrlFetcher* fetcher,
const std::list<HostPortPair>& etcds);
virtual ~EtcdClient();
virtual void Get(const Request& req, GetResponse* resp, util::Task* task);
virtual void Create(const std::string& key, const std::string& value,
Response* resp, util::Task* task);
virtual void CreateWithTTL(const std::string& key, const std::string& value,
const std::chrono::seconds& ttl, Response* resp,
util::Task* task);
virtual void Update(const std::string& key, const std::string& value,
const int64_t previous_index, Response* resp,
util::Task* task);
virtual void UpdateWithTTL(const std::string& key, const std::string& value,
const std::chrono::seconds& ttl,
const int64_t previous_index, Response* resp,
util::Task* task);
virtual void ForceSet(const std::string& key, const std::string& value,
Response* resp, util::Task* task);
virtual void ForceSetWithTTL(const std::string& key,
const std::string& value,
const std::chrono::seconds& ttl, Response* resp,
util::Task* task);
virtual void Delete(const std::string& key, const int64_t current_index,
util::Task* task);
virtual void ForceDelete(const std::string& key, util::Task* task);
virtual void GetStoreStats(StatsResponse* resp, util::Task* task);
// The "cb" will be called on the "task" executor. Also, only one
// will be sent to the executor at a time (for a given call to this
// method, not for all of them), to make sure they are received in
// order.
virtual void Watch(const std::string& key, const WatchCallback& cb,
util::Task* task);
protected:
// Testing only
EtcdClient();
private:
struct RequestState;
struct WatchState;
HostPortPair ChooseNextServer();
HostPortPair GetEndpoint() const;
HostPortPair UpdateEndpoint(HostPortPair&& new_endpoint);
void FetchDone(RequestState* etcd_req, util::Task* task);
void Generic(const std::string& key, const std::string& key_space,
const std::map<std::string, std::string>& params,
UrlFetcher::Verb verb, GenericResponse* resp, util::Task* task);
void WatchInitialGetDone(WatchState* state, GetResponse* resp,
util::Task* task);
void SendWatchUpdates(WatchState* state, const std::vector<Node>& updates);
void StartWatchRequest(WatchState* state);
void WatchRequestDone(WatchState* state, GetResponse* gen_resp,
util::Task* child_task);
void MaybeLogEtcdVersion();
util::Executor* const executor_;
std::unique_ptr<util::SyncTask> log_version_task_;
UrlFetcher* const fetcher_;
mutable std::mutex lock_;
std::list<HostPortPair> etcds_;
bool logged_version_;
DISALLOW_COPY_AND_ASSIGN(EtcdClient);
};
// Splits strings of the form "host:port,host:port,..." to a list of
// HostPortPairs.
std::list<EtcdClient::HostPortPair> SplitHosts(
const std::string& hosts_string);
} // namespace cert_trans
#endif // CERT_TRANS_UTIL_ETCD_H_
|