2021/12/18

c++20 Coroutine 활용

  

코루틴 배운지 1년이 넘었는데 심심삼아 이 놈을 어떻게 써 먹을까 여러가지 시험해 보기로 했다. cppcorounifex, concurrencpp 같은 코루틴 라이브러리 소스를 보면 압도 당하게 되고 걍 갖다 쓰자거나 아니면 라이브러리 사용법을 배우기도 참 힘들구나 싶다. 특히나 unifex는 아예 새로운 언어를 만든 느낌이다. 논란의 여지가 있지만 c++의 기본 철학이 쓰면 득(pay for what you use)이라지만 c++20부터는 정말 새로운 언어가 돼 버렸다. 그런데, 코루틴에 관한 한 간단히 몇 가지 기본 class만 만들어도 생각보다 코루틴을 쉽게 사용할 수 있더라.

구체적으로는 코루틴 객제 class인  Task와 Task가 모든 data type에 동작하도록 하기 위한 일반화된 Promise의 두 가지 class만 있어도 코루틴 라이브러리 없이 코루틴을 사용하는데 충분하다는 것이다.  코루틴을 지원하는 ThreadPool과 이를 동기화하기 위한 SyncTask class도 간단히 만들어 볼 수 있었다. 더구나 일반 함수도 co_return만 붙이면 코루틴이 되기 때문에 성능은 좀 떨어지겠지만 모든 함수에 코루틴 ThreadPool을 사용할 수 있다. 또, Generator의 경우에는 일반적인 Task class를 사용해도 되지만 무한 동작하는 Task이므로 memory leak이 생기지 않게 coroutine handle(이하 handle)을 최종적으로 소멸시켜야 하므로 별개의 class로 만드는게 낫긴 하다. 참고로, clang은 알아서 코루틴 종료시 handle을 잘 제거하기 때문에 Generator class를 분리할 필요도 없다. 우분투 21.10의 gcc 11.2에서는 handle을 적절히 제거해 주어야만 하더라. 두 컴파일러는 버전이 올라갈 수록 뭔가 처리 방식이 달라서 어느 하나에선 잘 돌아 가는데 다른 놈에선 안돌아가는 문제가 자주 생긴다. 최근에 Qt creator가 cmake 프로젝트를 잘 지원하기 때문에 Qt 라이브러리를 안 쓰더라도 gcc와 clang 컴파일러 환경을 한 번 맞춰주면 번갈아 가며 시험해 볼 수 있어 좋다.

이 글에서는 c++20 코루틴을 갖고 놀아본 것을 정리한다. c++23에 코루틴 표준 라이브러리가 제대로 포함되겠지만 그 전에 코루틴의 활용처가 꽤 늘어날 듯한 생각이 들어서 새로운 아이디어가 계속 생겨나길 바라기 때문이다. 코루틴이 비동기 실행 흐름을 논리적으로 구조화하는 것이 기본 목적이지만, 역사적으로 새로운 기술이나 발명품이 원래 의도와는 다른 방향으로 사용되는 경우도 많았다.

코루틴 객체 Task<> class

코루틴 반환 객체 class인 Task<>는 Promise<>와 함께 std::future<> / std::promise<>의 역할을 수행한다. Task는 copy할 수 없고 move만 가능하다. 소멸자인 ~Task()에서 완료된 handle만 소멸시킨다. 이 점이 특이 사항이다. Generator는 완료할 수 없는 무한 Task이므로 ~Generator() 소멸자에서는 handle이 nullptr가 아니면 무조건 제거해 주면 된다. 또한, Task 내부에 co_await 연산자를 overloading하고 있는데 다른 코루틴 Task 객체를 co_await하기 위한 것이다. 이로써 Task 자체도 Awaitable 객체가 된다. 즉, 부모 코루틴이 자식 코루틴을, 자식 코루틴이 손자 코루틴을, ..... 반복적으로 co_await 할 수 있게 된다. 사실 Python도 3.5 이후 코루틴을 제대로 지원하는 듯 한데 기본으로 코루틴 내부에서 다른 코루틴을 co_await 할 수 있더라. 그니까 다른 언어에서는 기본으로 사용하는 기능인데 c++에서는 직접 만들든 라이브러리를 쓰든 해야 한다는 야그다. 아무튼 Promise<>와 Awaiter<> class를 통해 이를 구현할 수 있다.

//! General coroutine tasks including generators.
//! Task object can be used as a 'future' with promise_type.
template <typename TR = void>
class Task
{
public:
using promise_type = detail::Promise<TR, Task>;

Task(Task&& task) noexcept : m_handle{task.m_handle} { task.m_handle = nullptr; }
template <typename TF, typename... TAs>
Task(TF&& fn, TAs&&... args)
{ *this = create_task(std::forward<TF>(fn), std::forward<TAs>(args)...); }
Task& operator=(Task&& task) noexcept {
if(std::addressof(task) != this) {
if(m_handle) m_handle.destroy();
m_handle = task.m_handle;
task.m_handle = nullptr;
}
return *this;
}
// Only destory if a Task is finished - unfinished temporary Tasks should be alive.
// Instead manual destruction is required for unfinished Tasks using destroy().
~Task() { if(m_handle && m_handle.done()) m_handle.destroy(); m_handle = nullptr; }

auto operator co_await() { return detail::Awaiter<promise_type>{m_handle}; }
//! Synchronize this Task and get value from the promise_type.
auto get() noexcept
{ if(!m_handle) return TR{}; SyncTask::run(*this); return m_handle.promise().value(); }
//! For generators use next() instead of get() to get value.
auto next() { resume(); return m_handle.promise().value(); }
//! Manual destruction is required for unfinished Tasks like generators(for GCC not Clang).
auto destroy() { if(m_handle) m_handle.destroy(); m_handle = nullptr; }
auto done() const noexcept { return m_handle.done(); }
auto resume() { if(!m_handle.done()) m_handle.resume(); return !m_handle.done(); }
auto handle() const noexcept { return m_handle; }

auto begin() {
if(m_handle) {
//m_handle(); // for eager coroutines
m_handle.resume(); // for lazy coroutines
if(m_handle.done()) m_handle.promise().rethrow_if_exception();
}
return detail::Iterator<TR, Task::promise_type>{m_handle};
}
auto end() const noexcept { return detail::EndIterator{}; }

template <typename TF, typename... TAs>
static Task create_task(TF&& fn, TAs&&... args)
{ co_return std::invoke(std::forward<TF>(fn), std::forward<TAs>(args)...); }
//! Generate task for TimerQueue.
template <typename TF, typename... TAs>
static Task generate_task(TF&& fn, TAs&&... args) {
while(true) {
if constexpr(std::is_void_v<TR>) {
std::invoke(std::forward<TF>(fn), std::forward<TAs>(args)...);
co_await suspend_always{};
}
else { co_yield std::invoke(std::forward<TF>(fn), std::forward<TAs>(args)...); }
}
}

private:
Task(coroutine_handle<promise_type> handle) : m_handle(handle) {}

coroutine_handle<promise_type> m_handle;
friend class detail::Promise<TR, Task>;
};

Promise<> class

Promise class는 c++20 코루틴 Promise Interface 표준에 따라 기본적으로 제공해야 할 함수들을 정의해야 한다. 아래 소스에 base_promise_t<TR> class를 보이지는 않았지만 Task<TR> 객체에 결과로써 제공할 데이터 값을 Promise class 내부에 저장했다가 task.get()이나 task.next() 호출시 제공한다. 따라서 template <TR>은 모든 데이터 반환 type에 대해 동작할 수 있도록 하기 위한 것이다. 앞서 Task class에서 다른 코루틴 객체를 반복해서 co_await하기 위한 역할을 하는 놈들이 Awaiter<>와 Promise의 final_suspend()에서 사용하는 FinalAwaiter이다. 즉, Awaiter는 부모 코루틴이 co_await할 때 자식 handle을 저장하는데, 자식 handle도 Task 객체이므로 Promise를 갖고 있고 거기에 부모 handle을 저장할 수 있다. 

원래 co_await 연산자는 코루틴 중단 후 caller에게 제어 흐름을 넘기는데, co_await overloading에 의해 부모 코루틴에서 co_await <자식 코루틴>; 하면 코루틴 중단 후 Awaiter를 통해 자식 코루틴에게 실행 흐름을 넘긴다. 자식 코루틴은 promise.initial_suspend() -> 자식 코루틴 body(사용자 코드) -> promise.final_suspend()의 실행 흐름을 거치는데 final_suspend()의 FinalAwaiter를 통해 자식 handle의 Promise에 저장했던 부모 handle을 return 함으로써 부모 코루틴이 자식 코루틴을 co_await 했던 지점으로 실행 흐름이 바뀐다. 그 지점은 Awaiter.await_resume()이다. 만약 자식 코루틴이 사용자 코드 실행 과정에서 Promise에 어떤 데이터 값 결과를 저장했다면 Awaiter는 그 값을 전달하게 된다. 그리고 부모 코루틴의 나머지 부분이 계속 실행된다.

여기서 c++ 코루틴이 무한한 확장성을 갖고 있음을 알 수 있는데, 코루틴 사용자 코드 실행 전후로 promise.initial_suspend()와 promise.final_suspend()가 실행되기 때문에 여기에 Task 마다 반복되는 코드나 callback 함수를 집어 넣을 수도 있겠다. 또한, co_await를 통해 코루틴 중단/재개가 이루어지는데 코루틴 중단 전/후에도 Awaitable 들을 이용해 실행 흐름을 바꾸거나 반복 코드/callback 삽입 등이 가능하다는 것이다.

template <typename TR, typename TT>
class Promise final : public detail::base_promise_t<TR>
{
public:
auto get_return_object() noexcept
{ return TT(coroutine_handle<Promise>::from_promise(*this)); }
//auto initial_suspend() const noexcept { return suspend_never{}; } // for eager coroutines
auto initial_suspend() const noexcept { return suspend_always{}; } // for lazy coroutines
auto final_suspend() const noexcept {
struct FinalAwaiter {
auto await_ready() const noexcept { return false; }
auto await_suspend(coroutine_handle<Promise> handle) noexcept
{ return handle.promise().m_parent_handle; }
auto await_resume() const noexcept {}
};
return FinalAwaiter{};
}
auto set_parent_handle(coroutine_handle<> handle) { m_parent_handle = handle; }

private:
coroutine_handle<> m_parent_handle{noop_coroutine()};
};

template <typename TP>
struct Awaiter
{
Awaiter(coroutine_handle<TP> handle) : m_handle{handle} {}
auto await_ready() const noexcept { return !m_handle || m_handle.done(); }
auto await_suspend(coroutine_handle<> handle) noexcept {
m_handle.promise().set_parent_handle(handle);
return m_handle;
}
auto await_resume() const { return m_handle.promise().value(); }
private:
coroutine_handle<TP> m_handle;
};

ThreadPool과 SyncTask class

ThreadPool class는 코루틴 Task 들을 multi-thread pool을 이용해서 실행하기 위한 class이다. 일반 함수도 코루틴으로 쉽게 만들 수 있기 때문에 모든 c++ callables을 실행할 수 있게 된다. 코루틴은 일반 함수로 바꿀 수 없기 때문에 일반 함수 용 ThreadPool은 사용할 수 없다. 구조적으로는 일반 함수용 ThreadPool과 거의 비슷한데 내부적으로 coroutine handle만 container에 저장하면 되기 때문에 std::function<>을 사용하지 않아도 되므로 구현하기도 어렵지 않다. 그리고, 코루틴 ThreadPool은 SyncTask class를 통해 간단히 동기화를 구현할 수 있는 장점도 있다.

SyncTask class는 Task 객체가 비동기적으로 실행되기 때문에 결과(값)를 얻기 위해서 최종적으로 동기화 시키는 역할을 수행하기 위해 필요하다. 단일 thread 환경에서는 task.resume()을 사용하면 되기 때문에 반드시 필요하지는 않다. Multi-thread 환경에서는 비동기로 실행된 thread 들을 동기화해 주어야 하기 때문에 필요한 놈이다.

이 두 class는 코루틴 사용을 위해 반드시 필요한 놈들은 아니다.

코루틴 사용 예

Generator는 대표적인 코루틴이다. 앞서 얘기했듯이 clang 환경에서는 Generator = Task 이다. 아래에 4개의 Generator가 있는데 하나는 0.1초 간격으로 fibonacci 문자열의 문자 1개를 돌아가면서 무한 생산하고, 하나는 0.2초 간격으로 fibonacci 수열 숫자를 무한 생산하며, 세 번째 놈 fibonacci()에서 두 개의 부품을 조립해서 단일 thread로 최종 제품을 무한 생산한다. 네 번째 놈은 생산 시간을 절약하기 위해 ThreadPool을 이용해 최종 제품을 무한 생산한다. 제품 연속 조립 공정과 동일한 로직이다. 요즘 같은 병렬 컴퓨팅 환경에서 코루틴이 얼마나 유용하게 사용될 수 있는지 보여주는 대표적인 예이다. 코루틴이 없다면 동기화를 위해 많은 노력이 필요하다. 아래의 예에서 보듯이 코루틴과 multi-thread는 굉장히 효율적인 조합이다. 코루틴을 사용하면 lock-free 병렬 연산이 가능하기 때문이다.

// yield: "F", "i", "b", "o", "n" ......
Generator<std::string_view> fibonacci_str()
{
constexpr std::string_view str{"Fibonacci"};
size_t max_pos = str.size() - 1;
for(size_t pos = 0; true; ++pos) {
if(pos > max_pos) pos = 0;
std::this_thread::sleep_for(100ms);
co_yield str.substr(pos, 1);
}
}

// yield: 1, 1, 2, 3, 5, 8, 13 ......
Generator<int> fibonacci_num()
{
int a = 0, b = 1;
while(true) {
std::this_thread::sleep_for(200ms);
co_yield b;
auto tmp = a;
a = b;
b += tmp;
}
}

// yield: "F ~ 1", "i ~ 1", "b ~ 2", "o ~ 3", "n ~ 5" ......
Generator<std::string> fibonacci()
{
auto task1 = fibonacci_str();
auto task2 = fibonacci_num();
while(true) {
co_yield std::string(task1.next()) + " ~ " + std::to_string(task2.next());
}
}

Generator<std::string> fibonacci_pool()
{
ThreadPool pool;
auto task1 = fibonacci_str();
auto task2 = fibonacci_num();
while(true) {
pool.run(task1);
pool.run(task2);
SyncTask::sync(&pool);
co_yield std::string(task1.get()) + " ~ " + std::to_string(task2.get());
}
}

void fibonacci_test()
{
Timer now;
int count = 0;
LOG("* Fibonacci without ThreadPool " << now << "ms");
for(auto& str : fibonacci()) {
if(++count > 10) break;
LOG("String " << count << ": " << str);
}

LOG("-----------------------------------")
LOG("* Fibonacci with ThreadPool " << now << "ms");
count = 0;
for(auto& str : fibonacci_pool()) {
if(++count > 10) break;
LOG("String " << count << ": " << str);
}
}

int main()
{
try {
Timer now;
LOG("========================================= " << now << "ms");
fibonacci_test();
LOG("========================================= " << now << "ms");
}
catch(const std::exception& e) { LOG("Error: exception: " << e.what()); }
catch(...) { LOG("Error: unhandled exception."); }
std::cout << "O.K~! The End.\n\n"; // Monitor segmentation fault.
return 0;
}

아래의 예는 nested coroutine을 어떻게 사용하는지에 대한 간단한 예이다. 부모 코루틴에서 자식 코루틴을 co_await 해서 결과 값을 얻어 내는 것이다.

Task<> co_sleep(int delay)
{
std::this_thread::sleep_for(std::chrono::milliseconds(delay));
LOG("sleeping " << delay);
co_return;
}

Task<int> delay(int n)
{
LOG("before..." << n);
co_await co_sleep(n);
LOG("after..." << n);
co_return n;
}

Task<int> nested_delay()
{
LOG("pass 1");
auto d1 = delay(100);
auto d2 = delay(200);
auto r2 = co_await d2;
LOG("pass 2");
auto d3 = delay(300);
auto d23 = r2 + co_await d3;
LOG("Finally return total delayed time...");
co_return co_await d1 + d23;
}

void nested_test()
{
LOG("+++ Start");
auto task = nested_delay();
LOG("running...");
task.resume();
LOG("getting result...");
auto value = task.get();
LOG("Result: " << value);
LOG("--- End");
}

아래의 예는 Task/Promise가 std::future<>/std::promise<>와 유사하게 동작하고 있음을 보여 준다.

Task<int> task_square(int id)
{
LOG("Thread " << id << " started");
int delay = 1 + id % 3;
std::this_thread::sleep_for(std::chrono::seconds(delay));
LOG("Thread " << id << " finished after " << delay << "sec.");
co_return id * id;
}

void pool_test()
{
ThreadPool pool;
std::vector<Task<int>> tasks;
for(int i = 0; i < 5; ++i) {
tasks.push_back(task_square(i));
pool.run(tasks[i]);
}

pool.run(co_sleep(500));
pool.run(co_sleep(1000));
pool.run([]{ LOG("Async test: " << std::this_thread::get_id()); });

SyncTask::sync(&pool);
for(auto& task : tasks) { LOG("Result: " << task.get()); }
}

네트워크 chatting을 비롯한 비동기 네트워크 I/O에는 오래 전부터 Asio 라이브러리가 코루틴을 사용해 왔고 최근 버전에서는 c++20 코루틴을 적용하고 있더라.

맺음말

아무튼 코루틴은 그 자체로 재미있는 놈이다. 아직 c++20 컴파일러들도 아주 원활한 편은 아니라서 좀 아쉽긴 하지만, 이미 c++20 코루틴을 충분히 활용할 수 있는 수준이다. Python을 비롯한 다른 언어 환경에서 코루틴을 사용해 보면 c++20 코루틴을 이해하는데 큰 도움이 될 것이다. 대개 c++에서 원리를 이해하면 다른 언어에 구현된 것들을 이해하는 것이 식은 죽 먹기였는데 요즘은 다른 언어들이 한 발 앞서 나가더라.


댓글 없음:

댓글 쓰기