LevelDB-Interface

LevelDB源码分析

Intro

前两篇博客主要注重于LevelDB内部结构及Compaction的机制的分析,该篇博客主要介绍LevelDB对外提供的数个接口,及这些接口的实现,包括这些接口是如何与前述截至相互作用的。

DBImpl::Put

DBImpl::Put直接使用了DB::Put的默认实现,即直接构建一个仅有一个Put操作的WriteBatch,然后利用Write来完成操作。

DBImpl::Delete

DBImpl::Delete的实现与DBImpl::Put类似,直接构建一个仅有一个删除操作的WriteBatch,然后让Write完成其余工作。

DBImpl::Write

DBImpl::Write作为Put和Delete的实际实现,保证了WriteBatch中操作的原子性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
Writer w(&mutex_);
w.batch = my_batch;
w.sync = options.sync;
w.done = false;

// 构建一个写入的请求之后将该请求置入请求队列中,在等待Condition Variable时会解锁
// mutex_
MutexLock l(&mutex_);
writers_.push_back(&w);
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}
// 由于写入时writer可能自动合并为一个,因此该请求可能被其他的线程一并处理
if (w.done) {
return w.status;
}

// 为该请求在mem_中保留足够的空间,该函数会在稍后介绍
Status status = MakeRoomForWrite(my_batch == nullptr);
uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w;
// my_batch 如果为nullptr则表示该次写入是为了触发Compaction
if (status.ok() && my_batch != nullptr) {
// 为减少写入次数,一次可能会将多个writer合并为一个
WriteBatch* updates = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(updates, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(updates);

// 将对应的条目写入日志,然后插入mem_中
{
mutex_.Unlock();
status = log_->AddRecord(WriteBatchInternal::Contents(updates));
bool sync_error = false;
if (status.ok() && options.sync) {
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
if (status.ok()) {
status = WriteBatchInternal::InsertInto(updates, mem_);
}
mutex_.Lock();
if (sync_error) {
RecordBackgroundError(status);
}
}
if (updates == tmp_batch_) tmp_batch_->Clear();

versions_->SetLastSequence(last_sequence);
}

// 唤醒所有在该次写入中被处理的写入请求
while (true) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}

// 唤醒下一个等待的writer
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}

return status;
}

DBImpl::MakeRoomForWrite

该函数负责在mem_中有足够的空间来处理该写入请求,如果mem_已满则会创建新的mem_。如果该次请求是用户显式要求的,即由Compaction而来,则该请求需要立刻完成。其参数force表示是否需要将mem_写入level0,哪怕在mem_已有足够的空间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
Status DBImpl::MakeRoomForWrite(bool force) {
mutex_.AssertHeld();
assert(!writers_.empty());
bool allow_delay = !force;
Status s;
while (true) {
if (!bg_error_.ok()) {
s = bg_error_;
break;
} else if (
allow_delay &&
versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {
// 当level0的文件数量较多时会放缓当前写入的处理速度,从而让compaction的线程
// 有机会来进行compaction,从而减少level0的文件数目。因而避免下述versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger
// 触发的次数,因为如果那个条件触发会导致该请求一直等待知道Compaction线程
// 完成工作,这可能为某一个请求带来较高的延迟
mutex_.Unlock();
env_->SleepForMicroseconds(1000);
allow_delay = false; // Do not delay a single write more than once
mutex_.Lock();
} else if (!force &&
(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
// 当前mem_中仍有足够空间
break;
} else if (imm_ != nullptr) {
// 之前的imm_仍然存在,即上次的Table尚未写入level0
background_work_finished_signal_.Wait();
} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
// level0文件过多,需要等待Compaction线程完成其工作
background_work_finished_signal_.Wait();
} else {
// 创建新的mem_,并将已有的mem_赋值给imm_,从而让compaction线程将imm_写入
// level0,同时创建新的日志文件
assert(versions_->PrevLogNumber() == 0);
uint64_t new_log_number = versions_->NewFileNumber();
WritableFile* lfile = nullptr;
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
if (!s.ok()) {
// Avoid chewing through file number space in a tight loop.
versions_->ReuseFileNumber(new_log_number);
break;
}
delete log_;
delete logfile_;
logfile_ = lfile;
logfile_number_ = new_log_number;
log_ = new log::Writer(lfile);
imm_ = mem_;
has_imm_.Release_Store(imm_);
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
force = false;
MaybeScheduleCompaction();
}
}
return s;
}

DBImpl::BuildBatchGroup

为了减少多次写入,LevelDB会将多个writer合并为一个,然后一次性写入,从而提升写入的效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
mutex_.AssertHeld();
assert(!writers_.empty());
Writer* first = writers_.front();
WriteBatch* result = first->batch;
assert(result != nullptr);

size_t size = WriteBatchInternal::ByteSize(first->batch);

// 将多个writer合并为一个时会限制该writer不会增长的过大
size_t max_size = 1 << 20;
if (size <= (128<<10)) {
max_size = size + (128<<10);
}

*last_writer = first;
std::deque<Writer*>::iterator iter = writers_.begin();
++iter;
for (; iter != writers_.end(); ++iter) {
Writer* w = *iter;
if (w->sync && !first->sync) {
// 不会在异步处理的写操作中包含一个同步的写操作
break;
}

if (w->batch != nullptr) {
size += WriteBatchInternal::ByteSize(w->batch);
if (size > max_size) {
// 确保writer的group不会增长的过大
break;
}

if (result == first->batch) {
// 将所有的更新放入tmp_batch_
result = tmp_batch_;
assert(WriteBatchInternal::Count(result) == 0);
WriteBatchInternal::Append(result, first->batch);
}
WriteBatchInternal::Append(result, w->batch);
}
*last_writer = w;
}
return result;
}

DBImpl::Get

Get会从当前的数据库中查找某个键对应的值,查找过程中可能使用快照来限制查找的范围。查找时会首先查找mem_与imm_,这两个表代表了所有尚未写入不同level的信息,仅当这两张表中未查找到对应的值后才会进入文件进行查找。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
Status DBImpl::Get(const ReadOptions& options,
const Slice& key,
std::string* value) {
Status s;
MutexLock l(&mutex_);
SequenceNumber snapshot;
// 确定该次查找所指定的序号,查找时会返回小于该序号且值最大的那个结果
if (options.snapshot != nullptr) {
snapshot =
static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
} else {
snapshot = versions_->LastSequence();
}

MemTable* mem = mem_;
MemTable* imm = imm_;
Version* current = versions_->current();
mem->Ref();
if (imm != nullptr) imm->Ref();
current->Ref();

bool have_stat_update = false;
Version::GetStats stats;

{
mutex_.Unlock();
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s)) {
// 查找mem_
} else if (imm != nullptr && imm->Get(lkey, value, &s)) {
// 查找imm_
} else {
// 在当前Version所包含的各个level的文件中进行查找
s = current->Get(options, lkey, value, &stats);
have_stat_update = true;
}
mutex_.Lock();
}

// Get可能触发seek compaction
if (have_stat_update && current->UpdateStats(stats)) {
MaybeScheduleCompaction();
}
mem->Unref();
if (imm != nullptr) imm->Unref();
current->Unref();
return s;
}

DBImpl::GetSnapshot

创建一个新的快照仅会在LevelDB中记录该快照对应的序号,从而保证在这个序号之后的修改不会被合并和丢弃。

1
2
3
4
const Snapshot* DBImpl::GetSnapshot() {
MutexLock l(&mutex_);
return snapshots_.New(versions_->LastSequence());
}

DBImpl::ReleaseSnapshot

删除一个快照并不会直接触发所有因快照而产生的重复的值的删除,删除的过程会在之后的归并过程中进行。

1
2
3
4
void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) {
MutexLock l(&mutex_);
snapshots_.Delete(static_cast<const SnapshotImpl*>(snapshot));
}

DBImpl::CompactRange

该函数用于手动触发一次Compaction。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
// 在max_level_with_files之后的各个level没有文件的键值范围与该范围重叠
int max_level_with_files = 1;
{
MutexLock l(&mutex_);
Version* base = versions_->current();
for (int level = 1; level < config::kNumLevels; level++) {
if (base->OverlapInLevel(level, begin, end)) {
max_level_with_files = level;
}
}
}
// 将mem_写入level0
TEST_CompactMemTable();
for (int level = 0; level < max_level_with_files; level++) {
// 将每层的与该键值范围相关的文件归并到下一层,期望仅在最高的一层有该键值
TEST_CompactRange(level, begin, end);
}
}

DBImpl::TEST_CompactMemTable

调度compaction将mem_写入level0中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Status DBImpl::TEST_CompactMemTable() {
// nullptr batch means just wait for earlier writes to be done
// 等待所有已有的writer完成写入,之后利用MakeRoomForWrite将mem写入level0的table中
Status s = Write(WriteOptions(), nullptr);
if (s.ok()) {
// 等待Compaction的完成
MutexLock l(&mutex_);
while (imm_ != nullptr && bg_error_.ok()) {
background_work_finished_signal_.Wait();
}
if (imm_ != nullptr) {
s = bg_error_;
}
}
return s;
}

DBImpl::TEST_CompactRange

该函数用于手动触发一次归并,尽量使某个键值范围内仅在一个level存在。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
void DBImpl::TEST_CompactRange(int level, const Slice* begin,
const Slice* end) {
assert(level >= 0);
assert(level + 1 < config::kNumLevels);

InternalKey begin_storage, end_storage;

// 设置归并范围
ManualCompaction manual;
manual.level = level;
manual.done = false;
if (begin == nullptr) {
manual.begin = nullptr;
} else {
begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
manual.begin = &begin_storage;
}
if (end == nullptr) {
manual.end = nullptr;
} else {
end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
manual.end = &end_storage;
}

MutexLock l(&mutex_);
while (!manual.done && !shutting_down_.Acquire_Load() && bg_error_.ok()) {
if (manual_compaction_ == nullptr) {
// 如果当前没有其他的手动归并的任务,则将该Compaction设置未当前任务,然后
// 调度Compaction
// 注意,在DBImpl::BackgroundCompaction中可能一次仅进行部分归并,而不是用户
// 指定的整个范围的归并,此时manual.done仍会设为false,因此会继续进行Compaction
// 的调度
manual_compaction_ = &manual;
MaybeScheduleCompaction();
} else {
// 如果有其他的归并任务则等待其完成
background_work_finished_signal_.Wait();
}
}
if (manual_compaction_ == &manual) {
manual_compaction_ = nullptr;
}
}

总结

这三篇博客主要介绍了LevelDB的内部结构、Compaction的实现以及各个接口的实现,但未深入到文件结构等细节。这也是我第一次写博客,还是有很多地方做的不好,希望这几篇博客可以帮助到希望了解或者打算阅读LevelDB源代码的人。我过几天会找一些时间重新整理一下这几篇博客。