LevelDB_Open

LevelDB源码分析

Intro

我们从LevelDB给定的示例入手对LevelDB的源代码进行分析,本篇仅分析其中的Open函数,该函数用于打开一个数据库。

1
2
3
4
5
6
7
8
9
leveldb::DB* db;  
leveldb::Options options;
options.create_if_missing = true;
leveldb::Status status = leveldb::DB::Open(options, "/tmp/testdb", &db);
std::string value;
leveldb::Status s = db->Get(leveldb::ReadOptions(), key1, &value);
if (s.ok()) s = db->Put(leveldb::WriteOptions(), key2, value);
if (s.ok()) s = db->Delete(leveldb::WriteOptions(), key1);
delete db;

Open

Open函数为DB类的一个静态函数。该函数负责打开一个数据库,由于DB仅为一个接口,其中定义了LevelDB所提供的接口,而实际的数据库实现放在了DBImpl类中,DBImpl由DB类继承而来。

首先需要介绍一下LevelDB的构成,LevelDB当前的状态由一个Version构成,每个Version都追踪着一系列用于构成数据库的文件,这些Version共同构成VersionSet,当某些文件被添加或者删除时就会触发Version之间的变换,Version之间的变换可以使用VersionEdit来记录不同Version之间的文件或其他元信息的差异。

接下来逐行分析Open这个函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
*dbptr = nullptr;

// 构建新的DBImpl对象
DBImpl* impl = new DBImpl(options, dbname);
impl->mutex_.Lock();
// VersionEdit用来记录LevelDB中Version之间的修改,包括Version之间有哪些
// 文件被添加/删除,当前Version的日志文件编号是多少等
VersionEdit edit;
// save_manifest用于记录是否需要更新MANIFEST文件
bool save_manifest = false;
// LevelDB从已有的文件中恢复信息
Status s = impl->Recover(&edit, &save_manifest);
// 如果LevelDB的信息恢复成功并且此时没有正在使用的日志文件,则创建日志文件和对应
// 的MemTable,其中MemTable使用引用计数来进行管理,该类会在之后进行介绍
if (s.ok() && impl->mem_ == nullptr) {
uint64_t new_log_number = impl->versions_->NewFileNumber();
WritableFile* lfile;
s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
&lfile);
if (s.ok()) {
edit.SetLogNumber(new_log_number);
impl->logfile_ = lfile;
impl->logfile_number_ = new_log_number;
impl->log_ = new log::Writer(lfile);
impl->mem_ = new MemTable(impl->internal_comparator_);
impl->mem_->Ref();
}
}
// 如果Version信息发生了变动,则利用VersionEdit将Version之间的变动应用到Version
// 使用LogAndApply会生成新的Version,并将该Version加入VersionSet中
if (s.ok() && save_manifest) {
edit.SetPrevLogNumber(0); // No older logs needed after recovery.
edit.SetLogNumber(impl->logfile_number_);
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
}
if (s.ok()) {
// 删除无用的文件,该函数会在之后的博客中介绍
impl->DeleteObsoleteFiles();
// 进行Merge
impl->MaybeScheduleCompaction();
}
impl->mutex_.Unlock();
if (s.ok()) {
assert(impl->mem_ != nullptr);
*dbptr = impl;
} else {
delete impl;
}
return s;
}

DBImpl::Recover

从已有的文件中恢复数据库的信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
Status DBImpl::Recover(VersionEdit* edit, bool *save_manifest) {
mutex_.AssertHeld();

// 该数据库对应的文件夹,如果该数据库已经存在则文件夹创建失败
env_->CreateDir(dbname_);
assert(db_lock_ == nullptr);
// 使用文件锁防止多个进程对同一个数据库的并发访问
Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
if (!s.ok()) {
return s;
}

// 根据是否存在CURRENT文件判断是否已经存在数据库,如果不存在调用NewDB创建
// 新的数据库,该函数会在之后介绍
if (!env_->FileExists(CurrentFileName(dbname_))) {
if (options_.create_if_missing) {
s = NewDB();
if (!s.ok()) {
return s;
}
} else {
return Status::InvalidArgument(
dbname_, "does not exist (create_if_missing is false)");
}
} else {
if (options_.error_if_exists) {
return Status::InvalidArgument(
dbname_, "exists (error_if_exists is true)");
}
}

// 从已有文件中恢复VersionSet的信息,该函数会在之后介绍VersionSet时介绍
s = versions_->Recover(save_manifest);
if (!s.ok()) {
return s;
}
SequenceNumber max_sequence(0);

// 由于部分操作可能已经记录进日志文件,但这些日志文件所记录的更改可能并未出现在各个Level中
// 从VersionSet中获取当前所有Version的所有Level中包含的文件。再从当前文件夹中
// 获取所有的日志文件。
...

std::sort(logs.begin(), logs.end());
for (size_t i = 0; i < logs.size(); i++) {
// 从日志文件中恢复信息,该函数会在之后介绍
s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
&max_sequence);
if (!s.ok()) {
return s;
}

// 将这个日志文件的编号标记为已用
versions_->MarkFileNumberUsed(logs[i]);
}

if (versions_->LastSequence() < max_sequence) {
versions_->SetLastSequence(max_sequence);
}

return Status::OK();
}

NewDB()

创建新的数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Status DBImpl::NewDB() {
// 编号为1的为MANIFEST文件,0为当前的日志文件
// 将当前的信息编码后写入MANIFEST文件,并设置CURRENT文件指向该
// MANIFEST文件
VersionEdit new_db;
new_db.SetComparatorName(user_comparator()->Name());
new_db.SetLogNumber(0);
new_db.SetNextFile(2);
new_db.SetLastSequence(0);

const std::string manifest = DescriptorFileName(dbname_, 1);
WritableFile* file;
Status s = env_->NewWritableFile(manifest, &file);
if (!s.ok()) {
return s;
}
{
log::Writer log(file);
std::string record;
new_db.EncodeTo(&record);
s = log.AddRecord(record);
if (s.ok()) {
s = file->Close();
}
}
delete file;
if (s.ok()) {
s = SetCurrentFile(env_, dbname_, 1);
} else {
env_->DeleteFile(manifest);
}
return s;
}

DBImpl::RecoverLogFile

由日志文件中恢复未被写入各个Level文件的更新。save_manifest用来标识是否有新的文件被加入各个Level,如果有的话则需要更新当前Version,进而需要更新MANIFEST文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
bool* save_manifest, VersionEdit* edit,
SequenceNumber* max_sequence) {
struct LogReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
const char* fname;
Status* status;
virtual void Corruption(size_t bytes, const Status& s) {
Log(info_log, "%s%s: dropping %d bytes; %s",
(this->status == nullptr ? "(ignoring error) " : ""),
fname, static_cast<int>(bytes), s.ToString().c_str());
if (this->status != nullptr && this->status->ok()) *this->status = s;
}
};

mutex_.AssertHeld();

// 打开log_number对应的日志文件
...

// 创建对应的Log Reader的Reporter
LogReporter reporter;
reporter.env = env_;
reporter.info_log = options_.info_log;
reporter.fname = fname.c_str();
reporter.status = (options_.paranoid_checks ? &status : nullptr);
log::Reader reader(file, &reporter, true, 0);
Log(options_.info_log, "Recovering log #%llu",
(unsigned long long) log_number);

// 利用Log Reader从日志中读取记录并添加入MemTable中
std::string scratch;
Slice record;
WriteBatch batch;
int compactions = 0;
MemTable* mem = nullptr;
while (reader.ReadRecord(&record, &scratch) && status.ok()) {
if (record.size() < 12) {
reporter.Corruption(
record.size(), Status::Corruption("log record too small"));
continue;
}
WriteBatchInternal::SetContents(&batch, record);

if (mem == nullptr) {
mem = new MemTable(internal_comparator_);
mem->Ref();
}
status = WriteBatchInternal::InsertInto(&batch, mem);
MaybeIgnoreError(&status);
if (!status.ok()) {
break;
}
// 更新写入时的SequenceNumber
const SequenceNumber last_seq =
WriteBatchInternal::Sequence(&batch) +
WriteBatchInternal::Count(&batch) - 1;
if (last_seq > *max_sequence) {
*max_sequence = last_seq;
}

// 如果MemTable的大小过大,将该MemTable生成Level0的Table
if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
compactions++;
*save_manifest = true;
status = WriteLevel0Table(mem, edit, nullptr);
mem->Unref();
mem = nullptr;
if (!status.ok()) {
break;
}
}
}

delete file;

// 可能重复使用最后的日志文件
if (status.ok() && options_.reuse_logs && last_log && compactions == 0) {
assert(logfile_ == nullptr);
assert(log_ == nullptr);
assert(mem_ == nullptr);
uint64_t lfile_size;
if (env_->GetFileSize(fname, &lfile_size).ok() &&
env_->NewAppendableFile(fname, &logfile_).ok()) {
Log(options_.info_log, "Reusing old log %s \n", fname.c_str());
log_ = new log::Writer(logfile_, lfile_size);
logfile_number_ = log_number;
if (mem != nullptr) {
mem_ = mem;
mem = nullptr;
} else {
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
}
}
}

if (mem != nullptr) {
// 将未写入Level0的Table的日志全部写入Level0的Table中
if (status.ok()) {
*save_manifest = true;
status = WriteLevel0Table(mem, edit, nullptr);
}
mem->Unref();
}

return status;
}

VersionEdit

VersionEdit用以修改当前Version的信息,包括文件编号、日志编号、新增/删除的文件等,该类提供了EncodeTo和DecodeFrom来与string进行相互转换。以下是VersionEdit提供的接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class VersionEdit {
public:
VersionEdit();
~VersionEdit();

void Clear();
void SetComparatorName(const Slice& name);
void SetLogNumber(uint64_t num);
void SetPrevLogNumber(uint64_t num);
void SetNextFile(uint64_t num);
void SetLastSequence(SequenceNumber seq);
void SetCompactPointer(int level, const InternalKey& key);
void AddFile(int level, uint64_t file,
uint64_t file_size,
const InternalKey& smallest,
const InternalKey& largest);
void DeleteFile(int level, uint64_t file);
void EncodeTo(std::string* dst) const;
Status DecodeFrom(const Slice& src);
std::string DebugString() const;
private:
...
};

Version

Version用于记录当前数据库所拥有的各个Level的文件以及一些有关Compaction的信息。该类还提供了接口用于在这些文件上进行查找。各个Version均使用引用计数来防止被过早的释放,当一个Version被加入VersionSet时会增加引用计数,当VersionSet的PickCompaction以及CompactRange选择一个Version作为输入时也会增加对应Version的引用计数,使用Builder来构建新的Version时也会增加相应的引用计数。

VersionSet

VersionSet用于管理当前数据库不同的Version,所有Version被串成一个双向链表。不同Version之间的变化可以通过VersionSet提供的LogAndApply来应用VersionEdit或者使用VersionSet内部提供的Builder来一次进行多个VersionEdit的变更。

接下来介绍一下DBImpl::Recover中使用的VersionSet::Recover函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
Status VersionSet::Recover(bool *save_manifest) {
struct LogReporter : public log::Reader::Reporter {
...
};

// 利用CURRENT文件读取当前的MANIFEST文件
...

bool have_log_number = false;
bool have_prev_log_number = false;
bool have_next_file = false;
bool have_last_sequence = false;
uint64_t next_file = 0;
uint64_t last_sequence = 0;
uint64_t log_number = 0;
uint64_t prev_log_number = 0;
Builder builder(this, current_);

{
...
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
VersionEdit edit;
s = edit.DecodeFrom(record);
if (s.ok()) {
// 检测Comparator是否一致
}

if (s.ok()) {
builder.Apply(&edit);
}

if (edit.has_log_number_) {
log_number = edit.log_number_;
have_log_number = true;
}

if (edit.has_prev_log_number_) {
prev_log_number = edit.prev_log_number_;
have_prev_log_number = true;
}

if (edit.has_next_file_number_) {
next_file = edit.next_file_number_;
have_next_file = true;
}

if (edit.has_last_sequence_) {
last_sequence = edit.last_sequence_;
have_last_sequence = true;
}
}
}
delete file;
file = nullptr;

if (s.ok()) {
if (!have_next_file) {
s = Status::Corruption("no meta-nextfile entry in descriptor");
} else if (!have_log_number) {
s = Status::Corruption("no meta-lognumber entry in descriptor");
} else if (!have_last_sequence) {
s = Status::Corruption("no last-sequence-number entry in descriptor");
}

if (!have_prev_log_number) {
prev_log_number = 0;
}

MarkFileNumberUsed(prev_log_number);
MarkFileNumberUsed(log_number);
}

if (s.ok()) {
// 利用Builder来一次性应用所有MANIFEST文件中记录的Version变化情况
Version* v = new Version(this);
builder.SaveTo(v);
// 将Version添加到VersionSet中
Finalize(v);
AppendVersion(v);
manifest_file_number_ = next_file;
next_file_number_ = next_file + 1;
last_sequence_ = last_sequence;
log_number_ = log_number;
prev_log_number_ = prev_log_number;

// 如果旧的MANIFEST文件并不是太大可以复用旧的MANIFEST文件
if (ReuseManifest(dscname, current)) {
// No need to save new manifest
} else {
*save_manifest = true;
}
}

return s;
}

总结

Open函数的流程可以概括为如下步骤:

  • 构造DBImpl对象
  • 从已有文件中恢复信息
    • 根据是否存在CURRENT文件判断是否是已有的数据库,若没有则创建新数据库
    • 从已有文件中恢复VersionSet信息
      • 利用CURRENT找到当前MANIFEST文件
      • 从MANIFEST文件中获取所有VersionEdit的信息
      • 利用所有的VersionEdit信息来生成当前Version的信息
      • 找到所有日志文件
      • 从所有日志文件中读取所有记录,插入一个MemTable中,该MemTable大小超过定值后会写入Level0的Table中
      • 最后一个日志文件可能被复用,其对应的MemTable会作为当前数据库的MemTable
      • MANIFEST文件可能也会复用,如果不复用则在第一次调用LogAndApply时会创建新的MANIFEST文件
  • 如果没有可以复用的日志文件及MemTable,则创建新的日志文件及MemTable
  • 如果恢复过程中生成了新的文件,即生成了新的Level0的Table文件或者需要新的MANIFEST文件,则将VersionEdit记录至VersionSet中
  • 删除无用文件
  • 调度Compaction