2009年11月28日土曜日

boost::asio udp 考察

 boost::asio の udp について、少しコツがわかったので、ポストします。ごちゃごちゃと説明するよりは、コードを見てもらった方がはやいと思うので、いきなりコードです。

// ping_pong_server.cpp

#include <boost/asio.hpp>
#include <boost/array.hpp>
#include <boost/bind.hpp>
#include <iostream>

using boost::asio::ip::udp;

//
// サーバ側から見た、通信の流れ
// 実装は、bind の順序が下図どおりではない点に注意
//
// CLIENT SERVER
// bind_accept
// ---- connect ----> => handle_accept
// bind_ack
// <---- ack -----
// => handle_ack
// bind_receive
// ---- data 1 ----> => handle_receive
// bind_ack
// <---- ack -----
// => handle_ack
// bind_receive
// ---- data 2 ----> => handle_receive
// bind_ack
// <---- ack -----
// => handle_ack
// bind_receive
// ---- data 3 ----> => handle_receive
// bind_ack
// <---- ack -----
// => handle_ack
// bind_receive
// ---- data 4 ----> => handle_receive
// bind_response
// <--- data 1+2+3+4 ---
// => handle_response
//
// Async 実装上のコツは、キャッチャーミットを構えておいて
// ボールを投げる。この一点につきるかもしれない。

class ping_pong_server {
private:
boost::asio::io_service io_service_;
udp::socket socket_;
udp::endpoint client_endpoint_;
std::string ball_;
int count_;
boost::array<char,512> buffer_;
boost::array<char,1> ack_buffer_;

private:
void bind_accept() {
//std::cout << "bind_accept" << std::endl;
socket_.async_receive_from(
boost::asio::buffer( buffer_ ),
client_endpoint_,
boost::bind(
&ping_pong_server::handle_accept, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred
)
);
}
void handle_accept(
const boost::system::error_code& error,
size_t bytes_recvd
) {
//std::cout << "handle_accept" << std::endl;
if( !error ) {
ball_.append( &buffer_[0], bytes_recvd );
std::cout << "*:" << client_endpoint_.port() << ":" << ball_ << std::endl;
++count_;
// ACK を送信する前に、受信の準備をしておく事が重要
bind_receive();
// 受信の準備なしに ACK を送信すると、受信が間に合わなくなる可能性がある。
bind_ack();
} else if( error != boost::asio::error::operation_aborted ) {
std::cout << error.message() << std::endl;
start();
}
}
void bind_ack() {
//std::cout << "bind_ack" << std::endl;
socket_.async_send_to(
boost::asio::buffer( ack_buffer_ ),
client_endpoint_,
boost::bind(
&ping_pong_server::handle_ack, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred
)
);
}
void handle_ack(
const boost::system::error_code& error,
size_t bytes_transferred
) {
// ACK の送信が完了した段階で、受信するコードを書く事ができるが、
// ここで bind_receive するのでは、タイミングが遅い可能性がある。
//std::cout << "handle_ack" << std::endl;
}
void bind_receive() {
//std::cout << "bind_receive" << std::endl;
socket_.async_receive(
boost::asio::buffer( buffer_ ),
boost::bind(
&ping_pong_server::handle_receive, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred
)
);
}
void handle_receive(
const boost::system::error_code& error,
size_t bytes_recvd
) {
//std::cout << "handle_receive" << std::endl;
if( !error ) {
ball_.append( &buffer_[0], bytes_recvd );
std::cout << client_endpoint_.port() << ":" << ball_ << std::endl;
++count_;
if( count_ < 4 ) {
// ACK を送信する前に、受信の準備をしておく事が重要
bind_receive();
// 受信の準備なしに ACK を送信すると、受信が間に合わなくなる可能性がある。
bind_ack();
} else {
// cleint_endpoint_ が更新されないうちに最後のレスポンスを送信
bind_response();
// 次の受信サイクルを開始する
start();
}
} else if( error != boost::asio::error::operation_aborted ) {
std::cout << error.message() << std::endl;
start();
}
}
void bind_response() {
//std::cout << "bind_response" << std::endl;
socket_.async_send_to(
boost::asio::buffer( ball_ ),
client_endpoint_,
boost::bind(
&ping_pong_server::handle_response, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred
)
);
}
void handle_response(
const boost::system::error_code& error,
size_t bytes_transferred
) {
// ここでも先手を打ってあるので、何もしない
//std::cout << "handle_response" << std::endl;
}

void start() {
ball_.clear();
count_ = 0;
bind_accept();
}

public:
ping_pong_server()
: io_service_()
//, socket_(io_service_,udp::endpoint(udp::v6(),8080))
, socket_(io_service_,udp::endpoint(udp::v4(),8080))
, client_endpoint_()
, ball_()
, count_()
{
}

void run() {
start();
io_service_.run();
}


};

int main() {
ping_pong_server pps;
pps.run();
return 0;
}


これに対して、クライアント側のコードです。


// ppclient.cpp

#include <boost/asio.hpp>
#include <boost/array.hpp>
#include <boost/bind.hpp>
#include <boost/thread/thread.hpp>
#include <string>
#include <iostream>

using boost::asio::ip::udp;

const char *msgs[] = { "ping", "pong", "pang", "pong" };

class ping_pong_client {
private:
boost::asio::io_service& io_service_;
udp::resolver resolver_;
udp::socket socket_;
udp::endpoint server_endpoint_;
int count_;
boost::array<char,512> buffer_;
std::string result_;

private:
void bind_send() {
//std::cout << "bind_send(" << msgs[count_] << ")" << std::endl;
socket_.async_send(
boost::asio::buffer( msgs[count_++], 4 ),
boost::bind(
&ping_pong_client::handle_send, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred
)
);
}
void handle_send(
const boost::system::error_code& error,
size_t bytes_transferred
) {
//std::cout << "handle_send" << std::endl;
if( error ) {
std::cout << error.message() << std::endl;
}
}

void bind_ack() {
//std::cout << "bind_ack" << std::endl;
socket_.async_receive_from(
boost::asio::buffer( buffer_ ),
server_endpoint_,
boost::bind(
&ping_pong_client::handle_ack, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred
)
);
}
void handle_ack(
const boost::system::error_code& error,
size_t bytes_transferred
) {
//std::cout << "handle_ack(" << count_ << ")" << std::endl;
if( !error ) {
if( count_ < 3 ) bind_ack();
else bind_receive();
bind_send();
} else {
std::cout << error.message() << std::endl;
}
}
void bind_receive() {
//std::cout << "bind_receive" << std::endl;
socket_.async_receive_from(
boost::asio::buffer( buffer_ ),
server_endpoint_,
boost::bind(
&ping_pong_client::handle_receive, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred
)
);
}
void handle_receive(
const boost::system::error_code& error,
size_t bytes_recvd
) {
//std::cout << "handle_receive" << std::endl;
if( !error ) {
result_.assign( &buffer_[0], bytes_recvd );
} else {
std::cout << error.message() << std::endl;
}
}
void start( const char* host, const char* port ) {
//std::cout << "start(" << host << "," << port << ")" << std::endl;
udp::resolver::query query(host, port);
resolver_.async_resolve(
query,
boost::bind(
&ping_pong_client::handle_resolve, this,
boost::asio::placeholders::error,
boost::asio::placeholders::iterator
)
);
}

void bind_connect(
udp::resolver::iterator endpoint_iterator
) {
//std::cout << "bind_connect" << std::endl;
server_endpoint_ = *endpoint_iterator;
//std::cout << server_endpoint_ << std::endl;
socket_.async_connect(
server_endpoint_,
boost::bind(
&ping_pong_client::handle_connect, this,
boost::asio::placeholders::error,
++endpoint_iterator
)
);
}

void handle_resolve(
const boost::system::error_code& error,
udp::resolver::iterator endpoint_iterator
) {
//std::cout << "handle_resolve" << std::endl;
if( error ) return;
bind_connect( endpoint_iterator );
}

void handle_connect(
const boost::system::error_code& error,
udp::resolver::iterator endpoint_iterator
) {
//std::cout << "handle_connect" << std::endl;
if( !error ) {
bind_ack();
bind_send();
} else if( endpoint_iterator != udp::resolver::iterator() ) {
bind_connect( ++endpoint_iterator );
} else {
std::cout << error.message() << std::endl;
}
}

public:
ping_pong_client(boost::asio::io_service& io_service)
: io_service_(io_service)
, resolver_(io_service_)
, socket_(io_service_)
, count_()
{
}

std::string run( const char* host, const char* port ) {
count_ = 0;
start( host, port );
io_service_.run();
return result_;
}

};

boost::asio::io_service io_service;

int main() {

ping_pong_client ppc( io_service );

std::cout << ppc.run("localhost","8080") << std::endl;

return 0;
}



 と、まぁ、1対1の通信ならば、これで、うまく行きます。では、同時に多対1の通信にするとどうでしょうか?というテストコードが以下です。


// 前半は同じ。

boost::asio::io_service io_service;
boost::array<std::string,10> slot;
boost::array<boost::thread,10> thr;

void tfunc( int i ) {
ping_pong_client ppc( io_service );
slot[i] = ppc.run( "localhost", "8080" );
}

int main() {
boost::thread_group threads;
for( int i = 0; i < 10; ++i ) {
thr[i] = boost::thread( &tfunc, i );
threads.add_thread( &thr[i] );
}
threads.join_all();
for( int i = 0; i < 10; ++i ) {
std::cout << slot[i] << std::endl;
}
return 0;
}


 これで実験してみたところ、async_receive_from と async_recieve も変わりなく、ごちゃ混ぜになりました。UDP は、コネクション・レスという事で、1Query 1Response のタイプのものが多いです。このような状況で、セッションという概念を持ち込むのは、かなりしんどそうです。このサンプルのように、通信をする場合は、トラスティッド・ネットワークで1対1が保障されるような場合にしか使えないという事のようです。

2009/12/01 追記、どのみちマルチ版は、まともに動作しないのですが(通信がちゃんと終了しないため)、boost::asio を使ったマルチスレッドのサンプルとしては、でたらめなので、後でフォローしなおします。

0 件のコメント: