0%

paper reading: Exploiting Cloud Object Storage for High-Performance Analytics

Abstract

过去因为网络带宽限制,对于analytic 大表的场景local storage还是不可替代的,但是随着带宽升级网络访问和本地nvme的带宽越来越接近所以现在对象存储作为底座也越来越有吸引力。 本篇文章在研究后提出了一个成本和性能最佳的检索配置. 也开源了一个叫AnyBlob的downloader library。 后文会展示在umbra上集成这个downloader的威力.

Introduction

21年的时候cloud dbms的收入就已经达到了本地部署的系统的收入,到了23年肯定更高了. 云上最吸引的人就是弹性和资源供给的灵活度.
各家云厂商提供的对象存储服务允许将计算资源和存储资源分离。 他们同时也提供了很强的持久性保证,几乎无限的容量以及可拓展的访问带宽.
在18年AWS提供给每个实例的网络就是100Gbit/s(约12GB/s) 这极大地减轻了对网络访问和本地内存访问带宽差异的担忧(甚至比访问hdd都高).
过去的研究主要是关注于如何通过cache避免拉去远程的数据. 另一部分研究则是关注于cloud上的OLTP系统[27]. 没有多少对于对象存储上的AP系统的研究和结论.
另外作者认为有3大挑战

  1. 如何有效利用机器带宽打满网卡
  2. 如何应对网络请求带来的CPU负载
  3. 如何提供良好的多云支持

本文作者研究了不同云的对象存储总结了一套价格性能都优化过的配置. 另外他们通过自己实现了一个异步的对象存储downloader来减轻cpu消耗. 并且他们无缝地将对象下载和数据库的scan算子结合了起来. 作者自称umbra搭载了这些优化后,在不开启cache的情况下性能都和别的开启了local cache的stoa 数仓一致(跑的是TPCH,跟snowflake和spark对比). 另外他们的这套模式在切换实例的时候还不会带来性能的抖动(这里意思比较绕,大概是说在没有warm cache的情况下切换不同配置的实例的拓展性良好).

贡献

  1. 定义了一个能够最小化成本同时打满带宽的request size range.
  2. 开源了Anyblob这个库
  3. 将扫描对象和数据库系统集成的蓝图

Cloud Storage Characteristics

Object Storage Architecture

对象存储提供过用户的接口是各种HTTP接口,用户发送过来的请求在经过负载均衡后会落到对应的Api server上. 而云厂商的服务器上一份文件的数据通常会切成多个chunk存储在不同的Object storage机器上, 这些机器本身也是跨 AZ的,所以数据的持久性保证很强.

另外因为数据拆成了多个chunk那么之后在读取的时候也需要有对应的索引或者记录chunk位置的header信息(小文件的情况甚至索引上就可以顺带记录着文件数据减少后续的网络IO). 这部分元信息也会存储在对应的metadata storage服务器上(听说有的厂这里metadata storage甚至就是mysql).

figure1

Object Storage Cost

table1

IO操作是要收费的,不管是读/写/跨region网络传输都要收费. 但是在相同region内部,下载的费用和下载的数据的大小是无关的,只和下载请求的次数有关. 另外对象存储是持久化的,只有写到了S3/GCP上就会被复制冗余到多个跨region的多AZ副本上.
除了对象存储以外也有别的存储选择,比如EBS等, 他们的价格会更贵, 而云上的HDD价格可以更便宜但是带宽又不如对象存储. 另外比如EBS只能attach到一台节点上,也不能做到存算分离上的共享存储. 除此以外他们也无法提供S3的持久性.

Latency

对象存储的延迟比SSD会更高.基于不同的request size测量了对应的latency. 这里用来区分总耗时和时延的是第一个字节被读取到的时候. 取第一次冷读和连续20次的热读.

figure2

对于小的请求来说第一个字节的延迟基本决定了整个时延. 这也说明round-trip时延限制了总的吞吐. 对于大的请求来说带宽则是限制因素(可以理解成要读的数据越大的时候读取性能基本没变化,这个时候的时间基本上是size/带宽).(笔者注: 这里first byte分析可能也不是特别有意义,如上文所说在cloud vendor机器上数据也是拆分的,另外还有读取元数据的部分)

另外作者还测试发现工作日晚上以及周日性能会更强(因为没人来卷了xs).

Throughput

对于AP系统来讲,查询的时候下载的数据的大小可能是非常大的,所以first byte arrival latency的重要性没有吞吐大.

作者接着测试发现不同地区吞吐有差别,以及冷数据还可能有更大的吞吐.

此外小机型的机器存储一段时间的性能喷射的情况(还有论文研究如何靠这个薅云厂商羊毛, 另外也有利用这个做cache的).

Optimal Request Size

在请求的request这里就有个tradeoff,如果请求的size越大,获取同样大小的数据IO次数相对来说就更少,花的钱就少. 当然吞吐可能不能打满.
但是如果size越小,并行的请求就可以越多,吞吐能打得更高,但是相对的IO次数多了花的钱就多了. 作者这里测试出来对于OLAP系统来说8MB-16MB是价格吞吐都友好的。

Encryption

比较无聊,跳过

Tail Latency & Request Hedging

这里的长尾延时情况是有不到5%的16MB request会超过600ms还没下载完成. 也有不到5%的first byte latency超过了200ms. 也就是超过600ms的情况重新发一个request是挺有效的. 可以减轻tail部分.

Model for Cloud Storage Retrieval

对于给定的期望throughput,需要的并发request请求数量是:

formula

AnyBlob

普通的云厂商SDK比如AWS的每一个http请求都开一个线程,如果要达到上文中打满带宽的需求不知道会有多少个线程,这带来的context switch开销也无法忽视。所以本文提出了一套基于io uring的异步download库,和查询引擎共享线程资源.
AnyBlob Design

  1. 每个线程异步发送多个request
  2. 使用io_uring减低CPU负载
  3. 状态机驱动的message: 基于HTTP请求实现了多阶段的状态机,一直到对象被完全下载下来(其实理解起来就是协程呗).
  4. 异步系统调用. 尽量不卡住线程. 避免影响吞吐. 通过Loop来定期从io_uring拿到complete entry
  5. Task-based send-receiver调度器
  6. 多个send-receiver 组

figure11

整体来看除了io_uring这一层就是一个很典型的网络库应用, 在http request完整以前都通过非阻塞IO不停地读取/写入,拿到一条完整的request就执行,执行的结果可能是继续产生新的send/receive, 也可能是执行完了就执行上层的回调(估计是scan算子在拿到完整的数据后push到下游的算子继续进行计算). 他这里状态机这一套感觉用coroutine来实现挺好的.
这里AnyBlob的example代码里也没写怎么和Scan Operator整合在一起的. 不过从后文的描述来看其实就是scan算子执行的过程里包括了异步下载远端的对象存储数据而已,这里AnyBlob作为一个库也就是直接被调用而已,没啥好说的感觉.

Authentication & Security

感觉没啥好说的

Domain Name Resolver Strategires

  1. 解析endpoint也是一种开销,所以会把endpoint ip cache住
  2. 记录了不同的endpoint ip的throughput,如果某个endpoint的吞吐不太行就替换
  3. MTU相关的 太细了== 可以参考netapp的 MTU调参(同一家云的机房里通过调整MTU可以省掉很多header的开销)

Cloud Storage Intergration

前文提到现在download和query engine是共享线程了,一种可能的实现是scan算子需要读取数据的时候会注册一个回调到AnyBlob上,下载到具体的数据后在某次event loop里通过回调的方式继续执行scan上层的逻辑.接下来大概是他们怎么做这个scheduler的逻辑. 我理解里他们这个scheduler是一段在定制点代码处调用的schedule逻辑(见https://15721.courses.cs.cmu.edu/spring2023/papers/07-scheduling/wagner-sigmod21.pdf). 同时这个Object Manager估计也是.
先贴一张他们的大概的架构图

figure13

Database Engine Design

Umbra中的worker thread可以干三种事情

  1. 执行查询
  2. 准备新的对象存储request
  3. 作为网络线程
    这设计大概率是一个EventLoop,每次通过拿到的不同的Job实现切换任务就是了. 这里我比较好奇的是Umbra有没有做任务的interrupt,可能也不好做,估计粒度上还是得做完一次Job. 以及从他的描述中来看这个Object Manager应该不是一个单独的线程,估计是类似内核的调度一样每次执行这段逻辑就帮你计算下. 不是类似SAP HANA一样的有个Watch Dog线程.
    Umbra的调度器可以动态切换thread的角色

任务自适应性 :
为了避免长查询一直占着资源把系统响应度给拉低,有的系统把查询拆成task(morsel-driven),这些task可以被挂起,或者会通过分配自适应数量的tuples控制执行时间. 这么做的目的是让系统能够快速应对不同的workload. Umbra的这个模式下如果是在执行的时候需要查数据可以做到在worker上切换task(不过感觉没说他们到底能不能挂起之类的 感觉自适应这个还是看这篇https://15721.courses.cs.cmu.edu/spring2023/papers/07-scheduling/wagner-sigmod21.pdf).

Columnar format

Raw data是按照列存切成多个chunk的block丢到cloud上的。每个block的header里面是column types,offsets等.
DB的schema也是丢在cloud上的(所以启动的时候需要先fetch).

Table Metadata retrieval

在图13里scan operator一开始会去拿到这些相关的table的元数据(比如有哪些blocks) 都下载完了才去进行数据的retrieval.

Worker thread scheduling

初始化完scan后会选出多个worker thread来执行操作. Umbra这里没有选择把worker thread拆分成下载线程和处理现场(因为他们觉得这样会在执行query的时候去适应, 这句话没明白啥意思但是我猜是比如下载线程和执行线程一起执行的时候如果下载慢了执行的部分要去等待,比如下载的线程结束了是不是要转换成执行线程之类的调度会比较难做). 所以他们的实现是引入了一个object manager. 每一个scanner线程询问scheduler应该干什么job. 如果已经有足够的数据了就进行查询处理(如4A), 否则就进行下载准备block. 这里umbra认为执行jobs的时间很短,所以这里的ask scheduler的决策是可以很快调整的.

Download preparation

为了打满带宽所以需要有比较多的download线程和downloadrequest请求持续发给cloud. 在4B里面preparation worker thread创建请求的时候不会打断retrieval 线程执行event loop. Object Manager中保存着表的元数据以及block和对应的chunk 数据.

Table Scan Operator

Umbra会尽量利用上所有的thread来给operator加速. Umbra将tasks拆分成morsels, 每个morsel里面都是task的一小块数据.
Table scan在初始化完后worker线程会调用pickMorsel方法将chunk分配给worker线程. 在本文的修改版本中,这些worker线程不仅需要处理数据还会需要准备新的block以及从对象上下载数据. Object Scheduelr会基于统计信息和过去的处理来决定某个worker thread的应该做什么工作. 这里如果worker thread拿到了morsel那么他会去处理,如果不是的话他就去执行prepare或者retrieve的逻辑.

figure14

图里就是3种任务,4个线程在processing,一个在prepare,3个在下载.
这里下载线程的callback干的事是标记这个block为ready. Download线程结束任务后还可以被复用为processing或者preparing线程.
Processing线程只会去处理ready的block上的morsel.

Object Scheduler

目标是达成下载和处理性能的平衡. 这个scheduler会根据统计信息采取对应的算法更新不同的线程应该有的数量

这里会有一系列动态的调整,首先这里的统计信息会使用lock-free的方式进行维护. 之后在执行的时候会尽量保证在当前的retrieval线程的数量下带宽能打满. 这里大概的意思是,记录有多少线程用于下载,并且控制该类线程的数量. 同时通过记录outstanding request(正在下载或者await中的prepared HTTP请求)的数量可以计算出带宽的上界. Scheduler会尽量调度出能够匹配retrieval threads数量的prepare job.
同时也会根据查询性能和下载速度动态调整下载线程和处理线程.

Relation & Storage Format

列格式:每个column分成多个chunk 然后在元数据里记录min max zonemap 每个block使用byte-level encoding的方式压缩(frame-of-reference & dictionary)

每一个block中的tuple数量: 前文有提过他们的实验跑下来每一个Get请求的content size在8M-16MB是最优的所以这里在对象上的每个block的chunk也希望尽量是16MB. 查询引擎在执行的时候的数据处理粒度也是block. 同时这里有要求一个block中的所有列的tuple数量相同. 对于不同的数据类型来说这就会导致chunk的大小很难相同. 对于非变长类型的列而言在encode过后每个tuple的size大小分布在1到16字节. 所以在存储的时候需要对于不同列的chunk的tuple 数量做一个平衡. 在构建block的时候就会自适应地计算一个平均tuple count, 尽量让没编码的列小于2MB. 对于某些定长和变长类型(比如varchar(65535)或者更长的奇奇怪怪的类型),则会采取在下载的时候拆成多个更小的range request的方式.

Zero user-space copies:其实比较好理解,Buffer Pool在分配内存的时候预留http header,然后将blocks对齐到buffer pool的page大小. 将Buffer丢给io_uring下载了直接拿出来取offset用就行.

Transparent paging:这里用了匿名pages拓展这个buffer manager,相当于对于同一个page的请求就被缓冲起来,请求缓存命中就直接返回,没命中就再下载一次.

Structure of metadata: 其实比较直白,大概来讲就是db_prefix/table_prefix/. 目录下是header的list,headers,data blocks. 和iceberg的manifest files很像https://iceberg.apache.org/terms/ . 相当于db_prefix/table_prefix/list这个文件下载拿到的是所有的header部分的内容(至于具体是啥内容貌似没说清楚,但如果是为了多版本考虑的话可以理解为是一个snapshot,里面保留的是某个版本下的数据的所有的header). 在对象存储上每次更新这个db_prefix/table_prefix/list文件都是原子的. 可以参考下Iceberg的这篇文章. 不过这里没说怎么决定当前是哪个snapshot,其实我理解这里不管是Iceberg还是这个umbra的实现以及vertica的eon mode(cluster_info.json)都是类似的,会在一个文件里面写着.

下面是iceberg

iceberg

下面是umbra

15

Scan optimizations: 其实就是通过header的min max过滤. 除此以外Umbra把数据的decode和process拆成了两部分. 相当于可以重复decode一部分然后塞到chunk里丢给process.

Cloud DBMS: 这里就大概讲了下Redshift使用了Aqua来在计算层加速. 另外一直到最近,由于带宽的限制,caching都是不可或缺的. 但是由于网络带宽和NVME带宽越来越接近所以cloud storage越来越有吸引力。 像Amazon基于Presto的Athena系统就是直接在remote data上执行的.77比较了这些系统的架构. 16,29,30,81的OLTP系统是专门用来处理cloud era的. 2,25,33,58,59,67,79,87则是OLAP

Spot instances:这里大概意思是想说有些关于如何利用竞价实例的研究74,76

Cloud storage for DBMS: Apache Iceberg和Delta Lake都像Umbra这样把元数据丢在了S3上并且基于此来提供一致性快照服务. 也有一些对于cache的研究37,46,85,89. Cache的解决方案从在本地节点使用语义化的cache37拓展到了利用竞价实例来作为cache89.

Memory disaggregation: 讨论了一下CPU和内存分离的内容50, 83, 90, 91