0%

Doris 如何实现vertical compaction

Apache Doris原先的compaction策略是horizontal compaction, 也就是说在读取不同版本数据做merge的这一步是把整行数据全部读到内存中进行聚合后再写入到compaction output之中的, 这样的方式面对大宽表场景会很容易把内存打满导致失败. Doris在最近的版本中引入了vertical compaction, 从整行一起compaction变成了按照列组的方式对不同的组进行compaction, 大大降低了compaction的内存消耗. 本文主要是根据代码解析到底是如何做到的. 本文不会涉及rowset的选择策略, 也不会涉及merge时读取数据聚合的过程,主要关注的是如何写不同的segment.

调用路径

compaction的触发方式有两种, 手动http触发, bg线程轮询触发. 本文主要介绍后者,前者比较简单,直接从compaction_action.cpp入手即可.

首先compaction任务是由_compaction_tasks_producer_thread周期性扫描磁盘上的tablets的情况后调用StorageEngine::_submit_compaction_task产生的. 对于每一个被挑选的tablet,会调用Tablet::prepare_compaction_and_calculate_permits计算其所需要的permits. 这一步具体的逻辑是各个compaction子类的prepare_compacte.

prepare_compacte主要的逻辑是从这个tablet下属的所有rowsets中挑选最合适的连续的一连串rowset(根据不同的compaction类型会有不同的策略)进行compaction. 刚刚提到的compaction需要的permits则是所有被挑选出来的rowsets的compaction score之和.

在顺利执行完上面的步骤后即可向不同的compaction线程池提交对应的任务, 任务主要的逻辑是调用对应tablet实例的execute_compaction方法. 而这个方法的本质是调用execute_compact方法进而调用不同compaction子类实现execute_compaction_impl().

这些execute_compaction_impl()方法之中都是先上锁然后计算compaction需要的permits然后调用Compaction::do_compaction(int64_t permits)方法. 其实这个方法的逻辑则是调用do_compaction_impl(int64_t permits)(如果开启了checksum配置则还会在调用前后执行检查checksum).

do_compaction_impl(int64_t permits)

这个函数的逻辑很长, 一点一点来解析他干了什么

linenums
1
2
3
4
5
6
Status Compaction::do_compaction_impl(int64_t permits) {
if (handle_ordered_data_compaction()) {
// 检查segment间是否无重叠
Compaction::do_compact_ordered_rowsets();
}
}

如果被选取的rowset之中的数据彼此间是顺序的(比如读取segment里面的zonemap发现min max区间是无重复的)并且segment文件大小合适, 那就可以很方便地compaction起来甚至不需要读取每个segment的数据聚合起来重新写到一个新的大的segment文件中. 可以简单的将input rowset下的segment文件硬链接到output rowset下使用新的segment id即可.

1
2
3
4
5
6
7
8
9
10
11
12
Status Compaction::do_compact_ordered_rowsets() {
construct_output_rowset_writer();
auto seg_id = 0; // 新的目录下seg文件的新名字
for (const auto& rowset: _input_rowsets) {
rowset->link_files_to(_tablet->tablet_path(), _output_rs_writer->rowset_id(), seg_id);
seg_id += rowset->num_segments();
// 记录rowset的key bounds
}
// 向output rowset的meta信息里写数据
// 包括刚刚记录的key bounds
// 数据大小等
}
linenums
1
2
3
4
5
6
7
8
9
10
11
12
13
14
Status Compaction::do_compaction_impl(int64_t permits) {
// 处理ordered compaction

construct_input_rowset_reader(); // 从所有input rowset上构建reader, 之后需要读所有rs的数据聚合才是output的数据
construct_output_rowset_writer();

if (vertical_compaction) {
Merger::vertical_merge_rowsets(_tablet, compaction_type(), _cur_tablet_schema,
_input_rs_readers, _output_rs_writer.get(),
get_avg_segment_rows(), &stats);
} else {
// 非vertical compaction的逻辑就不梳理了, 比较直观(当然代码不一定直观==)
}
}

接下来是vertical compaction的逻辑到底是怎么走的

vertical_merge_rowsets

第一步先看整个逻辑的入口也就是vertical_merge_rowsets这个函数, 先分析下他的入参

1
2
3
4
5
Status Merger::vertical_merge_rowsets(TabletSharedPtr tablet, ReaderType reader_type,
TabletSchemaSPtr tablet_schema,
const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment,
Statistics* stats_output)

第一个tablet表示是哪个tablet被选出来做compaction, 因为会涉及到修改元数据所以需要拿到tablet并获取其中的锁. 第二个reader_type中记录的是不同的compaction的类型, tablet_schema记录表的schema属性(比如key类型, 列的数量)之后切分列组时也会用到. src_rowset_readers里记录的是之前prepare_compaction中选出的所有参与compaction的rowset, 这些rowset对应不同版本的数据. dst_rowset_writer是用来构造compaction的结果rowset的其表示的范围是src_rowset_reader中的最小值到最大值的区间, max_rows_per_segment是最后生成的output rowset下的每个segment文件之中最多有多少行数据, stats_output里面记录的是compaction的stats包括merge了多少列,结果是多少列等.

接下来的逻辑中第一步是划分列, 我们考虑下为什么需要划分. 在建表的时候用户可以指定dup key, uniq key, agg key等key列, 每一行数据在逻辑上都是通过key列来辨识的, key列不同的时候就一定不是同一行. 在进行compaction的时候为了保证数据不重不漏, 需要先将key列组进行compaction, 这样整行的数据都是按照key列在遍历就不会有遗漏. 而key列的顺序也表示了最终数据的顺序. 在拆分出key列之后僧下的value列按照config::vertical_compaction_num_columns_per_group的大小为一组分成多个组.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// tablet_schema中拿到表的key列和列数等属性, column_groups中记录的是不同列组的下标集合
void Merger::vertical_split_columns(TabletSchemaSPtr tablet_schema,
std::vector<std::vector<uint32_t>>* column_groups) {
uint32_t num_key_cols = tablet_schema->num_key_columns();
uint32_t total_cols = tablet_schema->num_columns();
std::vector<uint32_t> key_columns;
// 前面几列都是key
for (auto i = 0; i < num_key_cols; ++i) {
key_columns.emplace_back(i);
}

// 根据不同key类型进行不同的处理

std::vector<uint32_t> value_columns;
// 按照个数拆分个多个group
for (auto i = num_key_cols; i < total_cols; ++i) {
if ((i - num_key_cols) % config::vertical_compaction_num_columns_per_group == 0) {
column_groups->emplace_back();
}
column_groups->back().emplace_back(i);
}
}

接下来的一行代码vectorized::RowSourcesBuffer row_sources_buf(tablet->tablet_id(), tablet->tablet_path(),reader_type);的作用目前还不明显, 我们可以暂时简单地理解为他是用来记录行的. 而这个类的实现可以概括如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class RowSourcesBuffer {
public:
RowSourcesBuffer(int64_t tablet_id, const std::string& tablet_path, ReaderType reader_type)
: _tablet_id(tablet_id),
_tablet_path(tablet_path),
_reader_type(reader_type),
_buffer(ColumnUInt16::create()) {}

~RowSourcesBuffer() {
_reset_buffer();
if (_fd > 0) {
::close(_fd);
}
}

// write batch row source
Status append(const std::vector<RowSource>& row_sources);
Status flush();

RowSource current() {
DCHECK(_buf_idx < _buffer->size());
return RowSource(_buffer->get_element(_buf_idx));
}
void advance(int32_t step = 1) {
DCHECK(_buf_idx + step <= _buffer->size());
_buf_idx += step;
}

private:
Status _create_buffer_file();
Status _serialize();
Status _deserialize();
void _reset_buffer() {
_buffer->clear();
_buf_idx = 0;
}

int64_t _tablet_id;
std::string _tablet_path;
ReaderType _reader_type = ReaderType::UNKNOWN;
uint64_t _buf_idx = 0;
int _fd = -1;
ColumnUInt16::MutablePtr _buffer;
uint64_t _total_size = 0;
};

里面存储的是多个RowSource, 每个rowsource之中是一块连续内存(顾名思义保存row的), 每次调用append将多个RowSource写到MutablePtr _buffer之中. 如果超过了_buffer的大小则会先序列化到磁盘并从_buffer头部开始继续写入.

接下来的逻辑是对于每一个column group进行一次merge, 也就是按照多列一起来merge. 这里也是和horizontal compaction的最大区别, 按照列组来进行merge显而易见的好处是内存压力会小很多, 假设列大小是均匀分布的, 平均每一行的内存消耗几乎只有最大列组数大小/列数(当然实际情况里肯定是非常不均的).

1
2
3
4
5
6
7
8
9
10
for (auto i = 0; i < column_groups.size(); ++i) {
bool is_key = (i == 0);
RETURN_IF_ERROR(vertical_compact_one_group(
tablet, reader_type, tablet_schema, is_key, column_groups[i], &row_sources_buf,
src_rowset_readers, dst_rowset_writer, max_rows_per_segment, stats_output));
if (is_key) {
RETURN_IF_ERROR(row_sources_buf.flush());
}
RETURN_IF_ERROR(row_sources_buf.seek_to_begin());
}

深入到vertical_compact_one_group之中, 传递进来的参数主要关注tablet_schema, src_rowset_reader, dst_rowset_writer的使用.
row_source_buf作为将数据读到内存时的内存缓冲传递给VerticalBlockReader的构造函数之中, 之后VerticalBlockReader读取input_rowset并聚合出数据后按照堆排序排序存入row_source_buf之中, 下面按照VerticalHeapMergeIterator::next_batch的部分代码为例说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
Status VerticalHeapMergeIterator::next_batch(Block* block) {
size_t row_idx = 0;
VerticalMergeIteratorContext* pre_ctx = nullptr;
std::vector<RowSource> tmp_row_sources; // 读取的每一行数据
// 下面是按照堆排序读取row
while (_get_size(block) < _block_row_max) {
auto ctx = _merge_heap.top();
_merge_heap.pop();
// 先收集多个row
if (ctx->is_same() &&
(_keys_type == KeysType::UNIQUE_KEYS || _keys_type == KeysType::AGG_KEYS)) {
// skip cur row, copy pre ctx
++_merged_rows;
} else {
ctx->add_cur_batch();
if (pre_ctx != ctx) {
if (pre_ctx) {
RETURN_IF_ERROR(pre_ctx->copy_rows(block));
}
pre_ctx = ctx;
}
if (ctx->is_cur_block_finished() || row_idx >= _block_row_max) {
// current block finished, ctx not advance
// so copy start_idx = (_index_in_block - _cur_batch_num + 1)
RETURN_IF_ERROR(ctx->copy_rows(block, false));
pre_ctx = nullptr;
}
}

RETURN_IF_ERROR(ctx->advance());
if (ctx->valid()) {
_merge_heap.push(ctx);
} else {
// push next iterator in same rowset into heap
}
}
// 这里就是将读取出来的row都放进`RowSourceBuf`之中
RETURN_IF_ERROR(_row_sources_buf->append(tmp_row_sources));
if (!_merge_heap.empty()) {
return Status::OK();
}
return Status::EndOfFile("no more data in segment");
}

现在我们回到vertical_compact_one_group之中, 刚刚说了row_source_buf的用处,现在来说一下这个VerticalBlockReader, 这个reader实际上是用来读取从input rowset之中数据的,读取出来的数据都放到了刚刚提到的row_source_buf之中, 既然要读取数据这里就涉及到数据的schema的问题. 比如考虑这样的情况, 一开始有colA, colB然后插入数据(1,2), 之后delete where colB = 2, 然后drop掉columnB, 现在add column columnB. 新的column B和之前的column B是不同的.

在往下有一行reader_params.return_columns = column_group;, 因为vertical compaction的逻辑是按照列组进行compaction的也就是说读取数据的时候只需要读取某几列即可,具体读取哪些列的数据就是靠reader_params.return_columns决定.

接下来是读取聚合后的数据并且写入output rowset的逻辑.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
size_t output_rows = 0;
bool eof = false;
while (!eof && !StorageEngine::instance()->stopped()) {
// Read one block from block reader
// reader通过reader_params.set_read_source拿到了所有input rowset
// 然后根据不同的key类型选择不同的聚合方式, 每次读取一个block的数据
RETURN_NOT_OK_STATUS_WITH_WARN(reader.next_block_with_aggregation(&block, &eof),
"failed to read next block when merging rowsets of tablet " +
std::to_string(tablet->tablet_id()));
// 这个函数名字叫add_columns, 但其实可能叫add_columns_block_data更直观一点
// 主要作用是将这个block写入segment文件中
RETURN_NOT_OK_STATUS_WITH_WARN(
dst_rowset_writer->add_columns(&block, column_group, is_key, max_rows_per_segment),
"failed to write block when merging rowsets of tablet " +
std::to_string(tablet->tablet_id()));

output_rows += block.rows();
block.clear_column_data();
}

从刚刚的代码逻辑里可以看到这个while循环会将input rowset里对应column_group的所有数据全部读出来然后写进output rowset之中. 这里要注意下只读取了对应column_group并写入. 这里也是为什么vertical compaction消耗的内存会低很多. 感兴趣的读者可以看一下Merger::vmerge_rowsets的实现,基本的逻辑几乎一模一样,最大的区别是一个读取的数据是row的部分,一个是整个row全部会读取.

到目前为止我们从merger的角度大概梳理了代码的流程,接下来我们看一下vertical_beta_rowset_writer的实现来理解vertical compaction的segment文件的数据写入.

vertical_beta_rowset_writer

vertical_beta_rowset_writer::add_columns

在vertical compaction中写入数据其实就是调用了这个add_columns接口, 我们从这个函数开始分析具体实现.
add_columns需要判断是否某一个column group写满了一个segment的对应区域, 如果写满了需要创建一个新的segment或者切换到下一个segment,但是又因为一个segment里面需要写多个column group, 所以得把segment都保留着,在最后一个 column group写完后才全部下刷持久化产生可用的文件.
先写key column group, 同时也只有key column group可能会创建新的segment.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
Status VerticalBetaRowsetWriter::add_columns(const vectorized::Block* block,
const std::vector<uint32_t>& col_ids, bool is_key,
uint32_t max_rows_per_segment) {
// 检查rows数以及更新每个segment最多有多少rows

if (_segment_writers.empty()) {
// 如果一个segment 都没有就创建一个新的segment并写入数据
RETURN_IF_ERROR(_create_segment_writer(col_ids, is_key, &writer));
_cur_writer_idx = 0;
RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->append_block(block, 0, num_rows));
} else if (is_key) {
// 如果当前的segment写过的行数已经满了
if (_segment_writers[_cur_writer_idx]->num_rows_written() > max_rows_per_segment) {
// segment is full, need flush columns and create new segment writer
RETURN_IF_ERROR(_flush_columns(&_segment_writers[_cur_writer_idx], true));
// 创建一个新的segment并跳到对应的新segment
RETURN_IF_ERROR(_create_segment_writer(col_ids, is_key, &writer));
++_cur_writer_idx;
}
RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->append_block(block, 0, num_rows));
} else {
if (_cur_writer_idx == 0 && num_rows_written == 0) {
// init的作用是更新segment中对应的column id(其实就是上一轮column group写完了需要换到下一个column group了)
RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->init(col_ids, is_key));
}
// 如果会将当前segment写满(也就是rows超过限制),那为了防止溢出智只会写一部分数据
// 并且写完后切换到下一个segment继续写
if (num_rows_written + num_rows >= num_rows_key_group &&
_cur_writer_idx < _segment_writers.size() - 1) {
RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->append_block(
block, 0, num_rows_key_group - num_rows_written));
// 切换前需要flush写将当前segment的写flush了来
RETURN_IF_ERROR(_flush_columns(&_segment_writers[_cur_writer_idx]));
start_offset = num_rows_key_group - num_rows_written;
limit = num_rows - start_offset;
++_cur_writer_idx;
// switch to next writer
RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->init(col_ids, is_key));
num_rows_written = 0;
num_rows_key_group = _segment_writers[_cur_writer_idx]->row_count();
}
// 这里既可能是刚切换完segment也可能是这次写入不会写满segment
if (limit > 0) {
RETURN_IF_ERROR(
_segment_writers[_cur_writer_idx]->append_block(block, start_offset, limit));
DCHECK(_segment_writers[_cur_writer_idx]->num_rows_written() <=
_segment_writers[_cur_writer_idx]->row_count());
}
}

}

接下来简单说一下创建segment writer的逻辑, Doris中数据文件实际上是通过segment在表示的,一个segment上的数据是按照列存形式管理的,所以创建segment实际上需要创建一个对应的file以及file writer. 另外在vertical compaction中由于每次写segment是按照列组在写的,所以对于一个segment其每次写入表示的column的id也是有所不同的,所以存在多次init的情况. 其实不同的列对应的就是不同的column writer,每一次init根据传递进去的column id都会构造新的column writer.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Status VerticalBetaRowsetWriter::_create_segment_writer(
const std::vector<uint32_t>& column_ids, bool is_key,
std::unique_ptr<segment_v2::SegmentWriter>* writer) {
// segment 文件的路径
auto path =
BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, _num_segment++);
auto fs = _rowset_meta->fs();
io::FileWriterPtr file_writer;
fs->create_file(path, &file_writer);
writer->reset(new segment_v2::SegmentWriter(
file_writer.get(), _num_segment, _context.tablet_schema, _context.tablet,
_context.data_dir, _context.max_rows_per_segment, writer_options, nullptr));
// 因为一个rowset可能有多个segment, 所以都通过_file_writer管理
{
std::lock_guard<SpinLock> l(_lock);
_file_writers.push_back(std::move(file_writer));
}

auto s = (*writer)->init(column_ids, is_key);
}

vertical_beta_rowset_writer::flush_columns

上文有说到,vertical compaction是按照column group在进行数据的读取和写入的,但是考虑到Doris的数据表示是通过segment进行的,一个segment上就应该有同一行的所有数据,自然而然会想到一个问题,假设segment数量有多个,那么按照column group行读取部分列的方式进行写一定会出现一个segment交替的过程. 比如说一共1000行数据,有2个column group(key group和value group), 分成2个segment. 那么先按照key group写segment1写了500行数据,之后写segment2写了500行数据,之后切换到value group从segment1开始写,这里一定有一个segment遍历的过程. 查看vertical_beta_rowset_writer的实现会发现一个std::vector<std::unique_ptr<segment_v2::SegmentWriter>> _segment_writers;size_t _cur_writer_idx = 0;后者便是用来迭代遍历segment的下标.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Status VerticalBetaRowsetWriter::flush_columns(bool is_key) {
RETURN_IF_ERROR(_flush_columns(&_segment_writers[_cur_writer_idx], is_key));
// 下一次从第一个segment开始写入
_cur_writer_idx = 0;
return Status::OK();
}
Status VerticalBetaRowsetWriter::_flush_columns(
std::unique_ptr<segment_v2::SegmentWriter>* segment_writer, bool is_key) {
uint64_t index_size = 0;
RETURN_IF_ERROR((*segment_writer)->finalize_columns_data());
RETURN_IF_ERROR((*segment_writer)->finalize_columns_index(&index_size));
if (is_key) {
_total_key_group_rows += (*segment_writer)->row_count();
// record segment key bound, 计算zonemap的值
KeyBoundsPB key_bounds;
Slice min_key = (*segment_writer)->min_encoded_key();
Slice max_key = (*segment_writer)->max_encoded_key();
key_bounds.set_min_key(min_key.to_string());
key_bounds.set_max_key(max_key.to_string());
_segments_encoded_key_bounds.emplace_back(key_bounds);
}
_total_index_size +=
static_cast<int64_t>(index_size) + (*segment_writer)->get_inverted_index_file_size();
return Status::OK();
}

现在回头看一下vertical compaction的代码,可以发现在Merger::vertical_compact_one_group中最后一行调用了VerticalBetaRowsetWriter::_flush_columns方法,也就是在每一个column group结束时将segment从0开始. 用一个伪代码表示则是

1
2
3
4
5
6
7
8
9
10
11
12
13
Merger::vertical_merge_rowsets() {
for (column_group: column_groups) {
while (!eof) {
reader.next_block_with_aggregation();
dst_rowset_writer->add_columns(column_group);
}
dst_rowset_writer->flush_columns();
}
dst_rowset_writer->final_flush();
}

// final_flush中对每个segment调用finalize footer写入每个column的元数据信息, 在rowset_writer->build()时
// 持久所有数据