Apache Doris原先的compaction策略是horizontal compaction, 也就是说在读取不同版本数据做merge的这一步是把整行数据全部读到内存中进行聚合后再写入到compaction output之中的, 这样的方式面对大宽表场景会很容易把内存打满导致失败. Doris在最近的版本中引入了vertical compaction, 从整行一起compaction变成了按照列组的方式对不同的组进行compaction, 大大降低了compaction的内存消耗. 本文主要是根据代码解析到底是如何做到的. 本文不会涉及rowset的选择策略, 也不会涉及merge时读取数据聚合的过程,主要关注的是如何写不同的segment.
调用路径 compaction的触发方式有两种, 手动http触发, bg线程轮询触发. 本文主要介绍后者,前者比较简单,直接从compaction_action.cpp
入手即可.
首先compaction任务是由_compaction_tasks_producer_thread周期性扫描磁盘上的tablets的情况后调用StorageEngine::_submit_compaction_task
产生的. 对于每一个被挑选的tablet,会调用Tablet::prepare_compaction_and_calculate_permits
计算其所需要的permits. 这一步具体的逻辑是各个compaction子类的prepare_compacte
.
prepare_compacte
主要的逻辑是从这个tablet下属的所有rowsets中挑选最合适的连续的一连串rowset(根据不同的compaction类型会有不同的策略)进行compaction. 刚刚提到的compaction需要的permits则是所有被挑选出来的rowsets的compaction score之和.
在顺利执行完上面的步骤后即可向不同的compaction线程池提交对应的任务, 任务主要的逻辑是调用对应tablet实例的execute_compaction
方法. 而这个方法的本质是调用execute_compact
方法进而调用不同compaction子类实现execute_compaction_impl()
.
这些execute_compaction_impl()
方法之中都是先上锁然后计算compaction需要的permits然后调用Compaction::do_compaction(int64_t permits)
方法. 其实这个方法的逻辑则是调用do_compaction_impl(int64_t permits)
(如果开启了checksum配置则还会在调用前后执行检查checksum).
do_compaction_impl(int64_t permits) 这个函数的逻辑很长, 一点一点来解析他干了什么
linenums 1 2 3 4 5 6 Status Compaction::do_compaction_impl (int64_t permits) { if (handle_ordered_data_compaction ()) { Compaction::do_compact_ordered_rowsets (); } }
如果被选取的rowset之中的数据彼此间是顺序的(比如读取segment里面的zonemap发现min max区间是无重复的)并且segment文件大小合适, 那就可以很方便地compaction起来甚至不需要读取每个segment的数据聚合起来重新写到一个新的大的segment文件中. 可以简单的将input rowset下的segment文件硬链接到output rowset下使用新的segment id即可.
1 2 3 4 5 6 7 8 9 10 11 12 Status Compaction::do_compact_ordered_rowsets () { construct_output_rowset_writer (); auto seg_id = 0 ; for (const auto & rowset: _input_rowsets) { rowset->link_files_to (_tablet->tablet_path (), _output_rs_writer->rowset_id (), seg_id); seg_id += rowset->num_segments (); } }
linenums 1 2 3 4 5 6 7 8 9 10 11 12 13 14 Status Compaction::do_compaction_impl (int64_t permits) { construct_input_rowset_reader (); construct_output_rowset_writer (); if (vertical_compaction) { Merger::vertical_merge_rowsets (_tablet, compaction_type (), _cur_tablet_schema, _input_rs_readers, _output_rs_writer.get (), get_avg_segment_rows (), &stats); } else { } }
接下来是vertical compaction的逻辑到底是怎么走的
vertical_merge_rowsets 第一步先看整个逻辑的入口也就是vertical_merge_rowsets
这个函数, 先分析下他的入参
1 2 3 4 5 Status Merger::vertical_merge_rowsets (TabletSharedPtr tablet, ReaderType reader_type, TabletSchemaSPtr tablet_schema, const std::vector<RowsetReaderSharedPtr>& src_rowset_readers, RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, Statistics* stats_output)
第一个tablet表示是哪个tablet被选出来做compaction, 因为会涉及到修改元数据所以需要拿到tablet并获取其中的锁. 第二个reader_type中记录的是不同的compaction的类型, tablet_schema记录表的schema属性(比如key类型, 列的数量)之后切分列组时也会用到. src_rowset_readers里记录的是之前prepare_compaction中选出的所有参与compaction的rowset, 这些rowset对应不同版本的数据. dst_rowset_writer是用来构造compaction的结果rowset的其表示的范围是src_rowset_reader中的最小值到最大值的区间, max_rows_per_segment是最后生成的output rowset下的每个segment文件之中最多有多少行数据, stats_output里面记录的是compaction的stats包括merge了多少列,结果是多少列等.
接下来的逻辑中第一步是划分列, 我们考虑下为什么需要划分. 在建表的时候用户可以指定dup key, uniq key, agg key等key列, 每一行数据在逻辑上都是通过key列来辨识的, key列不同的时候就一定不是同一行. 在进行compaction的时候为了保证数据不重不漏, 需要先将key列组进行compaction, 这样整行的数据都是按照key列在遍历就不会有遗漏. 而key列的顺序也表示了最终数据的顺序. 在拆分出key列之后僧下的value列按照config::vertical_compaction_num_columns_per_group
的大小为一组分成多个组.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 void Merger::vertical_split_columns (TabletSchemaSPtr tablet_schema, std::vector<std::vector<uint32_t >>* column_groups) { uint32_t num_key_cols = tablet_schema->num_key_columns (); uint32_t total_cols = tablet_schema->num_columns (); std::vector<uint32_t > key_columns; for (auto i = 0 ; i < num_key_cols; ++i) { key_columns.emplace_back (i); } std::vector<uint32_t > value_columns; for (auto i = num_key_cols; i < total_cols; ++i) { if ((i - num_key_cols) % config::vertical_compaction_num_columns_per_group == 0 ) { column_groups->emplace_back (); } column_groups->back ().emplace_back (i); } }
接下来的一行代码vectorized::RowSourcesBuffer row_sources_buf(tablet->tablet_id(), tablet->tablet_path(),reader_type);
的作用目前还不明显, 我们可以暂时简单地理解为他是用来记录行的. 而这个类的实现可以概括如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 class RowSourcesBuffer {public : RowSourcesBuffer (int64_t tablet_id, const std::string& tablet_path, ReaderType reader_type) : _tablet_id(tablet_id), _tablet_path(tablet_path), _reader_type(reader_type), _buffer(ColumnUInt16::create ()) {} ~RowSourcesBuffer () { _reset_buffer(); if (_fd > 0 ) { ::close (_fd); } } Status append (const std::vector<RowSource>& row_sources) ; Status flush () ; RowSource current () { DCHECK (_buf_idx < _buffer->size ()); return RowSource (_buffer->get_element (_buf_idx)); } void advance (int32_t step = 1 ) { DCHECK (_buf_idx + step <= _buffer->size ()); _buf_idx += step; } private : Status _create_buffer_file(); Status _serialize(); Status _deserialize(); void _reset_buffer() { _buffer->clear (); _buf_idx = 0 ; } int64_t _tablet_id; std::string _tablet_path; ReaderType _reader_type = ReaderType::UNKNOWN; uint64_t _buf_idx = 0 ; int _fd = -1 ; ColumnUInt16::MutablePtr _buffer; uint64_t _total_size = 0 ; };
里面存储的是多个RowSource, 每个rowsource之中是一块连续内存(顾名思义保存row的), 每次调用append将多个RowSource写到MutablePtr _buffer
之中. 如果超过了_buffer的大小则会先序列化到磁盘并从_buffer头部开始继续写入.
接下来的逻辑是对于每一个column group进行一次merge, 也就是按照多列一起来merge. 这里也是和horizontal compaction的最大区别, 按照列组来进行merge显而易见的好处是内存压力会小很多, 假设列大小是均匀分布的, 平均每一行的内存消耗几乎只有最大列组数大小/列数(当然实际情况里肯定是非常不均的).
1 2 3 4 5 6 7 8 9 10 for (auto i = 0 ; i < column_groups.size (); ++i) { bool is_key = (i == 0 ); RETURN_IF_ERROR (vertical_compact_one_group ( tablet, reader_type, tablet_schema, is_key, column_groups[i], &row_sources_buf, src_rowset_readers, dst_rowset_writer, max_rows_per_segment, stats_output)); if (is_key) { RETURN_IF_ERROR (row_sources_buf.flush ()); } RETURN_IF_ERROR (row_sources_buf.seek_to_begin ()); }
深入到vertical_compact_one_group
之中, 传递进来的参数主要关注tablet_schema
, src_rowset_reader
, dst_rowset_writer
的使用.row_source_buf
作为将数据读到内存时的内存缓冲传递给VerticalBlockReader的构造函数之中, 之后VerticalBlockReader读取input_rowset并聚合出数据后按照堆排序排序存入row_source_buf
之中, 下面按照VerticalHeapMergeIterator::next_batch
的部分代码为例说明
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 Status VerticalHeapMergeIterator::next_batch (Block* block) { size_t row_idx = 0 ; VerticalMergeIteratorContext* pre_ctx = nullptr ; std::vector<RowSource> tmp_row_sources; while (_get_size(block) < _block_row_max) { auto ctx = _merge_heap.top (); _merge_heap.pop (); if (ctx->is_same () && (_keys_type == KeysType::UNIQUE_KEYS || _keys_type == KeysType::AGG_KEYS)) { ++_merged_rows; } else { ctx->add_cur_batch (); if (pre_ctx != ctx) { if (pre_ctx) { RETURN_IF_ERROR (pre_ctx->copy_rows (block)); } pre_ctx = ctx; } if (ctx->is_cur_block_finished () || row_idx >= _block_row_max) { RETURN_IF_ERROR (ctx->copy_rows (block, false )); pre_ctx = nullptr ; } } RETURN_IF_ERROR (ctx->advance ()); if (ctx->valid ()) { _merge_heap.push (ctx); } else { } } RETURN_IF_ERROR (_row_sources_buf->append (tmp_row_sources)); if (!_merge_heap.empty ()) { return Status::OK (); } return Status::EndOfFile ("no more data in segment" ); }
现在我们回到vertical_compact_one_group
之中, 刚刚说了row_source_buf的用处,现在来说一下这个VerticalBlockReader, 这个reader实际上是用来读取从input rowset之中数据的,读取出来的数据都放到了刚刚提到的row_source_buf之中, 既然要读取数据这里就涉及到数据的schema的问题. 比如考虑这样的情况, 一开始有colA
, colB
然后插入数据(1,2), 之后delete where colB = 2
, 然后drop掉columnB, 现在add column columnB. 新的column B和之前的column B是不同的.
在往下有一行reader_params.return_columns = column_group;
, 因为vertical compaction的逻辑是按照列组进行compaction的也就是说读取数据的时候只需要读取某几列即可,具体读取哪些列的数据就是靠reader_params.return_columns
决定.
接下来是读取聚合后的数据并且写入output rowset的逻辑.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 size_t output_rows = 0 ;bool eof = false ;while (!eof && !StorageEngine::instance ()->stopped ()) { RETURN_NOT_OK_STATUS_WITH_WARN (reader.next_block_with_aggregation (&block, &eof), "failed to read next block when merging rowsets of tablet " + std::to_string (tablet->tablet_id ())); RETURN_NOT_OK_STATUS_WITH_WARN ( dst_rowset_writer->add_columns (&block, column_group, is_key, max_rows_per_segment), "failed to write block when merging rowsets of tablet " + std::to_string (tablet->tablet_id ())); output_rows += block.rows (); block.clear_column_data (); }
从刚刚的代码逻辑里可以看到这个while循环会将input rowset里对应column_group的所有数据全部读出来然后写进output rowset之中. 这里要注意下只读取了对应column_group并写入 . 这里也是为什么vertical compaction消耗的内存会低很多. 感兴趣的读者可以看一下Merger::vmerge_rowsets
的实现,基本的逻辑几乎一模一样,最大的区别是一个读取的数据是row的部分,一个是整个row全部会读取.
到目前为止我们从merger的角度大概梳理了代码的流程,接下来我们看一下vertical_beta_rowset_writer的实现来理解vertical compaction的segment文件的数据写入.
vertical_beta_rowset_writer vertical_beta_rowset_writer::add_columns 在vertical compaction中写入数据其实就是调用了这个add_columns接口, 我们从这个函数开始分析具体实现. add_columns需要判断是否某一个column group写满了一个segment的对应区域, 如果写满了需要创建一个新的segment或者切换到下一个segment,但是又因为一个segment里面需要写多个column group, 所以得把segment都保留着,在最后一个 column group写完后才全部下刷持久化产生可用的文件. 先写key column group, 同时也只有key column group可能会创建新的segment.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 Status VerticalBetaRowsetWriter::add_columns (const vectorized::Block* block, const std::vector<uint32_t >& col_ids, bool is_key, uint32_t max_rows_per_segment) {if (_segment_writers.empty ()) { RETURN_IF_ERROR (_create_segment_writer(col_ids, is_key, &writer)); _cur_writer_idx = 0 ; RETURN_IF_ERROR (_segment_writers[_cur_writer_idx]->append_block (block, 0 , num_rows)); } else if (is_key) { if (_segment_writers[_cur_writer_idx]->num_rows_written () > max_rows_per_segment) { RETURN_IF_ERROR (_flush_columns(&_segment_writers[_cur_writer_idx], true )); RETURN_IF_ERROR (_create_segment_writer(col_ids, is_key, &writer)); ++_cur_writer_idx; } RETURN_IF_ERROR (_segment_writers[_cur_writer_idx]->append_block (block, 0 , num_rows)); } else { if (_cur_writer_idx == 0 && num_rows_written == 0 ) { RETURN_IF_ERROR (_segment_writers[_cur_writer_idx]->init (col_ids, is_key)); } if (num_rows_written + num_rows >= num_rows_key_group && _cur_writer_idx < _segment_writers.size () - 1 ) { RETURN_IF_ERROR (_segment_writers[_cur_writer_idx]->append_block ( block, 0 , num_rows_key_group - num_rows_written)); RETURN_IF_ERROR (_flush_columns(&_segment_writers[_cur_writer_idx])); start_offset = num_rows_key_group - num_rows_written; limit = num_rows - start_offset; ++_cur_writer_idx; RETURN_IF_ERROR (_segment_writers[_cur_writer_idx]->init (col_ids, is_key)); num_rows_written = 0 ; num_rows_key_group = _segment_writers[_cur_writer_idx]->row_count (); } if (limit > 0 ) { RETURN_IF_ERROR ( _segment_writers[_cur_writer_idx]->append_block (block, start_offset, limit)); DCHECK (_segment_writers[_cur_writer_idx]->num_rows_written () <= _segment_writers[_cur_writer_idx]->row_count ()); } } }
接下来简单说一下创建segment writer的逻辑, Doris中数据文件实际上是通过segment在表示的,一个segment上的数据是按照列存形式管理的,所以创建segment实际上需要创建一个对应的file以及file writer. 另外在vertical compaction中由于每次写segment是按照列组在写的,所以对于一个segment其每次写入表示的column的id也是有所不同的,所以存在多次init的情况. 其实不同的列对应的就是不同的column writer,每一次init根据传递进去的column id都会构造新的column writer.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 Status VerticalBetaRowsetWriter::_create_segment_writer( const std::vector<uint32_t >& column_ids, bool is_key, std::unique_ptr<segment_v2::SegmentWriter>* writer) { auto path = BetaRowset::segment_file_path (_context.rowset_dir, _context.rowset_id, _num_segment++); auto fs = _rowset_meta->fs (); io::FileWriterPtr file_writer; fs->create_file (path, &file_writer); writer->reset (new segment_v2::SegmentWriter ( file_writer.get (), _num_segment, _context.tablet_schema, _context.tablet, _context.data_dir, _context.max_rows_per_segment, writer_options, nullptr )); { std::lock_guard<SpinLock> l (_lock) ; _file_writers.push_back (std::move (file_writer)); } auto s = (*writer)->init (column_ids, is_key); }
vertical_beta_rowset_writer::flush_columns 上文有说到,vertical compaction是按照column group在进行数据的读取和写入的,但是考虑到Doris的数据表示是通过segment进行的,一个segment上就应该有同一行的所有数据,自然而然会想到一个问题,假设segment数量有多个,那么按照column group行读取部分列的方式进行写一定会出现一个segment交替的过程. 比如说一共1000行数据,有2个column group(key group和value group), 分成2个segment. 那么先按照key group写segment1写了500行数据,之后写segment2写了500行数据,之后切换到value group从segment1开始写,这里一定有一个segment遍历的过程. 查看vertical_beta_rowset_writer的实现会发现一个std::vector<std::unique_ptr<segment_v2::SegmentWriter>> _segment_writers;
和size_t _cur_writer_idx = 0;
后者便是用来迭代遍历segment的下标.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 Status VerticalBetaRowsetWriter::flush_columns (bool is_key) { RETURN_IF_ERROR (_flush_columns(&_segment_writers[_cur_writer_idx], is_key)); _cur_writer_idx = 0 ; return Status::OK (); } Status VerticalBetaRowsetWriter::_flush_columns( std::unique_ptr<segment_v2::SegmentWriter>* segment_writer, bool is_key) { uint64_t index_size = 0 ; RETURN_IF_ERROR ((*segment_writer)->finalize_columns_data ()); RETURN_IF_ERROR ((*segment_writer)->finalize_columns_index (&index_size)); if (is_key) { _total_key_group_rows += (*segment_writer)->row_count (); KeyBoundsPB key_bounds; Slice min_key = (*segment_writer)->min_encoded_key (); Slice max_key = (*segment_writer)->max_encoded_key (); key_bounds.set_min_key (min_key.to_string ()); key_bounds.set_max_key (max_key.to_string ()); _segments_encoded_key_bounds.emplace_back (key_bounds); } _total_index_size += static_cast <int64_t >(index_size) + (*segment_writer)->get_inverted_index_file_size (); return Status::OK (); }
现在回头看一下vertical compaction的代码,可以发现在Merger::vertical_compact_one_group
中最后一行调用了VerticalBetaRowsetWriter::_flush_columns
方法,也就是在每一个column group结束时将segment从0开始. 用一个伪代码表示则是
1 2 3 4 5 6 7 8 9 10 11 12 13 Merger::vertical_merge_rowsets () { for (column_group: column_groups) { while (!eof) { reader.next_block_with_aggregation (); dst_rowset_writer->add_columns (column_group); } dst_rowset_writer->flush_columns (); } dst_rowset_writer->final_flush (); }