2023/02/12

Signed Distance Field를 이용한 폰트 렌더링

 

Signed Distance Field(SDF; 부호 거리 장)는 임의의 위치에서 어떤 경계까지의 최단 거리들을, 경계 안은 양수로 경계 밖은 음수로 표현한 것이다(부호를 반대로 표현하기도 한다). 가령, 타원이 경계선이라면 타원 접선 들과의 직교(gradient 방향) 선들의 길이를 SDF로 나타낼 수 있다. 등고선 지도(contour map)가 대표적인 SDF이다. SDF 활용 분야는 많지만 여기서는 font rendering 기법에 대한 것이다. 실제 활용시에는 Signed Distance Function (SDF; 부호 거리 함수)이라는 개념이 유용해 보인다. Signed Distance를 계산하는 함수이다. 함수든 장이든 SDF를 혼용해도 무리가 없다.

OpenGL에서 truetype font(vector font)를 FreeType 라이브러리로 rendering 한 후, 이미지 파일로 font atlas(지도)를 저장했다가 text rendering 시에 사용했던 적이 있다. 그 때 SDF atlas를 쓰면 bitmap atlas와 달리 글자 크기에 상관없이 사용할 수 있다는 글을 보았는데, SDF에 대해 좀 찾아 보다가 복잡해 보여서 그러려니 했었다. 그런데, 아이디어는 어느날 갑자기 문득 떠오른다. 그리고 아이디어를 기록하지 않으면 영원히 잊혀진다.

SDF를 이용한 폰트 렌더링

원래 SDF를 제대로 계산하려면 경계선인 직선이나 곡선들을 Bezier 함수 등으로 근사하여 contour 경계들을 만들어서 각 지점들과의 최단 직교 거리를 계산해야 한다. 폰트 렌더링 시에 흔히 쓰이는 방식은, vector font를 확대한 bitmap으로 렌더링해서 pixel 경계들과의 Euclidean distance를 구한 후 downsampling 해서 SDF 폰트 atlas를 만드는 것이다. 아무튼, 엄밀하게 하자면 SDF 계산 알고리즘 자체도 복잡하고 한글이나 한자는 글자 수가 많아 계산량도 만만치 않다.
내게 떠오른 아이디어는 정수(integer) SDF를 이용해서 폰트를 렌더링하자는 것이다. 간단하지만 특허도 받을 수 있는 아이디어이다. 하지만, 아이디어가 활용되는 것이 더 낫다고 본다. 폰트 렌더링은 물체 모델링과 유사한데, 충돌 감지 같은 다른 분야에서도 SDF 활용시, 상대적인 값이 의미 있고 굳이 정확한 값이 필요하지 않다면, 정수 SDF를 충분히 활용해 볼 수 있을 것이다. 계산 성능을 높이면서도 효과적인 방법이기 때문이다.

폰트 렌더링 시에 pixel 들간의 거리는 정수로 표현될 수 밖에 없으니 단순무식(?) 더하기/빼기(= Manhattan distance)로 정수 값의 SDF를 구해서 사용해도 충분하리라는 것이 기본적인 생각이었다. 아래 그림은 FreeType으로 생성한 32- pixel 'g' 문자(glyph)를 정수 SDF로 표현한 것이다. Vector font이기 때문에 32 x 32 pixel이 아니고 문자 마다 matrix 크기는 달라진다. 어렴풋이 '0'을 경계로 'g'가 보일 것이다. pixel 경계는 SDF 값이 0이고 경계 안은 양수, 밖은 음수로 나타낸 경계까지의 거리이다. 다르게 보면, SDF가 등고선 들이므로, 각각의 지점에서 0은 해수면이 되고 음수는 해수면 아래 높이, 양수는 해수면 위의 높이가 된다. 입체 글자 모형이 물위에 떠 있다고 생각할 수 있다.

 -5 -4 -2 -1 -1  0  0  0  0  0 -1 -1 -1 -1  0  0 -1
-3 -2 -1 0 0 0 -1 -1 -1 0 0 0 0 0 0 0 -1
-2 -1 0 0 -1 -1 -2 -2 -2 -1 0 1 0 -1 -1 -1 -2
-2 -1 0 0 -1 -2 -3 -3 -3 -2 -1 0 0 -1 -2 -2 -3
-1 0 1 0 -1 -2 -3 -4 -3 -2 -1 0 0 -1 -2 -3 -4
-1 0 1 0 -1 -2 -3 -4 -3 -2 -1 0 1 0 -1 -2 -3
-1 0 1 0 -1 -2 -3 -4 -3 -2 -1 0 0 -1 -2 -3 -4
-1 0 1 0 -1 -2 -3 -3 -3 -2 -1 0 0 -1 -2 -3 -4
-2 -1 0 0 -1 -1 -2 -2 -2 -1 0 1 0 -1 -2 -3 -4
-3 -2 -1 0 0 0 -1 -1 -1 0 0 0 -1 -2 -3 -4 -5
-3 -2 -1 0 0 0 0 0 0 0 -1 -1 -2 -3 -4 -5 -6
-2 -1 0 0 -1 -1 -1 -1 -1 -1 -2 -2 -3 -4 -5 -6 -7
-2 -1 0 -1 -2 -2 -2 -2 -2 -2 -3 -3 -3 -3 -4 -5 -6
-1 0 0 -1 -2 -2 -2 -2 -2 -2 -2 -2 -2 -2 -3 -4 -5
-1 0 1 0 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -2 -3 -4
-2 -1 0 1 0 0 0 0 0 0 0 0 0 0 -1 -2 -3
-3 -2 -1 0 0 0 0 0 0 0 0 0 1 1 0 -1 -2
-2 -1 0 -1 -1 -1 -1 -1 -1 -1 -1 -1 0 1 1 0 -1
-1 0 0 -1 -2 -2 -2 -2 -2 -2 -2 -2 -1 0 1 0 -1
-1 0 0 -1 -2 -3 -3 -3 -3 -3 -3 -3 -2 -1 0 0 -1
0 0 -1 -2 -3 -4 -4 -4 -4 -4 -3 -3 -2 -1 0 0 -1
0 1 0 -1 -2 -3 -3 -3 -3 -3 -2 -2 -1 0 0 -1 -2
-1 0 0 -1 -1 -2 -2 -2 -2 -2 -1 -1 0 0 -1 -2 -3
-2 -1 0 0 0 -1 -1 -1 -1 -1 0 0 0 -1 -2 -3 -4
-3 -2 -1 0 0 0 0 0 0 0 0 -1 -1 -2 -3 -4 -5

그런데, 경계 지점이 곡선 중심이라면 직교 거리는 대각선 방향이고 실제 거리는 2의 제곱근 = (1+1)^0.5 = 1.414씩 증가한다. 즉, 정수로 SDF를 표현하면 분명히 대각선 근처 SDF 값들은 오차가 커진다. 하지만 어차피 Euclidean distance를 사용하더라도 rounding 오차가 발생할 수 밖에 없으므로 내 아이디어를 검증해 볼 만한다. SDF 값이 같은 놈들끼리 contour를 형성하는데, 정수 오차가 발생하더라도 contour 간 거리 비율은 일정하기 때문에 오차가 증폭되지 않을 뿐 아니라, 폰트 렌더링시에도 SDF 값들의 비율에 따라 pixel 강도가 변하기 때문에 SDF 사용 취지에도 잘 부합하게 된다. Euclidean distance를 사용하는 것에 비해 성능을 높일 수 있고 atlas 용량도 줄일 수 있다. 아이디어를 검증하기 위해 Manhattan distance를 정확히 계산하지도 않았는데, 알고리즘의 정확성에 비해 효용성이 별로 없으리라 생각했기 때문이다. 이것이 단순무식 더하기/빼기라고 했던 이유이다. 사실, 정수 오차값들은 일종의 filter 역할을 하는데 대각선 근처의 pixel들 중 먼 거리에 있는 놈들은 걸러내도 폰트 품질에 큰 영향을 주지는 않는다. 실제로 Euclidean distance를 사용한 SDF 값들을 가지고 text 렌더링 했을 때와 비교해도 육안으로는 거의 차이를 느끼기 어려웠다.
아무튼, 위의 SDF 값을 가지고 ASCII 문자로 SDF 값에 따라 pixel 강도를 달리 표현해 렌더링한 결과가 아래 그림이다. OpenGL을 사용하지 않아도 어떻게 렌더링 될지 대략 테스트 해 볼 수 있다.

    ..:iiiiiiiiiiiiii
    ::iiiiiiiiiiiiiii
    :iiiii:::iioiiii:
    :iiii:::::iiii:::
    iioii::.::iiii::.
    iioii::.::iioii::
    iioii::.::iiii::.
    iioii:::::iiii::.
    :iiiii:::iioii::.
    ::iiiiiiiiiii::..
    ::iiiiiiiiii::.. 
    :iiiiiiiii:::..  
    :iii::::::::::.. 
    iiii:::::::::::..
    iioiiiiiiiiiii::.
    :iioiiiiiiiiiii::
    ::iiiiiiiiiiooii:
    :iiiiiiiiiiiiooii
    iiii::::::::iioii
    iiii:::::::::iiii
    iii::.....:::iiii
    ioii::::::::iiii:
    iiiii:::::iiiii::
    :iiiiiiiiiiiii::.
    ::iiiiiiiiiii::..

SDF 렌더링 방식에서는 pixel 강도 값을 spread 값에 따라 조정할 수 있는데 spread는 SDF 상의 최대 거리이다. 가령, 'g'의 SDF matrix에서 spread를 2로 한다는 의미는 모든 SDF 값들을 [-2, 2] 범위로 한정한다는 의미이다. spread 값이 커지면 글자가 굵고 흐릿해지며, 작아지면 가늘고 또렷해 진다. spread가 일종의 anti-aliasing 기능을 수행해 준다. Shader를 사용하면 이런 특성을 이용해서 글자에 외곽선, 음영, 반짝임, 입체효과 등등 여러가지 효과를 줄 수 있다. 아래 그림은 32-pixel SDF 폰트 atlas를 만들어 본 것이다. spread가 커질수록 SDF 계산 시간과 atlas 용량이 늘어난다.

그런데, 전에 본 글에서 글자 확대시 bitmap 폰트 atlas로 렌더링한 text의 품질이 SDF 폰트 atlas로 렌더링한 text 보다 못하다는 말은 사실이 아닌 것 같다. 같은 폰트 size의 atlas로 둘을 비교해 보니 확대시 오히려 bitmap 폰트 atlas를 이용할 때가 품질이 더 좋았다. 물론 같은 조건으로 Shader를 사용했을 때 얘기다. 다만, SDF의 장점은 1개의 폰트 atlas를 가지고도 Shader의 설정을 바꿔 주기만 하면 다양한 글자 효과를 줄 수 있다는 점이다. SDF로 렌더링한 폰트의 품질을 높이려면  고해상도의 truetype 폰트를 이용해서 SDF를 만들고 SDF 폰트의 pixel size도 64이상이면 좋다. 품질을 높이려면 계산 시간과 atlas 용량 증가를 감수해야 한다.
아래 그림은 256-pixel truetype 폰트로 64-pixel SDF 폰트 atlas를 만들어 text를 렌더링해 본 것이다.


아래 그림은 동일한 atlas를 가지고 외곽선 효과를 준 것이다. bitmap 폰트 atlas는 spread가 거의 없기 때문에 글자에 효과를 주는데는 한계가 있다. 일부러 명조체(Serif) 폰트를 사용했는데 고딕체(Sans Serif) 폰트는 상대적으로 글자 효과를 주기 쉽기 때문이다.

글자에 입체 효과 주기

SDF 폰트 atlas의 SDF 정보와 Phong Lighting 기법을 이용하면 Shader를 이용해 글자에 다양한 입체 효과를 줄 수 있다. 3D 모델링에서 SDF를 이용한 rendering 방식은 기존의 triangle mesh를 이용한 rendering 방식과는 차이가 있는데, 객체 모델을 vertex로 표현하는 것이 아니고 SDF로 표현함으로써 Shader에서 SDF surface의 normal vector를 수학적으로 계산하여 lighting 기법을 바로 적용할 수 있는 장점이 있다. Normal vector는 SDF surface의 gradient vector를 normalize 하면 얻어지기 때문에 정확하게 계산할 수 있다. 3D 객체 들은 SDF 함수들의 조합으로 모델링(Constructive Solid Geometry)할 수 있는데  3D 객체 들을 모두 수학 함수로 표현할 수 있다면 실감나는 3D 장면을 연출 할 수 있다. Ray casting 또는 ray marching 기법을 사용해 SDF 값을 얻어낸 후 rendering에 사용하는 것이다.

폰트 렌더링시에는 SDF가 수학 함수가 아니고 이미 계산된 정보이기 때문에 ray marching 기법을 사용할 필요는 없다. 물론, 3D 장면 속의 글자들이 다른 객체들과 어우러져 lighting 효과를 주어야 한다면 ray marching 기법을 사용해야 하겠지만 너무 복잡하기 때문에 여기서는 개별 글자에 입체 효과를 주는 것에 만족한다. 수학 함수로 글자를 표현한 것이 아니기 때문에 normal vector는 SDF surface의 gradient vector를 근사하여 구한다. SDF 값에 다양한 math를 가함으로써 아래의 예와 같은 다양한 효과를 얻을 수 있다.

 

맺음말

정수 SDF를 폰트 렌더링에 사용함으로써 SDF 계산 성능을 높이고 atlas 용량을 줄이지만 렌더링된 text의 품질 저하를 유발하지는 않는다. SDF 폰트 atlas가 좋은 점은, Shader를 사용함으로써 1개의 atlas만으로도 다양한 글자 크기를 표현 할 수 있고, 글자에 다양한 효과를 줄 수 있다는 것이다. 단, 글자가 일정 크기 이상이 되어야만 이런 효과를 맛볼 수 있다.

2021/12/18

c++20 Coroutine 활용

  

코루틴 배운지 1년이 넘었는데 심심삼아 이 놈을 어떻게 써 먹을까 여러가지 시험해 보기로 했다. cppcorounifex, concurrencpp 같은 코루틴 라이브러리 소스를 보면 압도 당하게 되고 걍 갖다 쓰자거나 아니면 라이브러리 사용법을 배우기도 참 힘들구나 싶다. 특히나 unifex는 아예 새로운 언어를 만든 느낌이다. 논란의 여지가 있지만 c++의 기본 철학이 쓰면 득(pay for what you use)이라지만 c++20부터는 정말 새로운 언어가 돼 버렸다. 그런데, 코루틴에 관한 한 간단히 몇 가지 기본 class만 만들어도 생각보다 코루틴을 쉽게 사용할 수 있더라.

구체적으로는 코루틴 객제 class인  Task와 Task가 모든 data type에 동작하도록 하기 위한 일반화된 Promise의 두 가지 class만 있어도 코루틴 라이브러리 없이 코루틴을 사용하는데 충분하다는 것이다.  코루틴을 지원하는 ThreadPool과 이를 동기화하기 위한 SyncTask class도 간단히 만들어 볼 수 있었다. 더구나 일반 함수도 co_return만 붙이면 코루틴이 되기 때문에 성능은 좀 떨어지겠지만 모든 함수에 코루틴 ThreadPool을 사용할 수 있다. 또, Generator의 경우에는 일반적인 Task class를 사용해도 되지만 무한 동작하는 Task이므로 memory leak이 생기지 않게 coroutine handle(이하 handle)을 최종적으로 소멸시켜야 하므로 별개의 class로 만드는게 낫긴 하다. 참고로, clang은 알아서 코루틴 종료시 handle을 잘 제거하기 때문에 Generator class를 분리할 필요도 없다. 우분투 21.10의 gcc 11.2에서는 handle을 적절히 제거해 주어야만 하더라. 두 컴파일러는 버전이 올라갈 수록 뭔가 처리 방식이 달라서 어느 하나에선 잘 돌아 가는데 다른 놈에선 안돌아가는 문제가 자주 생긴다. 최근에 Qt creator가 cmake 프로젝트를 잘 지원하기 때문에 Qt 라이브러리를 안 쓰더라도 gcc와 clang 컴파일러 환경을 한 번 맞춰주면 번갈아 가며 시험해 볼 수 있어 좋다.

이 글에서는 c++20 코루틴을 갖고 놀아본 것을 정리한다. c++23에 코루틴 표준 라이브러리가 제대로 포함되겠지만 그 전에 코루틴의 활용처가 꽤 늘어날 듯한 생각이 들어서 새로운 아이디어가 계속 생겨나길 바라기 때문이다. 코루틴이 비동기 실행 흐름을 논리적으로 구조화하는 것이 기본 목적이지만, 역사적으로 새로운 기술이나 발명품이 원래 의도와는 다른 방향으로 사용되는 경우도 많았다.

코루틴 객체 Task<> class

코루틴 반환 객체 class인 Task<>는 Promise<>와 함께 std::future<> / std::promise<>의 역할을 수행한다. Task는 copy할 수 없고 move만 가능하다. 소멸자인 ~Task()에서 완료된 handle만 소멸시킨다. 이 점이 특이 사항이다. Generator는 완료할 수 없는 무한 Task이므로 ~Generator() 소멸자에서는 handle이 nullptr가 아니면 무조건 제거해 주면 된다. 또한, Task 내부에 co_await 연산자를 overloading하고 있는데 다른 코루틴 Task 객체를 co_await하기 위한 것이다. 이로써 Task 자체도 Awaitable 객체가 된다. 즉, 부모 코루틴이 자식 코루틴을, 자식 코루틴이 손자 코루틴을, ..... 반복적으로 co_await 할 수 있게 된다. 사실 Python도 3.5 이후 코루틴을 제대로 지원하는 듯 한데 기본으로 코루틴 내부에서 다른 코루틴을 co_await 할 수 있더라. 그니까 다른 언어에서는 기본으로 사용하는 기능인데 c++에서는 직접 만들든 라이브러리를 쓰든 해야 한다는 야그다. 아무튼 Promise<>와 Awaiter<> class를 통해 이를 구현할 수 있다.

//! General coroutine tasks including generators.
//! Task object can be used as a 'future' with promise_type.
template <typename TR = void>
class Task
{
public:
using promise_type = detail::Promise<TR, Task>;

Task(Task&& task) noexcept : m_handle{task.m_handle} { task.m_handle = nullptr; }
template <typename TF, typename... TAs>
Task(TF&& fn, TAs&&... args)
{ *this = create_task(std::forward<TF>(fn), std::forward<TAs>(args)...); }
Task& operator=(Task&& task) noexcept {
if(std::addressof(task) != this) {
if(m_handle) m_handle.destroy();
m_handle = task.m_handle;
task.m_handle = nullptr;
}
return *this;
}
// Only destory if a Task is finished - unfinished temporary Tasks should be alive.
// Instead manual destruction is required for unfinished Tasks using destroy().
~Task() { if(m_handle && m_handle.done()) m_handle.destroy(); m_handle = nullptr; }

auto operator co_await() { return detail::Awaiter<promise_type>{m_handle}; }
//! Synchronize this Task and get value from the promise_type.
auto get() noexcept
{ if(!m_handle) return TR{}; SyncTask::run(*this); return m_handle.promise().value(); }
//! For generators use next() instead of get() to get value.
auto next() { resume(); return m_handle.promise().value(); }
//! Manual destruction is required for unfinished Tasks like generators(for GCC not Clang).
auto destroy() { if(m_handle) m_handle.destroy(); m_handle = nullptr; }
auto done() const noexcept { return m_handle.done(); }
auto resume() { if(!m_handle.done()) m_handle.resume(); return !m_handle.done(); }
auto handle() const noexcept { return m_handle; }

auto begin() {
if(m_handle) {
//m_handle(); // for eager coroutines
m_handle.resume(); // for lazy coroutines
if(m_handle.done()) m_handle.promise().rethrow_if_exception();
}
return detail::Iterator<TR, Task::promise_type>{m_handle};
}
auto end() const noexcept { return detail::EndIterator{}; }

template <typename TF, typename... TAs>
static Task create_task(TF&& fn, TAs&&... args)
{ co_return std::invoke(std::forward<TF>(fn), std::forward<TAs>(args)...); }
//! Generate task for TimerQueue.
template <typename TF, typename... TAs>
static Task generate_task(TF&& fn, TAs&&... args) {
while(true) {
if constexpr(std::is_void_v<TR>) {
std::invoke(std::forward<TF>(fn), std::forward<TAs>(args)...);
co_await suspend_always{};
}
else { co_yield std::invoke(std::forward<TF>(fn), std::forward<TAs>(args)...); }
}
}

private:
Task(coroutine_handle<promise_type> handle) : m_handle(handle) {}

coroutine_handle<promise_type> m_handle;
friend class detail::Promise<TR, Task>;
};

Promise<> class

Promise class는 c++20 코루틴 Promise Interface 표준에 따라 기본적으로 제공해야 할 함수들을 정의해야 한다. 아래 소스에 base_promise_t<TR> class를 보이지는 않았지만 Task<TR> 객체에 결과로써 제공할 데이터 값을 Promise class 내부에 저장했다가 task.get()이나 task.next() 호출시 제공한다. 따라서 template <TR>은 모든 데이터 반환 type에 대해 동작할 수 있도록 하기 위한 것이다. 앞서 Task class에서 다른 코루틴 객체를 반복해서 co_await하기 위한 역할을 하는 놈들이 Awaiter<>와 Promise의 final_suspend()에서 사용하는 FinalAwaiter이다. 즉, Awaiter는 부모 코루틴이 co_await할 때 자식 handle을 저장하는데, 자식 handle도 Task 객체이므로 Promise를 갖고 있고 거기에 부모 handle을 저장할 수 있다. 

원래 co_await 연산자는 코루틴 중단 후 caller에게 제어 흐름을 넘기는데, co_await overloading에 의해 부모 코루틴에서 co_await <자식 코루틴>; 하면 코루틴 중단 후 Awaiter를 통해 자식 코루틴에게 실행 흐름을 넘긴다. 자식 코루틴은 promise.initial_suspend() -> 자식 코루틴 body(사용자 코드) -> promise.final_suspend()의 실행 흐름을 거치는데 final_suspend()의 FinalAwaiter를 통해 자식 handle의 Promise에 저장했던 부모 handle을 return 함으로써 부모 코루틴이 자식 코루틴을 co_await 했던 지점으로 실행 흐름이 바뀐다. 그 지점은 Awaiter.await_resume()이다. 만약 자식 코루틴이 사용자 코드 실행 과정에서 Promise에 어떤 데이터 값 결과를 저장했다면 Awaiter는 그 값을 전달하게 된다. 그리고 부모 코루틴의 나머지 부분이 계속 실행된다.

여기서 c++ 코루틴이 무한한 확장성을 갖고 있음을 알 수 있는데, 코루틴 사용자 코드 실행 전후로 promise.initial_suspend()와 promise.final_suspend()가 실행되기 때문에 여기에 Task 마다 반복되는 코드나 callback 함수를 집어 넣을 수도 있겠다. 또한, co_await를 통해 코루틴 중단/재개가 이루어지는데 코루틴 중단 전/후에도 Awaitable 들을 이용해 실행 흐름을 바꾸거나 반복 코드/callback 삽입 등이 가능하다는 것이다.

template <typename TR, typename TT>
class Promise final : public detail::base_promise_t<TR>
{
public:
auto get_return_object() noexcept
{ return TT(coroutine_handle<Promise>::from_promise(*this)); }
//auto initial_suspend() const noexcept { return suspend_never{}; } // for eager coroutines
auto initial_suspend() const noexcept { return suspend_always{}; } // for lazy coroutines
auto final_suspend() const noexcept {
struct FinalAwaiter {
auto await_ready() const noexcept { return false; }
auto await_suspend(coroutine_handle<Promise> handle) noexcept
{ return handle.promise().m_parent_handle; }
auto await_resume() const noexcept {}
};
return FinalAwaiter{};
}
auto set_parent_handle(coroutine_handle<> handle) { m_parent_handle = handle; }

private:
coroutine_handle<> m_parent_handle{noop_coroutine()};
};

template <typename TP>
struct Awaiter
{
Awaiter(coroutine_handle<TP> handle) : m_handle{handle} {}
auto await_ready() const noexcept { return !m_handle || m_handle.done(); }
auto await_suspend(coroutine_handle<> handle) noexcept {
m_handle.promise().set_parent_handle(handle);
return m_handle;
}
auto await_resume() const { return m_handle.promise().value(); }
private:
coroutine_handle<TP> m_handle;
};

ThreadPool과 SyncTask class

ThreadPool class는 코루틴 Task 들을 multi-thread pool을 이용해서 실행하기 위한 class이다. 일반 함수도 코루틴으로 쉽게 만들 수 있기 때문에 모든 c++ callables을 실행할 수 있게 된다. 코루틴은 일반 함수로 바꿀 수 없기 때문에 일반 함수 용 ThreadPool은 사용할 수 없다. 구조적으로는 일반 함수용 ThreadPool과 거의 비슷한데 내부적으로 coroutine handle만 container에 저장하면 되기 때문에 std::function<>을 사용하지 않아도 되므로 구현하기도 어렵지 않다. 그리고, 코루틴 ThreadPool은 SyncTask class를 통해 간단히 동기화를 구현할 수 있는 장점도 있다.

SyncTask class는 Task 객체가 비동기적으로 실행되기 때문에 결과(값)를 얻기 위해서 최종적으로 동기화 시키는 역할을 수행하기 위해 필요하다. 단일 thread 환경에서는 task.resume()을 사용하면 되기 때문에 반드시 필요하지는 않다. Multi-thread 환경에서는 비동기로 실행된 thread 들을 동기화해 주어야 하기 때문에 필요한 놈이다.

이 두 class는 코루틴 사용을 위해 반드시 필요한 놈들은 아니다.

코루틴 사용 예

Generator는 대표적인 코루틴이다. 앞서 얘기했듯이 clang 환경에서는 Generator = Task 이다. 아래에 4개의 Generator가 있는데 하나는 0.1초 간격으로 fibonacci 문자열의 문자 1개를 돌아가면서 무한 생산하고, 하나는 0.2초 간격으로 fibonacci 수열 숫자를 무한 생산하며, 세 번째 놈 fibonacci()에서 두 개의 부품을 조립해서 단일 thread로 최종 제품을 무한 생산한다. 네 번째 놈은 생산 시간을 절약하기 위해 ThreadPool을 이용해 최종 제품을 무한 생산한다. 제품 연속 조립 공정과 동일한 로직이다. 요즘 같은 병렬 컴퓨팅 환경에서 코루틴이 얼마나 유용하게 사용될 수 있는지 보여주는 대표적인 예이다. 코루틴이 없다면 동기화를 위해 많은 노력이 필요하다. 아래의 예에서 보듯이 코루틴과 multi-thread는 굉장히 효율적인 조합이다. 코루틴을 사용하면 lock-free 병렬 연산이 가능하기 때문이다.

// yield: "F", "i", "b", "o", "n" ......
Generator<std::string_view> fibonacci_str()
{
constexpr std::string_view str{"Fibonacci"};
size_t max_pos = str.size() - 1;
for(size_t pos = 0; true; ++pos) {
if(pos > max_pos) pos = 0;
std::this_thread::sleep_for(100ms);
co_yield str.substr(pos, 1);
}
}

// yield: 1, 1, 2, 3, 5, 8, 13 ......
Generator<int> fibonacci_num()
{
int a = 0, b = 1;
while(true) {
std::this_thread::sleep_for(200ms);
co_yield b;
auto tmp = a;
a = b;
b += tmp;
}
}

// yield: "F ~ 1", "i ~ 1", "b ~ 2", "o ~ 3", "n ~ 5" ......
Generator<std::string> fibonacci()
{
auto task1 = fibonacci_str();
auto task2 = fibonacci_num();
while(true) {
co_yield std::string(task1.next()) + " ~ " + std::to_string(task2.next());
}
}

Generator<std::string> fibonacci_pool()
{
ThreadPool pool;
auto task1 = fibonacci_str();
auto task2 = fibonacci_num();
while(true) {
pool.run(task1);
pool.run(task2);
SyncTask::sync(&pool);
co_yield std::string(task1.get()) + " ~ " + std::to_string(task2.get());
}
}

void fibonacci_test()
{
Timer now;
int count = 0;
LOG("* Fibonacci without ThreadPool " << now << "ms");
for(auto& str : fibonacci()) {
if(++count > 10) break;
LOG("String " << count << ": " << str);
}

LOG("-----------------------------------")
LOG("* Fibonacci with ThreadPool " << now << "ms");
count = 0;
for(auto& str : fibonacci_pool()) {
if(++count > 10) break;
LOG("String " << count << ": " << str);
}
}

int main()
{
try {
Timer now;
LOG("========================================= " << now << "ms");
fibonacci_test();
LOG("========================================= " << now << "ms");
}
catch(const std::exception& e) { LOG("Error: exception: " << e.what()); }
catch(...) { LOG("Error: unhandled exception."); }
std::cout << "O.K~! The End.\n\n"; // Monitor segmentation fault.
return 0;
}

아래의 예는 nested coroutine을 어떻게 사용하는지에 대한 간단한 예이다. 부모 코루틴에서 자식 코루틴을 co_await 해서 결과 값을 얻어 내는 것이다.

Task<> co_sleep(int delay)
{
std::this_thread::sleep_for(std::chrono::milliseconds(delay));
LOG("sleeping " << delay);
co_return;
}

Task<int> delay(int n)
{
LOG("before..." << n);
co_await co_sleep(n);
LOG("after..." << n);
co_return n;
}

Task<int> nested_delay()
{
LOG("pass 1");
auto d1 = delay(100);
auto d2 = delay(200);
auto r2 = co_await d2;
LOG("pass 2");
auto d3 = delay(300);
auto d23 = r2 + co_await d3;
LOG("Finally return total delayed time...");
co_return co_await d1 + d23;
}

void nested_test()
{
LOG("+++ Start");
auto task = nested_delay();
LOG("running...");
task.resume();
LOG("getting result...");
auto value = task.get();
LOG("Result: " << value);
LOG("--- End");
}

아래의 예는 Task/Promise가 std::future<>/std::promise<>와 유사하게 동작하고 있음을 보여 준다.

Task<int> task_square(int id)
{
LOG("Thread " << id << " started");
int delay = 1 + id % 3;
std::this_thread::sleep_for(std::chrono::seconds(delay));
LOG("Thread " << id << " finished after " << delay << "sec.");
co_return id * id;
}

void pool_test()
{
ThreadPool pool;
std::vector<Task<int>> tasks;
for(int i = 0; i < 5; ++i) {
tasks.push_back(task_square(i));
pool.run(tasks[i]);
}

pool.run(co_sleep(500));
pool.run(co_sleep(1000));
pool.run([]{ LOG("Async test: " << std::this_thread::get_id()); });

SyncTask::sync(&pool);
for(auto& task : tasks) { LOG("Result: " << task.get()); }
}

네트워크 chatting을 비롯한 비동기 네트워크 I/O에는 오래 전부터 Asio 라이브러리가 코루틴을 사용해 왔고 최근 버전에서는 c++20 코루틴을 적용하고 있더라.

맺음말

아무튼 코루틴은 그 자체로 재미있는 놈이다. 아직 c++20 컴파일러들도 아주 원활한 편은 아니라서 좀 아쉽긴 하지만, 이미 c++20 코루틴을 충분히 활용할 수 있는 수준이다. Python을 비롯한 다른 언어 환경에서 코루틴을 사용해 보면 c++20 코루틴을 이해하는데 큰 도움이 될 것이다. 대개 c++에서 원리를 이해하면 다른 언어에 구현된 것들을 이해하는 것이 식은 죽 먹기였는데 요즘은 다른 언어들이 한 발 앞서 나가더라.


2021/03/24

LIPO Algorithm

 

LIPO(Malherbe and Vayatis, 2017)는 우연히 발견한 전역 최적화(global optimization) 알고리즘인데 새로운 아이디어들이 쏟아져 나오는 세상이라 무감각해지긴 했지만 짚어볼 가치가 충분한 놈이다. Lipshitz 연속성 개념을 이용한 알고리즘인데 그 개념에 대해서는 Wikipedia에 잘 설명해 놓았더라. 이 놈도 Evolutionary Algorithm이나 Monte Carlo Search와 같이 추계적 탐색(stochastic search)을 사용한다. 즉, 미분을 사용하지 않는다. LIPO의 새로운 발상은 random하게 최적해를 탐색하되 좋은 놈만 골라서 sampling하다 보면 최적해를 빨리 찾을 수 있다는 것이다. 좋은 놈을 골라 내려면 기준이 필요한데 좋은 놈인지 알기는 어렵지만, 나쁜 놈은 쉽게 알 수 있다는 것이고 나쁜 놈들을 버리다 보면 좋은 놈만 남게 된다는 원리다. 그 기준이 되는 것이 Lipshitz upper bound(UB; 상한)이다. 이 기준을 적용하면 좋은 놈들의 목적함수 값만 알면 되기 때문에 목적함수 계산을 최소화할 수 있다는 것이다. 인공신경망과 같이 parameter 수가 많은 목적함수들은 목적함수 계산 비용도 어마무시하게 발생하기 때문에 이 놈이 그만한 가치가 있는 것이다.

Lipshitz Upper Bound

어떤 목적함수 f(x)가 있을 때 Lipshitz UB(L(x))는 모든 x에 대해서 f(x) 보다 크거나 같은 함수인데 아래의 조건을 만족하는 함수이다.
                                   |f(x1) - f(x2)| <= k|x1 - x2|     (단,  k >= 0)
즉, L(x)는 아래와 같이 정의할 수 있다.
                                   L(x) = f(x_i) + k|x - x_i|         (단, i = 1, ..., t)
밑에 그림과 같이 L(x)는 k값에 따라 무수히 많은 놈이 있을 수 있지만 min L(x)인 k값이 존재한다는 것이고 min k 값을 특별히 Lipshitz 상수라 하더라. 참고로 Lipshitz UB를 만족하는 함수 f(x)를 Lipshitz 연속 함수라 하고 이런 함수들은 연속적이고 미분 가능하다. 또한 직관적으로 Lipshitz 상수는 f(x)의 최대 기울기(= 최대 미분값)와 일치함을 알수 있다. 역으로 연속 함수이고 미분 가능하다고 해서 Lipshitz UB가 항상 존재하는 것은 아니다. f(x) = x^2과 같이 이차 포물선 함수들이 대표적인 예이다. 하지만, 특정 구간에서는 Lipshitz UB가 존재할 수 있기 때문에 Lipshitz UB 개념은 불연속이고 미분 가능하지 않더라도 확장해서 사용할 수 있다. 그리고, 수치 미분을 사용할 경우에는 Lipshitz UB를 적용함으로써 함수의 연속성과 미분 가능성을 보장해 줄 수 있다.

LIPO Algorithm

알고리즘은 아래 그림으로 한 눈에 어떤 원리인지 알 수 있기 때문에 자세한 설명은 논문을 보거나 구글링해 보면 된다. 아래 그림은 전역 최대 값을 탐색할 때 random sampling한 x_i가 좋은 놈인지 Lipshitz UB로 판단할 수 있음을 보여준다. 즉, random하게 매번 sampling시 마다 Lipshitz UB(L(x))를 구할 수 있는데 max f(x) 값보다 min L(x) 값이 항상 크거나 같을 때만 좋은 놈이라고 보고 sampling을 하는 것이다. 그림과 같이 4번 sampling한 상태에서 다음 sampling한 놈이 좋은 놈이 되려면 현재 까지의 f(x) 최대값 보다 큰 구간에서만 x값을 sampling하면 되는 것이다. 빨간 선 아래에 있는 놈들은 모두 나쁜 놈이 되는 것이다. 즉, 좋은 놈 골라내기를 반복하다 보면 최소한의 목적함수 값만 계산하더라도 최대값을 찾아낼 수 있다는 것이 LIPO 알고리즘의 핵심 원리이다.

문제는 k값을 미리 알기 어렵다는 것인데 이 놈이 LIPO 알고리즘의 유일한 매개 변수(hyper parameter)가 된다. 그림에서는 k=2가 주어진 목적함수의 최대 기울기이자 L(x)가 최소가 되도록하는, 즉 Lipshitz 상수이다. k=10일 때와 비교해 보면 다음 번에 sampling할 구간이 굉장히 작다는 것을 알 수 있다. 또한, k값이 Lipshitz 상수보다 너무 작다면 아예 sampling을 하지 않게 되는 문제가 발생할 수도 있음을 알 수 있다. 즉, k값도 최적값이 존재하는데 L(x)가 최소가 되는 k값을 구하는 최적화 문제로 귀결된다. 원저자들의 논문에서는 adaptive LIPO(AdaLIPO) 알고리즘까지 제시하고 있는데 이미 sampling한 놈들의 최대 기울기를 k값으로 사용하면 된다는 것이 요지이다. 독립변수 x가 여러개 일때에도 각 변수 마다의 k값을 각각의 기울기를 이용해서 쉽게 구할 수 있다.

LIPO 테스트

간단한 단변수 multi-modal 함수(f(x) = -(8/x)*sin(pi*x), 단, x = 1e-10 if |x| < 1e-10)의 최소값 찾기 테스트 결과를 아래 그림에 보였다. 참고로, AdaLIPO 보다는 단순한 adaptive LIPO 로직을 적용했다. 목적함수가 x=0 근처에서 기울기가 무한대가 되는 불연속 함수인데 0으로 나누기 오류가 발생할 수 있어서 1e-10값을 할당했다. k값은 목적함수의 기울기를 따라가기 때문에 무한대가 될 수 있는데 k값이 너무 커지면 Lipshitz UB를 사용하는 의미가 없어진다. 대략 1000을 넘지 않도록 해주면 된다. k값 자체를 엄밀하게 Quadratic Programming 등으로 최적화해서 사용할 수도 있는데 크게 의미 있어 보이진 않는다. random sampling을 사용하기 때문에 실제 목적 함수 값 계산 횟수(number of function evaluations; nfe)는 편자가 심하지만 Differential Evolution(DE) 알고리즘 보다는 확실히 적은 횟수만으로 준 최적값을 찾아낸다. 하지만 정확도까지 따지면 그닥 좋은 편은 아니다. 그래서 Quadratic Regression(QR)을 같이 사용한 결과가 아래 그림이다. 즉, exploration은 LIPO가 하고 exploitation은 QR이 하도록 했더니 좋은 결과가 나온 것이다.

흠, 하지만 공짜 점심 없음 정리(no free-lunch theorem)에 따라 배보다 배꼽이 큰 문제가 발생할 수 있음이 확인된 것이다. 그러니까, 목적함수 계산 비용과 Lipshitz UB + local search 계산 비용 중에 어떤 비용이 많이 드는 지에 따라 알고리즘의 효용성이 달라질 수 있는 것이다. 일반적으로 바꿔 말하자면 sampling 비용과 목적함수 계산 비용에 따라 LIPO의 가치가 달라질 수 있다. 목적함수의 독립 변수 수가 어마무시하다면 LIPO의 효용성이 커질 수 있다.

맺음말

결론적으로, 최소한의 sampling으로 전역 최적해를 찾을 수 있다는 LIPO의 기본 발상은 매우 훌륭하다. Deep Learning에서도 최소의 data를 가지고 학습시키는 것이 중요한 연구 분야이다. 더불어 Lipshitz UB에 대해서 깊이 생각해 볼수 있었다. random search에서 sampling 문제를 인공신경망과 같은 일반적인 regression 문제에 대해서도 확장해서 생각해 볼 수 있는데, 이미 sampling한 data를 가지고 regression 목적 함수를 최적화 시키는 것에 대한 것이다. 즉, Lipshitz UB 개념을 학습에 사용되는 data들에도 적용할 수 있겠다는 것이다. training data들 중에도 좋은 놈이 있고 나쁜 놈이 있을 수 있는데, 이놈들을 걸러낼 수 있는 기준이 될 수 있겠다는 점이다. 아니면, 학습 data를 쉽게 얻을 수 있는 것은 아니기 때문에 data/parameter transformation을 통해 Lipshitz UB 조건을 충족하도록 할 수도 있겠다. 그래서 구글링했더니 GAN(Generative Adversarial Network)에서 gradient exploding 문제 때문에 Spectral Normalization을 적용해서 학습이 잘 되도록 했다는 연구 결과가 있더라. 아무튼 이 글을 읽은 사람들 중에 아이디어를 얻어서 연구에 활용해 보기를 기대한다.