1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
|
#ifndef CAFFE2_OPERATORS_INDEX_OPS_H_
#define CAFFE2_OPERATORS_INDEX_OPS_H_
#include <limits>
#include <mutex>
#include <sstream>
#include <unordered_map>
#include <vector>
#include "caffe2/core/blob_serialization.h"
#include "caffe2/core/operator.h"
#include "caffe2/core/tensor.h"
#include "c10/util/irange.h"
namespace caffe2 {
namespace {
using IndexKeyTypes = TensorTypes<int32_t, int64_t, std::string>;
using int64_tValue = int64_t;
} // namespace
struct IndexBase {
public:
IndexBase(int64_tValue maxElements, const TypeMeta type)
: maxElements_{maxElements}, meta_(type), frozen_{false} {}
void Freeze() {
frozen_ = true;
}
bool isFrozen() const {
return frozen_;
}
int64_t maxElements() const {
return maxElements_;
}
virtual ~IndexBase() {}
const TypeMeta Type() const {
return meta_;
}
int64_tValue Size() {
std::lock_guard<std::mutex> guard(dictMutex_);
return nextId_;
}
protected:
int64_t maxElements_;
TypeMeta meta_;
int64_tValue nextId_{1}; // guarded by dictMutex_
std::atomic<bool> frozen_{false};
std::mutex dictMutex_;
};
template <typename T>
struct Index : IndexBase {
explicit Index(int64_tValue maxElements)
: IndexBase(maxElements, TypeMeta::Make<T>()) {}
void Get(const T* keys, int64_tValue* values, size_t numKeys) {
if (frozen_) {
FrozenGet(keys, values, numKeys);
return;
}
std::lock_guard<std::mutex> lock(dictMutex_);
// NOLINTNEXTLINE(clang-diagnostic-sign-compare)
for (const auto i : c10::irange(numKeys)) {
auto it = dict_.find(keys[i]);
if (it != dict_.end()) {
values[i] = it->second;
} else if (nextId_ < maxElements_) {
auto newValue = nextId_++;
dict_.insert({keys[i], newValue});
values[i] = newValue;
} else {
CAFFE_THROW("Dict max size reached");
}
}
}
bool Load(const T* keys, size_t numKeys) {
CAFFE_ENFORCE(
// NOLINTNEXTLINE(clang-diagnostic-sign-compare)
numKeys <= maxElements_,
"Cannot load index: Tensor is larger than max_elements.");
decltype(dict_) dict;
for (const auto i : c10::irange(0U, numKeys)) {
CAFFE_ENFORCE(
dict.insert({keys[i], i + 1}).second,
"Repeated elements found: cannot load into dictionary.");
}
// assume no `get` is inflight while this happens
{
std::lock_guard<std::mutex> lock(dictMutex_);
// let the old dict get destructed outside of the lock
dict_.swap(dict);
nextId_ = numKeys + 1;
}
return true;
}
bool Store(Tensor* out) {
std::lock_guard<std::mutex> lock(dictMutex_);
out->Resize(nextId_ - 1);
auto outData = out->template mutable_data<T>();
for (const auto& entry : dict_) {
outData[entry.second - 1] = entry.first;
}
return true;
}
private:
void FrozenGet(const T* keys, int64_tValue* values, size_t numKeys) {
for (const auto i : c10::irange(0U, numKeys)) {
auto it = dict_.find(keys[i]);
values[i] = it != dict_.end() ? it->second : 0;
}
}
std::unordered_map<T, int64_tValue> dict_;
};
} // namespace caffe2
#endif // CAFFE2_OPERATORS_INDEX_OPS_H_
|