File: redis_store_handler.cc

package info (click to toggle)
pytorch 1.13.1%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 139,252 kB
  • sloc: cpp: 1,100,274; python: 706,454; ansic: 83,052; asm: 7,618; java: 3,273; sh: 2,841; javascript: 612; makefile: 323; xml: 269; ruby: 185; yacc: 144; objc: 68; lex: 44
file content (130 lines) | stat: -rw-r--r-- 3,954 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#include "redis_store_handler.h"

#include <caffe2/core/logging.h>

#include <chrono>
#include <thread>
#include <vector>

namespace caffe2 {

RedisStoreHandler::RedisStoreHandler(
    std::string& host,
    int port,
    std::string& prefix)
    : host_(host), port_(port), prefix_(prefix) {
  struct timeval tv = {
      .tv_sec = 5,
      .tv_usec = 0,
  };

  redis_ = redisConnectWithTimeout(host.c_str(), port, tv);
  CAFFE_ENFORCE_NE(redis_, (redisContext*)nullptr);
  CAFFE_ENFORCE_EQ(redis_->err, 0, redis_->errstr);
}

RedisStoreHandler::~RedisStoreHandler() {
  redisFree(redis_);
}

std::string RedisStoreHandler::compoundKey(const std::string& name) {
  return prefix_ + name;
}

void RedisStoreHandler::set(const std::string& name, const std::string& data) {
  auto key = compoundKey(name);
  void* ptr = redisCommand(
      redis_,
      "SETNX %b %b",
      key.c_str(),
      (size_t)key.size(),
      data.c_str(),
      (size_t)data.size());
  CAFFE_ENFORCE_NE(ptr, (void*)nullptr, redis_->errstr);
  redisReply* reply = static_cast<redisReply*>(ptr);
  CAFFE_ENFORCE_EQ(reply->type, REDIS_REPLY_INTEGER);
  CAFFE_ENFORCE_EQ(
      reply->integer,
      1,
      "Value at ",
      name,
      " was already set",
      " (perhaps you reused a run ID you have used before?)");
}

std::string RedisStoreHandler::get(
    const std::string& name,
    const std::chrono::milliseconds& timeout) {
  // Block until key is set
  wait({name}, timeout);

  auto key = compoundKey(name);
  void* ptr = redisCommand(redis_, "GET %b", key.c_str(), (size_t)key.size());
  CAFFE_ENFORCE_NE(ptr, (void*)nullptr, redis_->errstr);
  redisReply* reply = static_cast<redisReply*>(ptr);
  CAFFE_ENFORCE_EQ(reply->type, REDIS_REPLY_STRING);
  return std::string(reply->str, reply->len);
}

int64_t RedisStoreHandler::add(const std::string& name, int64_t value) {
  auto key = compoundKey(name);
  void* ptr = redisCommand(
      redis_, "INCRBY %b %ld", key.c_str(), (size_t)key.size(), value);
  CAFFE_ENFORCE_NE(ptr, (void*)nullptr, redis_->errstr);
  redisReply* reply = static_cast<redisReply*>(ptr);
  CAFFE_ENFORCE_EQ(reply->type, REDIS_REPLY_INTEGER);
  return reply->integer;
}

int64_t RedisStoreHandler::getNumKeys() {
  CHECK(false) << "getNumKeys not implemented for RedisStoreHandler";
  return 0;
}

bool RedisStoreHandler::deleteKey(const std::string& /* unused */) {
  CHECK(false) << "deleteKey not implemented for RedisStoreHandler";
  return false;
}

bool RedisStoreHandler::check(const std::vector<std::string>& names) {
  std::vector<std::string> args;
  args.push_back("EXISTS");
  for (const auto& name : names) {
    args.push_back(compoundKey(name));
  }

  std::vector<const char*> argv;
  std::vector<size_t> argvlen;
  for (const auto& arg : args) {
    argv.push_back(arg.c_str());
    argvlen.push_back(arg.length());
  }

  auto argc = argv.size();
  void* ptr = redisCommandArgv(redis_, argc, argv.data(), argvlen.data());
  CAFFE_ENFORCE_NE(ptr, (void*)nullptr, redis_->errstr);
  redisReply* reply = static_cast<redisReply*>(ptr);
  CAFFE_ENFORCE_EQ(reply->type, REDIS_REPLY_INTEGER);
  return reply->integer == names.size();
}

void RedisStoreHandler::wait(
    const std::vector<std::string>& names,
    const std::chrono::milliseconds& timeout) {
  // Simple approach: poll...
  // Complex approach: use pub/sub.
  // Polling is fine for the typical rendezvous use case, as it is
  // only done at initialization time and  not at run time.
  const auto start = std::chrono::steady_clock::now();
  while (!check(names)) {
    const auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
        std::chrono::steady_clock::now() - start);
    if (timeout != kNoTimeout && elapsed > timeout) {
      STORE_HANDLER_TIMEOUT(
          "Wait timeout for name(s): ", c10::Join(" ", names));
    }
    /* sleep override */
    std::this_thread::sleep_for(std::chrono::milliseconds(10));
  }
}
} // namespace caffe2