轉(zhuǎn)自:http://blog.csdn.net/caisini_vc/article/details/4474910
把這兩天做Proactor的一些經(jīng)驗(yàn)和心得寫一下,可能會(huì)給一些人幫助。
Proactor是異步模式的網(wǎng)絡(luò)處理器,ACE中叫做“前攝器”。
先講幾個(gè)概念:
前攝器(Proactor)-異步的事件多路分離器、處理器,是核心處理類。啟動(dòng)后由3個(gè)線程組成(你不需要關(guān)心這三個(gè)線程,我只是讓你知道一下有這回事存在)。
接受器(Acceptor)-用于服務(wù)端,監(jiān)聽(tīng)在一個(gè)端口上,接受用戶的請(qǐng)求。
連接器(Connector)-用于客戶端,去連接遠(yuǎn)程的監(jiān)聽(tīng)。當(dāng)然,如果遠(yuǎn)程是ACE寫的,就是Acceptor。
異步模式-即非阻塞模式。網(wǎng)絡(luò)的傳輸速度一般來(lái)講為10Mbps、100Mbps、1000Mbps。拿千兆網(wǎng)來(lái)說(shuō),實(shí)際的傳輸速度為1000Mbps/8大概為128KB左右。我們的CPU一般為P4 3.0GHZ,如果是32位的處理器,一秒鐘大概可以處理6G的字節(jié),那么,128KB的網(wǎng)絡(luò)速度是遠(yuǎn)遠(yuǎn)及不上處理器的速度的。網(wǎng)絡(luò)發(fā)送數(shù)據(jù)是一位一位發(fā)送出去的,如果CPU等在這里,發(fā)送完成函數(shù)才結(jié)束,那么,處理器浪費(fèi)了大量時(shí)間在網(wǎng)絡(luò)傳輸上。
操作系統(tǒng)提供了異步的模式來(lái)傳輸網(wǎng)絡(luò)數(shù)據(jù),工作模式即:應(yīng)用程序把要發(fā)送的數(shù)據(jù)交給操作系統(tǒng),操作系統(tǒng)把數(shù)據(jù)放在系統(tǒng)緩沖區(qū)后就告訴應(yīng)用程序OK了,我?guī)湍惆l(fā),應(yīng)用程序該干嘛干嘛去。操作系統(tǒng)發(fā)送完成后,會(huì)給應(yīng)用系統(tǒng)一個(gè)回執(zhí),告訴應(yīng)用程序:剛才那個(gè)包發(fā)送完成了!
舉個(gè)例子:你有幾封郵件和包裹要發(fā),最有效率的辦法是什么?你把郵件和包裹及交給總臺(tái),總臺(tái)MM說(shuō),好了,你幫你發(fā),你忙去吧!然后你去工作了。過(guò)了一會(huì),總臺(tái)MM打電話告訴你:“剛才我叫快遞公司的人來(lái)了,把你的包裹發(fā)出去了。郵局的人也來(lái)了,取走了郵件,放心好了”。同樣,如果你知道今天會(huì)有包裹來(lái),比如你在淘寶上購(gòu)物了,你能成天等在總臺(tái)?你應(yīng)該告訴總臺(tái)MM:“今天可能有我的一個(gè)快遞,你幫我收一下,晚上請(qǐng)你肯德基!”。MM:“看在肯得基的面子上,幫你收了”。某個(gè)時(shí)間,MM打電話來(lái)了:“帥哥,你的包裹到了,我?guī)湍愫炇樟耍靵?lái)拿吧。”
因?yàn)椴僮飨到y(tǒng)是很有效率的,所有,他在后臺(tái)收發(fā)是很快的。應(yīng)用程序也很簡(jiǎn)單。Proactor就是這種異步模式的。Proactor就是總臺(tái)MM;ACE_Service_Handle就是總臺(tái)代為收發(fā)郵件的公司流程。
我們看一個(gè)實(shí)例:
//***********************************************************
class TPTCPAsynchServerImpl : public ACE_Service_Handler
{
public:
TPTCPAsynchServerImpl(void);
~TPTCPAsynchServerImpl(void);
virtual void open (ACE_HANDLE handle, ACE_Message_Block &message_block);
virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
virtual void handle_time_out (const ACE_Time_Value &tv, const void *act=0);
private:
int initiate_read_stream (const ACE_Asynch_Read_Stream::Result &result);
ACE_Asynch_Read_Stream rs_;
ACE_Asynch_Write_Stream ws_;
};
這個(gè)例子從ACE_Service_Handler繼承過(guò)來(lái),ACE_Service_Handle主要就是定義了一些回調(diào)函數(shù)。
1、 virtual void open (ACE_HANDLE handle, ACE_Message_Block &message_block);
當(dāng)有客戶端連接上來(lái),連接建立成功后Proactor會(huì)調(diào)用這個(gè)方法。
2、 virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
當(dāng)用戶要讀的數(shù)據(jù)讀好了后,調(diào)用這個(gè)方法
3、virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
當(dāng)用戶要寫的數(shù)據(jù)在網(wǎng)卡上發(fā)送成功后,Proactor會(huì)回調(diào)這個(gè)方法
4、 virtual void handle_time_out (const ACE_Time_Value &tv, const void *act=0);
當(dāng)用戶設(shè)定的時(shí)鐘到期了,這個(gè)方法會(huì)被調(diào)用。
這跟和總臺(tái)MM的聯(lián)絡(luò)方法是不是一樣的?
對(duì)還缺點(diǎn)東西,缺少怎么向總臺(tái)MM交待任務(wù)的方法。下面看看:
首先,創(chuàng)建一個(gè)監(jiān)聽(tīng)器。
ACE_Asynch_Acceptor<TPTCPAsynchServerImpl> acceptor_;
看到?jīng)],就是我們剛才寫的類,因?yàn)樗^承了回調(diào)接口,并實(shí)現(xiàn)了自已的代碼,模板中ACE_Asynch_Acceptor會(huì)在合適的時(shí)候回調(diào)這些方法。
//創(chuàng)建一個(gè)地址對(duì)象
ACE_INET_Addr addr(port, ip);
acceptor_.open (addr, 8 * 1024, 1);
Open后,就開(kāi)始監(jiān)聽(tīng)了。其它的,向Proactor注冊(cè)一些事件的事模板類中都替你做了,你不需要做很多事。
那么,已經(jīng)開(kāi)始監(jiān)聽(tīng)了,我的程序從哪里開(kāi)始呢?對(duì)于一個(gè)服務(wù)程序來(lái)講,程序是被用戶的連接驅(qū)動(dòng)的,一個(gè)用戶程序想和通訊,必須先創(chuàng)建連接,就是Socket中的connect操作。這個(gè)操作Proactor會(huì)替我們做一些工作,當(dāng)連接創(chuàng)建完成后,上面講的Open方法會(huì)被調(diào)用,我們看看Open方法中都有些什么代碼:
void TPTCPAsynchServerImpl::open (ACE_HANDLE handle, ACE_Message_Block &message_block)
{
ACE_DEBUG ((LM_DEBUG, "%N:%l:TPTCPAsynchServerImpl::open()
"));
//構(gòu)造讀流
if (rs_.open (*this, handle) == -1)
{
ACE_ERROR ((LM_ERROR, "%N:%l: ", "TPTCPAsynchServerImpl::open() Error"));
return;
}
//構(gòu)造寫流
if (ws_.open(*this, handle) == -1)
{
ACE_ERROR ((LM_ERROR, "%N:%l: ", "TPTCPAsynchServerImpl::open() Error"));
return;
}
//獲取客戶端連接地址和端口
ACE_INET_Addr addr;
ACE_SOCK_SEQPACK_Association ass=ACE_SOCK_SEQPACK_Association(handle);
size_t addr_size=1;
ass.get_local_addrs(&addr,addr_size);
this->server_->onClientConnect((int)handle, addr.get_ip_address(), addr.get_port_number());
//如果客戶連接時(shí)同時(shí)提交了數(shù)據(jù),需要偽造一個(gè)結(jié)果,然后呼叫讀事件
if (message_block.length () != 0)
{
// ACE_DEBUG((LM_DEBUG, "message_block.length() != 0 "));
// 復(fù)制消息塊
ACE_Message_Block &duplicate = *message_block.duplicate ();
// 偽造讀結(jié)果,以便進(jìn)行讀完成回調(diào)
ACE_Asynch_Read_Stream_Result_Impl *fake_result =
ACE_Proactor::instance ()->create_asynch_read_stream_result (this->proxy (),
this->handle_,
duplicate,
1024,
0,
ACE_INVALID_HANDLE,
0,
0);
size_t bytes_transferred = message_block.length ();
// Accept事件處理完成,wr_ptr指針會(huì)被向前移動(dòng),將其移動(dòng)到開(kāi)始位置
duplicate.wr_ptr (duplicate.wr_ptr () - bytes_transferred);
// 這個(gè)方法將調(diào)用回調(diào)函數(shù)
fake_result->complete (message_block.length (), 1, 0);
// 銷毀偽造的讀結(jié)果
delete fake_result;
}
// 否則,通知底層,準(zhǔn)備讀取用戶數(shù)據(jù)
//創(chuàng)建一個(gè)消息塊。這個(gè)消息塊將用于從套接字中異步讀
ACE_Message_Block *mb = 0;
ACE_NEW (mb, ACE_Message_Block (_bufSize));
if (rs_.read (*mb, mb->size () - 1) == -1)
{
delete mb;
ACE_ERROR ((LM_ERROR, "%N:%l:open init read failed!"));
return;
}
}
我們看到,首先創(chuàng)建了兩個(gè)流,就是前面類定義中定義的一個(gè)異步寫流,一個(gè)異步讀流。以后對(duì)網(wǎng)絡(luò)的讀和寫就通過(guò)這兩個(gè)流進(jìn)行。我還給出了一段讀客戶端地址和端口的代碼。然后是讀取客戶Connect可能附帶的數(shù)據(jù),那段代碼不用看懂,以后使用照抄就行。然后就是
if (rs_.read (*mb, mb->size () - 1) == -1)
{
delete mb;
ACE_ERROR ((LM_ERROR, "%N:%l:open init read failed!"));
return;
}
這段代碼使用讀流讀一段數(shù)據(jù)。這段代碼就是向總臺(tái)MM交待:我要收包裹,收好了叫我!
也就是說(shuō),這段代碼99%的可能是讀不出數(shù)據(jù)的,只是向Proactor注冊(cè)讀的事件,具體的等待、讀取操作由Proactor讀,讀到了,就回調(diào)Handle_Read_Stream方法。ACE_Message_Block是消息塊,數(shù)據(jù)就是存放在消息塊中的。
下面看看Handle_Read_Stream方法的代碼:
void TPTCPAsynchServerImpl::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '/0';
ACE_DEBUG ((LM_DEBUG, "********************/n"));
ACE_DEBUG ((LM_DEBUG, "%s = %d/n", "bytes_to_read", result.bytes_to_read ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d/n", "handle", result.handle ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d/n", "bytes_transfered", result.bytes_transferred ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d/n", "act", (u_long) result.act ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d/n", "success", result.success ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d/n", "completion_key", (u_long) result.completion_key ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d/n", "error", result.error ()));
ACE_DEBUG ((LM_DEBUG, "********************/n"));
result.message_block().release();
if (this->initiate_read_stream (result) == -1)
{
ACE_ERROR((LM_ERROR, "%N:%l:read stream failed!connection closed, remove it:%d/n", result.handle()));
closeConnection(result.handle());
}
}
這個(gè)函數(shù)被調(diào)用,就表明有數(shù)據(jù)已經(jīng)讀好了,包裹已經(jīng)在總臺(tái)了。Proactor比總臺(tái)MM還好,給你送上門了,數(shù)據(jù)就在Result里,上面演示了Result中的數(shù)據(jù)。然后把消息塊釋放了,然后調(diào)用initiate_read_stream繼續(xù)監(jiān)聽(tīng)網(wǎng)絡(luò)上可能到來(lái)的數(shù)據(jù)。看看initiate_read_stream好了:
int TPTCPAsynchServerImpl::initiate_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
ACE_DEBUG((LM_TRACE, "%N:%l:TPTCPAsynchServerImpl::initiate_read_stream() "));
//創(chuàng)建一個(gè)消息塊。這個(gè)消息塊將用于從套接字中異步讀
ACE_Message_Block *mb = new ACE_Message_Block(_bufSize);
if (mb == NULL)
{
ACE_DEBUG((LM_ERROR, "%N:%l:can't allock ACE_Message_Block. "));
return -1;
}
if (rs_.read (*mb, mb->size () - 1) == -1)
{
delete mb;
ACE_ERROR_RETURN ((LM_ERROR, "%N:%l:rs->read() failed, clientID=%d ", result.handle()), -1);
}
return 0;
}
代碼很簡(jiǎn)單,就是創(chuàng)建一個(gè)新的消息塊,然后使用讀流注冊(cè)一個(gè)讀消息就可以了。
到此為止,Proactor的讀流程很清楚了吧?
下面再說(shuō)一個(gè)寫流程。
寫流程其實(shí)更簡(jiǎn)單,在任意想向客戶端寫數(shù)據(jù)的地方,調(diào)用相應(yīng)代碼就行了,比如,我們提供了SendData方法來(lái)發(fā)送數(shù)據(jù),在任意想發(fā)送數(shù)據(jù)的地方調(diào)用SendData就行了,SendData的代碼如下:
int TPTCPAsynchServerImpl::sendData(int clientID, const char *data, int dataLen, unsigned int &id)
{
ACE_DEBUG((LM_DEBUG, "TPTCPAsynchServerImpl::sendData(void) "));
ACE_Message_Block *mb;
ACE_NEW_RETURN(mb, ACE_Message_Block(dataLen + 1), -1);
mb->wr_ptr((char*)data);
ACE_OS::memcpy(mb->base(),(char*)data, dataLen);
id = GlobleSingleton::instance()->getIndex();
mb->msg_type((int)id);
//向操作系統(tǒng)發(fā)送數(shù)據(jù)
if (connection->ws->write (*mb , dataLen ) == -1)
{
ACE_ERROR_RETURN((LM_ERROR, "%N:%l:sendData failed! clientID=%d ", clientID),-1);
}
return 0;
}
簡(jiǎn)單說(shuō),就是創(chuàng)建了一個(gè)消息塊,把用戶數(shù)據(jù)拷貝進(jìn)來(lái),然后調(diào)用寫流WS向Proactor發(fā)送一個(gè)Write事件就可以了,發(fā)送成功后,Handle_write_handle會(huì)被調(diào)用,看一下:
void
TPTCPAsynchServerImpl::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
{
ACE_DEBUG ((LM_DEBUG,
"handle_write_stream called "));
// Reset pointers.
result.message_block ().rd_ptr (result.message_block ().rd_ptr () - result.bytes_transferred ());
ACE_DEBUG ((LM_DEBUG, "******************** "));
ACE_DEBUG ((LM_DEBUG, "%s = %d ", "bytes_to_write", result.bytes_to_write ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d ", "handle", result.handle ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d ", "bytes_transfered", result.bytes_transferred ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d ", "act", (u_long) result.act ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d ", "success", result.success ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d ", "completion_key", (u_long) result.completion_key ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d ", "error", result.error ()));
ACE_DEBUG ((LM_DEBUG, "******************** "));
#if 0
ACE_DEBUG ((LM_DEBUG, "%s = %s ", "message_block", result.message_block ().rd_ptr ()));
#endif
// Release message block.
result.message_block ().release ();
}
代碼中使用了result中發(fā)數(shù)據(jù),然后把消息塊釋放了,就這么簡(jiǎn)單。
////////////////////////////////////////////////////////////////////////////////////////////////////
這是簡(jiǎn)單的proactor用法,當(dāng)然,復(fù)雜也基本就這樣用。所謂不基本的不是Proactor的內(nèi)容,而是服務(wù)器編程本身的麻煩。比如說(shuō),多個(gè)連接的管理、重發(fā)機(jī)制、發(fā)送隊(duì)列等等,這都不是ACE的內(nèi)容。這些要大家自己思考了,并添加。
在這里,我要說(shuō)幾個(gè)重要的問(wèn)題:連接的管理。Acceptor是一個(gè)類,但是在每一個(gè)連接,Proactor都用了某種辦法創(chuàng)建了一個(gè)實(shí)例,所以,連接管理的群集類一定不能在Acceptor類中,不然得到的結(jié)果就是始終只有一條記錄。因?yàn)槊總€(gè)Acceptor都有一個(gè)實(shí)例,實(shí)例對(duì)應(yīng)一個(gè)連接,群集類也就每個(gè)實(shí)例一個(gè)了。要采取的方法是一個(gè)全局的容器對(duì)象就可以了。比如我這個(gè)類:
typedef ACE_Map_Manager <ACE_HANDLE, ConnectionBean *, ACE_Null_Mutex> ConnectionMap;
typedef ACE_Map_Iterator<ACE_HANDLE, ConnectionBean *, ACE_Null_Mutex> ConnectionIterator;
typedef ACE_Map_Entry <ACE_HANDLE, ConnectionBean *> ConnectionEntry;
class Globle
{
public:
Globle(void);
~Globle(void);
ITPServer* server_;
ConnectionMap _connections;
unsigned getIndex(void);
long getTimerId(void);
private:
unsigned int index_;
long timerId_;
};
typedef ACE_Singleton<Globle, ACE_Null_Mutex> GlobleSingleton;
我使用ACE的Singleton模板創(chuàng)建這個(gè)類,每一個(gè)Acceptor要使用ConnectionMap,都使用這里的_connections,方法如下 :
GlobleSingleton::instance()->connection.bind()......
這個(gè)問(wèn)題可是我花費(fèi)了2天時(shí)間找出來(lái)的,諸位同仁不可不戒啊,給點(diǎn)掌聲:)