0%

Self-Tuning Query Scheduling for Analytical Workloads

Abstract

大部分数据库都将调度策略委托给了操作系统本身,这个策略虽然能够简化数据库的设计但是也会带来一些问题. 比如在面对并发查询的时候自适应的资源分配就便得很难做,除此以外要在数据库中做一些调度调优也变得很困难(因为实际上更多的还是靠os自己在进行调度). 所以很多现代的现代都通过将一整条query的执行拆分成多个小的独立的任务以此来实现task-based并行. 基于task就使得数据库系统自己就能进行调度.

这篇论文主要是展示如何在task-based的设计上进行一些优化,论文作者提出了一种针对分析型workload的创新的无锁,自调优调度器. 通过动态地调整任务的优先级以及任务对应的粒度提供了很高的调度弹性. 即使在大压力导入下依旧能为短查询提供接近最低的延迟.

Introduction

分析型数据库面临的workload特别复杂,在高压导入时可能还有并行抵达的各种查询. 很多系统很难在这种压力下保持良好的查询性能.

下图作者以他们的系统和PG的进行了一个对比,在高频导入下的查询延迟的变化. Workload包括3/4的短查询和1/4的长查询. 同时系统的导入压力为其最大压力的95%并持续了25分钟.
Query Type

处于这种状态的系统对于用户来讲响应度变得更低,查询性能也变得难以预测在不同的时间跑相同的查询可能会收获不同的性能. 对于用户来说,在高压场景性能降级的影响应该尽量低. 类似PG的系统将执行的调度责任转交给了操作系统. 这类系统对每一次新的连接和查询都可能创建新的线程或者进程(当然更好的方式是池化), 在执行的过程中可能也会创建更多的线程进程来进行query内的并行. 为了避免线程数超过OS线程一般也会尽量控制线程的数量.

现代化的系统一般会做更细粒度的调度,会实现基于task-based的调度系统,一条query会被拆分成多个独立的任务,这些任务都可以在任何一个OS线程上执行,同时任务的调度也由数据库系统接手而非被动等待OS调度。数据库系统也可以很方便的基于任务数派发给不同的线程以动态调整query的并行度. 一些系统比如SAP HANA的调度策略是和OS 调度器共生的.

其他的比如HyPer和Umbra则是几乎不依赖OS的调度. 在启动时他们便启动同CPU核数相同数量的线程. 之后使用morsel-driven的方式进行调度. 一个morsel表示一个tuples的固定集合,morsel时query执行时的最小单元. 因为同一条queried的不同morsel可以并行执行所以这种方式可以实现query内并行和query间并行.

类似intel tbb的通用调度器更关注的是吞吐, 数据库系统更关注的是公平性和查询的响应度. DB系统调度器为了在高压导入时依旧能保证低延时会更倾向于执行short running query. 除此以外, 新开启任务的粒度也可以在执行时自适应调整.

本篇论文主要提出了一种创新的无锁自调优调度器. 接下来的内容主要是 Section2 调度器设计. Section 3 关注于morsel-driven数据库系统. 如何通过morsel数据结构让调度变得更鲁棒更可预测. Section 4, 在调度器上加上自适应调优.

Scalable Task Scheduling

在task-based的系统中挑选任务的步骤是在用户态进行的,所以调度策略需要尽可能地可拓展并且利用好硬件资源.

Background

假设$t_i$表示第i个任务,$p_i$表示任务的优先级. 每一个task被赋值stride $S_i=(p_i)^{-1}$. 假设所有任务在同一时间到达那么调度策略就变得很简单. 每一个task被映射到一个$P_i$, 这个值一开始设置为0. 之后的处理方式是: 拥有最小pass的任务被选出来执行一段时间片. 执行完后将pass更新为$P_i + S_i$. 任务$t_i$拥有的执行资源比例是$p_i / \sum_{k=1}^{n} p_k$. Stride调度策略在所有任务都拥有相同优先级的情况下时公平的.

但是如果要适配动态变化的任务则需要有一些修改. 如果一个任务在任意时刻加入则其需要一个初始值. 本文的scheduler会维护一个全局的stride $S_G$ = $\left(\sum_{k=1}^{n} p_k\right)^{-1}$ 以及一个全局的pass $P_G$. 每过一个调度时间片这个全局的pass都要增加一次全局的stride. 现在这个全局的pass可以用来给一个新到的任务计算初始的initial pass值. 只觉上可以认为这个全局的pass值表示scheduler的时间. 如果任务的pass值比全局pass值$P_G$小那么这个任务还没拿到他应该拥有的资源,反之比全局pass大则是已经使用了太多资源.

Stride调度可以很简单地被拓展到非抢占式设置. 如果一个任务$t_i$消耗了其分配的时间片的$f$那么他的pass就被更新为$P_i$+$fS_i$.同样的全局pass也变成$P_G$+$fS_G$. $f$在这里可能大于1.

Scheduling in Umbra

TaskStructureOfUmbra

每一条pipeline被映射成一个task set. 每一个task set中包含数个task,每个task由数个morsel组成. 同一条query可以有多个task set,这些task set都属于同一个resource group. 如上图中task set 1和2都属于rg1,因为rg1有两条pipeline. 而rg2只有一个所以只有一个task set. 每一个task可以有多个morsel,而morsel有3种状态, finished, running, pending. Worker thread每次pick一个task执行.

同一个task set的task可以被不同的worker thread并行执行以此来提供query的并行度. query的不同pipeline间可能有顺序依赖所以task set间也是存在顺序依赖的. 例如上图中左边的查询里的pipeline A部分必须在右边的pipeline B开始前完成. 因为hash join的build side必须比probe side先物化. Umbra通过将两条pipeline映射成顺序的task set来保证,在同一个resource group之中的task set必须等待其旗面的task set全部结束后才能开始执行. 这也方便我们在query的粒度追踪资源消耗.

类似Hyper的系统中task和morsel是一比一的关系也就是说一个task只有一个morsel. 但是umbra中一个task可以有任意个morsel. 在umbra中task并不是一开始就被静态创建好了的. 而是在运行时根据运行时的观测性动态调整的.

Thread local scheduling

Stride scheduling虽然能提供很强的确定性调度粒度但是他对现代多核硬件并不友好,因为会需要很多同步操作. 本章主要是提出了一种创新的task-based stride 调度实现. 可以在thread-local的底座上执行所有调度决策. Worker线程只会在活跃的task sets发生变化被notify.

当然,和传统的stride scheduling相比也增加了一些限制, 同时存在的resource group(也就是query的数量)是有上限的(但其实这样挺合理的,不做点反压系统搞不好直接被打爆了). 当数量特别多的时候会有一定的性能降级,新的resource group会在任务队列中等待.

本文的设计比较巧妙,虽然是针对stride scheduling algorithm的,但也可以通过仅修改thread-local scheduling的逻辑不需要修改别的部分从而切换到别的调度算法. 作者仅通过修改不了不到100行C++代码就实现了非确定性的lottery调度.

Thread local Decisions

全局scope内会维护一个数组,其中每一个slot是一个指针,指向某一个活跃的resource group中的task set. 当某一个RG的task set执行完毕后,会从RG中选一个新的task set放到同一个slot上. 这样的好处是调度时的优先级是绑定到了RG上而不是task set上从而简化了调度. 如果同一个RG的task sets可以被放到不同的slot会增加记账的逻辑.

除此以外所有的调度都是thread-local里发生的. 其中包括一个bitmask,它的作用是记录全局RG数组中的活跃项. 同时也负责优先级和pass value的映射记录. 除此以外每个worker thread也会存储其自己的global pass value. 就如下图所示.

SchedulingStructrue

如果全局的slot和worker thread的local activity mask是同步的那么调度就很简单, 直接挑选pass value最小的slot,然后在这个slot上进行一个atomic read以获取指向对应task set的指针. 之后便是执行,记录执行时间后更新本地对应的local pass value. 这种处理模式非常轻量, worker在pick 任务的时候不需要别的线程是否pick了相同的task set. 同时全局array中的slot只有在有新的task set的时候才会被写入新数据. 这类写操作发生的频率是比较低的,就不会带来大量的cache无效化同步开销.

修改active task sets

Figure4

大部分情况下会尽力最低化同步开销,但是有些全局的信息还是无法避免同步. Worker线程在以下3种事件时需要能过检测到

  1. 某一个全局array的slot里的task set执行完毕了. Worker必须把本地slot也disable掉(因为这个slot可能之后没有task set了 也就是这个query可能执行完了)
  2. 一个新的RG被赋值给一个slot时的初始化task set. worker需要pick这个RG对应的初始pass value和优先级. 并且更新local activity bitmask对应位置
  3. 全局slot对应位置的active RG中插入一条新的task set时(比如之前的task set执行完了就切换到下一个task set). worker需要更新本地的active slot并且设置初始pass value(在1中被disable掉了)

第一种事件可以只标记对应的pointer而无需通知worker. worker线程拿到slot时读取pointer就能发现已经是disable的状态并更新自己的local 数组

第二种和第三种事件需要引入新的组件. 在每一个worker中会维护两个原子的bitmask. 当worker在全局的slot中插入新的task set时, 所有worker的bitmask都会对应的更新. 两个bitmask分别称为change mask和return mask. 事件2更新change mask事件3更新return mask. 之后用update mask统称两个mask.

更新update bitmask的方式很简单,假如现在全局数组中的第k个slot接收到了一个新的RG的初始task set, 则只要更新每个worker的change mask的第k位为1. 通过原子的fetch or操作即可轻量级更新每一个worker. 类似地,worker线程首先通过原子的exchange操作将update mask都设为0并拿到刚刚被设置的值,并通过刚刚的值来获取全局的slot的状态以此决定是否更新调度策略. 如果根据bitmask获取的值没有特别大的变化则可以避免cache无效化(其实主要是是否更新worker对应的active task set, 如果不需要更新则可以继续pick对应的任务)。 例如在上图中, 第二幅图里全局slot中插入了两个新的task set. 所有worker的update mask也进行了对应的更新. 这里用了return mask来表示TS2是从一个已知的RG中来的,用change mask表示TS3是从一个全新的RG中来的. 此时所有的worker都还在执行TS3的task 他们在第二张image的时候还不会去拉取bitmask的更新到本地的调度状态里.在第三幅图片时第一个worker会将update mask中的信息同步更新, worker 2还在执行TS3的任务不会去更新. 这张figure说明了两个workers不需要同步各自的active task sets.

Task Set Finalization

如果RG中的task set A结束了那么需要激活他的下一个task set B(如果存在的话). 只有在task set b的所有前置task set都执行完后才会发生这一步. 当然为了灵活度考虑也允许task sets执行额外的finalization steps. 比如在sort时执行partitions的shuffling或者在grouping时merge部分的聚合结果.

worker会在试图从没有剩余的task的task set获取任务时被notified. 咋一想我们或许可以在这种事件发生时立即开启finalization. 但是别的worker可能还在执行刚刚从这个task set里拿到的task, 如果立马去finalization是很不对的,因为我们必须要在task set的所有任务都结束后才执行finalization, 这种情况也不应该一直等待别的task 完成. 为了解决这个情况Umbra的scheduler引入了一种轻量级的finalization phase.

当一个worker 挑选了一个slot进行执行时它会在全局的state数组上更新自己的决策(发生在原子read全局slot数组前). 之后第一个发现task set耗光了的worker会作为finalization phase的coordinator.

coordinator需要保证最后一个结束对应task的线程能够调用finalization逻辑. coordinator首先在全局slot 数组上将对应的slot里的pointer标记为无效, 这样之后pick这个slot的worker也会将其对应的local slot设置为无。 之后coordinator遍历全局state数组查找有哪些worker还在执行这个task set的task. 对于每一个满足条件的slot信息加上一个专用的finalization marker. 所有被mark的worker thread在结束他们当前的task之后必须显式注销这个task set. 只需要对每一个task set设置一个原子的counter即可做到. coordinator会在遍历完后将counter设置为其成功设置marker的worker数量(不能是设置一个marker就+1一次,一个是这样其实不高效另一个是这样不好确认到底是不是全部执行完了). 对应的worker thread在每次执行完一个task后检查全局state数组是否包含finalization marker,如果有则对counter减1(注意,这个counter可能变成负数, 因为有个worker可能在coordinator遍历完以前就执行完并减1了). 将counter置为0的worker可以保证是最后一个(因为这个counter只能一次性增加,就避免了刚+1就-1的情况), 他可以执行最终的finalization逻辑-> 在当前RG中查找是否有剩下的task set,如果有就设置如果没有就去全局wait queue中获取新的 RG.

这个finalization phase的开销几乎只是对全局state 数组的更新. 只要别的worker不在coordinator更新对应的slot的时候试图去pick task set则不会发生竞争. 并且只有pin了相同RG的worker才可能被影响. 所有会被影响到的线程理论上并不多(特别是在query比较多的场景,只要不是所有的worker都在处理同一个RG其实很难有特别多竞争).

Robust Morsel Scheduling

上一章介绍了lock-free的stride scheduler. 这一章介绍在此基础上利用morsel-based task进行的优化.

首先介绍如何将morsel-driven parallelism变得robust. 传统方式task:morsel的比例导致task粒度有方差从而导致不理想的调度失真. 调度的开销变得很难预测而且会有worker长时间被阻塞. 本文作者通过标准化task的执行时间来保证了调度开销可预测以及强响应度.

第二步是利用数据库的领域知识来优化混合分析负载中的查询延时. 通过优先级策略,本文的stride scheduler提供了细粒度的资源消耗控制. 而这是通过自适应的优先级调整策略做到的.

Adaptive Morsel Execution

类似Hyper的系统中morsel:task是1比1,但其实不同的morsel的执行耗时很很不同的(因为不同的pipeline执行的逻辑本身也不同,复杂度也不同)
. 比如假设一个pipeline中时简单的selectiton然后插入到hashtable, 那么在相同的morsel大小的情况下他的执行时间肯定是比一条包括复杂的字符串匹配和一系列hash table probe的pipeline要短的. 这意味着对于scheduler来说不同任务的粒度是差别很大的. 如果继续采用这种1:1映射那么在选择morsel大小时就需要考虑不同的tradeoff. 如果太小那么会产生更多次的schedule相应的schedule开销也更大, 如果太长, 那么有的morsel在有的pipeline中执行时间太长会影响整个系统的响应度.

传统的模式在两个维度是静态的:1. 依赖于固定的morsel大小 2. 依赖于静态的morsel和task的映射.

本文提出了一个创新的渐进式框架. 调度器定义一个目标时间$t_max$. 在挑选任务的时候调度器尽量调度能够准确满足这个目标的morsels. 也就是说这个框架能够1.使用动态的不定size morsel大小 2. 执行不同数量的morsels. 另外调度器对task的结构是无感的,这也让整个调度压力变得可预测了. Umbra将这个值设置为2ms. 作者认为这个大小兼顾了调度的负担以及响应度. 另外据作者的观测调度策略一般只需要不到1ms就能决定,也就是说负载大概也就0.05%

通过将pipeline转换为状态机以不同的执行阶段采取不同的策略来达到目标时间. 不同的状态暗示了需要pick多少个morsel. 因为morsel是在运行时从tuples的集合中算出来的,所以动态地通过不同的morsel size填充task size是可行的.

Default state

默认状态时会试着挑取一个能够满足$t_max$的morsel. 这需要系统能够提供一个准确的吞吐估算值$T$, 表示每秒多少tuple. 这样一个morsel则是T*$t_max$的tuples. 而当这个morsel执行完后便能拿到实际的执行时间$t$. 对刚刚的吞吐估算值进行一个修正$\hat{T}=(T\cdot t_{\max})/t$. 根据旧的吞吐量T和系数$\alpha \in [0, 1]$, 那么新的则是$T^{\prime}=\alpha\hat{T}+(1-\alpha).T$.

Startup State

这个state用来提供一个初始的吞吐估算. 这个state会按照指数关系不停pick不同size的morsel指到达到$t_max$

Optimizations

还引入了两个优化. 一个是除了上述两种state还有一个shutdown state. 通过当前的吞吐估算值以及剩余的tuples数量可以大概计算出剩余所需的pipeline执行时间. 假设我们有M个worker, 一旦可预测的剩余的总时间低于$W\cdot t_{max}$就会进入shutdown state. 假设预期剩余总时间为t,morsel的最短执行时间为$t_min$, 我们将morsels调度为$\max(\frac{t}{W},t_{\min})$. 另一个优化是为了应对不支持自适应morsel size的tasks. 这类必须高效处理因为有些任务不具有高度的自适应. 如果运行时发现有的任务消耗的只是target duration的一部分可以允许这类任务继续消费直到达到$t_max$

Evaluation

MorselSizeComparison

这个图能看出来不同阶段的效果, 特别是startup阶段的两倍指数

Adaptive Query Priorities

不同的任务可以有不同的优先级. 数据库不应该让用户处理workload管理中的繁琐细节. Umbra利用自适应查询优先级来透明化地按照短作业优先的方式处理. 不需要用户输入而是在运行时根据query的性质赋予优先级. 首先定义”desirable”延迟. 假设所有的query都是一样重要的. 在这个假设下提出两个基础原则.

  1. 查询延迟在load时也得保持可预测性.

如果系统同时接受两条query, 短一些query需要先结束.

  1. 查询延迟需要尽量低.

如果数据库遵守这两条那么是可以做到可预测和高性能的. 不过公平调度只能保证1不能保证2. 因为短作业一般不会特别影响到长作业的延迟. 比如我们有9成的短作业他们耗时10ms,只有一成的长作业,他们耗时1s. 即使使用短作业优先的方式,对于长作业也只有10%的影响.

本文提出的自适应优先级策略就是为了透明地给短作业赋予优先级. 处理起来和多级反馈队列很像,一个查询的优先级取决于他到目前为止消耗的CPU资源. 因为作者将query包装成了一个RG, 所以查询的优先级其实也就是query的优先级. 可以按照如下公式处理:

$P_{i+1}=\begin{cases}P_{i},i<d_{start}\\max(P_{min},\lambda P_{i}),i\geq d_{start}\end{cases}$

有3个参数, $d_start$表示RG的优先级开始衰退的时间, 衰退的速度控制在$\lambda\in[0,1]$. 优先级最低到$p_min$.

和公平调度相比,这种调度策略能提供更大的相对性能差异. 但是为了实现短作业任务的高性能也是必须的. 尽管如此还是保证了原则(1). 同时到达的两条query的优先级退化速度是一致的也就是说短的一个会在同样的资源分配下先结束.

自定义优先级

当然也可以让用户自己定义优先级. 有两种简单的方式实现

  1. 特别重要的query可以允许用户设置一个不同的静态初始优先级并且这个优先级是静态的不会衰退
  2. 也可以将优化级和用户绑定. 这样用户优先级可以影响他的所有query的衰退速度.

Self-tuning Scheduling

TODO