0%

译From Eager Futures/Promises to Lazy Continuations

介绍

译者一直对有关coroutine和execution的概念非常迷恋,近来希望将对这部分内容的学习心得简单分享一下提升下自己的理解,预计会有5 6篇博文产出,先以这篇CppCon的翻译起个头

下文会从pdf的第一章开始翻译

motivating futures/promises + actors

当工程师试图打造一款高性能且正确的分布式系统时会遇到许多关键挑战。概括下来有两条

  1. 在代码中不得不有等待的条件
  2. 在代码中有state

如何解决等待的问题

以下面的代码为例

1
2
3
std::string text="...";
text = SpellCehck(text);
text = GrammerCheck(text);

上面的代码可通过函数的组合修改成一行GrammerCheck(SpellCehck("...")). 不过接下来我们还是分开分析,先分析SpellCheck函数的实现

1
2
3
4
5
std::string SpellCheck(std::string text) {
auto body = http::UrlEncode({"text", text});
auto response = http::Post("https://www.online-spellcheck.com", body);
return response.body;
}

无论这段代码之中的Http::Post方法是阻塞的还是非阻塞都不会改变一件事实:你的代码不得不在这里等待
可能的解决方案如下:

  1. 就死等. 这个方案肯定是不会采纳的
  2. 使用线程. 成本高昂同时对正确性无益
  3. 使用coroutine. 作者当时是09年 还没有好用的C++ coroutine
  4. 使用不同的语言比如erlang 或者 把erlang带进C++之中
  5. 使用回调函数,之后会讨论这个
  6. 使用future/promise

接下来我们讨论future/promise。
future/promise就像buffered channel一样

1
2
3
4
5
6
7
8
Promise<std::string> promise;
---------------thread x-------------
Channel::Reader<std::string> reader = channel.Reader();
reader.Read(); // Blocks!
---------------thread y-------------
Channel::Writer<std::string> writer = channel.Writer();
writer.Write("..."); // Non-blocking!
writer.Close();

改为future代码如下

1
2
3
4
5
6
Promise<std::string> promise;
---------------thread x-------------
Future<std::string> future = promise.Future();
future.Get(); // Blocks!
---------------thread y-------------
promise.Set("..."); // Non-blocking!

接下来我们用future来修改SpellCheck函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 这里我们异步地请求Post方法,并通过future拿到对应的返回值
Future<std::string> SpellCheck(std::string text) {
auto body = http::UrlEncode({"text", text});
Promise<std::string> promise;
auto future = promise.Future();
http::AsyncPost(
"https://www.online-spellcheck.com" ,
body,
[promise = std::move(promise)]( auto&& response) {
if (response.code != 200) promise.Fail(response.code);
else promise.Set(response.body);
});
return future;
}

假如现在我们的SpellCheck和GrammarCheck两个函数都试用了future/promise的方法改造,那么我们会碰到接下来的问题

1
2
3
std::string text = "..."; 
text = SpellCheck(text). Get(); // Blocks!
text = GrammarCheck(text). Get(); // Blocks!

可以看到这两个方法的Get都会是阻塞的,那不相当于我们的代码又变成串行阻塞的了吗?这里的函数逻辑是在完成spellcheck后再调用grammarcheck,也就是说这里的Control Flow应该是SpellCheck -> GrammarCheck那么这里如果可以让两个future拥有future a异步执行完了之后(then)执行b的语义似乎能避免对应的阻塞,假设C++的future能提供类似JavaScript的.then接口继续讨论这个问题.那么对应代码或许可以这样修改

1
2
3
4
5
6
7
// 这里将SpellCheck和GrammerCheck组合到一个函数SpellAndGrammarCheck之中
Future<std::string> SpellAndGrammarCheck (std::string text) {
return SpellCheck(text)
.Then([](auto&& text) {
return GrammarCheck(text);// Can be a Future<T> or T.
});
}

到这里我们似乎解决了等待的问题.

如何解决状态的问题

使用future/promises执行代码的一些性质

  • 因为代码中从不block所以可以只有一个线程便执行(类型JavaScript的模型,只有基于libuv提供的单线程event loop)
  • 然后代码并不一定能原子地执行,因为当代码中出现必须等待别的代码时就出现了交互(在不得不等待时会有别的代码被执行就是问题的关键)
  • 许多人称这种情况为”concurrency” vs parallelism因为这种模式给了你并发执行的错觉然而你并没有同时在执行
  • 但是你依旧得忍受并发执行所带来的同步问题
  • 也可以基于多线程执行代码

可能的解决方案如下:

  • 1963年提出了mutexes和semaphore
  • 1973年提出了actors
  • 1974年提出了monitors
  • 1978年提出了communicating sequential processes
  • 1987年提出了statecharts

其中mutexes,semaphores和monitors都是基于threads的解决方案
而actors,csp,statecharts是没有线程的解决方案

没有线程会是什么情况?
没有线程的抽象包含了执行模型/语义和状态的同步
actors包括了执行,同步,状态

包括得更多 -> 更高级的抽象能带来

  • 更容易理解
  • 更容易在更多的硬件和平台运行
  • 更容易优化

actors的性质

  • 本地可修改状态
  • 消息队列
  • 一次接受并处理一条消息
  • actors之间发送消息是非阻塞的
  • 不论是local还是distributed模式都是同样的编程模型

下面我们看一段在C++中的actor模型的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
struct MyActor : public Actor {
void Receive(ActorId sender, Message message, void* arguments) override {
switch (message) {
case MESSAGE_FOO_REQUEST:
auto* request = (FooRequest*) arguments;
...
Send(sender, MESSAGE_FOO_RESPONSE, response);
break;
case MESSAGE_BAR_REQUEST:
...
}
}
};

只需要实现不同的request,actor在接受到 message后就能派发到对应的actor上去。
actors (visualized)

如果对所有的方法上锁(类似java的synchronized)能不能做到同样的效果呢?
threads (visualized)

actor的性能如何?
对于数据经常需要被共享或者在执行资源间移动的并发程序没啥影响

但是对于分布式和网络程序,数据经常只在别的机器上被共享,并且在任意的core之间交换数据会带来性能下降

似乎通过actor我们解决了state的问题

libprocess

作者在2009年的时候

  • 在UCB 构建分布式系统Apache Mesos
  • 使用了C++来避免如Java的gc语言会带来的运行时不确定性问题
  • 希望使用actors

于是在打造libprocess的时候他们想到了将futures/promises和actors结合!

为什么actor需要future/promises

回首上面的actor代码会发现很难描述清楚actor之间的执行流

  • 在actor 模型上收发信息就如同编写汇编语言一样(译者注:编写难度和体验)尽管他们确实解决了state的问题
  • message的处理就如同goto一样!而goto是非结构化的,是不被提倡的

抛弃goto这个玩意,我们想要的是

  • 非阻塞的函数调用(返回future)
  • 函数的可组合性(Then)

就像如下的代码

1
2
3
4
5
MyActor actor;
auto future = actor.Foo(...)
.Then([](auto&& response) {
return ...;
});

libprocess actor的伪代码如下

1
2
3
4
5
6
7
8
9
10
struct MyActor : public Actor {
Future<FooResponse> Foo(FooRequest request) {
// Execute the “message” on the actor ‘self()’.
return On(self(), [this, request]() {
FooResponse response;
...
return response;
});
}
};

注意这里On(self(), closure)的语义是在self()也就是这个actor实例本身这个执行资源上执行closure中的内容

自然的也可以在函数中通过then组合调用别的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
struct MyActor : public Actor {
Future<FooResponse> Foo(FooRequest request) {
// Execute the “message” on the actor ‘self()’.
return On(self(), [this, request]() {
...
return SomeOtherFunctionReturningAFuture()
.Then([](auto&& value) {
...
});
});
}
};

why futures/promises need actors

观察一下下面这段代码

1
2
3
4
5
6
7
8
9
struct MyObject {
Future<void> SomeMember() {
return SomeFunction()
.Then([](auto&& value) {
// Where should this lambda run?????
...
});
}
};

Then之中的函数逻辑应该运行在什么之上呢?一个简单的想法是直接使用SomeFunction返回的future所关联的promise所在的执行资源之上

完善一下上面的代码可能会是这样

1
2
3
4
5
6
7
8
9
10
11
12
13
struct MyObject {
Future<void> SomeMember() {
return SomeFunction()
.Then([this](auto&& value) {
// Where should this lambda run?????
std::unique_lock<std::mutex> lock(mutex_);
i += value;
});
}
private:
int i_;
std::mutex mutex_;
};

这可能会带来一个问题:在SomeFunction对应的promise的执行逻辑在调用promise.Set()的时候可能是阻塞的(可以理解成执行promise的逻辑和执行Then的逻辑可能会并行,这里就有锁会带来的同步代价)
或许可以使用asyncmutex试图解决这个问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
struct MyObject {
Future<void> SomeMember() {
return SomeFunction()
.Then([this](auto&& value) {
// Where should this lambda run?????
return mutex_.Acquire()
.Then([this]() {
i += value;
mutex_.Release();
});
});
}
private:
int i_;
AsyncMutex mutex_;
};

这里调用Release()将要/必须执行一个不会block的waiter,这也可能带来不确定的执行。

如果能够保证Then的内容严格在actor执行完SomeFunction()之后执行就能够避免上文中非确定性执行带来的同步问题

1
2
3
4
5
6
7
8
9
10
struct MyObject : public Actor {
Future<void> SomeMember() {
return SomeFunction()
.Then(DeferOn(self(), [this](auto&& value) { // Then之中的内容之后会在同一个actor的执行资源上执行
i_ += value;
}));
}
private:
int i_;
};
  • 上面的方式中actor提供了执行Then(continuation)的executor资源
  • 设置promise非常快是非阻塞的(DeferOn保证了在set之后才会执行Then之中的逻辑)
  • 无需同步

revisiting the problem

重新看一下SpellAndGrammerCheck函数

1
2
3
4
std::string SpellAndGrammarCheck(std::string text) { 
text = SpellCheck(text);
return GrammarCheck(text);
}

这段代码是顺序的,非并行的,即使并发执行也没有状态需要同步
修改成Future和Then的方式

1
2
3
4
5
6
Future<std::string> SpellAndGrammarCheck(std::string text) {
return SpellCheck(text)
.Then([](auto&& text) {
return GrammarCheck(text);
});
}

这之中会申请锁并且需要动态分配内存

是什么需要申请锁以及动态分配内存

我们先展开SpellCheck的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
Future<std::string> SpellAndGrammarCheck (std::string text) {
...
auto future = promise.future();
http::AsyncPost( // Non-blocking! Returns immediately!
...,
[promise = std::move(promise)]( auto&& response) {
promise.Set(response.body);
});
return future
.Then([](auto&& text) {
return GrammarCheck(text);
});
}

上面的代码中在promise.Set(response.body)与continuation通过.Then()组合之间存在race, 这两个动作可能在同时发生,于此我们需要锁来同步. 另外promise也可能在continuation被组合起来以前设置,所以需要动态内存分配.

有什么办法避免锁以及这一次动态内存分配吗?

evolution of libprocess

避免锁开销

上面的逻辑中SpellCheck的future的组装(将GrammarCheck的逻辑组合在一起)和promise的set操作之间是可能并行的,所以需要锁来同步,但如果将代码的逻辑做一下简单的修改,修改成SpellCheck中AsyncPost的逻辑执行完之后执行一个回调函数就可以避免锁的同步(也就是保证了GrammerCheck的逻辑一定在SpellCheck之后)

1
2
3
4
5
6
7
8
9
10
11
12
// f的逻辑其实就是GrammerCheck
void SpellCheck(std::string text, std::function<void(std::string)> f) {
auto body = http::UrlEncode({"text", text});

http::AsyncPost(
"https://www.online-spellcheck.com" ,
body,
[f = std::move(f)](auto&& response) {
f(response.body); // Invoke continuation without locks!
});
return future;
}

也可以修改成模板函数

1
2
3
4
5
6
7
8
9
10
11
template <typename K>
void SpellCheck(std::string text, K k) {
auto body = http::UrlEncode({"text", text});
http::AsyncPost(
"https://www.online-spellcheck.com" ,
body,
[k = std::move(k)](auto&& response) {
if (response.code != 200) k.Fail(response.code);
else k.Success(response.body);
});
}

译者注:作者在演讲里表示在他读大学的时候他们都用K来表示continuation,所以这里模板里他也用的是K而不是C

避免动态内存分配

以上面的模板函数为例,到底是在什么地方分配了内存呢?这里需要进入Http::AsyncPost的实现之中

1
2
3
4
5
6
7
8
9
10
11
12
namespace http {
template <typename K>
void Post(std::string url, std::string body, K k) {
void* data = new K(std::move(k));
...
http_post(url, body, data, +[](long code, const char* body, void* data) {
K* k = reinterpret_cast<K*>(data);
k->Success(http::Response{code, body});
delete k;
});
}
} // namespace http

在Post方法中还是分配了内存.

💡 如果将continuation K作为函数返回的结果中的一部分是不是就避免了分配?

接受一个continuation作为参数并返回一个continuation作为结果(返回值中组合/包括了作为参数传递的continuation)

修改一下上面的Post方法的实现,与其在函数的栈上分配一个堆上的void*,不如在函数中定义一个仅存在于该函数的struct并返回一个该struct的值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
template <typename K>
void Post(std::string url, std::string body, K k) {
struct Continuation {
void Start() {
void* data = &k;
http_post(url, body, data, +[]( long code, const char* body, void* data)
{
K* k = reinterpret_cast<K*>(data);
k->Success( http::Response{code, body});
});
}
std::string url, body;
K k;
};
return Continuation{std::move(url), std::move(body), std::move(k)};
}

可以看到这段代码讲传递进入的url,body,k都传递进了struct Continuation之中,之后这三个值的生命周期将同返回的Continuation value绑定,在value析构时一同回收,同时这里的内存分配也只有一个栈上的值类型Continuation的内存分配.

lazy continuation

1
2
auto k = http::Post(url, body, /* k' */);
k.Start();
  • 返回值类型是”computational graph”
  • 这个graph是lazy的,当我们拿到这个graph时什么都不会发生(tradeoff for dynamic allocation) 并且如果我们想执行他的逻辑就必须显式调用start
  • graph可以分配在栈上或者堆上
  • 在完成以前内存必须有效

eventuals

作者将lazy continuation称为eventuals.

这一章的内容主要是介绍了eventuals并且通过eventuals将上文中传递continuation的代码风格进行了修正(传递continuation看着比较难看不符合人体工程学),译文会忽略这部分推导,感兴趣的读者可以访问eventuals查看这个项目的信息.

scheduling

本章节主要是探讨continuation应该在什么地方运行?回望下面这段代码

1
2
3
4
5
6
7
8
9
10
11
auto Post(std::string url, std::string body) {
return Eventual<http::Response>([url, body]( auto& k) {
using K = std::decay_t<decltype(k)>;
void* data = &k;
http_post(url, body, data, +[]( long code, const char* body, void* data)
{
K* k = reinterpret_cast<K*>(data);
k->Success(http::Response{code, body});
});
}
});

如果continuation是在event loop中被调用的,我们不希望continuation继续在event loop中继续运行(会阻塞其他的IO任务,IO线程池不应该执行太复杂的continuation任务)
同理,如果continuation是在actor中被调用的我们也不希望继续在actor的执行资源上运行,因为这样会阻塞别的message的处理

回首看一下motivating exemple的代码

1
2
3
4
5
std::string SpellCheck(std::string text) {
auto body = http::UrlEncode({"text", text});
auto response = http::Post("https://www.online-spellcheck.com", body);
return response.body;
}

得益于函数的抽象,我们可以分开考虑函数的接口以及实现.在这里不需要考虑http::Post到底是如何实现的.

  • 我们不关心是否在多线程上实现
  • 我们不关心是否在GPU上实现
  • 我们不关心是否在FPGA上实现

然而,如果执行完http::Post方法并将控制流返回时我们发现现在正在

  • 于我们开始执行时不同的线程上执行时,我们会很惊讶
  • 一块GPU的执行逻辑上,而我们是在CPU上开始执行的,我们会很惊讶

所以我们到底应该让continuation在什么地方执行呢?
使用在你不得不等待以前正在使用的执行资源

否则(译者注:这里作者意思大概就是否则我们每一次都应该检查接下来的函数调用的文档/实现去查看对应的函数会不会在eventloop上执行,如果是的话就需要自己通过ThreadPool::Schedule等方法重新调度),需要将代码进行类似的修改.

1
2
3
4
5
6
7
8
9
10
11
12
13
auto SpellAndGrammarCheck (std::string text) {
return ThreadPool::Schedule([text]() {
return SpellCheck(text)
// Rescheduling on thread pool because we looked at
// documentation of 'SpellCheck()' and it continues on the
// event loop which we don't want to be on.
| ThreadPool::Schedule([text]() {
return Then([](auto&& text) {
return GrammarCheck(text);
});
});
});
}

类似的内容可以查看std::execution的提案(译者后续的博文也会介绍这个提案).

另外也介绍了在PLDI也有类似的工作

“Composing Sorfware Efficiently with Lithe”(PLDI 2010)

  • 允许许多同时的scheduler负责计算图的子树
  • 所有的计算都拥有一个调度context
  • 对于冲洗提交任务给拥有context的接口提供了正式的接口