LevelDB-Compaction

LevelDB源代码分析

Intro

这一篇主要介绍LevelDB的Compaction的过程。在上一篇我们看到LevelDB在打开数据库时会调用MaybeScheduleCompaction这个函数,该函数可能会在单独的线程调度Compaction。接下来我们首先介绍一下有哪些操作可以触发LevelDB的Compaction之后在介绍Compaction的流程。

在LevelDB中Compaction的触发可以大致分为两类,第一类是用户调用LevelDB的CompactRange接口来进行底层数据的压缩,第二类是用户调用Get/Write等操作时,随着LevelDB中某一个Level的文件大小或数量或者文件查找次数超过一定阈值来触发Compaction。当Compaction被触发后会统一调用MaybeScheduleCompaction,接下来逐行分析该函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld();
if (background_compaction_scheduled_) {
// 已有线程正在进行Compaction
} else if (shutting_down_.Acquire_Load()) {
// 正在删除数据库
} else if (!bg_error_.ok()) {
// 过去Compaction的过程存在错误
} else if (imm_ == nullptr && manual_compaction_ == nullptr &&
!versions_->NeedsCompaction()) {
// 现在不需要进行Compaction
} else {
background_compaction_scheduled_ = true;
// 进行Compaction,最终调用的是BackgroundCall
env_->Schedule(&DBImpl::BGWork, this);
}
}

DBImpl::BackgroundCall

LevelDB会在单独的线程中调用该函数来完成Compaction,该函数会调用BackgroundCompaction来完成Compaction,之后

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void DBImpl::BackgroundCall() {
MutexLock l(&mutex_);
assert(background_compaction_scheduled_);
if (shutting_down_.Acquire_Load()) {
} else if (!bg_error_.ok()) {
} else {
BackgroundCompaction();
}

background_compaction_scheduled_ = false;

// 由于之前的BackgroundCompaction可能在某一层产生了很多文件,因此需要再次调度Compaction
MaybeScheduleCompaction();
// 唤醒正在等待Compaction的线程,例如某线程发现在进行mem_已满,而imm_被设置,因此需要等待
// imm_被写入level0中,然后将mem_替换为新的imm_
background_work_finished_signal_.SignalAll();
}

DBImpl::BackgroundCompaction

该函数负责完成手动触发和自动触发的所有Compaction工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
void DBImpl::BackgroundCompaction() {
mutex_.AssertHeld();

// imm_是LevelDB中mem_的一个副本。当mem_占用空间过大后会将将imm_设置为mem_,然后
// 调度一次Compaction
if (imm_ != nullptr) {
// 将imm_合成入Level0中,该函数会稍后介绍
CompactMemTable();
return;
}

Compaction* c;
bool is_manual = (manual_compaction_ != nullptr);
InternalKey manual_end;
// Compaction可以有两种触发方式,一个是利用LevelDB提供的CompactRange函数手动进行Compaction
// 另一个是由用户的Get等操作触发Compaction
if (is_manual) {
ManualCompaction* m = manual_compaction_;
// 根据用户设定的范围来选择实际进行Compaction的范围,并设置需要进行Compaction所需要
// 的level和level+1的文件,该函数会在稍后介绍
c = versions_->CompactRange(m->level, m->begin, m->end);
m->done = (c == nullptr);
if (c != nullptr) {
// 为了避免一次归并的范围过大,可能仅归并其中的一小部分范围,因此需要修改manual_end
manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
}
} else {
// VersionSet负责挑选出最应当进行Compaction的范围,该函数会在稍后介绍
c = versions_->PickCompaction();
}

Status status;
if (c == nullptr) {
} else if (!is_manual && c->IsTrivialMove()) {
// 如果发现某次归并其在level的输入文件仅有一个,在level+1没有对应的输入文件
// 同时该次归并没有覆盖太多的level+2的文件,则该次归并仅需将文件从level移动到
// level+1
assert(c->num_input_files(0) == 1);
FileMetaData* f = c->input(0, 0);
c->edit()->DeleteFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
f->smallest, f->largest);
status = versions_->LogAndApply(c->edit(), &mutex_);
if (!status.ok()) {
RecordBackgroundError(status);
}
} else {
CompactionState* compact = new CompactionState(c);
// 进行归并,该函数会在之后进行介绍
status = DoCompactionWork(compact);
if (!status.ok()) {
RecordBackgroundError(status);
}
CleanupCompaction(compact);
c->ReleaseInputs();
DeleteObsoleteFiles();
}
delete c;

if (status.ok()) {
// Done
} else if (shutting_down_.Acquire_Load()) {
// Ignore compaction errors found during shutting down
} else {
Log(options_.info_log,
"Compaction error: %s", status.ToString().c_str());
}

if (is_manual) {
ManualCompaction* m = manual_compaction_;
if (!status.ok()) {
m->done = true;
}
if (!m->done) {
// We only compacted part of the requested range. Update *m
// to the range that is left to be compacted.
m->tmp_storage = manual_end;
m->begin = &m->tmp_storage;
}
manual_compaction_ = nullptr;
}
}

DBImpl::CompactMemTable()

该函数负责将imm_归并入level0中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void DBImpl::CompactMemTable() {
mutex_.AssertHeld();
assert(imm_ != nullptr);

// 由于将imm_写入level0可能会在level0创建新的文件,因此需要VersionEdit来完成该工作。
VersionEdit edit;
Version* base = versions_->current();
base->Ref();
// 将imm_写入Level0中
Status s = WriteLevel0Table(imm_, &edit, base);
base->Unref();

if (s.ok() && shutting_down_.Acquire_Load()) {
s = Status::IOError("Deleting DB during memtable compaction");
}

// 将VersionEdit应用于当前的VersionSet,从而生成新的当前的Version
if (s.ok()) {
edit.SetPrevLogNumber(0);
edit.SetLogNumber(logfile_number_);
s = versions_->LogAndApply(&edit, &mutex_);
}

if (s.ok()) {
// 清除旧的imm_
imm_->Unref();
imm_ = nullptr;
has_imm_.Release_Store(nullptr);
DeleteObsoleteFiles();
} else {
RecordBackgroundError(s);
}
}

DBImpl::WriteLevel0Table

该函数会生成一个新的Table文件,然后将imm_中所有的内容都插入该Table文件,则也就导致了由于新生成的level0的文件与之前的level0的文件有重复范围的键值的情况,该情况会在之后的Compaction中被解决,因而在所有大于1的level中每个文件的键值范围都是不重叠的。

VersionSet::CompactRange

该函数负责根据用户的输入选择实际进行Compaction的范围。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Compaction* VersionSet::CompactRange(int level, const InternalKey* begin, const InternalKey* end) {
std::vector<FileMetaData*> inputs;
// 查找在level中所有与begin与end范围有重叠的文件。如果是level0的话,由于不同文件的键值范围可能有
// 重叠,会尽可能的扩大选择范围,一次选择多个文件。
current_->GetOverlappingInputs(level, begin, end, &inputs);
if (inputs.empty()) {
return nullptr;
}

// 避免一次Compaction的范围过大,这里根据文件的总大小来进行选择,仅保留部分文件
if (level > 0) {
const uint64_t limit = MaxFileSizeForLevel(options_, level);
uint64_t total = 0;
for (size_t i = 0; i < inputs.size(); i++) {
uint64_t s = inputs[i]->file_size;
total += s;
if (total >= limit) {
inputs.resize(i + 1);
break;
}
}
}

Compaction* c = new Compaction(options_, level);
c->input_version_ = current_;
c->input_version_->Ref();
c->inputs_[0] = inputs;
// 根据该level的输入文件选择level+1的输入文件,该函数会在稍后介绍
SetupOtherInputs(c);
return c;
}

VersionSet::SetupOtherInputs

该函数根据Compaction在level所选择的文件选择level+1所需要的文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
void VersionSet::SetupOtherInputs(Compaction* c) {
const int level = c->level();
InternalKey smallest, largest;
// 根据level所选择的文件获取最大与最小的键值。注意由于每次Compaction的输入与输出的
// 单位都是文件,因此这里的smallest、largest可能与用户最初选择的Compaction范围不一致
GetRange(c->inputs_[0], &smallest, &largest);

// 获取在level+1层有哪些文件的键值范围与smallest、largest重叠
current_->GetOverlappingInputs(level+1, &smallest, &largest, &c->inputs_[1]);

// 获取level及level+1所有的输入文件的键值中的最大值和最小值
InternalKey all_start, all_limit;
GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);

// 由于all_start与all_limit的范围必然大于等于level+1的文件的输入范围,且level+1的文件输入
// 是由level的文件输入范围获取的,因此利用all_start与all_limit在level进行查找时不会导致在
// level+1层引入更多的文件。这样可以保证在使用相同数量的level+1层的文件的情况下一次归并更多
// 的level层的文件,提升压缩效率。
if (!c->inputs_[1].empty()) {
std::vector<FileMetaData*> expanded0;
current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0);
const int64_t inputs0_size = TotalFileSize(c->inputs_[0]);
const int64_t inputs1_size = TotalFileSize(c->inputs_[1]);
const int64_t expanded0_size = TotalFileSize(expanded0);
if (expanded0.size() > c->inputs_[0].size() &&
inputs1_size + expanded0_size <
ExpandedCompactionByteSizeLimit(options_)) {
InternalKey new_start, new_limit;
GetRange(expanded0, &new_start, &new_limit);
std::vector<FileMetaData*> expanded1;
current_->GetOverlappingInputs(level+1, &new_start, &new_limit,
&expanded1);
if (expanded1.size() == c->inputs_[1].size()) {
Log(options_->info_log,
"Expanding@%d %d+%d (%ld+%ld bytes) to %d+%d (%ld+%ld bytes)\n",
level,
int(c->inputs_[0].size()),
int(c->inputs_[1].size()),
long(inputs0_size), long(inputs1_size),
int(expanded0.size()),
int(expanded1.size()),
long(expanded0_size), long(inputs1_size));
smallest = new_start;
largest = new_limit;
c->inputs_[0] = expanded0;
c->inputs_[1] = expanded1;
GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
}
}
}

// 查找所有在level+2层与该范围对应的文件,设置该值是为了确定在归并的过程中不会引起太多的
// 在level+2层的修改。
// (parent == level+1; grandparent == level+2)
if (level + 2 < config::kNumLevels) {
current_->GetOverlappingInputs(level + 2, &all_start, &all_limit,
&c->grandparents_);
}

// 更新Compaction pointer指向下一次需要开始归并的地址
compact_pointer_[level] = largest.Encode().ToString();
c->edit_.SetCompactPointer(level, largest);
}

VersionSet::PickCompaction

该函数是为了当自动触发了归并时,选择一个需要进行归并的范围。自动触发的归并又可分为两类,第一类是由于Recover或者添加新文件时导致某个level文件数量过多或者文件大小过大从而触发的size_compaction;第二类是由于在某个文件上进行了多次的查找,从而触发的seek_compaction。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
Compaction* VersionSet::PickCompaction() {
Compaction* c;
int level;

const bool size_compaction = (current_->compaction_score_ >= 1);
const bool seek_compaction = (current_->file_to_compact_ != nullptr);
if (size_compaction) {
level = current_->compaction_level_;
assert(level >= 0);
assert(level+1 < config::kNumLevels);
c = new Compaction(options_, level);

// 在最需要进行Compaction的level中根据compaction_pointer_选择该次需要的输入文件
for (size_t i = 0; i < current_->files_[level].size(); i++) {
FileMetaData* f = current_->files_[level][i];
if (compact_pointer_[level].empty() ||
icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
c->inputs_[0].push_back(f);
break;
}
}
if (c->inputs_[0].empty()) {
c->inputs_[0].push_back(current_->files_[level][0]);
}
} else if (seek_compaction) {
level = current_->file_to_compact_level_;
c = new Compaction(options_, level);
c->inputs_[0].push_back(current_->file_to_compact_);
} else {
return nullptr;
}

c->input_version_ = current_;
c->input_version_->Ref();

// 如果是在level0,在选择好需要进行Compaction的文件后会同时选择所有与该文件有重叠
// 键值范围的文件
if (level == 0) {
InternalKey smallest, largest;
GetRange(c->inputs_[0], &smallest, &largest);
current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
assert(!c->inputs_[0].empty());
}

// 根据level的文件选择level+1的输入文件
SetupOtherInputs(c);

return c;
}

DBImpl::DoCompactionWork

该函数负责执行实际的归并工作,会从level与level+1的文件中读取输入,进行归并,删除不必要的条目然后在level+1生成新的Table文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
Status DBImpl::DoCompactionWork(CompactionState* compact) {
...

assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
assert(compact->builder == nullptr);
assert(compact->outfile == nullptr);
// 保留在该序号之后的值
if (snapshots_.empty()) {
compact->smallest_snapshot = versions_->LastSequence();
} else {
compact->smallest_snapshot = snapshots_.oldest()->sequence_number();
}

mutex_.Unlock();

// 生成一个迭代器用于遍历所有的输入范围,即level和level+1的所有文件
Iterator* input = versions_->MakeInputIterator(compact->compaction);
input->SeekToFirst();
Status status;
ParsedInternalKey ikey;
std::string current_user_key;
bool has_current_user_key = false;
SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
// 如果当前有imm_被生成则有限进行imm_的归并
if (has_imm_.NoBarrier_Load() != nullptr) {
const uint64_t imm_start = env_->NowMicros();
mutex_.Lock();
if (imm_ != nullptr) {
CompactMemTable();
// 唤醒正在等待imm_归并的线程
background_work_finished_signal_.SignalAll();
}
mutex_.Unlock();
imm_micros += (env_->NowMicros() - imm_start);
}

Slice key = input->key();
if (compact->compaction->ShouldStopBefore(key) &&
compact->builder != nullptr) {
// 如果当前的归并以及覆盖了较多的level+2的文件,则停止继续归并
status = FinishCompactionOutputFile(compact, input);
if (!status.ok()) {
break;
}
}

bool drop = false;
if (!ParseInternalKey(key, &ikey)) {
current_user_key.clear();
has_current_user_key = false;
last_sequence_for_key = kMaxSequenceNumber;
} else {
if (!has_current_user_key ||
user_comparator()->Compare(ikey.user_key,
Slice(current_user_key)) != 0) {
// 归并过程中第一次见到该键值
current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
has_current_user_key = true;
last_sequence_for_key = kMaxSequenceNumber;
}

// 由于在Table中键值对按照键值递增,相同键值序号递减的方式排列,因此针对每个
// 键值需要至少保留一个最新的条目,其余的条目如果处在所有快照之前则可以删除
if (last_sequence_for_key <= compact->smallest_snapshot) {
drop = true; // (A)
} else if (ikey.type == kTypeDeletion &&
ikey.sequence <= compact->smallest_snapshot &&
compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
// 如果该键值对已经被标记为删除、该键值对不在某个快照之后、更高的level
// 没有该键值的条目(防止删除该条目之后由于更高level的键值导致删除的值重新出现)
// 之后的循环中有相同的键但是序号更小的值会由于仅保留最新的序号的键值对而被删除
drop = true;
}

last_sequence_for_key = ikey.sequence;
}

if (!drop) {
// 打开输出文件
if (compact->builder == nullptr) {
status = OpenCompactionOutputFile(compact);
if (!status.ok()) {
break;
}
}
if (compact->builder->NumEntries() == 0) {
compact->current_output()->smallest.DecodeFrom(key);
}
compact->current_output()->largest.DecodeFrom(key);
compact->builder->Add(key, input->value());

// 输出文件大小达到一定程度后关闭该文件
if (compact->builder->FileSize() >=
compact->compaction->MaxOutputFileSize()) {
status = FinishCompactionOutputFile(compact, input);
if (!status.ok()) {
break;
}
}
}

input->Next();
}

if (status.ok() && shutting_down_.Acquire_Load()) {
status = Status::IOError("Deleting DB during compaction");
}
if (status.ok() && compact->builder != nullptr) {
status = FinishCompactionOutputFile(compact, input);
}
if (status.ok()) {
status = input->status();
}
delete input;
input = nullptr;

...
}

总结

总结一下,LevelDB的Compaction的流程如下:

  • 用户手动调用CompactRange或由于某个level文件数过多等条件触发Compaction
  • Compaction的输入输出单位都是文件,因此会首先根据用户输入或compaction_pointer等信息确定归并范围
  • 归并过程中如果发现imm_被设置会优先将imm_写入level0中
  • 找到level及level+1所对应的与该次归并相关的文件
  • 将这些文件归并为新的Table文件置入level+1中,之后可能设置compaction_pointer指向下一次需要开始归并的位置