1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370
|
// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "table/compaction_merging_iterator.h"
namespace ROCKSDB_NAMESPACE {
class CompactionMergingIterator : public InternalIterator {
public:
CompactionMergingIterator(
const InternalKeyComparator* comparator, InternalIterator** children,
int n, bool is_arena_mode,
std::vector<std::pair<std::unique_ptr<TruncatedRangeDelIterator>,
std::unique_ptr<TruncatedRangeDelIterator>**>>&
range_tombstones)
: is_arena_mode_(is_arena_mode),
comparator_(comparator),
current_(nullptr),
minHeap_(CompactionHeapItemComparator(comparator_)),
pinned_iters_mgr_(nullptr) {
children_.resize(n);
for (int i = 0; i < n; i++) {
children_[i].level = i;
children_[i].iter.Set(children[i]);
assert(children_[i].type == HeapItem::ITERATOR);
}
assert(range_tombstones.size() == static_cast<size_t>(n));
for (auto& p : range_tombstones) {
range_tombstone_iters_.push_back(std::move(p.first));
}
pinned_heap_item_.resize(n);
for (int i = 0; i < n; ++i) {
if (range_tombstones[i].second) {
// for LevelIterator
*range_tombstones[i].second = &range_tombstone_iters_[i];
}
pinned_heap_item_[i].level = i;
pinned_heap_item_[i].type = HeapItem::DELETE_RANGE_START;
}
}
void considerStatus(const Status& s) {
if (!s.ok() && status_.ok()) {
status_ = s;
}
}
~CompactionMergingIterator() override {
range_tombstone_iters_.clear();
for (auto& child : children_) {
child.iter.DeleteIter(is_arena_mode_);
}
status_.PermitUncheckedError();
}
bool Valid() const override { return current_ != nullptr && status_.ok(); }
Status status() const override { return status_; }
void SeekToFirst() override;
void Seek(const Slice& target) override;
void Next() override;
Slice key() const override {
assert(Valid());
return current_->key();
}
Slice value() const override {
assert(Valid());
if (LIKELY(current_->type == HeapItem::ITERATOR)) {
return current_->iter.value();
} else {
return dummy_tombstone_val;
}
}
// Here we simply relay MayBeOutOfLowerBound/MayBeOutOfUpperBound result
// from current child iterator. Potentially as long as one of child iterator
// report out of bound is not possible, we know current key is within bound.
bool MayBeOutOfLowerBound() override {
assert(Valid());
return current_->type == HeapItem::DELETE_RANGE_START ||
current_->iter.MayBeOutOfLowerBound();
}
IterBoundCheck UpperBoundCheckResult() override {
assert(Valid());
return current_->type == HeapItem::DELETE_RANGE_START
? IterBoundCheck::kUnknown
: current_->iter.UpperBoundCheckResult();
}
void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
pinned_iters_mgr_ = pinned_iters_mgr;
for (auto& child : children_) {
child.iter.SetPinnedItersMgr(pinned_iters_mgr);
}
}
bool IsDeleteRangeSentinelKey() const override {
assert(Valid());
return current_->type == HeapItem::DELETE_RANGE_START;
}
// Compaction uses the above subset of InternalIterator interface.
void SeekToLast() override { assert(false); }
void SeekForPrev(const Slice&) override { assert(false); }
void Prev() override { assert(false); }
bool NextAndGetResult(IterateResult*) override {
assert(false);
return false;
}
bool IsKeyPinned() const override {
assert(false);
return false;
}
bool IsValuePinned() const override {
assert(false);
return false;
}
bool PrepareValue() override {
assert(false);
return false;
}
private:
struct HeapItem {
HeapItem() = default;
IteratorWrapper iter;
size_t level = 0;
std::string tombstone_str;
enum Type { ITERATOR, DELETE_RANGE_START };
Type type = ITERATOR;
explicit HeapItem(size_t _level, InternalIteratorBase<Slice>* _iter)
: level(_level), type(Type::ITERATOR) {
iter.Set(_iter);
}
void SetTombstoneForCompaction(const ParsedInternalKey&& pik) {
tombstone_str.clear();
AppendInternalKey(&tombstone_str, pik);
}
[[nodiscard]] Slice key() const {
return type == ITERATOR ? iter.key() : tombstone_str;
}
};
class CompactionHeapItemComparator {
public:
explicit CompactionHeapItemComparator(
const InternalKeyComparator* comparator)
: comparator_(comparator) {}
bool operator()(HeapItem* a, HeapItem* b) const {
int r = comparator_->Compare(a->key(), b->key());
// For each file, we assume all range tombstone start keys come before
// its file boundary sentinel key (file's meta.largest key).
// In the case when meta.smallest = meta.largest and range tombstone start
// key is truncated at meta.smallest, the start key will have op_type =
// kMaxValid to make it smaller (see TruncatedRangeDelIterator
// constructor). The following assertion validates this assumption.
assert(a->type == b->type || r != 0);
return r > 0;
}
private:
const InternalKeyComparator* comparator_;
};
using CompactionMinHeap = BinaryHeap<HeapItem*, CompactionHeapItemComparator>;
bool is_arena_mode_;
const InternalKeyComparator* comparator_;
// HeapItem for all child point iterators.
std::vector<HeapItem> children_;
// HeapItem for range tombstones. pinned_heap_item_[i] corresponds to the
// current range tombstone from range_tombstone_iters_[i].
std::vector<HeapItem> pinned_heap_item_;
// range_tombstone_iters_[i] contains range tombstones in the sorted run that
// corresponds to children_[i]. range_tombstone_iters_[i] ==
// nullptr means the sorted run of children_[i] does not have range
// tombstones (or the current SSTable does not have range tombstones in the
// case of LevelIterator).
std::vector<std::unique_ptr<TruncatedRangeDelIterator>>
range_tombstone_iters_;
// Used as value for range tombstone keys
std::string dummy_tombstone_val{};
// Skip file boundary sentinel keys.
void FindNextVisibleKey();
// top of minHeap_
HeapItem* current_;
// If any of the children have non-ok status, this is one of them.
Status status_;
CompactionMinHeap minHeap_;
PinnedIteratorsManager* pinned_iters_mgr_;
// Process a child that is not in the min heap.
// If valid, add to the min heap. Otherwise, check status.
void AddToMinHeapOrCheckStatus(HeapItem*);
HeapItem* CurrentForward() const {
return !minHeap_.empty() ? minHeap_.top() : nullptr;
}
void InsertRangeTombstoneAtLevel(size_t level) {
if (range_tombstone_iters_[level]->Valid()) {
pinned_heap_item_[level].SetTombstoneForCompaction(
range_tombstone_iters_[level]->start_key());
minHeap_.push(&pinned_heap_item_[level]);
}
}
};
void CompactionMergingIterator::SeekToFirst() {
minHeap_.clear();
status_ = Status::OK();
for (auto& child : children_) {
child.iter.SeekToFirst();
AddToMinHeapOrCheckStatus(&child);
}
for (size_t i = 0; i < range_tombstone_iters_.size(); ++i) {
if (range_tombstone_iters_[i]) {
range_tombstone_iters_[i]->SeekToFirst();
InsertRangeTombstoneAtLevel(i);
}
}
FindNextVisibleKey();
current_ = CurrentForward();
}
void CompactionMergingIterator::Seek(const Slice& target) {
minHeap_.clear();
status_ = Status::OK();
for (auto& child : children_) {
child.iter.Seek(target);
AddToMinHeapOrCheckStatus(&child);
}
ParsedInternalKey pik;
ParseInternalKey(target, &pik, false /* log_err_key */)
.PermitUncheckedError();
for (size_t i = 0; i < range_tombstone_iters_.size(); ++i) {
if (range_tombstone_iters_[i]) {
range_tombstone_iters_[i]->Seek(pik.user_key);
// For compaction, output keys should all be after seek target.
while (range_tombstone_iters_[i]->Valid() &&
comparator_->Compare(range_tombstone_iters_[i]->start_key(), pik) <
0) {
range_tombstone_iters_[i]->Next();
}
InsertRangeTombstoneAtLevel(i);
}
}
FindNextVisibleKey();
current_ = CurrentForward();
}
void CompactionMergingIterator::Next() {
assert(Valid());
// For the heap modifications below to be correct, current_ must be the
// current top of the heap.
assert(current_ == CurrentForward());
// as the current points to the current record. move the iterator forward.
if (current_->type == HeapItem::ITERATOR) {
current_->iter.Next();
if (current_->iter.Valid()) {
// current is still valid after the Next() call above. Call
// replace_top() to restore the heap property. When the same child
// iterator yields a sequence of keys, this is cheap.
assert(current_->iter.status().ok());
minHeap_.replace_top(current_);
} else {
// current stopped being valid, remove it from the heap.
considerStatus(current_->iter.status());
minHeap_.pop();
}
} else {
assert(current_->type == HeapItem::DELETE_RANGE_START);
size_t level = current_->level;
assert(range_tombstone_iters_[level]);
range_tombstone_iters_[level]->Next();
if (range_tombstone_iters_[level]->Valid()) {
pinned_heap_item_[level].SetTombstoneForCompaction(
range_tombstone_iters_[level]->start_key());
minHeap_.replace_top(&pinned_heap_item_[level]);
} else {
minHeap_.pop();
}
}
FindNextVisibleKey();
current_ = CurrentForward();
}
void CompactionMergingIterator::FindNextVisibleKey() {
while (!minHeap_.empty()) {
HeapItem* current = minHeap_.top();
// IsDeleteRangeSentinelKey() here means file boundary sentinel keys.
if (current->type != HeapItem::ITERATOR ||
!current->iter.IsDeleteRangeSentinelKey()) {
return;
}
// range tombstone start keys from the same SSTable should have been
// exhausted
assert(!range_tombstone_iters_[current->level] ||
!range_tombstone_iters_[current->level]->Valid());
// current->iter is a LevelIterator, and it enters a new SST file in the
// Next() call here.
current->iter.Next();
if (current->iter.Valid()) {
assert(current->iter.status().ok());
minHeap_.replace_top(current);
} else {
considerStatus(current->iter.status());
minHeap_.pop();
}
if (range_tombstone_iters_[current->level]) {
InsertRangeTombstoneAtLevel(current->level);
}
}
}
void CompactionMergingIterator::AddToMinHeapOrCheckStatus(HeapItem* child) {
if (child->iter.Valid()) {
assert(child->iter.status().ok());
minHeap_.push(child);
} else {
considerStatus(child->iter.status());
}
}
InternalIterator* NewCompactionMergingIterator(
const InternalKeyComparator* comparator, InternalIterator** children, int n,
std::vector<std::pair<std::unique_ptr<TruncatedRangeDelIterator>,
std::unique_ptr<TruncatedRangeDelIterator>**>>&
range_tombstone_iters,
Arena* arena) {
assert(n >= 0);
if (n == 0) {
return NewEmptyInternalIterator<Slice>(arena);
} else {
if (arena == nullptr) {
return new CompactionMergingIterator(comparator, children, n,
false /* is_arena_mode */,
range_tombstone_iters);
} else {
auto mem = arena->AllocateAligned(sizeof(CompactionMergingIterator));
return new (mem) CompactionMergingIterator(comparator, children, n,
true /* is_arena_mode */,
range_tombstone_iters);
}
}
}
} // namespace ROCKSDB_NAMESPACE
|