1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
|
#include "caffe2/core/db.h"
#include "caffe2/core/logging.h"
#include "caffe2/core/flags.h"
#include "leveldb/db.h"
#include "leveldb/write_batch.h"
C10_DEFINE_int(
caffe2_leveldb_block_size,
65536,
"The caffe2 leveldb block size when writing a leveldb.");
namespace caffe2 {
namespace db {
class LevelDBCursor : public Cursor {
public:
explicit LevelDBCursor(leveldb::DB* db)
: iter_(db->NewIterator(leveldb::ReadOptions())) {
SeekToFirst();
}
~LevelDBCursor() override {}
void Seek(const string& key) override { iter_->Seek(key); }
bool SupportsSeek() override { return true; }
void SeekToFirst() override { iter_->SeekToFirst(); }
void Next() override { iter_->Next(); }
string key() override { return iter_->key().ToString(); }
string value() override { return iter_->value().ToString(); }
bool Valid() override { return iter_->Valid(); }
private:
std::unique_ptr<leveldb::Iterator> iter_;
};
class LevelDBTransaction : public Transaction {
public:
explicit LevelDBTransaction(leveldb::DB* db) : db_(db) {
CAFFE_ENFORCE(db_);
batch_.reset(new leveldb::WriteBatch());
}
~LevelDBTransaction() override {
Commit();
}
void Put(const string& key, const string& value) override {
batch_->Put(key, value);
}
void Commit() override {
leveldb::Status status = db_->Write(leveldb::WriteOptions(), batch_.get());
batch_.reset(new leveldb::WriteBatch());
CAFFE_ENFORCE(
status.ok(),
"Failed to write batch to leveldb. ", status.ToString());
}
private:
leveldb::DB* db_;
std::unique_ptr<leveldb::WriteBatch> batch_;
C10_DISABLE_COPY_AND_ASSIGN(LevelDBTransaction);
};
class LevelDB : public DB {
public:
LevelDB(const string& source, Mode mode) : DB(source, mode) {
leveldb::Options options;
options.block_size = FLAGS_caffe2_leveldb_block_size;
options.write_buffer_size = 268435456;
options.max_open_files = 100;
options.error_if_exists = mode == NEW;
options.create_if_missing = mode != READ;
leveldb::DB* db_temp;
leveldb::Status status = leveldb::DB::Open(options, source, &db_temp);
CAFFE_ENFORCE(
status.ok(),
"Failed to open leveldb ", source, ". ", status.ToString());
db_.reset(db_temp);
VLOG(1) << "Opened leveldb " << source;
}
void Close() override { db_.reset(); }
unique_ptr<Cursor> NewCursor() override {
return make_unique<LevelDBCursor>(db_.get());
}
unique_ptr<Transaction> NewTransaction() override {
return make_unique<LevelDBTransaction>(db_.get());
}
private:
std::unique_ptr<leveldb::DB> db_;
};
REGISTER_CAFFE2_DB(LevelDB, LevelDB);
// For lazy-minded, one can also call with lower-case name.
REGISTER_CAFFE2_DB(leveldb, LevelDB);
} // namespace db
} // namespace caffe2
|