1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
|
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// An example code demonstrating how to use CompactFiles, EventListener,
// and GetColumnFamilyMetaData APIs to implement custom compaction algorithm.
#include <mutex>
#include <string>
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/options.h"
using ROCKSDB_NAMESPACE::ColumnFamilyMetaData;
using ROCKSDB_NAMESPACE::CompactionOptions;
using ROCKSDB_NAMESPACE::DB;
using ROCKSDB_NAMESPACE::EventListener;
using ROCKSDB_NAMESPACE::FlushJobInfo;
using ROCKSDB_NAMESPACE::Options;
using ROCKSDB_NAMESPACE::ReadOptions;
using ROCKSDB_NAMESPACE::Status;
using ROCKSDB_NAMESPACE::WriteOptions;
#if defined(OS_WIN)
std::string kDBPath = "C:\\Windows\\TEMP\\rocksdb_compact_files_example";
#else
std::string kDBPath = "/tmp/rocksdb_compact_files_example";
#endif
struct CompactionTask;
// This is an example interface of external-compaction algorithm.
// Compaction algorithm can be implemented outside the core-RocksDB
// code by using the pluggable compaction APIs that RocksDb provides.
class Compactor : public EventListener {
public:
// Picks and returns a compaction task given the specified DB
// and column family. It is the caller's responsibility to
// destroy the returned CompactionTask. Returns "nullptr"
// if it cannot find a proper compaction task.
virtual CompactionTask* PickCompaction(DB* db,
const std::string& cf_name) = 0;
// Schedule and run the specified compaction task in background.
virtual void ScheduleCompaction(CompactionTask* task) = 0;
};
// Example structure that describes a compaction task.
struct CompactionTask {
CompactionTask(DB* _db, Compactor* _compactor,
const std::string& _column_family_name,
const std::vector<std::string>& _input_file_names,
const int _output_level,
const CompactionOptions& _compact_options, bool _retry_on_fail)
: db(_db),
compactor(_compactor),
column_family_name(_column_family_name),
input_file_names(_input_file_names),
output_level(_output_level),
compact_options(_compact_options),
retry_on_fail(_retry_on_fail) {}
DB* db;
Compactor* compactor;
const std::string& column_family_name;
std::vector<std::string> input_file_names;
int output_level;
CompactionOptions compact_options;
bool retry_on_fail;
};
// A simple compaction algorithm that always compacts everything
// to the highest level whenever possible.
class FullCompactor : public Compactor {
public:
explicit FullCompactor(const Options options) : options_(options) {
compact_options_.compression = options_.compression;
compact_options_.output_file_size_limit = options_.target_file_size_base;
}
// When flush happens, it determines whether to trigger compaction. If
// triggered_writes_stop is true, it will also set the retry flag of
// compaction-task to true.
void OnFlushCompleted(DB* db, const FlushJobInfo& info) override {
CompactionTask* task = PickCompaction(db, info.cf_name);
if (task != nullptr) {
if (info.triggered_writes_stop) {
task->retry_on_fail = true;
}
// Schedule compaction in a different thread.
ScheduleCompaction(task);
}
}
// Always pick a compaction which includes all files whenever possible.
CompactionTask* PickCompaction(DB* db, const std::string& cf_name) override {
ColumnFamilyMetaData cf_meta;
db->GetColumnFamilyMetaData(&cf_meta);
std::vector<std::string> input_file_names;
for (auto level : cf_meta.levels) {
for (auto file : level.files) {
if (file.being_compacted) {
return nullptr;
}
input_file_names.push_back(file.name);
}
}
return new CompactionTask(db, this, cf_name, input_file_names,
options_.num_levels - 1, compact_options_, false);
}
// Schedule the specified compaction task in background.
void ScheduleCompaction(CompactionTask* task) override {
options_.env->Schedule(&FullCompactor::CompactFiles, task);
}
static void CompactFiles(void* arg) {
std::unique_ptr<CompactionTask> task(static_cast<CompactionTask*>(arg));
assert(task);
assert(task->db);
Status s = task->db->CompactFiles(
task->compact_options, task->input_file_names, task->output_level);
printf("CompactFiles() finished with status %s\n", s.ToString().c_str());
if (!s.ok() && !s.IsIOError() && task->retry_on_fail) {
// If a compaction task with its retry_on_fail=true failed,
// try to schedule another compaction in case the reason
// is not an IO error.
CompactionTask* new_task =
task->compactor->PickCompaction(task->db, task->column_family_name);
task->compactor->ScheduleCompaction(new_task);
}
}
private:
Options options_;
CompactionOptions compact_options_;
};
int main() {
Options options;
options.create_if_missing = true;
// Disable RocksDB background compaction.
options.compaction_style = ROCKSDB_NAMESPACE::kCompactionStyleNone;
// Small write buffer size for generating more sst files in level 0.
options.write_buffer_size = 4 << 20;
// Small slowdown and stop trigger for experimental purpose.
options.level0_slowdown_writes_trigger = 3;
options.level0_stop_writes_trigger = 5;
options.IncreaseParallelism(5);
options.listeners.emplace_back(new FullCompactor(options));
DB* db = nullptr;
ROCKSDB_NAMESPACE::DestroyDB(kDBPath, options);
Status s = DB::Open(options, kDBPath, &db);
assert(s.ok());
assert(db);
// if background compaction is not working, write will stall
// because of options.level0_stop_writes_trigger
for (int i = 1000; i < 99999; ++i) {
db->Put(WriteOptions(), std::to_string(i),
std::string(500, 'a' + (i % 26)));
}
// verify the values are still there
std::string value;
for (int i = 1000; i < 99999; ++i) {
db->Get(ReadOptions(), std::to_string(i), &value);
assert(value == std::string(500, 'a' + (i % 26)));
}
// close the db.
delete db;
return 0;
}
|