2018/10/14

Signals & Slots - SigLots multithreading 지원


공부도 할겸해서 siglots에 multithread 기능을 넣어 보았다. 구현하면서 부딪치는 문제들을 해결하는 쏠쏠한 재미가 있다. 그 문제 들을 정리하면 아래와 같다.
  • single thread와 multi thread를 동시에 지원하려면? dummy mutex를 만들어서 일종의 template policy로 해결했다.
  • std::atomic_flag를 std::mutex 대신 써보면 어떨까? 기특하게 잘 돌아간다. Signal 객체가 기본적으로 multi thread로 동작하도록 하고 필요시 single thread를 사용할 수 있도록 했다.
  • siglots에 threadpool을 적용해 보면 어떨까? 잘 돌아간다. 덤으로 Qt의 Direct/Queued/ BlockingQueued Connection을 쉽게 구현할 수 있었다. Unique Connection은 siglots가 기본적으로 사용하는 방식이다. 다만 Auto Connection은 나중에 생각해 볼 여지가 있다.
  • Signal 객체에 multi thread를 지원하면서 mutex와 thread 객체를 포함하게 됐기 때문에 Signal 객체 자체는 copy/move가 안된다. 해결 방법은? 나중에 여기를 참고해서...
  • Qt의 QObject::moveToThread()에 대한 생각은? Tracker가 QObject 처럼 시조할배 노릇을 하게되면 구현할 수 있다. 하지만 siglots가 괴물 framework이 될 수도...
  • siglots의 threadpool에서 std::function과 std::packaged_task, std::future 등등을 사용할 필요가 있나? 시간이 되면 나중에 시도해 볼 수도...
  • single이든 multi thread이든 Slot 들의 실행 결과를 활용할 방법은? 필요시 검토...
  • clang++ 6.0.0에서는 lambda가 아무 문제 없는데 g++ 7.3.0에서는 lambda가 rvalue이면서 capture를 사용할때 문제가 생기더라. g++ 버그인가? 좀 찾아보니 lambda는 clang이 좀 안정적인듯... => 참고로, ubuntu 18.10의 gcc/g++ 8.2.0에서는 문제가 해결됐다.

Signal class에서 바뀐 주요 사항

multi thread를 기본으로 사용하면서 Signal class의 prototype이 아래 소스와 같이 바뀌었다. 따라서 single thread를 사용하려면 아래의 예와 같이 변수를 선언해야 한다.

Signal<void(int), SingleThread> signal_single; // for single-threaded Signal
Signal<void(int)> signal_multi;                          // for multi-threaded Signal (default)

아래 소스에서 LockGuard<TM>이 SingleThread/MultiThread의 template parameter 설정에 따라 dummy 또는 atomic mutex를 사용하게 된다. 그리고, 기존의 signal 변수에 SingleThread 옵션을 주면 connect() 함수의 muti thread 관련 설정들은 모두 무시해 버리기 때문에 안전하게 single thread로 실행할 수 있다. 즉, 기존의 multi thread 설정을 single thread로 바꾸려면, Signal 변수 선언 부분의 SingleThread 옵션만 주면 다른 소스 부분을 손댈 필요가 없다. 흠... 물론 Signal 변수 갯수가 밤하늘의 별처럼 많다면(?) 문제가 좀 있겠네... 사실은, 별볼일 없는 세상이고, single thread 신경 쓸 필요 없는 세상일 수도...???

소스에서는 ThreadPool에 thread 2개를 기본으로 사용하고 있지만 1개 이상이면 항상 multi thread를 사용하게 되는 셈이다. CPU core 수에 따라 적절히 조정하면 된다. ThreadPool은 내부적으로 thread queue 1개를 갖고 있다. main thread가 queue에 연결된 Slot들을 넣어 주면 ThreadPool 내의 thread 들이 알아서 일을 해준다. 주의할 점은, main thread는 이후에 자기할 일을 계속하는데, 일(Slots)을 맡겨 놓고 ThreadPool에서 일을 하고 있는 도중에 일을 없애(disconnect) 버릴 수도 있게 된다. 이러면 crash가 생길 수 있다. 그래서 소스에서 보듯이 disconnect() 함수에서 waitforThreads()를 이용해서 ThreadPool의 작업이 종료되지 않은 상황에서는 main thread가 기다렸다가 disconnect()를 수행하도록 하고 있다.

여기서, waitforThreads() 함수 사용시 각별한 주의가 필요하다. milli 초 주기로 ThreadPool이 Slot 실행을 완료했는지 확인하기 때문에 Slot 실행이 금방 끝나는 경우엔 효율적일 수 있지만, 예를 들어 30초 이상 작업 시간이 걸린다면 큰 문제가 될 수 있다. CPU 점유율 100%의 쓴 맛을 보게 될 것이다. std::condition_variable을 사용하는게 좋을 수도 있지만 dead lock 문제가 생길 수 있다. 아무튼 현재 조건에서 최선의 방법은, Slot이 30초 이상 작업을 해야 한다면, 다음에 설명할 BlockingQueued ConnectMode는 사용하지 말아야 한다. 그리고, disconnect() 함수들은 프로그램 종료시가 아니고서는 웬만하면 사용할 필요가 없기 때문에 큰 문제가 되지는 않는다.

connect() 함수에서 Slot의 ConnectMode를 옵션으로 설정할 수 있도록 했고, emit() 함수에서 ConnectMode 또는 SingleThread 여부를 판단해서 signals and slots가 원활히 동작하도록 해준다.

template<typename T, typename TM = MultiThread> class Signal;
template<typename TM, typename TR, typename... TAs>
class Signal<TR(TAs...), TM>
{
  ......
  // Waits for the threads in a ThreadPool to finish work.
  // Disconnecting Slots on which the threads are working may cause disaster.
  void waitforThreads()
  { while(m_threads.running()) std::this_thread::sleep_for(std::chrono::milliseconds(1)); }

  template<typename TO, typename TPMF>
  TypeID connect(TO&& pobj, TPMF&& pmf, ConnectMode mode = Queued)
  {
    LockGuard<TM> lock{m_mutex};
    TypeSlot slot(pobj, std::forward<TPMF>(pmf));
    if(!slot.object()) return 0;
    TypeID id = contains(slot);
    if(!id) {
      slot.setMode(mode);
      m_slots.push_back(slot);
    }
    attachTracker(pobj);
    addToMap(slot.object(), std::forward<TO>(pobj));
    return slot.id();
  }

  // Rvalue-lambda with capture shows abnormal behavior on gcc 7.3.0.
  // Just fine for all cases on clang 6.0.0.
  template<typename TO>
  TypeID connect(TO&& pobj, ConnectMode mode = Queued)
  {
    LockGuard<TM> lock{m_mutex};
    TypeSlot slot(pobj);
    if(!slot.object()) return 0;
    TypeID id = contains(slot);
    if(!id) {
      slot.setMode(mode);
      m_slots.push_back(slot);
    }
    addToMap(slot.object(), std::forward<TO>(pobj));
    return slot.id();
  }
 
  // Disconnects all the Slots connected to this Signal.
  void disconnect()
  {
    LockGuard<TM> lock{m_mutex};
    waitforThreads();
    while(!m_slots.empty()) {
      detachTracker();
      m_slots.pop_back();
    }
  }
  ......
private:
  mutable TM m_mutex;
  ThreadPool m_threads{2};
};

template<typename TM, typename TR, typename... TAs>
void Signal<TR(TAs...), TM>::emit(TAs... args)
{
  LockGuard<TM> lock{m_mutex};
  for(auto it = m_slots.begin(); it != m_slots.end();) {
    ......
    if(it->blocked()) { ++it; continue; }
    if(it->mode() == ConnectMode::Direct || it->mode() == ConnectMode::Auto ||
       std::is_same<SingleThread, TM>::value)
      //TODO: we can do somthing more with returned results if TR is not void.
      //TR result = (*it++).emit(args...);
      (*it++).emit(args...);
    else if(it->mode() & ConnectMode::Queued) {
      //TODO: we can do somthing more with returned results if TR is not void.
      //TR reult = m_threads.run(*it, args...).get();
      m_threads.run(*it, args...);
      if(it->mode() == ConnectMode::BlockingQueued) waitforThreads();
      ++it;
    }
  }
}

siglots에서 multi-thread 사용 예

아래의 소스를 참고하면 Slot ConnectMode가 어떻게 동작하는지 이해할 수 있다. Direct는 Signal에 연결된 Slot 함수를 직접 호출해서 실행한다. Queued는 thread queue에 Slot을 쌓아 놓고 실행되도록 한다. BlockingQueued는 Queued와 동일하지만 ThreadPool이 해당 Slot의 실행을 마친 후에야 Signal을 호출한 thread가 다른 일을 할 수 있다.

아래 소스를 실행해 보면, Signal 객체(sig) 자체가 main thread에서 생성되었기 때문에 single thread와 Direct 방식은 결과가 비슷하다. ThreadPool을 사용하지 않기 때문에 t1과 t2 thread가 일을 한다. BlockingQueued나 Queued는 main thread와 t1, t2 thread, 그리고 ThreadPool의 기본 thread 2개를 합쳐 5개의 thread id가 나타날 것이다. 이 두 방식의 차이는 t2 쓰레드 부분을 모두 comment 처리해서 실행해 보면 확연히 드러난다. 즉, BlockingQueued 방식을 사용하면 Slot 들의 실행 순서를 정해 줄 수 있게 된다.

#include "Signal.h"
std::mutex MX;
#define LOG(x) {std::lock_guard<std::mutex> l{MX}; std::cout << std::this_thread::get_id()\
                 << ' ' << __FUNCTION__ << '(' << __LINE__ << ")> " << x << '\n';}

using namespace siglots;

Signal<void(int, std::thread::id)> sig;
// Signal<void(int, std::thread::id), SingleThread> sig;

void emitN(int n)
{
  for(int i = 0; i < n; ++i) sig.emit(i, std::this_thread::get_id());
}

int main()
{
  auto fl = [](int i, std::thread::id tid) { LOG(i << " - emitter id: " << tid); };

  sig.connect(fl, Queued);
//  sig.connect(fl, Direct);
//  sig.connect(fl, BlockingQueued);
  std::thread t1{emitN, 20};
  std::thread t2{emitN, 10};
  LOG("main thread id: " << std::this_thread::get_id() << '\n');
  LOG("t1 id: " << t1.get_id() << ", t2 id: " << t2.get_id());
  t1.join();
  t2.join();
}

기타 multi-thread 사용시 참고 사항

앞서 multi-thread 환경에서 Slot이 실행되는 중에 그 Slot을 disconnect() 시키면 crash가 생길 수 있다고 했다. 이와 마찬가지로 Slot 객체를 new로 memory allocation 해서 Signal에 연결해서 사용하다가 어디선가 delete 해버리면 crash가 발생하게 된다. 이는 Signal에 국한된 문제는 아니다. 하지만, 이런 문제가 생기면 근본적으로 원인을 찾기가 어렵다. 특히나 multi-thread 환경에서는 엉뚱한 곳에서 crash가 발생하는 듯이 보이기 때문이다. 그니까 raw pointer는 잘 모르고 함부로 사용하면 안된다.

그리고, multi-thread 환경에서는 deadlock이 발생할 수 있는데 crash가 아니고, 말 그대로 프로그램이 멈춰서 동작하지 않는 경우이다. 흔한 경우는 한 thread가 lock을 걸어 놓은 상태에서 다시 lock을 걸 때인데, 함수 내에서 lock_guard를 사용할 때 조심해야 한다. 그 함수 내에서 호출하는 함수에 lock_guard를 다시 설정해 놓는 실수를 하게 된다. 또, 한 가지 흔한 경우는 두 개의 thread가 각각의 자원에 lock을 걸어 놓은 상태에서 상대방 thread의 자원을 요구하는 경우이다. 순환 참조 형태는 늘 문제를 유발시킨다. 순서를 정해 실행되도록 해야 한다.

끝으로, multi-thread 환경에서 공유 자원을 사용할 때는 lock을 사용하는 것이 원칙이지만, 이는 순서대로 thread들이 실행되도록 하는 것이므로 매우 비효율적인 것이기도 하다는 점이다. lock-free multi-threading이 이상적이지만 현실과는 괴리가 좀 있다. 가령, siglots의 ThreadPool이 실행 중인지를 main thread가 알고 싶을때 read-only로 running 상태만 알면 되기 때문에 굳이 lock을 사용할 필요는 없다.

댓글 없음:

댓글 쓰기