0%

Abstract

过去因为网络带宽限制,对于analytic 大表的场景local storage还是不可替代的,但是随着带宽升级网络访问和本地nvme的带宽越来越接近所以现在对象存储作为底座也越来越有吸引力。 本篇文章在研究后提出了一个成本和性能最佳的检索配置. 也开源了一个叫AnyBlob的downloader library。 后文会展示在umbra上集成这个downloader的威力.

Introduction

21年的时候cloud dbms的收入就已经达到了本地部署的系统的收入,到了23年肯定更高了. 云上最吸引的人就是弹性和资源供给的灵活度.
各家云厂商提供的对象存储服务允许将计算资源和存储资源分离。 他们同时也提供了很强的持久性保证,几乎无限的容量以及可拓展的访问带宽.
在18年AWS提供给每个实例的网络就是100Gbit/s(约12GB/s) 这极大地减轻了对网络访问和本地内存访问带宽差异的担忧(甚至比访问hdd都高).
过去的研究主要是关注于如何通过cache避免拉去远程的数据. 另一部分研究则是关注于cloud上的OLTP系统[27]. 没有多少对于对象存储上的AP系统的研究和结论.
另外作者认为有3大挑战

  1. 如何有效利用机器带宽打满网卡
  2. 如何应对网络请求带来的CPU负载
  3. 如何提供良好的多云支持

本文作者研究了不同云的对象存储总结了一套价格性能都优化过的配置. 另外他们通过自己实现了一个异步的对象存储downloader来减轻cpu消耗. 并且他们无缝地将对象下载和数据库的scan算子结合了起来. 作者自称umbra搭载了这些优化后,在不开启cache的情况下性能都和别的开启了local cache的stoa 数仓一致(跑的是TPCH,跟snowflake和spark对比). 另外他们的这套模式在切换实例的时候还不会带来性能的抖动(这里意思比较绕,大概是说在没有warm cache的情况下切换不同配置的实例的拓展性良好).

贡献

  1. 定义了一个能够最小化成本同时打满带宽的request size range.
  2. 开源了Anyblob这个库
  3. 将扫描对象和数据库系统集成的蓝图

Cloud Storage Characteristics

Object Storage Architecture

对象存储提供过用户的接口是各种HTTP接口,用户发送过来的请求在经过负载均衡后会落到对应的Api server上. 而云厂商的服务器上一份文件的数据通常会切成多个chunk存储在不同的Object storage机器上, 这些机器本身也是跨 AZ的,所以数据的持久性保证很强.

另外因为数据拆成了多个chunk那么之后在读取的时候也需要有对应的索引或者记录chunk位置的header信息(小文件的情况甚至索引上就可以顺带记录着文件数据减少后续的网络IO). 这部分元信息也会存储在对应的metadata storage服务器上(听说有的厂这里metadata storage甚至就是mysql).

figure1

Object Storage Cost

table1

IO操作是要收费的,不管是读/写/跨region网络传输都要收费. 但是在相同region内部,下载的费用和下载的数据的大小是无关的,只和下载请求的次数有关. 另外对象存储是持久化的,只有写到了S3/GCP上就会被复制冗余到多个跨region的多AZ副本上.
除了对象存储以外也有别的存储选择,比如EBS等, 他们的价格会更贵, 而云上的HDD价格可以更便宜但是带宽又不如对象存储. 另外比如EBS只能attach到一台节点上,也不能做到存算分离上的共享存储. 除此以外他们也无法提供S3的持久性.

Latency

对象存储的延迟比SSD会更高.基于不同的request size测量了对应的latency. 这里用来区分总耗时和时延的是第一个字节被读取到的时候. 取第一次冷读和连续20次的热读.

figure2

对于小的请求来说第一个字节的延迟基本决定了整个时延. 这也说明round-trip时延限制了总的吞吐. 对于大的请求来说带宽则是限制因素(可以理解成要读的数据越大的时候读取性能基本没变化,这个时候的时间基本上是size/带宽).(笔者注: 这里first byte分析可能也不是特别有意义,如上文所说在cloud vendor机器上数据也是拆分的,另外还有读取元数据的部分)

另外作者还测试发现工作日晚上以及周日性能会更强(因为没人来卷了xs).

Throughput

对于AP系统来讲,查询的时候下载的数据的大小可能是非常大的,所以first byte arrival latency的重要性没有吞吐大.

作者接着测试发现不同地区吞吐有差别,以及冷数据还可能有更大的吞吐.

此外小机型的机器存储一段时间的性能喷射的情况(还有论文研究如何靠这个薅云厂商羊毛, 另外也有利用这个做cache的).

Optimal Request Size

在请求的request这里就有个tradeoff,如果请求的size越大,获取同样大小的数据IO次数相对来说就更少,花的钱就少. 当然吞吐可能不能打满.
但是如果size越小,并行的请求就可以越多,吞吐能打得更高,但是相对的IO次数多了花的钱就多了. 作者这里测试出来对于OLAP系统来说8MB-16MB是价格吞吐都友好的。

Encryption

比较无聊,跳过

Tail Latency & Request Hedging

这里的长尾延时情况是有不到5%的16MB request会超过600ms还没下载完成. 也有不到5%的first byte latency超过了200ms. 也就是超过600ms的情况重新发一个request是挺有效的. 可以减轻tail部分.

Model for Cloud Storage Retrieval

对于给定的期望throughput,需要的并发request请求数量是:

formula

AnyBlob

普通的云厂商SDK比如AWS的每一个http请求都开一个线程,如果要达到上文中打满带宽的需求不知道会有多少个线程,这带来的context switch开销也无法忽视。所以本文提出了一套基于io uring的异步download库,和查询引擎共享线程资源.
AnyBlob Design

  1. 每个线程异步发送多个request
  2. 使用io_uring减低CPU负载
  3. 状态机驱动的message: 基于HTTP请求实现了多阶段的状态机,一直到对象被完全下载下来(其实理解起来就是协程呗).
  4. 异步系统调用. 尽量不卡住线程. 避免影响吞吐. 通过Loop来定期从io_uring拿到complete entry
  5. Task-based send-receiver调度器
  6. 多个send-receiver 组

figure11

整体来看除了io_uring这一层就是一个很典型的网络库应用, 在http request完整以前都通过非阻塞IO不停地读取/写入,拿到一条完整的request就执行,执行的结果可能是继续产生新的send/receive, 也可能是执行完了就执行上层的回调(估计是scan算子在拿到完整的数据后push到下游的算子继续进行计算). 他这里状态机这一套感觉用coroutine来实现挺好的.
这里AnyBlob的example代码里也没写怎么和Scan Operator整合在一起的. 不过从后文的描述来看其实就是scan算子执行的过程里包括了异步下载远端的对象存储数据而已,这里AnyBlob作为一个库也就是直接被调用而已,没啥好说的感觉.

Authentication & Security

感觉没啥好说的

Domain Name Resolver Strategires

  1. 解析endpoint也是一种开销,所以会把endpoint ip cache住
  2. 记录了不同的endpoint ip的throughput,如果某个endpoint的吞吐不太行就替换
  3. MTU相关的 太细了== 可以参考netapp的 MTU调参(同一家云的机房里通过调整MTU可以省掉很多header的开销)

Cloud Storage Intergration

前文提到现在download和query engine是共享线程了,一种可能的实现是scan算子需要读取数据的时候会注册一个回调到AnyBlob上,下载到具体的数据后在某次event loop里通过回调的方式继续执行scan上层的逻辑.接下来大概是他们怎么做这个scheduler的逻辑. 我理解里他们这个scheduler是一段在定制点代码处调用的schedule逻辑(见https://15721.courses.cs.cmu.edu/spring2023/papers/07-scheduling/wagner-sigmod21.pdf). 同时这个Object Manager估计也是.
先贴一张他们的大概的架构图

figure13

Database Engine Design

Umbra中的worker thread可以干三种事情

  1. 执行查询
  2. 准备新的对象存储request
  3. 作为网络线程
    这设计大概率是一个EventLoop,每次通过拿到的不同的Job实现切换任务就是了. 这里我比较好奇的是Umbra有没有做任务的interrupt,可能也不好做,估计粒度上还是得做完一次Job. 以及从他的描述中来看这个Object Manager应该不是一个单独的线程,估计是类似内核的调度一样每次执行这段逻辑就帮你计算下. 不是类似SAP HANA一样的有个Watch Dog线程.
    Umbra的调度器可以动态切换thread的角色

任务自适应性 :
为了避免长查询一直占着资源把系统响应度给拉低,有的系统把查询拆成task(morsel-driven),这些task可以被挂起,或者会通过分配自适应数量的tuples控制执行时间. 这么做的目的是让系统能够快速应对不同的workload. Umbra的这个模式下如果是在执行的时候需要查数据可以做到在worker上切换task(不过感觉没说他们到底能不能挂起之类的 感觉自适应这个还是看这篇https://15721.courses.cs.cmu.edu/spring2023/papers/07-scheduling/wagner-sigmod21.pdf).

Columnar format

Raw data是按照列存切成多个chunk的block丢到cloud上的。每个block的header里面是column types,offsets等.
DB的schema也是丢在cloud上的(所以启动的时候需要先fetch).

Table Metadata retrieval

在图13里scan operator一开始会去拿到这些相关的table的元数据(比如有哪些blocks) 都下载完了才去进行数据的retrieval.

Worker thread scheduling

初始化完scan后会选出多个worker thread来执行操作. Umbra这里没有选择把worker thread拆分成下载线程和处理现场(因为他们觉得这样会在执行query的时候去适应, 这句话没明白啥意思但是我猜是比如下载线程和执行线程一起执行的时候如果下载慢了执行的部分要去等待,比如下载的线程结束了是不是要转换成执行线程之类的调度会比较难做). 所以他们的实现是引入了一个object manager. 每一个scanner线程询问scheduler应该干什么job. 如果已经有足够的数据了就进行查询处理(如4A), 否则就进行下载准备block. 这里umbra认为执行jobs的时间很短,所以这里的ask scheduler的决策是可以很快调整的.

Download preparation

为了打满带宽所以需要有比较多的download线程和downloadrequest请求持续发给cloud. 在4B里面preparation worker thread创建请求的时候不会打断retrieval 线程执行event loop. Object Manager中保存着表的元数据以及block和对应的chunk 数据.

Table Scan Operator

Umbra会尽量利用上所有的thread来给operator加速. Umbra将tasks拆分成morsels, 每个morsel里面都是task的一小块数据.
Table scan在初始化完后worker线程会调用pickMorsel方法将chunk分配给worker线程. 在本文的修改版本中,这些worker线程不仅需要处理数据还会需要准备新的block以及从对象上下载数据. Object Scheduelr会基于统计信息和过去的处理来决定某个worker thread的应该做什么工作. 这里如果worker thread拿到了morsel那么他会去处理,如果不是的话他就去执行prepare或者retrieve的逻辑.

figure14

图里就是3种任务,4个线程在processing,一个在prepare,3个在下载.
这里下载线程的callback干的事是标记这个block为ready. Download线程结束任务后还可以被复用为processing或者preparing线程.
Processing线程只会去处理ready的block上的morsel.

Object Scheduler

目标是达成下载和处理性能的平衡. 这个scheduler会根据统计信息采取对应的算法更新不同的线程应该有的数量

这里会有一系列动态的调整,首先这里的统计信息会使用lock-free的方式进行维护. 之后在执行的时候会尽量保证在当前的retrieval线程的数量下带宽能打满. 这里大概的意思是,记录有多少线程用于下载,并且控制该类线程的数量. 同时通过记录outstanding request(正在下载或者await中的prepared HTTP请求)的数量可以计算出带宽的上界. Scheduler会尽量调度出能够匹配retrieval threads数量的prepare job.
同时也会根据查询性能和下载速度动态调整下载线程和处理线程.

Relation & Storage Format

列格式:每个column分成多个chunk 然后在元数据里记录min max zonemap 每个block使用byte-level encoding的方式压缩(frame-of-reference & dictionary)

每一个block中的tuple数量: 前文有提过他们的实验跑下来每一个Get请求的content size在8M-16MB是最优的所以这里在对象上的每个block的chunk也希望尽量是16MB. 查询引擎在执行的时候的数据处理粒度也是block. 同时这里有要求一个block中的所有列的tuple数量相同. 对于不同的数据类型来说这就会导致chunk的大小很难相同. 对于非变长类型的列而言在encode过后每个tuple的size大小分布在1到16字节. 所以在存储的时候需要对于不同列的chunk的tuple 数量做一个平衡. 在构建block的时候就会自适应地计算一个平均tuple count, 尽量让没编码的列小于2MB. 对于某些定长和变长类型(比如varchar(65535)或者更长的奇奇怪怪的类型),则会采取在下载的时候拆成多个更小的range request的方式.

Zero user-space copies:其实比较好理解,Buffer Pool在分配内存的时候预留http header,然后将blocks对齐到buffer pool的page大小. 将Buffer丢给io_uring下载了直接拿出来取offset用就行.

Transparent paging:这里用了匿名pages拓展这个buffer manager,相当于对于同一个page的请求就被缓冲起来,请求缓存命中就直接返回,没命中就再下载一次.

Structure of metadata: 其实比较直白,大概来讲就是db_prefix/table_prefix/. 目录下是header的list,headers,data blocks. 和iceberg的manifest files很像https://iceberg.apache.org/terms/ . 相当于db_prefix/table_prefix/list这个文件下载拿到的是所有的header部分的内容(至于具体是啥内容貌似没说清楚,但如果是为了多版本考虑的话可以理解为是一个snapshot,里面保留的是某个版本下的数据的所有的header). 在对象存储上每次更新这个db_prefix/table_prefix/list文件都是原子的. 可以参考下Iceberg的这篇文章. 不过这里没说怎么决定当前是哪个snapshot,其实我理解这里不管是Iceberg还是这个umbra的实现以及vertica的eon mode(cluster_info.json)都是类似的,会在一个文件里面写着.

下面是iceberg

iceberg

下面是umbra

15

Scan optimizations: 其实就是通过header的min max过滤. 除此以外Umbra把数据的decode和process拆成了两部分. 相当于可以重复decode一部分然后塞到chunk里丢给process.

Cloud DBMS: 这里就大概讲了下Redshift使用了Aqua来在计算层加速. 另外一直到最近,由于带宽的限制,caching都是不可或缺的. 但是由于网络带宽和NVME带宽越来越接近所以cloud storage越来越有吸引力。 像Amazon基于Presto的Athena系统就是直接在remote data上执行的.77比较了这些系统的架构. 16,29,30,81的OLTP系统是专门用来处理cloud era的. 2,25,33,58,59,67,79,87则是OLAP

Spot instances:这里大概意思是想说有些关于如何利用竞价实例的研究74,76

Cloud storage for DBMS: Apache Iceberg和Delta Lake都像Umbra这样把元数据丢在了S3上并且基于此来提供一致性快照服务. 也有一些对于cache的研究37,46,85,89. Cache的解决方案从在本地节点使用语义化的cache37拓展到了利用竞价实例来作为cache89.

Memory disaggregation: 讨论了一下CPU和内存分离的内容50, 83, 90, 91

Abstract

在过去每个团队都可能维护一套自己的特化的cache. 这样的方式无视了不同cache系统共有的问题, 极大增加了部署,维护以及扩容的成本.
本文主要介绍CacheLib——Facebook内部已经广泛应用在CDN,存储和应用数据cache上. 坐着阐述了各个生产环节workload的特质以及FB内部的应用场景如何影响着决策. 描述了FB内部cache如何演化的, 也讨论了对未来的cache研究和设计能带来哪些暗示.

Introduction

FB内部架构里大量的系统都使用了cache,包括CDB caches, key-value application caches, social-graph caches, storage cache, database cache, media caches等. 这些cache系统最初都是单独设计实现并维护的. 最初的思路是单独设计针对不同场景优化以此满足复杂的一致性协议,更好地利用定制化数据结构,并且对特定的硬件平台优化.

尽管各种cache有一些不同点,但他们也有很多相同的点: 都需要百万级的qps, cache set大到需要flash和DRAM都使用, 同时必须容忍系统更新引起的频繁重启(FB在Fail at Scale里提过他们更新部署是很频繁的). 随着FB的发展,这些cache系统也越来越多, 维护这么多系统的代价也是很高的. 很多重复的问题在不同的系统上被重复地解决多次. 同时这样多系统的维护也让很多系统上的系统调优知识无法有效共享. 因此FB在考虑了泛用性和特化的tradeoff之后选择了泛用性. 虽然这会在某些特殊的系统上丢失一部分特殊领域的优化,但是减轻了代码开发的负担也增加了系统协调性.

最后FB开启了CacheLib的开发. CacheLib作为一个C++库提供了cache功能所必需的核心功能, 包括: 高效的cache索引实现, 驱逐策略, 为DRAM和flash caches的稳定优化. 同时提供给用户简单,线程安全的接口.

Lessons Learned from CacheLib

**特化的cache系统可以同时也应该构建于通用的cache系统之上.**一方面一套统一的代码更利于维护和开发,另一个方面业务部门可以通过在core系统外围开启不同功能选择不同配置以定制化需求. 同时统一的系统的运维经验,调优经验,方便整理的文档也利于开发者分享(搞不好还利于减轻oncall的人的压力).

生产场景的workload需要大规模饿环境.以往的benchmark采用的是参数为.9的Zipf popularity model. 基于这个模型会认为DRAM-based缓存已经可以应对大多数场景. FB的场景里面cache系统需要能够应对massive request. 所以采用了flash.

Motivation: Caching Use Cases

这一章以6个FB内部的生产系统为例介绍不同的业务对cache的需求是什么

Hierarchical and geo-distributed caches

FB的CDN服务专注于为请求静态资源的HTTP请求服务. 使用CDN的目标之一是减少因为cache miss引起的跨区域网络传输(比如在亚洲要是需要请求到北美的服务器那延迟肯定很低). 在FB内部每台CDN都是使用local cache,同时包含flash和DRAM.

Application look-aside caches

web应用的cache需求非常广泛. 可能是某条db的查询结果(比如查询库存), 查询用户数据等. 这种需求一般是应用通过RPC访问一系列的共享cache服务. 每一个cache服务都由一个庞大的分布式cache系统组成(其实就很类似互联网应用中各种各样的cache, 不过一般可能都是使用的redis的服务, 可以看下FB关于memcached的论文).

in-process caches

也有很多应用无法忍受remote cache的网络开销. 这个时候cachelib也可以发挥类似caffeine的作用,以进程内cache的方式发挥作用.

Machine learning-model

面向用户的ml应用经常基于用户和推荐内容的互动来作为训练的输入. 用户和内容的互动会先cache住以方便ml程序快速做出预测(类似于将用户的行为记录counter缓存在cache中,用户的counter的更新也能通过write through cache的方式更新,之后训练程序直接读取cache肯定比去storage service里load更快). 另外相同的输入往往输出也相同,所以可以将相同输入对应的预测结果缓存在cache中.

Storage-backend cache

持久化数据会按照blocks的方式存储在FB内部的集群中的spinning disks上. 即使在block storage server前使用了多层存储还是存在部分热点block的请求次数超过目标磁盘的IOPS的情况. Storage server会使用flash来缓存热点block. 为了支持byte-range请求和append操作,这些flash cache会和storage system stack紧密整合.

Database page buffer

数据结构和小对象被存储在各种各样的数据库系统中. DB会利用page cache来提升吞吐并降低延迟, 为了保证一致性和事务性, page cache会和数据库logic紧密整合.

目前除了database page buffer以外上诉其他场景cachelib都在fb内部使用了.

Shared Challenges Across Caching Systems at Facebook

这一章介绍构建cache系统时普遍面临的问题.

Massive Working Sets

working set的定义是在一个workload中能够从caching里面收益的对象的集合. 如果要达到相同的hit ratio, working set越大则需要越大的cache. 为了测量working sets, 必须同时考虑随着时间变化被看到的popular data(Popularity)和随着时间变化数据的欢迎度改变的程度(Churn).

Popularity

表示每一个key在一定时间内在系统内所有objects间的相对受欢迎度.

TODO: 这里贴图

不恰当地说, 在Zipf分布里最受欢迎的20%的object接受了80%的请求. 但Zipf 分布的公式里第i受欢迎的object的相对频率是通过一个参数可以计算出来的. 相对的这个参数越低意味着更多的请求会发给popularity 分布的尾部,这也就需要更大的working set. (Lookaside系统的参数接近1, SocialGraph是0.55 CDN是0.7)

Churn

Churn表示随着新key的进入,working set的变化以及已经存在的key的popularity变化. churn会影响caching policy的设计。 高churn就增加了temporal locality的重要性. 也让caching police’s更难通过过去的access patten估计object的popularity.

这些发现也限制了设计一套通用cache系统时的设计空间. 许多现有的系统一个cacheline最多存储一个单独的object(64B). 对于SocialGraph系统(一般是10B到20B)这就会很浪费. 另一个挑战是经常用于作为flash里面系统的in-memory index. 每一个object在索引上的负担在现有系统上也是不同的(8B-100B都有。如LookAside系统, 这意味着需要80GB-1TB的DRAM来索引1TB的flash上的数据).

Size大小多变

Storage 和 CDN需求里常见的是64KB和128Kb的chunk,所以会把大的object拆分成chunk. 而Lookaside和SocialGraph里大小变化超过7个数量级.

Bursty Traffic

用户的请求数量存在激增的情况. 通常在system领域会采用Poisson分布来描述请求, 但FB内部的采集数据显示和实际上请求到达的速率还是有很大的区别的. Lookaside场景相对Poisson分布波动较小. SocialGraph和Storage场景会有20%到30%做有的波动, CDN场景则存在很尖锐的波动. 请求到达速率的多变性让cache系统在高压导入场景很难有效提供资源给cache服务.

Resource Management

cache系统需要注意不能将系统可用资源全部消耗,尤其是DRAM-based的cache系统. 以in-process场景的cache为例, 因为需要cache的对象的size也是多变的,所以cache的内存消耗也是很难预测的. 如果消耗了太多内存很容易导致系统OOM程序直接退出.

Computationally Costly Query for Empty Results

在追踪用户关系的数据库查询中经常会出现query返回了空的情况. 这种查询很浪费数据库资源. 在SocialGraph场景中FB发现有55.6%的查询都是这种空结果查询. 剩余的44.4%的请求都是有效的对象,对应的cache命中率是86.5%. 如果不能cache住空结果, 对应的cache命中率会显著降低.

Updating Cached Data Structures

Cache应该能够有效支持结构化数据. 特别是对in-process场景. 应用程序经常会希望能够在不反序列化整个cached数据的情况下更新其中的特定fields.

Frequent Restarts

最后,生产环境中的cache系统应该能过在频繁重启(可能是bug修复或者软件更新)的情况下稳定工作. 75%的Lookaside场景和95%的CDN场景的更新时间都小于7天. 即使是Storage和SocialGraph场景也会因为每月的维护需要重启cache进程. 大部分的cache系统是透明无感的,也就是说重启后就丢失了cache中的数据. 这对于大型的cache系统是很不友好的,可能需要很长的时间cache系统的命中率才能恢复到正常状态. 有的系统会采取warmup来缓解.

笔者吐槽: 其实cachelib在程序非正常退出的情况下 cache也全部会丢失… 只有优雅退出的场景能保留cache内容.

Design and Implementation

FB认为要解决前两章的问题则cache系统应该有如下的feature:

  • Thread-safe cache primitives: 以此简化应对bursty traffic的场景. 同时也简化了一致性和cache invalidation协议的实现.
  • Transparent hybrid caching: 为了能够满足large working sets的需求,cachelib支持混合使用DRAM和flash。 Hybrid cache能够在每台节点提供TB级的cache能力. cachelib提供给programmer的统一的byte-address的抽象,让programmer无需担心底层的存储介质.
  • Low resource overhead: Cachelib能够在占用低CPU和memory的情况下提供强劲的吞吐. 这使得其能够在和application混布的场景发挥作用(cache和application共享资源). 也让其在small objects很多的场景也能够顺利工作(不会出现前文中需要很大的资源才能维护cache index之类的情况).
  • Structured items: Cachelib提供了原生的array和hashmap的实现. 可以在不发起serialization的情况下高效cache和修改.
  • Dynamic resource monitoring, allocation, and OOM protection: cachelib会监控整个系统的内存使用.
  • Warm restarts: 在重启程序时依旧保留cache的状态.

API设计

所有的api设计都围绕着一个叫Item的概念, 用来表示被cache的对象的抽象. Item能够用byte-address的方式访问cache中的object, 无需关心是cache在DRAM还是在flash中. ItemHandle是Item的使用接口,每产生一个handle就会对Item的引用计数+1,销毁时则-1. 除非Item的引用为0,否则不会被evict. 如果某个引用计数不为0的item被删除或者超时expires了,现存的他的itemhandle还是有效的,但是不会有新的itemhandle产生了.

1
2
3
4
5
6
ItemHandle allocate(PoolId id, Key key, uint32_t size, uint32_t ttlSecs = 0);
bool insertOrReplace(const Itemhandle& handle);
ItemHandle find(Key key);
void* Item::getMemory();
void* Item::markNvmUnclean();
bool remove(Key key);

调用allocate时如果没有空间会先根据eviction policy来驱逐一个引用计数为0的Item. 新插入的Item可以设置TTL. 同时根据PoolId可以可以选择内存池以提供隔离性配置. 任何一个新的Item会在对对应的ItemHandle完成insertOrReplcae操作后才可见.

要访问Item需要通过find方法根据Key拿到ItemHandle. 之后可以通过getMemory方法以非同步地方式零拷贝访问Item相关的内存. 如果需要原子性地更新一个Item, 可以先使用allocate分配对应的ItemHandle,然后通过调用insertOrReplace让更新可见。 CacheLib会忠实地执行用户的markNvmUnclean方法指示任何的修改. 最后,remove会删除key指定的object,也会invalidation cache或者删除对应的底层的object.

1
2
3
4
5
struct MyType {
int foo;
char bar[10];
}
TypedHandleImpl<Item, MyType> typedHandle{cache->find(..)};

用户也可以如上面的代码一样自定义类型并使用CacheLib将他们cache起来. 也支持变长类型比如hashmap等.

Architecture Overview

CacheLib的设计目标是能够有足够的拓展性以及能够应对各种长度的size的object. 为了实现较低的per-object负载, 一个单独的CacheLib cache由多个子系统组成,每一个都对应一个特别的storage介质和object的size. CacheLib由一个DRAM cache和一个flash cache组成. Flash cache由LargeObjectCache(LOC, 为Item大于等于2KB的object服务)和SmallObjectSize(SOC, 为小于2KB的object服务).

allocate函数的作用是从DRAM空间分配内存,对应的就可能会被DRAM中的存在的Item驱逐到flash或者直接丢弃. find函数则会先找DRAM然后LOC然后SOC. 如果是在DRAM中找到则返回的ItemHandle立刻就能使用,如果是在flash中找到则需要异步拉取,对应的Cachehandle会在Item被加载进DRAM会变得可用. 当发生cache miss时会返回一个空的ItemHandle.

DRAM cache. CacheLib在DRAM cache中使用链式hash表进行查找, DRAM cache可以被切分成带有不同eviction policy的pool(在allocate的时候通过PoolId指定).

为了性能考虑cache 内存是通过伙伴系统来分配的. CacheLib使用4MB的slabs并且实现了自己的slab分配器. 每一个slab需要7B(3B给内部元数据,4B用来标识object大小). 对于不同业务来讲每一个slab ckass的大小是可以采用不同配置的(对于小于64B和大于4MB的情况在4.3节讨论对应的优化).

对于不同的Slab系统也可以配置不同的eviction policy(CacheLib也能自定义开发新的policy). CacheLib会在Item上额外使用31B来支持这些对应的policy的配置.

Item metadata DRAM overhead Data type
Eviction policy state 12B 14B timestamp, 24B pointers
Item creation timestamp 4B 4B timestamp
Expiration time(for TTLs) 4B 4B timestamp
Key size + object size 4B 4B size_t
Reference counting 2B 13b public ref count, 3b internal count
hash table chaining 4B 4B pointer
Flags 1B 8 binary flags

为了保证metadata操作的原子性, Cachelib使用了细粒度Lock, 用户空间mutexes, C++院子操作等优化. 比如在LRU场景中,传统的方式里一个object被访问后会被放到most-recently-used的位置(在FB的场景中也很容易发生). CacheLib里每个Item在一段时间T内同一个Item不会被移动到MRU位置. 只要T比object在LRU list里走到尾端(也就是成为最不常用的那个)的时间短,这个方式都能有效的减少移动操作带来的contention. 另外FB也采用了比如flat combining的方式优化.

Flash cache. 从DRAM中淘汰后除了直接删除也可能会写入flash. CacheLib还必须处理flash cell的有限的写寿命.

为了尽量减少写flash的速率,CacheLib会选择性地将object写回flash. 如果一个存在于flash的object在DRAM中没被修改过那么就不会写回flash. 否则CacheLib通过一个admission policy来决定是否写回flash. 默认配置是通过一个概率p来决定是否写回flash. 这个p可以通过对flash的写入速率进行控制.

另一个考虑的因素是写放大(比如在写falsh时除了object的内容还有元数据). 不只是应用层的写放大还有设备层的.

LOC存储的object大小都大于等于2KB,这个大小决定了在LOC中的独特的object的数量大概在百万级,所以可以通过一个内存中的B+树来进行索引. LOC使用分段B+树来存储flash中Item的位置. Items在LOC中的flash page里是4KB对齐的, 所以B+树中的flash location是一个4B的4KB对齐的地址. 也就是说LOC中最多能索引16TB的数据.

LOC使用cache也进一步限制了DRAM index的size. Keys被hash成8B. 前4B表示B+树的segment, 后4B用来在对应segment里查找. LOG 在flash中也会存储整个key的内容,用来在load进DRAM后与进行hash的key进行比较以确定是否找到了对应的Item. 而不同size的object在flash上被存储在不同的partition,所以可以根据flash location直接判断object的size,这样就不需要在元数据里存储object的size了. 为了减少DRAM里存储的address的size,每一个4KB的flash page最多存储一个单独的object和对应的元数据. 因为LOC只存储超过2KB的object,这个策略是很space-efficient的. 因为LOC的read和write都是page粒度的,任何application级别的碎片化都会导致写放大.

LOC的remove是按照region级别进行的。也就是说一下子可以删除一个region上的多个Item(顺序删除,提供删除的性能,也摊平flash erasure的开销). 默认情况下region是按照FIFO的方式进行严格顺序的erase的. 当然也可以采用类似LRU的方式进行region管理.

SOC. 因为SOC都是小于2KB的object,如果还是使用LOC那样的精确查找的方式会消耗非常多的内存,所以SOC采用的是近似索引. SOC将Key映射到不同的set. 每个set表示一个4KB的flash page. 一个flash page能存储多个objects. set中的objects是按照FIFO的顺序进行驱逐的. 为了加快查找效率,cachelib会给每个set在内存中维护一个8B的bloom filter.

在SOC中控制写放大时很有挑战性的. 因为写一条object在flash上就是下刷4KB. 所以SOC需要有先进的admit policy来控制对SOC的写入.

Implementation of Advanced Feature

structured items

CacheLib原生支持arrays和map类型. 同时因为能提供raw access访问cached memory, 平坦的数据结构可以直接用cachelib api访问.

caching large and small objects

大于4MB的object会通过链表的方式将多个Item链到一起组成一个逻辑上的大item(每个item会使用4B的next pointer).

针对小对象还有compact cache的功能用来存储小于cacheline的对象(一般64B或者128B). 相同key size,相同object size的objects会被存储在一个cache line之中. compact cache中每一个cacheline都是通过key hash索引的. cacheline之中会进行LRU. compact cache可以用来处理negative caching(后段查询返回空值的情况).

resource monitoring

Cachelib会监控系统内存使用,系统free内存不足时释放自己的内存.

warm restarts

Cachelib使用POSIX shared memory来满足warmup需要.

Experience and Discussion

New features are adopted by many system

之前为别的cache系统开发的功能越来越多地被移植到cachelib上

Performance improvements help many systems

因为是一套general的系统,在cachelib上的一点点改进在部署方都可能带来很大的收益.

Improved stability

Cachelib作为一个通用系统也避免了以前各种不同实现带来的不稳定性.

No single caching system dominates

Flash caching signals a paradigm shift

有的人可能觉得cache的命中率够高时通过flash增加cache的容量收益不会很大. flash cache和dram cache的混合部署能降低纯DRAM的成本.

另外传统的思路里认为cache只是用来节省磁盘访问时间的,但是考虑到现在的服务结构模式,除了磁盘访问延时以外还有更高的比如网络耗时,后端数据库执行耗时等, 使用flash缓存只要能避免这些耗时操作也是很有价值的.

CacheLib dose not always lead to performance gains

毕竟作为一个通用系统,在面对很多corner case的时候还是很难比经过专门设计和调优的专有系统强的. 但是专有系统的功能可以在之后被添加到cachelib中(.

CacheLib does not work for every use case

一些广告服务依赖于nested 数据结构,这是cachelib不能支持的.

Apache Doris原先的compaction策略是horizontal compaction, 也就是说在读取不同版本数据做merge的这一步是把整行数据全部读到内存中进行聚合后再写入到compaction output之中的, 这样的方式面对大宽表场景会很容易把内存打满导致失败. Doris在最近的版本中引入了vertical compaction, 从整行一起compaction变成了按照列组的方式对不同的组进行compaction, 大大降低了compaction的内存消耗. 本文主要是根据代码解析到底是如何做到的. 本文不会涉及rowset的选择策略, 也不会涉及merge时读取数据聚合的过程,主要关注的是如何写不同的segment.

调用路径

compaction的触发方式有两种, 手动http触发, bg线程轮询触发. 本文主要介绍后者,前者比较简单,直接从compaction_action.cpp入手即可.

首先compaction任务是由_compaction_tasks_producer_thread周期性扫描磁盘上的tablets的情况后调用StorageEngine::_submit_compaction_task产生的. 对于每一个被挑选的tablet,会调用Tablet::prepare_compaction_and_calculate_permits计算其所需要的permits. 这一步具体的逻辑是各个compaction子类的prepare_compacte.

prepare_compacte主要的逻辑是从这个tablet下属的所有rowsets中挑选最合适的连续的一连串rowset(根据不同的compaction类型会有不同的策略)进行compaction. 刚刚提到的compaction需要的permits则是所有被挑选出来的rowsets的compaction score之和.

在顺利执行完上面的步骤后即可向不同的compaction线程池提交对应的任务, 任务主要的逻辑是调用对应tablet实例的execute_compaction方法. 而这个方法的本质是调用execute_compact方法进而调用不同compaction子类实现execute_compaction_impl().

这些execute_compaction_impl()方法之中都是先上锁然后计算compaction需要的permits然后调用Compaction::do_compaction(int64_t permits)方法. 其实这个方法的逻辑则是调用do_compaction_impl(int64_t permits)(如果开启了checksum配置则还会在调用前后执行检查checksum).

do_compaction_impl(int64_t permits)

这个函数的逻辑很长, 一点一点来解析他干了什么

linenums
1
2
3
4
5
6
Status Compaction::do_compaction_impl(int64_t permits) {
if (handle_ordered_data_compaction()) {
// 检查segment间是否无重叠
Compaction::do_compact_ordered_rowsets();
}
}

如果被选取的rowset之中的数据彼此间是顺序的(比如读取segment里面的zonemap发现min max区间是无重复的)并且segment文件大小合适, 那就可以很方便地compaction起来甚至不需要读取每个segment的数据聚合起来重新写到一个新的大的segment文件中. 可以简单的将input rowset下的segment文件硬链接到output rowset下使用新的segment id即可.

1
2
3
4
5
6
7
8
9
10
11
12
Status Compaction::do_compact_ordered_rowsets() {
construct_output_rowset_writer();
auto seg_id = 0; // 新的目录下seg文件的新名字
for (const auto& rowset: _input_rowsets) {
rowset->link_files_to(_tablet->tablet_path(), _output_rs_writer->rowset_id(), seg_id);
seg_id += rowset->num_segments();
// 记录rowset的key bounds
}
// 向output rowset的meta信息里写数据
// 包括刚刚记录的key bounds
// 数据大小等
}
linenums
1
2
3
4
5
6
7
8
9
10
11
12
13
14
Status Compaction::do_compaction_impl(int64_t permits) {
// 处理ordered compaction

construct_input_rowset_reader(); // 从所有input rowset上构建reader, 之后需要读所有rs的数据聚合才是output的数据
construct_output_rowset_writer();

if (vertical_compaction) {
Merger::vertical_merge_rowsets(_tablet, compaction_type(), _cur_tablet_schema,
_input_rs_readers, _output_rs_writer.get(),
get_avg_segment_rows(), &stats);
} else {
// 非vertical compaction的逻辑就不梳理了, 比较直观(当然代码不一定直观==)
}
}

接下来是vertical compaction的逻辑到底是怎么走的

vertical_merge_rowsets

第一步先看整个逻辑的入口也就是vertical_merge_rowsets这个函数, 先分析下他的入参

1
2
3
4
5
Status Merger::vertical_merge_rowsets(TabletSharedPtr tablet, ReaderType reader_type,
TabletSchemaSPtr tablet_schema,
const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment,
Statistics* stats_output)

第一个tablet表示是哪个tablet被选出来做compaction, 因为会涉及到修改元数据所以需要拿到tablet并获取其中的锁. 第二个reader_type中记录的是不同的compaction的类型, tablet_schema记录表的schema属性(比如key类型, 列的数量)之后切分列组时也会用到. src_rowset_readers里记录的是之前prepare_compaction中选出的所有参与compaction的rowset, 这些rowset对应不同版本的数据. dst_rowset_writer是用来构造compaction的结果rowset的其表示的范围是src_rowset_reader中的最小值到最大值的区间, max_rows_per_segment是最后生成的output rowset下的每个segment文件之中最多有多少行数据, stats_output里面记录的是compaction的stats包括merge了多少列,结果是多少列等.

接下来的逻辑中第一步是划分列, 我们考虑下为什么需要划分. 在建表的时候用户可以指定dup key, uniq key, agg key等key列, 每一行数据在逻辑上都是通过key列来辨识的, key列不同的时候就一定不是同一行. 在进行compaction的时候为了保证数据不重不漏, 需要先将key列组进行compaction, 这样整行的数据都是按照key列在遍历就不会有遗漏. 而key列的顺序也表示了最终数据的顺序. 在拆分出key列之后僧下的value列按照config::vertical_compaction_num_columns_per_group的大小为一组分成多个组.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// tablet_schema中拿到表的key列和列数等属性, column_groups中记录的是不同列组的下标集合
void Merger::vertical_split_columns(TabletSchemaSPtr tablet_schema,
std::vector<std::vector<uint32_t>>* column_groups) {
uint32_t num_key_cols = tablet_schema->num_key_columns();
uint32_t total_cols = tablet_schema->num_columns();
std::vector<uint32_t> key_columns;
// 前面几列都是key
for (auto i = 0; i < num_key_cols; ++i) {
key_columns.emplace_back(i);
}

// 根据不同key类型进行不同的处理

std::vector<uint32_t> value_columns;
// 按照个数拆分个多个group
for (auto i = num_key_cols; i < total_cols; ++i) {
if ((i - num_key_cols) % config::vertical_compaction_num_columns_per_group == 0) {
column_groups->emplace_back();
}
column_groups->back().emplace_back(i);
}
}

接下来的一行代码vectorized::RowSourcesBuffer row_sources_buf(tablet->tablet_id(), tablet->tablet_path(),reader_type);的作用目前还不明显, 我们可以暂时简单地理解为他是用来记录行的. 而这个类的实现可以概括如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class RowSourcesBuffer {
public:
RowSourcesBuffer(int64_t tablet_id, const std::string& tablet_path, ReaderType reader_type)
: _tablet_id(tablet_id),
_tablet_path(tablet_path),
_reader_type(reader_type),
_buffer(ColumnUInt16::create()) {}

~RowSourcesBuffer() {
_reset_buffer();
if (_fd > 0) {
::close(_fd);
}
}

// write batch row source
Status append(const std::vector<RowSource>& row_sources);
Status flush();

RowSource current() {
DCHECK(_buf_idx < _buffer->size());
return RowSource(_buffer->get_element(_buf_idx));
}
void advance(int32_t step = 1) {
DCHECK(_buf_idx + step <= _buffer->size());
_buf_idx += step;
}

private:
Status _create_buffer_file();
Status _serialize();
Status _deserialize();
void _reset_buffer() {
_buffer->clear();
_buf_idx = 0;
}

int64_t _tablet_id;
std::string _tablet_path;
ReaderType _reader_type = ReaderType::UNKNOWN;
uint64_t _buf_idx = 0;
int _fd = -1;
ColumnUInt16::MutablePtr _buffer;
uint64_t _total_size = 0;
};

里面存储的是多个RowSource, 每个rowsource之中是一块连续内存(顾名思义保存row的), 每次调用append将多个RowSource写到MutablePtr _buffer之中. 如果超过了_buffer的大小则会先序列化到磁盘并从_buffer头部开始继续写入.

接下来的逻辑是对于每一个column group进行一次merge, 也就是按照多列一起来merge. 这里也是和horizontal compaction的最大区别, 按照列组来进行merge显而易见的好处是内存压力会小很多, 假设列大小是均匀分布的, 平均每一行的内存消耗几乎只有最大列组数大小/列数(当然实际情况里肯定是非常不均的).

1
2
3
4
5
6
7
8
9
10
for (auto i = 0; i < column_groups.size(); ++i) {
bool is_key = (i == 0);
RETURN_IF_ERROR(vertical_compact_one_group(
tablet, reader_type, tablet_schema, is_key, column_groups[i], &row_sources_buf,
src_rowset_readers, dst_rowset_writer, max_rows_per_segment, stats_output));
if (is_key) {
RETURN_IF_ERROR(row_sources_buf.flush());
}
RETURN_IF_ERROR(row_sources_buf.seek_to_begin());
}

深入到vertical_compact_one_group之中, 传递进来的参数主要关注tablet_schema, src_rowset_reader, dst_rowset_writer的使用.
row_source_buf作为将数据读到内存时的内存缓冲传递给VerticalBlockReader的构造函数之中, 之后VerticalBlockReader读取input_rowset并聚合出数据后按照堆排序排序存入row_source_buf之中, 下面按照VerticalHeapMergeIterator::next_batch的部分代码为例说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
Status VerticalHeapMergeIterator::next_batch(Block* block) {
size_t row_idx = 0;
VerticalMergeIteratorContext* pre_ctx = nullptr;
std::vector<RowSource> tmp_row_sources; // 读取的每一行数据
// 下面是按照堆排序读取row
while (_get_size(block) < _block_row_max) {
auto ctx = _merge_heap.top();
_merge_heap.pop();
// 先收集多个row
if (ctx->is_same() &&
(_keys_type == KeysType::UNIQUE_KEYS || _keys_type == KeysType::AGG_KEYS)) {
// skip cur row, copy pre ctx
++_merged_rows;
} else {
ctx->add_cur_batch();
if (pre_ctx != ctx) {
if (pre_ctx) {
RETURN_IF_ERROR(pre_ctx->copy_rows(block));
}
pre_ctx = ctx;
}
if (ctx->is_cur_block_finished() || row_idx >= _block_row_max) {
// current block finished, ctx not advance
// so copy start_idx = (_index_in_block - _cur_batch_num + 1)
RETURN_IF_ERROR(ctx->copy_rows(block, false));
pre_ctx = nullptr;
}
}

RETURN_IF_ERROR(ctx->advance());
if (ctx->valid()) {
_merge_heap.push(ctx);
} else {
// push next iterator in same rowset into heap
}
}
// 这里就是将读取出来的row都放进`RowSourceBuf`之中
RETURN_IF_ERROR(_row_sources_buf->append(tmp_row_sources));
if (!_merge_heap.empty()) {
return Status::OK();
}
return Status::EndOfFile("no more data in segment");
}

现在我们回到vertical_compact_one_group之中, 刚刚说了row_source_buf的用处,现在来说一下这个VerticalBlockReader, 这个reader实际上是用来读取从input rowset之中数据的,读取出来的数据都放到了刚刚提到的row_source_buf之中, 既然要读取数据这里就涉及到数据的schema的问题. 比如考虑这样的情况, 一开始有colA, colB然后插入数据(1,2), 之后delete where colB = 2, 然后drop掉columnB, 现在add column columnB. 新的column B和之前的column B是不同的.

在往下有一行reader_params.return_columns = column_group;, 因为vertical compaction的逻辑是按照列组进行compaction的也就是说读取数据的时候只需要读取某几列即可,具体读取哪些列的数据就是靠reader_params.return_columns决定.

接下来是读取聚合后的数据并且写入output rowset的逻辑.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
size_t output_rows = 0;
bool eof = false;
while (!eof && !StorageEngine::instance()->stopped()) {
// Read one block from block reader
// reader通过reader_params.set_read_source拿到了所有input rowset
// 然后根据不同的key类型选择不同的聚合方式, 每次读取一个block的数据
RETURN_NOT_OK_STATUS_WITH_WARN(reader.next_block_with_aggregation(&block, &eof),
"failed to read next block when merging rowsets of tablet " +
std::to_string(tablet->tablet_id()));
// 这个函数名字叫add_columns, 但其实可能叫add_columns_block_data更直观一点
// 主要作用是将这个block写入segment文件中
RETURN_NOT_OK_STATUS_WITH_WARN(
dst_rowset_writer->add_columns(&block, column_group, is_key, max_rows_per_segment),
"failed to write block when merging rowsets of tablet " +
std::to_string(tablet->tablet_id()));

output_rows += block.rows();
block.clear_column_data();
}

从刚刚的代码逻辑里可以看到这个while循环会将input rowset里对应column_group的所有数据全部读出来然后写进output rowset之中. 这里要注意下只读取了对应column_group并写入. 这里也是为什么vertical compaction消耗的内存会低很多. 感兴趣的读者可以看一下Merger::vmerge_rowsets的实现,基本的逻辑几乎一模一样,最大的区别是一个读取的数据是row的部分,一个是整个row全部会读取.

到目前为止我们从merger的角度大概梳理了代码的流程,接下来我们看一下vertical_beta_rowset_writer的实现来理解vertical compaction的segment文件的数据写入.

vertical_beta_rowset_writer

vertical_beta_rowset_writer::add_columns

在vertical compaction中写入数据其实就是调用了这个add_columns接口, 我们从这个函数开始分析具体实现.
add_columns需要判断是否某一个column group写满了一个segment的对应区域, 如果写满了需要创建一个新的segment或者切换到下一个segment,但是又因为一个segment里面需要写多个column group, 所以得把segment都保留着,在最后一个 column group写完后才全部下刷持久化产生可用的文件.
先写key column group, 同时也只有key column group可能会创建新的segment.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
Status VerticalBetaRowsetWriter::add_columns(const vectorized::Block* block,
const std::vector<uint32_t>& col_ids, bool is_key,
uint32_t max_rows_per_segment) {
// 检查rows数以及更新每个segment最多有多少rows

if (_segment_writers.empty()) {
// 如果一个segment 都没有就创建一个新的segment并写入数据
RETURN_IF_ERROR(_create_segment_writer(col_ids, is_key, &writer));
_cur_writer_idx = 0;
RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->append_block(block, 0, num_rows));
} else if (is_key) {
// 如果当前的segment写过的行数已经满了
if (_segment_writers[_cur_writer_idx]->num_rows_written() > max_rows_per_segment) {
// segment is full, need flush columns and create new segment writer
RETURN_IF_ERROR(_flush_columns(&_segment_writers[_cur_writer_idx], true));
// 创建一个新的segment并跳到对应的新segment
RETURN_IF_ERROR(_create_segment_writer(col_ids, is_key, &writer));
++_cur_writer_idx;
}
RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->append_block(block, 0, num_rows));
} else {
if (_cur_writer_idx == 0 && num_rows_written == 0) {
// init的作用是更新segment中对应的column id(其实就是上一轮column group写完了需要换到下一个column group了)
RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->init(col_ids, is_key));
}
// 如果会将当前segment写满(也就是rows超过限制),那为了防止溢出智只会写一部分数据
// 并且写完后切换到下一个segment继续写
if (num_rows_written + num_rows >= num_rows_key_group &&
_cur_writer_idx < _segment_writers.size() - 1) {
RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->append_block(
block, 0, num_rows_key_group - num_rows_written));
// 切换前需要flush写将当前segment的写flush了来
RETURN_IF_ERROR(_flush_columns(&_segment_writers[_cur_writer_idx]));
start_offset = num_rows_key_group - num_rows_written;
limit = num_rows - start_offset;
++_cur_writer_idx;
// switch to next writer
RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->init(col_ids, is_key));
num_rows_written = 0;
num_rows_key_group = _segment_writers[_cur_writer_idx]->row_count();
}
// 这里既可能是刚切换完segment也可能是这次写入不会写满segment
if (limit > 0) {
RETURN_IF_ERROR(
_segment_writers[_cur_writer_idx]->append_block(block, start_offset, limit));
DCHECK(_segment_writers[_cur_writer_idx]->num_rows_written() <=
_segment_writers[_cur_writer_idx]->row_count());
}
}

}

接下来简单说一下创建segment writer的逻辑, Doris中数据文件实际上是通过segment在表示的,一个segment上的数据是按照列存形式管理的,所以创建segment实际上需要创建一个对应的file以及file writer. 另外在vertical compaction中由于每次写segment是按照列组在写的,所以对于一个segment其每次写入表示的column的id也是有所不同的,所以存在多次init的情况. 其实不同的列对应的就是不同的column writer,每一次init根据传递进去的column id都会构造新的column writer.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Status VerticalBetaRowsetWriter::_create_segment_writer(
const std::vector<uint32_t>& column_ids, bool is_key,
std::unique_ptr<segment_v2::SegmentWriter>* writer) {
// segment 文件的路径
auto path =
BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, _num_segment++);
auto fs = _rowset_meta->fs();
io::FileWriterPtr file_writer;
fs->create_file(path, &file_writer);
writer->reset(new segment_v2::SegmentWriter(
file_writer.get(), _num_segment, _context.tablet_schema, _context.tablet,
_context.data_dir, _context.max_rows_per_segment, writer_options, nullptr));
// 因为一个rowset可能有多个segment, 所以都通过_file_writer管理
{
std::lock_guard<SpinLock> l(_lock);
_file_writers.push_back(std::move(file_writer));
}

auto s = (*writer)->init(column_ids, is_key);
}

vertical_beta_rowset_writer::flush_columns

上文有说到,vertical compaction是按照column group在进行数据的读取和写入的,但是考虑到Doris的数据表示是通过segment进行的,一个segment上就应该有同一行的所有数据,自然而然会想到一个问题,假设segment数量有多个,那么按照column group行读取部分列的方式进行写一定会出现一个segment交替的过程. 比如说一共1000行数据,有2个column group(key group和value group), 分成2个segment. 那么先按照key group写segment1写了500行数据,之后写segment2写了500行数据,之后切换到value group从segment1开始写,这里一定有一个segment遍历的过程. 查看vertical_beta_rowset_writer的实现会发现一个std::vector<std::unique_ptr<segment_v2::SegmentWriter>> _segment_writers;size_t _cur_writer_idx = 0;后者便是用来迭代遍历segment的下标.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Status VerticalBetaRowsetWriter::flush_columns(bool is_key) {
RETURN_IF_ERROR(_flush_columns(&_segment_writers[_cur_writer_idx], is_key));
// 下一次从第一个segment开始写入
_cur_writer_idx = 0;
return Status::OK();
}
Status VerticalBetaRowsetWriter::_flush_columns(
std::unique_ptr<segment_v2::SegmentWriter>* segment_writer, bool is_key) {
uint64_t index_size = 0;
RETURN_IF_ERROR((*segment_writer)->finalize_columns_data());
RETURN_IF_ERROR((*segment_writer)->finalize_columns_index(&index_size));
if (is_key) {
_total_key_group_rows += (*segment_writer)->row_count();
// record segment key bound, 计算zonemap的值
KeyBoundsPB key_bounds;
Slice min_key = (*segment_writer)->min_encoded_key();
Slice max_key = (*segment_writer)->max_encoded_key();
key_bounds.set_min_key(min_key.to_string());
key_bounds.set_max_key(max_key.to_string());
_segments_encoded_key_bounds.emplace_back(key_bounds);
}
_total_index_size +=
static_cast<int64_t>(index_size) + (*segment_writer)->get_inverted_index_file_size();
return Status::OK();
}

现在回头看一下vertical compaction的代码,可以发现在Merger::vertical_compact_one_group中最后一行调用了VerticalBetaRowsetWriter::_flush_columns方法,也就是在每一个column group结束时将segment从0开始. 用一个伪代码表示则是

1
2
3
4
5
6
7
8
9
10
11
12
13
Merger::vertical_merge_rowsets() {
for (column_group: column_groups) {
while (!eof) {
reader.next_block_with_aggregation();
dst_rowset_writer->add_columns(column_group);
}
dst_rowset_writer->flush_columns();
}
dst_rowset_writer->final_flush();
}

// final_flush中对每个segment调用finalize footer写入每个column的元数据信息, 在rowset_writer->build()时
// 持久所有数据

Abstract

大部分数据库都将调度策略委托给了操作系统本身,这个策略虽然能够简化数据库的设计但是也会带来一些问题. 比如在面对并发查询的时候自适应的资源分配就便得很难做,除此以外要在数据库中做一些调度调优也变得很困难(因为实际上更多的还是靠os自己在进行调度). 所以很多现代的现代都通过将一整条query的执行拆分成多个小的独立的任务以此来实现task-based并行. 基于task就使得数据库系统自己就能进行调度.

这篇论文主要是展示如何在task-based的设计上进行一些优化,论文作者提出了一种针对分析型workload的创新的无锁,自调优调度器. 通过动态地调整任务的优先级以及任务对应的粒度提供了很高的调度弹性. 即使在大压力导入下依旧能为短查询提供接近最低的延迟.

Introduction

分析型数据库面临的workload特别复杂,在高压导入时可能还有并行抵达的各种查询. 很多系统很难在这种压力下保持良好的查询性能.

下图作者以他们的系统和PG的进行了一个对比,在高频导入下的查询延迟的变化. Workload包括3/4的短查询和1/4的长查询. 同时系统的导入压力为其最大压力的95%并持续了25分钟.
Query Type

处于这种状态的系统对于用户来讲响应度变得更低,查询性能也变得难以预测在不同的时间跑相同的查询可能会收获不同的性能. 对于用户来说,在高压场景性能降级的影响应该尽量低. 类似PG的系统将执行的调度责任转交给了操作系统. 这类系统对每一次新的连接和查询都可能创建新的线程或者进程(当然更好的方式是池化), 在执行的过程中可能也会创建更多的线程进程来进行query内的并行. 为了避免线程数超过OS线程一般也会尽量控制线程的数量.

现代化的系统一般会做更细粒度的调度,会实现基于task-based的调度系统,一条query会被拆分成多个独立的任务,这些任务都可以在任何一个OS线程上执行,同时任务的调度也由数据库系统接手而非被动等待OS调度。数据库系统也可以很方便的基于任务数派发给不同的线程以动态调整query的并行度. 一些系统比如SAP HANA的调度策略是和OS 调度器共生的.

其他的比如HyPer和Umbra则是几乎不依赖OS的调度. 在启动时他们便启动同CPU核数相同数量的线程. 之后使用morsel-driven的方式进行调度. 一个morsel表示一个tuples的固定集合,morsel时query执行时的最小单元. 因为同一条queried的不同morsel可以并行执行所以这种方式可以实现query内并行和query间并行.

类似intel tbb的通用调度器更关注的是吞吐, 数据库系统更关注的是公平性和查询的响应度. DB系统调度器为了在高压导入时依旧能保证低延时会更倾向于执行short running query. 除此以外, 新开启任务的粒度也可以在执行时自适应调整.

本篇论文主要提出了一种创新的无锁自调优调度器. 接下来的内容主要是 Section2 调度器设计. Section 3 关注于morsel-driven数据库系统. 如何通过morsel数据结构让调度变得更鲁棒更可预测. Section 4, 在调度器上加上自适应调优.

Scalable Task Scheduling

在task-based的系统中挑选任务的步骤是在用户态进行的,所以调度策略需要尽可能地可拓展并且利用好硬件资源.

Background

假设$t_i$表示第i个任务,$p_i$表示任务的优先级. 每一个task被赋值stride $S_i=(p_i)^{-1}$. 假设所有任务在同一时间到达那么调度策略就变得很简单. 每一个task被映射到一个$P_i$, 这个值一开始设置为0. 之后的处理方式是: 拥有最小pass的任务被选出来执行一段时间片. 执行完后将pass更新为$P_i + S_i$. 任务$t_i$拥有的执行资源比例是$p_i / \sum_{k=1}^{n} p_k$. Stride调度策略在所有任务都拥有相同优先级的情况下时公平的.

但是如果要适配动态变化的任务则需要有一些修改. 如果一个任务在任意时刻加入则其需要一个初始值. 本文的scheduler会维护一个全局的stride $S_G$ = $\left(\sum_{k=1}^{n} p_k\right)^{-1}$ 以及一个全局的pass $P_G$. 每过一个调度时间片这个全局的pass都要增加一次全局的stride. 现在这个全局的pass可以用来给一个新到的任务计算初始的initial pass值. 只觉上可以认为这个全局的pass值表示scheduler的时间. 如果任务的pass值比全局pass值$P_G$小那么这个任务还没拿到他应该拥有的资源,反之比全局pass大则是已经使用了太多资源.

Stride调度可以很简单地被拓展到非抢占式设置. 如果一个任务$t_i$消耗了其分配的时间片的$f$那么他的pass就被更新为$P_i$+$fS_i$.同样的全局pass也变成$P_G$+$fS_G$. $f$在这里可能大于1.

Scheduling in Umbra

TaskStructureOfUmbra

每一条pipeline被映射成一个task set. 每一个task set中包含数个task,每个task由数个morsel组成. 同一条query可以有多个task set,这些task set都属于同一个resource group. 如上图中task set 1和2都属于rg1,因为rg1有两条pipeline. 而rg2只有一个所以只有一个task set. 每一个task可以有多个morsel,而morsel有3种状态, finished, running, pending. Worker thread每次pick一个task执行.

同一个task set的task可以被不同的worker thread并行执行以此来提供query的并行度. query的不同pipeline间可能有顺序依赖所以task set间也是存在顺序依赖的. 例如上图中左边的查询里的pipeline A部分必须在右边的pipeline B开始前完成. 因为hash join的build side必须比probe side先物化. Umbra通过将两条pipeline映射成顺序的task set来保证,在同一个resource group之中的task set必须等待其旗面的task set全部结束后才能开始执行. 这也方便我们在query的粒度追踪资源消耗.

类似Hyper的系统中task和morsel是一比一的关系也就是说一个task只有一个morsel. 但是umbra中一个task可以有任意个morsel. 在umbra中task并不是一开始就被静态创建好了的. 而是在运行时根据运行时的观测性动态调整的.

Thread local scheduling

Stride scheduling虽然能提供很强的确定性调度粒度但是他对现代多核硬件并不友好,因为会需要很多同步操作. 本章主要是提出了一种创新的task-based stride 调度实现. 可以在thread-local的底座上执行所有调度决策. Worker线程只会在活跃的task sets发生变化被notify.

当然,和传统的stride scheduling相比也增加了一些限制, 同时存在的resource group(也就是query的数量)是有上限的(但其实这样挺合理的,不做点反压系统搞不好直接被打爆了). 当数量特别多的时候会有一定的性能降级,新的resource group会在任务队列中等待.

本文的设计比较巧妙,虽然是针对stride scheduling algorithm的,但也可以通过仅修改thread-local scheduling的逻辑不需要修改别的部分从而切换到别的调度算法. 作者仅通过修改不了不到100行C++代码就实现了非确定性的lottery调度.

Thread local Decisions

全局scope内会维护一个数组,其中每一个slot是一个指针,指向某一个活跃的resource group中的task set. 当某一个RG的task set执行完毕后,会从RG中选一个新的task set放到同一个slot上. 这样的好处是调度时的优先级是绑定到了RG上而不是task set上从而简化了调度. 如果同一个RG的task sets可以被放到不同的slot会增加记账的逻辑.

除此以外所有的调度都是thread-local里发生的. 其中包括一个bitmask,它的作用是记录全局RG数组中的活跃项. 同时也负责优先级和pass value的映射记录. 除此以外每个worker thread也会存储其自己的global pass value. 就如下图所示.

SchedulingStructrue

如果全局的slot和worker thread的local activity mask是同步的那么调度就很简单, 直接挑选pass value最小的slot,然后在这个slot上进行一个atomic read以获取指向对应task set的指针. 之后便是执行,记录执行时间后更新本地对应的local pass value. 这种处理模式非常轻量, worker在pick 任务的时候不需要别的线程是否pick了相同的task set. 同时全局array中的slot只有在有新的task set的时候才会被写入新数据. 这类写操作发生的频率是比较低的,就不会带来大量的cache无效化同步开销.

修改active task sets

Figure4

大部分情况下会尽力最低化同步开销,但是有些全局的信息还是无法避免同步. Worker线程在以下3种事件时需要能过检测到

  1. 某一个全局array的slot里的task set执行完毕了. Worker必须把本地slot也disable掉(因为这个slot可能之后没有task set了 也就是这个query可能执行完了)
  2. 一个新的RG被赋值给一个slot时的初始化task set. worker需要pick这个RG对应的初始pass value和优先级. 并且更新local activity bitmask对应位置
  3. 全局slot对应位置的active RG中插入一条新的task set时(比如之前的task set执行完了就切换到下一个task set). worker需要更新本地的active slot并且设置初始pass value(在1中被disable掉了)

第一种事件可以只标记对应的pointer而无需通知worker. worker线程拿到slot时读取pointer就能发现已经是disable的状态并更新自己的local 数组

第二种和第三种事件需要引入新的组件. 在每一个worker中会维护两个原子的bitmask. 当worker在全局的slot中插入新的task set时, 所有worker的bitmask都会对应的更新. 两个bitmask分别称为change mask和return mask. 事件2更新change mask事件3更新return mask. 之后用update mask统称两个mask.

更新update bitmask的方式很简单,假如现在全局数组中的第k个slot接收到了一个新的RG的初始task set, 则只要更新每个worker的change mask的第k位为1. 通过原子的fetch or操作即可轻量级更新每一个worker. 类似地,worker线程首先通过原子的exchange操作将update mask都设为0并拿到刚刚被设置的值,并通过刚刚的值来获取全局的slot的状态以此决定是否更新调度策略. 如果根据bitmask获取的值没有特别大的变化则可以避免cache无效化(其实主要是是否更新worker对应的active task set, 如果不需要更新则可以继续pick对应的任务)。 例如在上图中, 第二幅图里全局slot中插入了两个新的task set. 所有worker的update mask也进行了对应的更新. 这里用了return mask来表示TS2是从一个已知的RG中来的,用change mask表示TS3是从一个全新的RG中来的. 此时所有的worker都还在执行TS3的task 他们在第二张image的时候还不会去拉取bitmask的更新到本地的调度状态里.在第三幅图片时第一个worker会将update mask中的信息同步更新, worker 2还在执行TS3的任务不会去更新. 这张figure说明了两个workers不需要同步各自的active task sets.

Task Set Finalization

如果RG中的task set A结束了那么需要激活他的下一个task set B(如果存在的话). 只有在task set b的所有前置task set都执行完后才会发生这一步. 当然为了灵活度考虑也允许task sets执行额外的finalization steps. 比如在sort时执行partitions的shuffling或者在grouping时merge部分的聚合结果.

worker会在试图从没有剩余的task的task set获取任务时被notified. 咋一想我们或许可以在这种事件发生时立即开启finalization. 但是别的worker可能还在执行刚刚从这个task set里拿到的task, 如果立马去finalization是很不对的,因为我们必须要在task set的所有任务都结束后才执行finalization, 这种情况也不应该一直等待别的task 完成. 为了解决这个情况Umbra的scheduler引入了一种轻量级的finalization phase.

当一个worker 挑选了一个slot进行执行时它会在全局的state数组上更新自己的决策(发生在原子read全局slot数组前). 之后第一个发现task set耗光了的worker会作为finalization phase的coordinator.

coordinator需要保证最后一个结束对应task的线程能够调用finalization逻辑. coordinator首先在全局slot 数组上将对应的slot里的pointer标记为无效, 这样之后pick这个slot的worker也会将其对应的local slot设置为无。 之后coordinator遍历全局state数组查找有哪些worker还在执行这个task set的task. 对于每一个满足条件的slot信息加上一个专用的finalization marker. 所有被mark的worker thread在结束他们当前的task之后必须显式注销这个task set. 只需要对每一个task set设置一个原子的counter即可做到. coordinator会在遍历完后将counter设置为其成功设置marker的worker数量(不能是设置一个marker就+1一次,一个是这样其实不高效另一个是这样不好确认到底是不是全部执行完了). 对应的worker thread在每次执行完一个task后检查全局state数组是否包含finalization marker,如果有则对counter减1(注意,这个counter可能变成负数, 因为有个worker可能在coordinator遍历完以前就执行完并减1了). 将counter置为0的worker可以保证是最后一个(因为这个counter只能一次性增加,就避免了刚+1就-1的情况), 他可以执行最终的finalization逻辑-> 在当前RG中查找是否有剩下的task set,如果有就设置如果没有就去全局wait queue中获取新的 RG.

这个finalization phase的开销几乎只是对全局state 数组的更新. 只要别的worker不在coordinator更新对应的slot的时候试图去pick task set则不会发生竞争. 并且只有pin了相同RG的worker才可能被影响. 所有会被影响到的线程理论上并不多(特别是在query比较多的场景,只要不是所有的worker都在处理同一个RG其实很难有特别多竞争).

Robust Morsel Scheduling

上一章介绍了lock-free的stride scheduler. 这一章介绍在此基础上利用morsel-based task进行的优化.

首先介绍如何将morsel-driven parallelism变得robust. 传统方式task:morsel的比例导致task粒度有方差从而导致不理想的调度失真. 调度的开销变得很难预测而且会有worker长时间被阻塞. 本文作者通过标准化task的执行时间来保证了调度开销可预测以及强响应度.

第二步是利用数据库的领域知识来优化混合分析负载中的查询延时. 通过优先级策略,本文的stride scheduler提供了细粒度的资源消耗控制. 而这是通过自适应的优先级调整策略做到的.

Adaptive Morsel Execution

类似Hyper的系统中morsel:task是1比1,但其实不同的morsel的执行耗时很很不同的(因为不同的pipeline执行的逻辑本身也不同,复杂度也不同)
. 比如假设一个pipeline中时简单的selectiton然后插入到hashtable, 那么在相同的morsel大小的情况下他的执行时间肯定是比一条包括复杂的字符串匹配和一系列hash table probe的pipeline要短的. 这意味着对于scheduler来说不同任务的粒度是差别很大的. 如果继续采用这种1:1映射那么在选择morsel大小时就需要考虑不同的tradeoff. 如果太小那么会产生更多次的schedule相应的schedule开销也更大, 如果太长, 那么有的morsel在有的pipeline中执行时间太长会影响整个系统的响应度.

传统的模式在两个维度是静态的:1. 依赖于固定的morsel大小 2. 依赖于静态的morsel和task的映射.

本文提出了一个创新的渐进式框架. 调度器定义一个目标时间$t_max$. 在挑选任务的时候调度器尽量调度能够准确满足这个目标的morsels. 也就是说这个框架能够1.使用动态的不定size morsel大小 2. 执行不同数量的morsels. 另外调度器对task的结构是无感的,这也让整个调度压力变得可预测了. Umbra将这个值设置为2ms. 作者认为这个大小兼顾了调度的负担以及响应度. 另外据作者的观测调度策略一般只需要不到1ms就能决定,也就是说负载大概也就0.05%

通过将pipeline转换为状态机以不同的执行阶段采取不同的策略来达到目标时间. 不同的状态暗示了需要pick多少个morsel. 因为morsel是在运行时从tuples的集合中算出来的,所以动态地通过不同的morsel size填充task size是可行的.

Default state

默认状态时会试着挑取一个能够满足$t_max$的morsel. 这需要系统能够提供一个准确的吞吐估算值$T$, 表示每秒多少tuple. 这样一个morsel则是T*$t_max$的tuples. 而当这个morsel执行完后便能拿到实际的执行时间$t$. 对刚刚的吞吐估算值进行一个修正$\hat{T}=(T\cdot t_{\max})/t$. 根据旧的吞吐量T和系数$\alpha \in [0, 1]$, 那么新的则是$T^{\prime}=\alpha\hat{T}+(1-\alpha).T$.

Startup State

这个state用来提供一个初始的吞吐估算. 这个state会按照指数关系不停pick不同size的morsel指到达到$t_max$

Optimizations

还引入了两个优化. 一个是除了上述两种state还有一个shutdown state. 通过当前的吞吐估算值以及剩余的tuples数量可以大概计算出剩余所需的pipeline执行时间. 假设我们有M个worker, 一旦可预测的剩余的总时间低于$W\cdot t_{max}$就会进入shutdown state. 假设预期剩余总时间为t,morsel的最短执行时间为$t_min$, 我们将morsels调度为$\max(\frac{t}{W},t_{\min})$. 另一个优化是为了应对不支持自适应morsel size的tasks. 这类必须高效处理因为有些任务不具有高度的自适应. 如果运行时发现有的任务消耗的只是target duration的一部分可以允许这类任务继续消费直到达到$t_max$

Evaluation

MorselSizeComparison

这个图能看出来不同阶段的效果, 特别是startup阶段的两倍指数

Adaptive Query Priorities

不同的任务可以有不同的优先级. 数据库不应该让用户处理workload管理中的繁琐细节. Umbra利用自适应查询优先级来透明化地按照短作业优先的方式处理. 不需要用户输入而是在运行时根据query的性质赋予优先级. 首先定义”desirable”延迟. 假设所有的query都是一样重要的. 在这个假设下提出两个基础原则.

  1. 查询延迟在load时也得保持可预测性.

如果系统同时接受两条query, 短一些query需要先结束.

  1. 查询延迟需要尽量低.

如果数据库遵守这两条那么是可以做到可预测和高性能的. 不过公平调度只能保证1不能保证2. 因为短作业一般不会特别影响到长作业的延迟. 比如我们有9成的短作业他们耗时10ms,只有一成的长作业,他们耗时1s. 即使使用短作业优先的方式,对于长作业也只有10%的影响.

本文提出的自适应优先级策略就是为了透明地给短作业赋予优先级. 处理起来和多级反馈队列很像,一个查询的优先级取决于他到目前为止消耗的CPU资源. 因为作者将query包装成了一个RG, 所以查询的优先级其实也就是query的优先级. 可以按照如下公式处理:

$P_{i+1}=\begin{cases}P_{i},i<d_{start}\\max(P_{min},\lambda P_{i}),i\geq d_{start}\end{cases}$

有3个参数, $d_start$表示RG的优先级开始衰退的时间, 衰退的速度控制在$\lambda\in[0,1]$. 优先级最低到$p_min$.

和公平调度相比,这种调度策略能提供更大的相对性能差异. 但是为了实现短作业任务的高性能也是必须的. 尽管如此还是保证了原则(1). 同时到达的两条query的优先级退化速度是一致的也就是说短的一个会在同样的资源分配下先结束.

自定义优先级

当然也可以让用户自己定义优先级. 有两种简单的方式实现

  1. 特别重要的query可以允许用户设置一个不同的静态初始优先级并且这个优先级是静态的不会衰退
  2. 也可以将优化级和用户绑定. 这样用户优先级可以影响他的所有query的衰退速度.

Self-tuning Scheduling

TODO

介绍

这篇博文主要是结合笔者以前在字节跳动实习时在内部做的一次分享以及近年来的一些新的感悟来谈一下协程的部分内容,这篇博文不会过多讨论协程如何进行调度以及如何进行任务的组合等部分,主要是围绕为何会有协程,协程的分类以及不同的实现,另外也简单聊一下使用协程的心得.

为什么我们想要协程

亦可参考Google于13年的一次分享

首先为什么会想要使用协程呢?先从网络编程中几种模式说起,这里我们基于asio的代码来讨论,先用最传统的阻塞式的网络编程模式实现echo server就如同如下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
int main()
{
try
{
boost::asio::io_context io_context;
tcp::acceptor acceptor(io_context, tcp::endpoint(tcp::v4(), 8000));
while (true)
{
tcp::socket socket(io_context);
acceptor.accept(socket);
boost::system::error_code error;
while (true)
{
char data[1024];
size_t length = socket.read_some(boost::asio::buffer(data), error);
if (error == boost::asio::error::eof)
break; // Connection closed cleanly by peer.
else if (error)
throw boost::system::system_error(error); // Some other error.
boost::asio::write(socket, boost::asio::buffer(data, length));
}
}
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << std::endl;
}
return 0;
}

因为使用的是阻塞的模式,这段代码的吞吐量特别慢,因为每次只能处理一个连接的请求,并且一次连接的请求没能处理完完全不会去处理下一段。

有的人会想或许我们可以采取每一条连接都开启一个新线程进行处理的方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#include <iostream>
#include <thread>
#include <asio.hpp>

using namespace asio::ip;

void do_echo(tcp::socket socket)
{
try
{
char data[1024];
while (true)
{
std::size_t length = socket.read_some(asio::buffer(data));
asio::write(socket, asio::buffer(data, length));
}
}
catch (std::exception& e)
{
std::cerr << "Exception in thread: " << e.what() << "\n";
}
}

void do_accept(asio::ip::tcp::acceptor& acceptor, asio::io_service& io_service)
{
acceptor.async_accept(io_service,
[&acceptor, &io_service](std::error_code ec, tcp::socket socket)
{
if (!ec)
{
std::thread(do_echo, std::move(socket)).detach();
}

do_accept(acceptor, io_service);
});
}

int main(int argc, char* argv[])
{
try
{
if (argc != 2)
{
std::cerr << "Usage: echo_server <port>\n";
return 1;
}

asio::io_service io_service;
tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), std::atoi(argv[1])));
do_accept(acceptor, io_service);
io_service.run();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}

return 0;
}

上面这种形式在连接数较少时也是能work的,但是当连接数量达到几十万条甚至更多的时候呢?假设有10w个连接,Linux下每个线程的栈默认大小为8M,那么一下子就会消费80G的内存,再算上线程切换的开销,可以想象服务的负载会有多么大,这似乎并不是一条很靠谱的解决方案.
接下来我们试着用最简单的回调的方式进行异步化改造

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include <iostream>
#include <boost/asio.hpp>

using boost::asio::ip::tcp;

void session(tcp::socket socket)
{
std::array<char, 1024> buffer;
boost::system::error_code error;

// 异步读取来自客户端的数据
socket.async_read_some(boost::asio::buffer(buffer), [&](boost::system::error_code ec, std::size_t length)
{
if (!ec)
{
// 将数据回显回客户端
boost::asio::async_write(socket, boost::asio::buffer(buffer, length),
[&](boost::system::error_code ec, std::size_t /*length*/)
{
if (!ec)
{
// 继续异步读取来自客户端的数据
session(std::move(socket));
}
});
}
});
}
int main()
{
try
{
boost::asio::io_context io_context;
tcp::acceptor acceptor(io_context, tcp::endpoint(tcp::v4(), 8000));
while (true)
{
tcp::socket socket(io_context);
acceptor.accept(socket);
// 启动异步会话
session(std::move(socket));
}
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << std::endl;
}
return 0;
}

上面这段代码中所有的io操作都是异步的,即使只有一个线程的情况下也能提供很强的吞吐(参考nodejs),因为其中所有的IO操作都是非阻塞的,加上使用了IO多路复用技术可以保证一个线程中处理多个request时不会出现某个request的io卡住导致所有request全部饿死的情况。但是可以看到这种方式的代码中无可避免的会使用回调(echo server这种比较简单的情况只有2层回调,但是如果我们是http服务器呢?读到数据后先进行encode/decode,然后路由到对应的处理函数),当回调层数越深的时候丢失的代码上下文就越多,在debug的时候就更费劲.
另外异步+回调更容易让程序员迷失在对象的生命周期之中,一种很常见的问题是,对象A在线程1里析构了,但是被线程2访问了,这就会导致heap-use-after-free的问题。有的人会argue到使用shared_ptr来解决问题,但是如果shared_ptr引用的原子变量的开销在很多时候也不可小觑,更不用提enable_shared_from被滥用的话带来的问题。

下面我们用asio的协程来编写刚刚的功能.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
void report_error(std::string_view component, sys::error_code ec)
{
std::cerr << component << " failure: "
<< ec << " ()" << ec.message() << ")\n";
}

asio::awaitable<void> session(tcp::socket socket)
{
try
{
char data[1024];
for (;;)
{
std::size_t n = co_await socket.async_read_some(asio::buffer(data), asio::use_awaitable);
co_await async_write(socket, asio::buffer(data, n), asio::use_awaitable);
}
}
catch (sys::system_error const& e)
{
if (e.code() == asio::error::eof)
std::cerr << "Session done \n";
else
report_error("Session", e.code());
}
}

asio::awaitable<void> listener(asio::io_context& context, unsigned short port)
{
tcp::acceptor acceptor(context, {tcp::v4(), port});

try
{
for (;;)
{
tcp::socket socket = co_await acceptor.async_accept(asio::use_awaitable);
asio::co_spawn(context, session(std::move(socket)), asio::detached);
}
}
catch (sys::system_error const& e)
{
report_error("Listener", e.code());
}
}

int main()
{
try
{
asio::io_context context;

asio::signal_set signals(context, SIGINT, SIGTERM);
signals.async_wait([&](auto, auto){ context.stop(); });

auto listen = listener(context, 55555);
asio::co_spawn(context, std::move(listen), asio::detached);

context.run();
std::cerr << "Server done \n";
}
catch (std::exception& e)
{
std::cerr << "Server failure: " << e.what() << "\n";
}
}

可以看到在协程版本的代码中我们并不需要像异步版本的代码一样写很多的回调函数,更多的是co_await expression这样类似函数调用的写法,我们的代码不需要再非常的割裂,可以和同步编程时的代码似乎没有太大的差别,同时这样的代码也不会出现同步阻塞代码中处理一条请求时无法处理新来的请求的情况,程序的吞吐也能很可观。同时也不会出现per-thread-per-connection版本中的线程消耗问题.

下文中我们简单描述一下不同的协程的实现原理

原理

按照是否保存调用栈区分

首先有栈无栈协程在执行的时候都是要使用到程序内存空间当中的栈的,而他们的名字之中的有栈无栈指的是是否保存自己的调用栈,有栈协程会将从协程起始点开始到挂起点为止的所有栈上的变量都保存到自己的栈之中,当发生协程切换的时候会替换挂起协程和恢复协程的栈,这样就可以在另一个协程的挂起点上恢复。这里我们先卖个关子不谈这有什么差异

现在我们设想一个场景,在某个协程C的上下文中调用函数F,这个协程能在什么地方挂起?
在有栈协程中,假设C调用到F的时候调用栈如下:

1
2
3
4
5
6
7
8
9
10
11
12
|      Stack Frame f     |   <- 在这里的任意时刻挂起
+----------------------+
| Local Variables f |
| |
| |
| |
| |
+----------------------+
| Return Address f |
+----------------------+
| Ctx Of C |
+----------------------+

有栈协程能够将C中jmp F到F执行的任意时刻的堆栈给保存下来,之后也可以切换回来. 这里用Linux kernel中的process/thread调度做一个简单的类比,Linux中不管是process还是thread都是用task_struct保存其对应的属性的,内核在进行调度的时候也是将当前的process/thread对应的task_struct切出并替换成另一个task_struct,这样就完成了切换。所以我们甚至可以说如果有syscall api能够让用户自己指定将当前process/thread切换到pid所对应的process/thread,那么内核其实也提供了手动实现的有栈协程的手段.(实际上Google还真想这么干)

接下来我们讨论无栈协程的实现。无栈协程并不会单独保存协程的整个stack frame,类比上面有栈协程的图来对比的话就是无栈协程只能在被标注可以挂起的地方挂起. 假设我们有这样一段代码

1
2
3
4
5
6
7
8
Task<int> coro() {
int i = 0;
co_yield i;
i++;
co_yield i;
i++;
co_return i;
}

这段代码中只有3个co_xxx的地方可以挂起,这段代码可能会被编译器修改成这样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
struct CoroutineState {
int i;
int state;
CoroutineState() : i(0), state(0) {}
int coro() {
switch (state) {
case 0:
state = 1;
return i;
case 1:
i++;
state = 2;
return i;
case 2:
i++;
return i;
default:
// shoule never come here
}
}
};

可以看到整个函数被转换成了一个状态机类,在函数内部的局部变量会被捕获到这个类中作为成员变量,而不同的挂起点对应不同的状态。可以认为无栈协程会被状态成状态机代码来执行(熟悉Js的小伙伴会不会想到Promise呢).敏锐的小伙伴肯定也意识到了,编译器帮我们将无栈协程转换成了对应的状态机或者结构体(在rust中的future,JS中的promise),那只要拿到这个对应的handler就能自己决定在哪儿resume(这也给众多的库作者提供了自己手写runtime scheduler的方式).

但是说了这么多无栈和有栈的区别能如何更具体的体现呢?我们按照一段伪代码来分析在无栈和有栈时的不同情况

1
2
3
4
5
6
7
8
function foo():
...
return 42

function coroFun():
yield 100
i = foo()
yield 101

无栈协程无法在foo()上挂起,因为foo中没有挂起点,这意味着没有告诉编译器怎么将foo()转变成一个结构体保存其对应的堆栈上下文也不知道在什么地方挂起和回复. 但是有栈协程因为从协程起始点开始保存了所有的栈信息所以即使到了foo()函数中也能一并保存,可以随时挂起.这就是为什么无栈协程only the top-level coroutine may be suspended.

协程的使用

  1. 首先第一个问题,协程是适合IO密集型任务还是计算密集任务?答案是IO密集型任务,协程最直观的好处在于他的切换代价非常低,所以对于IO密集型任务可以在IO发生阻塞时(所以一般需要结合非阻塞IO使用,当返回EWOULDBLOCK的时候就可以出让本协程了)切换到另一个协程,这样代价相比线程IO切换会低上很多。

  2. 那如果有了协程我的并行度会有提升吗?并行度的上限理论上还是和CPU core数量有关,同一时间能并行执行的任务并不会因为协程数量更多切换代价更低而提升。协程的好处是在避免因为过多的回调引起程序的非结构化的基础上还能提供有效的异步编程手段以提升程序的吞吐(参考单线程的NodeJS).

  3. 协程使用中应该使用什么同步原语?如果还是使用pthread_xxx同步原语的话同步的粒度是thread级别的,不管是有栈协程还是无栈协程对于操作系统来说都是运行在用户态的函数,比如一旦使用pthread_lock上锁那整个线程都将阻塞自然而然也就没有了切换协程的能力。这也是为什么在不同的协程实现中都会带上一套自己的同步原语(比如bmutex之于bthread, tokio::sync::Mutex之于rust). 之后的博文也会介绍如何实现不同的协程如何实现自己的同步原语. 当然,在库级别的协程的同步原语在多个库混杂时也会造成一些问题,比如我在bthread里面如果使用了folly::fiber的同步原语会发生什么?

介绍

译者一直对有关coroutine和execution的概念非常迷恋,近来希望将对这部分内容的学习心得简单分享一下提升下自己的理解,预计会有5 6篇博文产出,先以这篇CppCon的翻译起个头

下文会从pdf的第一章开始翻译

motivating futures/promises + actors

当工程师试图打造一款高性能且正确的分布式系统时会遇到许多关键挑战。概括下来有两条

  1. 在代码中不得不有等待的条件
  2. 在代码中有state

如何解决等待的问题

以下面的代码为例

1
2
3
std::string text="...";
text = SpellCehck(text);
text = GrammerCheck(text);

上面的代码可通过函数的组合修改成一行GrammerCheck(SpellCehck("...")). 不过接下来我们还是分开分析,先分析SpellCheck函数的实现

1
2
3
4
5
std::string SpellCheck(std::string text) {
auto body = http::UrlEncode({"text", text});
auto response = http::Post("https://www.online-spellcheck.com", body);
return response.body;
}

无论这段代码之中的Http::Post方法是阻塞的还是非阻塞都不会改变一件事实:你的代码不得不在这里等待
可能的解决方案如下:

  1. 就死等. 这个方案肯定是不会采纳的
  2. 使用线程. 成本高昂同时对正确性无益
  3. 使用coroutine. 作者当时是09年 还没有好用的C++ coroutine
  4. 使用不同的语言比如erlang 或者 把erlang带进C++之中
  5. 使用回调函数,之后会讨论这个
  6. 使用future/promise

接下来我们讨论future/promise。
future/promise就像buffered channel一样

1
2
3
4
5
6
7
8
Promise<std::string> promise;
---------------thread x-------------
Channel::Reader<std::string> reader = channel.Reader();
reader.Read(); // Blocks!
---------------thread y-------------
Channel::Writer<std::string> writer = channel.Writer();
writer.Write("..."); // Non-blocking!
writer.Close();

改为future代码如下

1
2
3
4
5
6
Promise<std::string> promise;
---------------thread x-------------
Future<std::string> future = promise.Future();
future.Get(); // Blocks!
---------------thread y-------------
promise.Set("..."); // Non-blocking!

接下来我们用future来修改SpellCheck函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 这里我们异步地请求Post方法,并通过future拿到对应的返回值
Future<std::string> SpellCheck(std::string text) {
auto body = http::UrlEncode({"text", text});
Promise<std::string> promise;
auto future = promise.Future();
http::AsyncPost(
"https://www.online-spellcheck.com" ,
body,
[promise = std::move(promise)]( auto&& response) {
if (response.code != 200) promise.Fail(response.code);
else promise.Set(response.body);
});
return future;
}

假如现在我们的SpellCheck和GrammarCheck两个函数都试用了future/promise的方法改造,那么我们会碰到接下来的问题

1
2
3
std::string text = "..."; 
text = SpellCheck(text). Get(); // Blocks!
text = GrammarCheck(text). Get(); // Blocks!

可以看到这两个方法的Get都会是阻塞的,那不相当于我们的代码又变成串行阻塞的了吗?这里的函数逻辑是在完成spellcheck后再调用grammarcheck,也就是说这里的Control Flow应该是SpellCheck -> GrammarCheck那么这里如果可以让两个future拥有future a异步执行完了之后(then)执行b的语义似乎能避免对应的阻塞,假设C++的future能提供类似JavaScript的.then接口继续讨论这个问题.那么对应代码或许可以这样修改

1
2
3
4
5
6
7
// 这里将SpellCheck和GrammerCheck组合到一个函数SpellAndGrammarCheck之中
Future<std::string> SpellAndGrammarCheck (std::string text) {
return SpellCheck(text)
.Then([](auto&& text) {
return GrammarCheck(text);// Can be a Future<T> or T.
});
}

到这里我们似乎解决了等待的问题.

如何解决状态的问题

使用future/promises执行代码的一些性质

  • 因为代码中从不block所以可以只有一个线程便执行(类型JavaScript的模型,只有基于libuv提供的单线程event loop)
  • 然后代码并不一定能原子地执行,因为当代码中出现必须等待别的代码时就出现了交互(在不得不等待时会有别的代码被执行就是问题的关键)
  • 许多人称这种情况为”concurrency” vs parallelism因为这种模式给了你并发执行的错觉然而你并没有同时在执行
  • 但是你依旧得忍受并发执行所带来的同步问题
  • 也可以基于多线程执行代码

可能的解决方案如下:

  • 1963年提出了mutexes和semaphore
  • 1973年提出了actors
  • 1974年提出了monitors
  • 1978年提出了communicating sequential processes
  • 1987年提出了statecharts

其中mutexes,semaphores和monitors都是基于threads的解决方案
而actors,csp,statecharts是没有线程的解决方案

没有线程会是什么情况?
没有线程的抽象包含了执行模型/语义和状态的同步
actors包括了执行,同步,状态

包括得更多 -> 更高级的抽象能带来

  • 更容易理解
  • 更容易在更多的硬件和平台运行
  • 更容易优化

actors的性质

  • 本地可修改状态
  • 消息队列
  • 一次接受并处理一条消息
  • actors之间发送消息是非阻塞的
  • 不论是local还是distributed模式都是同样的编程模型

下面我们看一段在C++中的actor模型的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
struct MyActor : public Actor {
void Receive(ActorId sender, Message message, void* arguments) override {
switch (message) {
case MESSAGE_FOO_REQUEST:
auto* request = (FooRequest*) arguments;
...
Send(sender, MESSAGE_FOO_RESPONSE, response);
break;
case MESSAGE_BAR_REQUEST:
...
}
}
};

只需要实现不同的request,actor在接受到 message后就能派发到对应的actor上去。
actors (visualized)

如果对所有的方法上锁(类似java的synchronized)能不能做到同样的效果呢?
threads (visualized)

actor的性能如何?
对于数据经常需要被共享或者在执行资源间移动的并发程序没啥影响

但是对于分布式和网络程序,数据经常只在别的机器上被共享,并且在任意的core之间交换数据会带来性能下降

似乎通过actor我们解决了state的问题

libprocess

作者在2009年的时候

  • 在UCB 构建分布式系统Apache Mesos
  • 使用了C++来避免如Java的gc语言会带来的运行时不确定性问题
  • 希望使用actors

于是在打造libprocess的时候他们想到了将futures/promises和actors结合!

为什么actor需要future/promises

回首上面的actor代码会发现很难描述清楚actor之间的执行流

  • 在actor 模型上收发信息就如同编写汇编语言一样(译者注:编写难度和体验)尽管他们确实解决了state的问题
  • message的处理就如同goto一样!而goto是非结构化的,是不被提倡的

抛弃goto这个玩意,我们想要的是

  • 非阻塞的函数调用(返回future)
  • 函数的可组合性(Then)

就像如下的代码

1
2
3
4
5
MyActor actor;
auto future = actor.Foo(...)
.Then([](auto&& response) {
return ...;
});

libprocess actor的伪代码如下

1
2
3
4
5
6
7
8
9
10
struct MyActor : public Actor {
Future<FooResponse> Foo(FooRequest request) {
// Execute the “message” on the actor ‘self()’.
return On(self(), [this, request]() {
FooResponse response;
...
return response;
});
}
};

注意这里On(self(), closure)的语义是在self()也就是这个actor实例本身这个执行资源上执行closure中的内容

自然的也可以在函数中通过then组合调用别的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
struct MyActor : public Actor {
Future<FooResponse> Foo(FooRequest request) {
// Execute the “message” on the actor ‘self()’.
return On(self(), [this, request]() {
...
return SomeOtherFunctionReturningAFuture()
.Then([](auto&& value) {
...
});
});
}
};

why futures/promises need actors

观察一下下面这段代码

1
2
3
4
5
6
7
8
9
struct MyObject {
Future<void> SomeMember() {
return SomeFunction()
.Then([](auto&& value) {
// Where should this lambda run?????
...
});
}
};

Then之中的函数逻辑应该运行在什么之上呢?一个简单的想法是直接使用SomeFunction返回的future所关联的promise所在的执行资源之上

完善一下上面的代码可能会是这样

1
2
3
4
5
6
7
8
9
10
11
12
13
struct MyObject {
Future<void> SomeMember() {
return SomeFunction()
.Then([this](auto&& value) {
// Where should this lambda run?????
std::unique_lock<std::mutex> lock(mutex_);
i += value;
});
}
private:
int i_;
std::mutex mutex_;
};

这可能会带来一个问题:在SomeFunction对应的promise的执行逻辑在调用promise.Set()的时候可能是阻塞的(可以理解成执行promise的逻辑和执行Then的逻辑可能会并行,这里就有锁会带来的同步代价)
或许可以使用asyncmutex试图解决这个问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
struct MyObject {
Future<void> SomeMember() {
return SomeFunction()
.Then([this](auto&& value) {
// Where should this lambda run?????
return mutex_.Acquire()
.Then([this]() {
i += value;
mutex_.Release();
});
});
}
private:
int i_;
AsyncMutex mutex_;
};

这里调用Release()将要/必须执行一个不会block的waiter,这也可能带来不确定的执行。

如果能够保证Then的内容严格在actor执行完SomeFunction()之后执行就能够避免上文中非确定性执行带来的同步问题

1
2
3
4
5
6
7
8
9
10
struct MyObject : public Actor {
Future<void> SomeMember() {
return SomeFunction()
.Then(DeferOn(self(), [this](auto&& value) { // Then之中的内容之后会在同一个actor的执行资源上执行
i_ += value;
}));
}
private:
int i_;
};
  • 上面的方式中actor提供了执行Then(continuation)的executor资源
  • 设置promise非常快是非阻塞的(DeferOn保证了在set之后才会执行Then之中的逻辑)
  • 无需同步

revisiting the problem

重新看一下SpellAndGrammerCheck函数

1
2
3
4
std::string SpellAndGrammarCheck(std::string text) { 
text = SpellCheck(text);
return GrammarCheck(text);
}

这段代码是顺序的,非并行的,即使并发执行也没有状态需要同步
修改成Future和Then的方式

1
2
3
4
5
6
Future<std::string> SpellAndGrammarCheck(std::string text) {
return SpellCheck(text)
.Then([](auto&& text) {
return GrammarCheck(text);
});
}

这之中会申请锁并且需要动态分配内存

是什么需要申请锁以及动态分配内存

我们先展开SpellCheck的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
Future<std::string> SpellAndGrammarCheck (std::string text) {
...
auto future = promise.future();
http::AsyncPost( // Non-blocking! Returns immediately!
...,
[promise = std::move(promise)]( auto&& response) {
promise.Set(response.body);
});
return future
.Then([](auto&& text) {
return GrammarCheck(text);
});
}

上面的代码中在promise.Set(response.body)与continuation通过.Then()组合之间存在race, 这两个动作可能在同时发生,于此我们需要锁来同步. 另外promise也可能在continuation被组合起来以前设置,所以需要动态内存分配.

有什么办法避免锁以及这一次动态内存分配吗?

evolution of libprocess

避免锁开销

上面的逻辑中SpellCheck的future的组装(将GrammarCheck的逻辑组合在一起)和promise的set操作之间是可能并行的,所以需要锁来同步,但如果将代码的逻辑做一下简单的修改,修改成SpellCheck中AsyncPost的逻辑执行完之后执行一个回调函数就可以避免锁的同步(也就是保证了GrammerCheck的逻辑一定在SpellCheck之后)

1
2
3
4
5
6
7
8
9
10
11
12
// f的逻辑其实就是GrammerCheck
void SpellCheck(std::string text, std::function<void(std::string)> f) {
auto body = http::UrlEncode({"text", text});

http::AsyncPost(
"https://www.online-spellcheck.com" ,
body,
[f = std::move(f)](auto&& response) {
f(response.body); // Invoke continuation without locks!
});
return future;
}

也可以修改成模板函数

1
2
3
4
5
6
7
8
9
10
11
template <typename K>
void SpellCheck(std::string text, K k) {
auto body = http::UrlEncode({"text", text});
http::AsyncPost(
"https://www.online-spellcheck.com" ,
body,
[k = std::move(k)](auto&& response) {
if (response.code != 200) k.Fail(response.code);
else k.Success(response.body);
});
}

译者注:作者在演讲里表示在他读大学的时候他们都用K来表示continuation,所以这里模板里他也用的是K而不是C

避免动态内存分配

以上面的模板函数为例,到底是在什么地方分配了内存呢?这里需要进入Http::AsyncPost的实现之中

1
2
3
4
5
6
7
8
9
10
11
12
namespace http {
template <typename K>
void Post(std::string url, std::string body, K k) {
void* data = new K(std::move(k));
...
http_post(url, body, data, +[](long code, const char* body, void* data) {
K* k = reinterpret_cast<K*>(data);
k->Success(http::Response{code, body});
delete k;
});
}
} // namespace http

在Post方法中还是分配了内存.

💡 如果将continuation K作为函数返回的结果中的一部分是不是就避免了分配?

接受一个continuation作为参数并返回一个continuation作为结果(返回值中组合/包括了作为参数传递的continuation)

修改一下上面的Post方法的实现,与其在函数的栈上分配一个堆上的void*,不如在函数中定义一个仅存在于该函数的struct并返回一个该struct的值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
template <typename K>
void Post(std::string url, std::string body, K k) {
struct Continuation {
void Start() {
void* data = &k;
http_post(url, body, data, +[]( long code, const char* body, void* data)
{
K* k = reinterpret_cast<K*>(data);
k->Success( http::Response{code, body});
});
}
std::string url, body;
K k;
};
return Continuation{std::move(url), std::move(body), std::move(k)};
}

可以看到这段代码讲传递进入的url,body,k都传递进了struct Continuation之中,之后这三个值的生命周期将同返回的Continuation value绑定,在value析构时一同回收,同时这里的内存分配也只有一个栈上的值类型Continuation的内存分配.

lazy continuation

1
2
auto k = http::Post(url, body, /* k' */);
k.Start();
  • 返回值类型是”computational graph”
  • 这个graph是lazy的,当我们拿到这个graph时什么都不会发生(tradeoff for dynamic allocation) 并且如果我们想执行他的逻辑就必须显式调用start
  • graph可以分配在栈上或者堆上
  • 在完成以前内存必须有效

eventuals

作者将lazy continuation称为eventuals.

这一章的内容主要是介绍了eventuals并且通过eventuals将上文中传递continuation的代码风格进行了修正(传递continuation看着比较难看不符合人体工程学),译文会忽略这部分推导,感兴趣的读者可以访问eventuals查看这个项目的信息.

scheduling

本章节主要是探讨continuation应该在什么地方运行?回望下面这段代码

1
2
3
4
5
6
7
8
9
10
11
auto Post(std::string url, std::string body) {
return Eventual<http::Response>([url, body]( auto& k) {
using K = std::decay_t<decltype(k)>;
void* data = &k;
http_post(url, body, data, +[]( long code, const char* body, void* data)
{
K* k = reinterpret_cast<K*>(data);
k->Success(http::Response{code, body});
});
}
});

如果continuation是在event loop中被调用的,我们不希望continuation继续在event loop中继续运行(会阻塞其他的IO任务,IO线程池不应该执行太复杂的continuation任务)
同理,如果continuation是在actor中被调用的我们也不希望继续在actor的执行资源上运行,因为这样会阻塞别的message的处理

回首看一下motivating exemple的代码

1
2
3
4
5
std::string SpellCheck(std::string text) {
auto body = http::UrlEncode({"text", text});
auto response = http::Post("https://www.online-spellcheck.com", body);
return response.body;
}

得益于函数的抽象,我们可以分开考虑函数的接口以及实现.在这里不需要考虑http::Post到底是如何实现的.

  • 我们不关心是否在多线程上实现
  • 我们不关心是否在GPU上实现
  • 我们不关心是否在FPGA上实现

然而,如果执行完http::Post方法并将控制流返回时我们发现现在正在

  • 于我们开始执行时不同的线程上执行时,我们会很惊讶
  • 一块GPU的执行逻辑上,而我们是在CPU上开始执行的,我们会很惊讶

所以我们到底应该让continuation在什么地方执行呢?
使用在你不得不等待以前正在使用的执行资源

否则(译者注:这里作者意思大概就是否则我们每一次都应该检查接下来的函数调用的文档/实现去查看对应的函数会不会在eventloop上执行,如果是的话就需要自己通过ThreadPool::Schedule等方法重新调度),需要将代码进行类似的修改.

1
2
3
4
5
6
7
8
9
10
11
12
13
auto SpellAndGrammarCheck (std::string text) {
return ThreadPool::Schedule([text]() {
return SpellCheck(text)
// Rescheduling on thread pool because we looked at
// documentation of 'SpellCheck()' and it continues on the
// event loop which we don't want to be on.
| ThreadPool::Schedule([text]() {
return Then([](auto&& text) {
return GrammarCheck(text);
});
});
});
}

类似的内容可以查看std::execution的提案(译者后续的博文也会介绍这个提案).

另外也介绍了在PLDI也有类似的工作

“Composing Sorfware Efficiently with Lithe”(PLDI 2010)

  • 允许许多同时的scheduler负责计算图的子树
  • 所有的计算都拥有一个调度context
  • 对于冲洗提交任务给拥有context的接口提供了正式的接口

Welcome to Hexo! This is your very first post. Check documentation for more info. If you get any problems when using Hexo, you can find the answer in troubleshooting or you can ask me on GitHub.

Quick Start

Create a new post

1
$ hexo new "My New Post"

More info: Writing

Run server

1
$ hexo server

More info: Server

Generate static files

1
$ hexo generate

More info: Generating

Deploy to remote sites

1
$ hexo deploy

More info: Deployment