File: file_store_handler.cc

package info (click to toggle)
pytorch 1.13.1%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 139,252 kB
  • sloc: cpp: 1,100,274; python: 706,454; ansic: 83,052; asm: 7,618; java: 3,273; sh: 2,841; javascript: 612; makefile: 323; xml: 269; ruby: 185; yacc: 144; objc: 68; lex: 44
file content (184 lines) | stat: -rw-r--r-- 5,169 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
#include "file_store_handler_op.h"

// NOLINTNEXTLINE(modernize-deprecated-headers)
#include <errno.h>
#include <fcntl.h>
// NOLINTNEXTLINE(modernize-deprecated-headers)
#include <limits.h>
// NOLINTNEXTLINE(modernize-deprecated-headers)
#include <stdio.h>
// NOLINTNEXTLINE(modernize-deprecated-headers)
#include <stdlib.h>
#include <sys/stat.h>

#include <array>
#include <chrono>
#include <iostream>
#include <thread>

#if defined(_MSC_VER)
#include <direct.h> // for _mkdir
#endif

#include "c10/util/StringUtil.h"

#include "caffe2/utils/murmur_hash3.h"

namespace caffe2 {

static std::string encodeName(const std::string& name) {
  // NOLINTNEXTLINE(cppcoreguidelines-pro-type-member-init)
  std::array<uint64_t, 2> out;
  MurmurHash3_x64_128(name.data(), name.size(), 0xcafef00d, out.data());

  // Size is 33 to have space for final NUL
  // NOLINTNEXTLINE(cppcoreguidelines-pro-type-member-init,cppcoreguidelines-avoid-magic-numbers)
  std::array<char, 33> buf;
  for (int i = 0; i < 16; i++) {
    snprintf(&buf[i * 2], buf.size() - (i * 2), "%02x", ((char*)out.data())[i]);
  }

  // Return everything but the final NUL
  return std::string(buf.data(), buf.size() - 1);
}

FileStoreHandler::FileStoreHandler(
    const std::string& path,
    const std::string& prefix) {
  basePath_ = realPath(path);
  if (!prefix.empty()) {
    basePath_ = basePath_ + "/" + encodeName(prefix);
  }
#if defined(_MSC_VER)
  auto ret = _mkdir(basePath_.c_str());
#else
  auto ret = mkdir(basePath_.c_str(), 0777);
#endif // defined(_MSC_VER)
  if (ret == -1) {
    TORCH_CHECK_EQ(errno, EEXIST) << "mkdir: " << strerror(errno);
  }
}

// NOLINTNEXTLINE(modernize-use-equals-default)
FileStoreHandler::~FileStoreHandler() {}

std::string FileStoreHandler::realPath(const std::string& path) {
#if defined(_MSC_VER)
  std::array<char, _MAX_PATH> buf;
  auto ret = _fullpath(buf.data(), path.c_str(), buf.size());
#else
  // NOLINTNEXTLINE(cppcoreguidelines-pro-type-member-init)
  std::array<char, PATH_MAX> buf;
  auto ret = realpath(path.c_str(), buf.data());
#endif
  TORCH_CHECK_EQ(buf.data(), ret) << "realpath: " << strerror(errno);
  return std::string(buf.data());
}

std::string FileStoreHandler::tmpPath(const std::string& name) {
  return basePath_ + "/." + encodeName(name);
}

std::string FileStoreHandler::objectPath(const std::string& name) {
  return basePath_ + "/" + encodeName(name);
}

void FileStoreHandler::set(const std::string& name, const std::string& data) {
  auto tmp = tmpPath(name);
  auto path = objectPath(name);

  {
    std::ofstream ofs(tmp.c_str(), std::ios::out | std::ios::trunc);
    if (!ofs.is_open()) {
      CAFFE_ENFORCE(
          false, "File cannot be created: ", tmp, " (", ofs.rdstate(), ")");
    }
    ofs << data;
  }

  // Atomically movve result to final location
  auto rv = rename(tmp.c_str(), path.c_str());
  CAFFE_ENFORCE_EQ(rv, 0, "rename: ", strerror(errno));
}

std::string FileStoreHandler::get(
    const std::string& name,
    const std::chrono::milliseconds& timeout) {
  auto path = objectPath(name);
  std::string result;

  // Block until key is set
  wait({name}, timeout);

  std::ifstream ifs(path.c_str(), std::ios::in);
  if (!ifs) {
    CAFFE_ENFORCE(
        false, "File cannot be opened: ", path, " (", ifs.rdstate(), ")");
  }
  ifs.seekg(0, std::ios::end);
  size_t n = ifs.tellg();
  result.resize(n);
  ifs.seekg(0);
  ifs.read(&result[0], n);
  return result;
}

int64_t FileStoreHandler::add(
    const std::string& /* unused */,
    int64_t /* unused */) {
  CHECK(false) << "add not implemented for FileStoreHandler";
  return 0;
}

int64_t FileStoreHandler::getNumKeys() {
  CHECK(false) << "getNumKeys not implemented for FileStoreHandler";
  return 0;
}

bool FileStoreHandler::deleteKey(const std::string& /* unused */) {
  CHECK(false) << "deleteKey not implemented for FileStoreHandler";
  return false;
}

bool FileStoreHandler::check(const std::vector<std::string>& names) {
  std::vector<std::string> paths;
  for (const auto& name : names) {
    // NOLINTNEXTLINE(performance-inefficient-vector-operation)
    paths.push_back(objectPath(name));
  }

  for (const auto& path : paths) {
    int fd = open(path.c_str(), O_RDONLY);
    if (fd == -1) {
      // Only deal with files that don't exist.
      // Anything else is a problem.
      TORCH_CHECK_EQ(errno, ENOENT);

      // One of the paths doesn't exist; return early
      return false;
    }

    close(fd);
  }

  return true;
}

void FileStoreHandler::wait(
    const std::vector<std::string>& names,
    const std::chrono::milliseconds& timeout) {
  // Not using inotify because it doesn't work on many
  // shared filesystems (such as NFS).
  const auto start = std::chrono::steady_clock::now();
  while (!check(names)) {
    const auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
        std::chrono::steady_clock::now() - start);
    if (timeout != kNoTimeout && elapsed > timeout) {
      STORE_HANDLER_TIMEOUT(
          "Wait timeout for name(s): ", c10::Join(" ", names));
    }
    /* sleep override */
    std::this_thread::sleep_for(std::chrono::milliseconds(10));
  }
}
} // namespace caffe2