2009年6月30日火曜日

thread_pool

そういえば、ブログの方には、ポストしてなかったので、thread_pool のコード、ポストしときます。
コンセプトは、スレッド・イニシャル時のランタイムの初期化等を省力化する事と、スリープ時には負荷を加えない事の2点です。この設計コンセプトは、BOOST_THREAD を定義すると守られません。

 まぁ、バグがあって、終了しないとか、あるかもしれないので、そこんところはよろしくです。

thread_pool.hpp

//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// Copyright (c) 2007 OKI Miyuki (oki.miyuki at gmail dot com)
//
#include <windows.h>
#include <deque>
#include <boost/thread.hpp>
#include <boost/function.hpp>

//#define BOOST_THREAD

/// ワーカ関数型
typedef boost::function<void (void*)> worker_func_t;
/// スレッドの設定関数型
typedef boost::function<void (void)> setup_func_t;

/// ワーカ情報
/*!
@brief ワーカ・ホルダ
*/
struct worker_info {
public:
worker_info() : func_(), arg_() {}
worker_info(
worker_func_t f,
void* arg
);
public:
worker_func_t func_; //!< ワーカ関数型
void* arg_; //!< 引数
};

/// ワーカ・ホルダ・キュー
typedef std::deque<worker_info> worker_queue_t;

/// 何もしない関数
void empty_func();

/// スレッド・プール・クラス
class thread_pool {
private:
size_t pool_count_; ///< スレッド・プール数
#ifdef BOOST_THREAD
boost::condition cond_;
boost::mutex mutex_;
#else
union {
HANDLE ready_[2];
struct {
HANDLE sync_; ///< キュー同期用
HANDLE free_; ///< 空き待ちシグナル
};
};
#endif
boost::thread_group threads_;
worker_queue_t queue_; ///< ワーカ・キュー
volatile bool continue_; ///< 停止通知用
setup_func_t initializer_; ///< 初期化関数
setup_func_t uninitializer_; ///< 終了化関数

private:
/// スレッド・プール関数
void pool_func();

private:
/// クラス初期化処理
void init();
/// クラス終了処理
void term();

public:
/// コンストラクタ
explicit thread_pool(
setup_func_t initializer = empty_func,
setup_func_t uninitializer = empty_func,
size_t pools = 16
);

/// デストラクタ
~thread_pool();

/// ワーカをスケジュールする
void schedule(
worker_func_t func,
void* arg
);

/// スレッド・プールを終了する
void stop();

};


thread_pool.cpp

//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// Copyright (c) 2007 OKI Miyuki (oki.miyuki at gmail dot com)
//
#include <thread_pool.hpp>
#include <boost/assert.hpp>
#include <boost/bind.hpp>

//#define HNS_THREADPOOL_DEBUG
#ifdef HNS_THREADPOOL_DEBUG
#include <iostream>
#endif

worker_info::worker_info(
worker_func_t f,
void* arg
) :
func_(f),
arg_(arg)
{
}

void empty_func() {}

void thread_pool::pool_func() {
initializer_();
while( continue_ ) {
bool do_work = false;
bool do_notify = false;
worker_info worker;
{
#ifdef BOOST_THREAD
boost::mutex::scoped_lock lk(mutex_);
#else
WaitForSingleObjectEx( free_, INFINITE, FALSE );
{
WaitForSingleObjectEx( sync_, INFINITE, FALSE );
#endif
if( !queue_.empty() ) {
worker = queue_.front();
queue_.pop_front();
do_work = true;
}
do_notify = !queue_.empty();
#ifndef BOOST_THREAD
if( do_notify && continue_ ) {
SetEvent( free_ );
} else {
ResetEvent( free_ );
}
}
ReleaseMutex( sync_ );
#endif
}
#ifdef BOOST_THREAD
if( do_notify && continue_ ) {
cond_.notify_one();
}
#endif
if( do_work && continue_ ) {
worker.func_( worker.arg_ );
}
}

#ifdef HNS_THREADPOOL_DEBUG
std::cout << "pool_func LEAVE:" << GetCurrentThreadId() << std::endl;
#endif
#ifndef BOOST_THREAD
SetEvent( free_ );
#endif
uninitializer_();
}

void thread_pool::init() {
#ifndef BOOST_THREAD
sync_ = CreateMutex( NULL, FALSE, 0 );
free_ = CreateEvent( 0, TRUE, FALSE, 0 );
#endif
for( size_t i = 0; i < pool_count_; ++i ) {
threads_.create_thread( boost::bind( &thread_pool::pool_func, this ) );
}
}

void thread_pool::term() {
continue_ = false;
#ifdef BOOST_THREAD
cond_.notify_all();
threads_.join_all();
#else
if( free_ && sync_ ) {
SetEvent( free_ );
threads_.join_all();
}
if( sync_ ) {
CloseHandle( sync_ );
sync_ = 0;
}
if( free_ ) {
CloseHandle( free_ );
free_ = 0;
}
#endif
}

thread_pool::thread_pool(
setup_func_t initializer,
setup_func_t uninitializer,
size_t pools
) :
initializer_(initializer),
uninitializer_(uninitializer),
pool_count_(pools),
#ifdef BOOST_THREAD
cond_(),
#else
sync_(),
free_(),
#endif
threads_(),
continue_(true)
{
init();
}

thread_pool::~thread_pool() {
term();
}

void thread_pool::stop() {
term();
}

void thread_pool::schedule(
worker_func_t func,
void* arg
) {
#ifdef BOOST_THREAD
{
boost::mutex::scoped_lock lk( mutex_ );
#else
WaitForSingleObjectEx( sync_, INFINITE, FALSE );
#endif
if( continue_ ) {
queue_.push_back( worker_info( func, arg ) );
SetEvent( free_ );
}
#ifdef BOOST_THREAD
}
cond_.notify_one();
#else
ReleaseMutex( sync_ );
#endif
}

#ifdef HNS_THREADPOOL_DEBUG

#include <boost/lexical_cast.hpp>
#include <boost/random.hpp>
#include <ctime>

// random generator class
/*!
@note Default random generator is too poor.
*/
struct random_generator {
private:
mutable boost::mt19937 mt_gen_;
public:
random_generator() :
mt_gen_(static_cast<unsigned long>(std::time(0)))
{}

int operator () (int n) const {
// Using mod is not uniform. but behavior is less.
return (mt_gen_() % n);
}
};

void test( void* arg ) {
std::cout << ">> test(" << (int)arg << ":" << GetCurrentThreadId() << ");" << std::endl;
Sleep( ((int)arg) * 250 );
std::cout << "<< test(" << (int)arg << ":" << GetCurrentThreadId() << ");" << std::endl;
}


void main() {
thread_pool pool;
random_generator rg;

for( int i = 0; i < 20; ++i ) {
pool.schedule( test, (void*)(rg(12)) );
pool.schedule( test, (void*)(rg(12)) );
pool.schedule( test, (void*)(rg(12)) );
pool.schedule( test, (void*)(rg(12)) );
pool.schedule( test, (void*)(rg(12)) );
pool.schedule( test, (void*)(rg(12)) );
pool.schedule( test, (void*)(rg(12)) );
std::cout << "...zzZ" << std::endl;
if( i < 12 ) Sleep( 2000 + rg(4000) );
std::cout << "Zzz..." << std::endl;
if( i == 10 ) pool.stop();
}
}

#endif



追記:2009/07/21 WaitForMutipleObjectsEx を分離する形に修正
 スケジューリングの関数自体と、スレッドの関数自体がスレッド・プールの同期オブジェクトをダイレクトに利用するため、ユーザ指定スレッドと複合してデッドロックを起こす可能性があるので、関数のスケジューラには、代理スレッドを立てた方が良いかと思案しているが、取り入れるには至っていない。

追記:2009/07/28 終了時のイベントの取り扱いが不味いのを修正。
 stop 関数を追加。
 テスト・ケースを修正
追記:2009/08/05 term 時のcontinue_ に対する同期保護は不要なので削除

0 件のコメント: