0%

浅谈协程

介绍

这篇博文主要是结合笔者以前在字节跳动实习时在内部做的一次分享以及近年来的一些新的感悟来谈一下协程的部分内容,这篇博文不会过多讨论协程如何进行调度以及如何进行任务的组合等部分,主要是围绕为何会有协程,协程的分类以及不同的实现,另外也简单聊一下使用协程的心得.

为什么我们想要协程

亦可参考Google于13年的一次分享

首先为什么会想要使用协程呢?先从网络编程中几种模式说起,这里我们基于asio的代码来讨论,先用最传统的阻塞式的网络编程模式实现echo server就如同如下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
int main()
{
try
{
boost::asio::io_context io_context;
tcp::acceptor acceptor(io_context, tcp::endpoint(tcp::v4(), 8000));
while (true)
{
tcp::socket socket(io_context);
acceptor.accept(socket);
boost::system::error_code error;
while (true)
{
char data[1024];
size_t length = socket.read_some(boost::asio::buffer(data), error);
if (error == boost::asio::error::eof)
break; // Connection closed cleanly by peer.
else if (error)
throw boost::system::system_error(error); // Some other error.
boost::asio::write(socket, boost::asio::buffer(data, length));
}
}
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << std::endl;
}
return 0;
}

因为使用的是阻塞的模式,这段代码的吞吐量特别慢,因为每次只能处理一个连接的请求,并且一次连接的请求没能处理完完全不会去处理下一段。

有的人会想或许我们可以采取每一条连接都开启一个新线程进行处理的方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#include <iostream>
#include <thread>
#include <asio.hpp>

using namespace asio::ip;

void do_echo(tcp::socket socket)
{
try
{
char data[1024];
while (true)
{
std::size_t length = socket.read_some(asio::buffer(data));
asio::write(socket, asio::buffer(data, length));
}
}
catch (std::exception& e)
{
std::cerr << "Exception in thread: " << e.what() << "\n";
}
}

void do_accept(asio::ip::tcp::acceptor& acceptor, asio::io_service& io_service)
{
acceptor.async_accept(io_service,
[&acceptor, &io_service](std::error_code ec, tcp::socket socket)
{
if (!ec)
{
std::thread(do_echo, std::move(socket)).detach();
}

do_accept(acceptor, io_service);
});
}

int main(int argc, char* argv[])
{
try
{
if (argc != 2)
{
std::cerr << "Usage: echo_server <port>\n";
return 1;
}

asio::io_service io_service;
tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), std::atoi(argv[1])));
do_accept(acceptor, io_service);
io_service.run();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}

return 0;
}

上面这种形式在连接数较少时也是能work的,但是当连接数量达到几十万条甚至更多的时候呢?假设有10w个连接,Linux下每个线程的栈默认大小为8M,那么一下子就会消费80G的内存,再算上线程切换的开销,可以想象服务的负载会有多么大,这似乎并不是一条很靠谱的解决方案.
接下来我们试着用最简单的回调的方式进行异步化改造

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include <iostream>
#include <boost/asio.hpp>

using boost::asio::ip::tcp;

void session(tcp::socket socket)
{
std::array<char, 1024> buffer;
boost::system::error_code error;

// 异步读取来自客户端的数据
socket.async_read_some(boost::asio::buffer(buffer), [&](boost::system::error_code ec, std::size_t length)
{
if (!ec)
{
// 将数据回显回客户端
boost::asio::async_write(socket, boost::asio::buffer(buffer, length),
[&](boost::system::error_code ec, std::size_t /*length*/)
{
if (!ec)
{
// 继续异步读取来自客户端的数据
session(std::move(socket));
}
});
}
});
}
int main()
{
try
{
boost::asio::io_context io_context;
tcp::acceptor acceptor(io_context, tcp::endpoint(tcp::v4(), 8000));
while (true)
{
tcp::socket socket(io_context);
acceptor.accept(socket);
// 启动异步会话
session(std::move(socket));
}
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << std::endl;
}
return 0;
}

上面这段代码中所有的io操作都是异步的,即使只有一个线程的情况下也能提供很强的吞吐(参考nodejs),因为其中所有的IO操作都是非阻塞的,加上使用了IO多路复用技术可以保证一个线程中处理多个request时不会出现某个request的io卡住导致所有request全部饿死的情况。但是可以看到这种方式的代码中无可避免的会使用回调(echo server这种比较简单的情况只有2层回调,但是如果我们是http服务器呢?读到数据后先进行encode/decode,然后路由到对应的处理函数),当回调层数越深的时候丢失的代码上下文就越多,在debug的时候就更费劲.
另外异步+回调更容易让程序员迷失在对象的生命周期之中,一种很常见的问题是,对象A在线程1里析构了,但是被线程2访问了,这就会导致heap-use-after-free的问题。有的人会argue到使用shared_ptr来解决问题,但是如果shared_ptr引用的原子变量的开销在很多时候也不可小觑,更不用提enable_shared_from被滥用的话带来的问题。

下面我们用asio的协程来编写刚刚的功能.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
void report_error(std::string_view component, sys::error_code ec)
{
std::cerr << component << " failure: "
<< ec << " ()" << ec.message() << ")\n";
}

asio::awaitable<void> session(tcp::socket socket)
{
try
{
char data[1024];
for (;;)
{
std::size_t n = co_await socket.async_read_some(asio::buffer(data), asio::use_awaitable);
co_await async_write(socket, asio::buffer(data, n), asio::use_awaitable);
}
}
catch (sys::system_error const& e)
{
if (e.code() == asio::error::eof)
std::cerr << "Session done \n";
else
report_error("Session", e.code());
}
}

asio::awaitable<void> listener(asio::io_context& context, unsigned short port)
{
tcp::acceptor acceptor(context, {tcp::v4(), port});

try
{
for (;;)
{
tcp::socket socket = co_await acceptor.async_accept(asio::use_awaitable);
asio::co_spawn(context, session(std::move(socket)), asio::detached);
}
}
catch (sys::system_error const& e)
{
report_error("Listener", e.code());
}
}

int main()
{
try
{
asio::io_context context;

asio::signal_set signals(context, SIGINT, SIGTERM);
signals.async_wait([&](auto, auto){ context.stop(); });

auto listen = listener(context, 55555);
asio::co_spawn(context, std::move(listen), asio::detached);

context.run();
std::cerr << "Server done \n";
}
catch (std::exception& e)
{
std::cerr << "Server failure: " << e.what() << "\n";
}
}

可以看到在协程版本的代码中我们并不需要像异步版本的代码一样写很多的回调函数,更多的是co_await expression这样类似函数调用的写法,我们的代码不需要再非常的割裂,可以和同步编程时的代码似乎没有太大的差别,同时这样的代码也不会出现同步阻塞代码中处理一条请求时无法处理新来的请求的情况,程序的吞吐也能很可观。同时也不会出现per-thread-per-connection版本中的线程消耗问题.

下文中我们简单描述一下不同的协程的实现原理

原理

按照是否保存调用栈区分

首先有栈无栈协程在执行的时候都是要使用到程序内存空间当中的栈的,而他们的名字之中的有栈无栈指的是是否保存自己的调用栈,有栈协程会将从协程起始点开始到挂起点为止的所有栈上的变量都保存到自己的栈之中,当发生协程切换的时候会替换挂起协程和恢复协程的栈,这样就可以在另一个协程的挂起点上恢复。这里我们先卖个关子不谈这有什么差异

现在我们设想一个场景,在某个协程C的上下文中调用函数F,这个协程能在什么地方挂起?
在有栈协程中,假设C调用到F的时候调用栈如下:

1
2
3
4
5
6
7
8
9
10
11
12
|      Stack Frame f     |   <- 在这里的任意时刻挂起
+----------------------+
| Local Variables f |
| |
| |
| |
| |
+----------------------+
| Return Address f |
+----------------------+
| Ctx Of C |
+----------------------+

有栈协程能够将C中jmp F到F执行的任意时刻的堆栈给保存下来,之后也可以切换回来. 这里用Linux kernel中的process/thread调度做一个简单的类比,Linux中不管是process还是thread都是用task_struct保存其对应的属性的,内核在进行调度的时候也是将当前的process/thread对应的task_struct切出并替换成另一个task_struct,这样就完成了切换。所以我们甚至可以说如果有syscall api能够让用户自己指定将当前process/thread切换到pid所对应的process/thread,那么内核其实也提供了手动实现的有栈协程的手段.(实际上Google还真想这么干)

接下来我们讨论无栈协程的实现。无栈协程并不会单独保存协程的整个stack frame,类比上面有栈协程的图来对比的话就是无栈协程只能在被标注可以挂起的地方挂起. 假设我们有这样一段代码

1
2
3
4
5
6
7
8
Task<int> coro() {
int i = 0;
co_yield i;
i++;
co_yield i;
i++;
co_return i;
}

这段代码中只有3个co_xxx的地方可以挂起,这段代码可能会被编译器修改成这样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
struct CoroutineState {
int i;
int state;
CoroutineState() : i(0), state(0) {}
int coro() {
switch (state) {
case 0:
state = 1;
return i;
case 1:
i++;
state = 2;
return i;
case 2:
i++;
return i;
default:
// shoule never come here
}
}
};

可以看到整个函数被转换成了一个状态机类,在函数内部的局部变量会被捕获到这个类中作为成员变量,而不同的挂起点对应不同的状态。可以认为无栈协程会被状态成状态机代码来执行(熟悉Js的小伙伴会不会想到Promise呢).敏锐的小伙伴肯定也意识到了,编译器帮我们将无栈协程转换成了对应的状态机或者结构体(在rust中的future,JS中的promise),那只要拿到这个对应的handler就能自己决定在哪儿resume(这也给众多的库作者提供了自己手写runtime scheduler的方式).

但是说了这么多无栈和有栈的区别能如何更具体的体现呢?我们按照一段伪代码来分析在无栈和有栈时的不同情况

1
2
3
4
5
6
7
8
function foo():
...
return 42

function coroFun():
yield 100
i = foo()
yield 101

无栈协程无法在foo()上挂起,因为foo中没有挂起点,这意味着没有告诉编译器怎么将foo()转变成一个结构体保存其对应的堆栈上下文也不知道在什么地方挂起和回复. 但是有栈协程因为从协程起始点开始保存了所有的栈信息所以即使到了foo()函数中也能一并保存,可以随时挂起.这就是为什么无栈协程only the top-level coroutine may be suspended.

协程的使用

  1. 首先第一个问题,协程是适合IO密集型任务还是计算密集任务?答案是IO密集型任务,协程最直观的好处在于他的切换代价非常低,所以对于IO密集型任务可以在IO发生阻塞时(所以一般需要结合非阻塞IO使用,当返回EWOULDBLOCK的时候就可以出让本协程了)切换到另一个协程,这样代价相比线程IO切换会低上很多。

  2. 那如果有了协程我的并行度会有提升吗?并行度的上限理论上还是和CPU core数量有关,同一时间能并行执行的任务并不会因为协程数量更多切换代价更低而提升。协程的好处是在避免因为过多的回调引起程序的非结构化的基础上还能提供有效的异步编程手段以提升程序的吞吐(参考单线程的NodeJS).

  3. 协程使用中应该使用什么同步原语?如果还是使用pthread_xxx同步原语的话同步的粒度是thread级别的,不管是有栈协程还是无栈协程对于操作系统来说都是运行在用户态的函数,比如一旦使用pthread_lock上锁那整个线程都将阻塞自然而然也就没有了切换协程的能力。这也是为什么在不同的协程实现中都会带上一套自己的同步原语(比如bmutex之于bthread, tokio::sync::Mutex之于rust). 之后的博文也会介绍如何实现不同的协程如何实现自己的同步原语. 当然,在库级别的协程的同步原语在多个库混杂时也会造成一些问题,比如我在bthread里面如果使用了folly::fiber的同步原语会发生什么?