本篇將介紹客戶(hù)端與游戲邏輯服務(wù)器連接建立以后,mangosd如何接收、解析和處理客戶(hù)端發(fā)過(guò)來(lái)的協(xié)議。本篇不再討論mangosd與客戶(hù)端的認(rèn)證及建立最終RC4流加密的過(guò)程,想了解這部分內(nèi)容請(qǐng)看該系列的
第一篇。
一、acceptor socket的監(jiān)聽(tīng)啟動(dòng)及注冊(cè)
mangosd的main ()函數(shù)調(diào)用單例對(duì)象sMaster的Run ()函數(shù),啟動(dòng)監(jiān)聽(tīng)socket的代碼如下:
1: int Master::Run()
2: {
3: ........
4:
5: ///- Launch the world listener socket
6: uint16 wsport = sWorld.getConfig (CONFIG_UINT32_PORT_WORLD);
7: std::string bind_ip = sConfig.GetStringDefault ("BindIP", "0.0.0.0");
8:
9: if (sWorldSocketMgr->StartNetwork (wsport, bind_ip) == -1)
10: {
11: sLog.outError ("Failed to start network");
12: Log::WaitBeforeContinueIfNeed();
13: World::StopNow(ERROR_EXIT_CODE);
14: // go down and shutdown the server
15: }
16:
17: sWorldSocketMgr->Wait ();
18:
19: ........
20: }
mangosd用WorldSocketMgr類(lèi)來(lái)管理socket。StartNetwork ()會(huì)調(diào)用StartReactiveIO ()來(lái)啟動(dòng)監(jiān)聽(tīng)socket,處理代碼如下:
1: int WorldSocketMgr::StartReactiveIO (ACE_UINT16 port, const char* address)
2: {
3: ........
4:
5: //(1)
6: m_NetThreadsCount = static_cast<size_t> (num_threads + 1);
7: m_NetThreads = new ReactorRunnable[m_NetThreadsCount];
8:
9: // -1 means use default
10: m_SockOutKBuff = sConfig.GetIntDefault ("Network.OutKBuff", -1);
11: m_SockOutUBuff = sConfig.GetIntDefault ("Network.OutUBuff", 65536);
12: if ( m_SockOutUBuff <= 0 )
13: {
14: sLog.outError ("Network.OutUBuff is wrong in your config file");
15: return -1;
16: }
17:
18: //(2)
19: WorldSocket::Acceptor *acc = new WorldSocket::Acceptor;
20: m_Acceptor = acc;
21:
22: ACE_INET_Addr listen_addr (port, address);
23: if (acc->open (listen_addr, m_NetThreads[0].GetReactor (), ACE_NONBLOCK) == -1)
24: {
25: sLog.outError ("Failed to open acceptor ,check if the port is free");
26: return -1;
27: }
28:
29: //(3)
30: for (size_t i = 0; i < m_NetThreadsCount; ++i)
31: m_NetThreads[i].Start ();
32:
33: return 0;
34: }
(1)ReactorRunnable類(lèi)繼承了ACE_Task_Base,ACE_Task_Base 是ACE 中的任務(wù)或主動(dòng)對(duì)象“處理結(jié)構(gòu)”的基類(lèi)。在A(yíng)CE 中使用了此類(lèi)來(lái)實(shí)現(xiàn)主動(dòng)對(duì)象模式。所有希望成為“主動(dòng)對(duì)象”的對(duì)象都必須從此類(lèi)派生。可以把ACE_TASK 看作是更高級(jí)的、更為面向?qū)ο蟮木€(xiàn)程類(lèi)[1]。ACE_Task_Base調(diào)用時(shí)繼承類(lèi)必須重寫(xiě)svc方法,并且在使用時(shí)保證調(diào)用了activate ()方法。
(2)指定監(jiān)聽(tīng)地址,端口并把Acceptor綁定到第一個(gè)線(xiàn)程的Reactor上。啟動(dòng)Acceptor開(kāi)始監(jiān)聽(tīng)網(wǎng)絡(luò)IO。
(3)啟動(dòng)所有線(xiàn)程,每個(gè)線(xiàn)程上有一個(gè)單獨(dú)的ACE_Reactor* m_Reactor;,這里的Reactor使用的是多線(xiàn)程的ACE_TP_Reactor。可以各自單獨(dú)完成事件的多路復(fù)用。
二、線(xiàn)程體函數(shù)
線(xiàn)程體函數(shù)ReactorRunnable::svc () 如下:
1: virtual int svc ()
2: {
3: //(1)
4: WorldDatabase.ThreadStart ();
5:
6: SocketSet::iterator i, t;
7: while (!m_Reactor->reactor_event_loop_done ())
8: {
9: // dont be too smart to move this outside the loop
10: // the run_reactor_event_loop will modify interval
11: ACE_Time_Value interval (0, 10000);
12:
13: //(2)
14: if (m_Reactor->run_reactor_event_loop (interval) == -1)
15: break;
16:
17: //(3)
18: AddNewSockets ();
19:
20: for (i = m_Sockets.begin (); i != m_Sockets.end ();)
21: {
22: //(4)
23: if ((*i)->Update () == -1)
24: {
25: t = i;
26: ++i;
27: (*t)->CloseSocket ();
28: (*t)->RemoveReference ();
29: --m_Connections;
30: m_Sockets.erase (t);
31: }
32: else
33: ++i;
34: }
35: }
36:
37: WorldDatabase.ThreadEnd ();
38: DEBUG_LOG ("Network Thread Exitting");
39:
40: return 0;
41: }
(1)會(huì)調(diào)用mysql_thread_init ()函數(shù),初始化與該線(xiàn)程相關(guān)的變量。
(2)run_reactor_event_loop ()函數(shù)為多路復(fù)用的等待函數(shù),當(dāng)注冊(cè)的事件發(fā)生、運(yùn)行超時(shí)或者出現(xiàn)錯(cuò)誤時(shí)返回。
(3)AddNewSockets ()函數(shù)會(huì)將緩存在m_NewSockets里的新到達(dá)的socket添加到SocketSet m_Sockets;里,同時(shí)檢查并處理m_Sockets;里已經(jīng)closed的socket。
(4)循環(huán)每一個(gè)WorldSocket,調(diào)用其Update ()方法,這里只處理每個(gè)socket的handle_output,即每個(gè)在此線(xiàn)程上的寫(xiě)事件,向客戶(hù)端發(fā)送數(shù)據(jù)。下一節(jié)詳細(xì)介紹:
三、WorldSocket::Update () 方法
Update方法用于處理每個(gè)socket的輸出:
1: int WorldSocket::Update (void)
2: {
3: if (closing_)
4: return -1;
5:
6: //(1)
7: if (m_OutActive || m_OutBuffer->length () == 0)
8: return 0;
9:
10: return handle_output (get_handle ());
11: }
(1)m_OutBuffer有數(shù)據(jù)時(shí)才會(huì)調(diào)用handle_output,handle_output ()用于處理輸出,如果輸出不能一次性做完,會(huì)調(diào)用schedule_wakeup_output ()再次激活write事件。當(dāng)輸出處理完畢后則調(diào)用cancel_wakeup_output ()取消激活write事件,使reactor恢復(fù)到正常的loop ()循環(huán)中。詳細(xì)過(guò)程如下:
1: int WorldSocket::handle_output (ACE_HANDLE)
2: {
3: //(1)
4: ACE_GUARD_RETURN (LockType, Guard, m_OutBufferLock, -1);
5:
6: if (closing_)
7: return -1;
8:
9: const size_t send_len = m_OutBuffer->length ();
10: if (send_len == 0)
11: return cancel_wakeup_output (Guard);
12:
13: #ifdef MSG_NOSIGNAL
14: ssize_t n = peer ().send (m_OutBuffer->rd_ptr (), send_len, MSG_NOSIGNAL);
15: #else
16: ssize_t n = peer ().send (m_OutBuffer->rd_ptr (), send_len);
17: #endif // MSG_NOSIGNAL
18:
19: if (n == 0)
20: return -1;
21: else if (n == -1)
22: {
23: if (errno == EWOULDBLOCK || errno == EAGAIN) //----------(2)
24: return schedule_wakeup_output (Guard);
25:
26: return -1;
27: }
28: else if (n < (ssize_t)send_len) //now n > 0 //----------(3)
29: {
30: m_OutBuffer->rd_ptr (static_cast<size_t> (n));
31:
32: // move the data to the base of the buffer
33: m_OutBuffer->crunch ();
34:
35: return schedule_wakeup_output (Guard);
36: }
37: else //now n == send_len //----------(4)
38: {
39: m_OutBuffer->reset ();
40:
41: if (!iFlushPacketQueue ())
42: return cancel_wakeup_output (Guard);
43: else
44: return schedule_wakeup_output (Guard);
45: }
46:
47: ACE_NOTREACHED (return 0);
48: }
(1)對(duì)m_OutBuffer加鎖。
(2)考慮信號(hào)打斷的情況等,暫時(shí)不能寫(xiě)。
(3)只發(fā)送了部分?jǐn)?shù)據(jù)則繼續(xù)wakeup該線(xiàn)程對(duì)應(yīng)的Reactor。
(4)檢查m_OutBuffer數(shù)據(jù)發(fā)送完畢同時(shí)等待buffer(PacketQueueT m_PacketQueue;)里已經(jīng)沒(méi)有數(shù)據(jù)時(shí),cancel wakeup讓Reactor恢復(fù)正常。
四、socket到對(duì)應(yīng)線(xiàn)程的指派
上一節(jié)內(nèi)容可以看到線(xiàn)程內(nèi)如何處理socket,及新到達(dá)的socket。但從第一節(jié)中可知只有第一個(gè)線(xiàn)程注冊(cè)為acceptor線(xiàn)程,那么新連接到達(dá)時(shí),是如何被指派到對(duì)應(yīng)的“接待”線(xiàn)程的呢?
可以先看一下ACE_Acceptor的處理時(shí)序圖:

圖3.1 連接到達(dá)處理時(shí)序 [2]
上圖可以看出,當(dāng)連接到達(dá)時(shí),acceptor會(huì)調(diào)用對(duì)應(yīng)的SVC_HANDLER的open ()函數(shù),在mangosd里就是acceptor對(duì)應(yīng)的int WorldSocket::open (void *a),如下:
1: int WorldSocket::open (void *a)
2: {
3: ........
4:
5: // Hook for the manager.
6: if (sWorldSocketMgr->OnSocketOpen (this) == -1)
7: return -1;
8:
9: ........
10: }
OnSocketOpen方法:
1: int WorldSocketMgr::OnSocketOpen (WorldSocket* sock)
2: {
3: ........
4:
5: // we skip the Acceptor Thread
6: size_t min = 1;
7:
8: MANGOS_ASSERT (m_NetThreadsCount >= 1);
9:
10: //(1)
11: for (size_t i = 1; i < m_NetThreadsCount; ++i)
12: if (m_NetThreads[i].Connections () < m_NetThreads[min].Connections ())
13: min = i;
14:
15: return m_NetThreads[min].AddSocket (sock);
16: }
(1)將WorldSocket均衡的分配給每個(gè)線(xiàn)程。AddSocket ()將socket添加到m_NewSockets中做緩存,待該線(xiàn)程自行調(diào)用AddNewSockets ()添加到處理隊(duì)列里。
總結(jié):
mangosd對(duì)socket的處理因?yàn)槭褂昧薃CE,邏輯處理代碼相對(duì)比較簡(jiǎn)單,寫(xiě)事件的異常處理主要涉及
(1)一次不能寫(xiě)完則不斷的wakeup Reactor。
(2)信號(hào)中斷等錯(cuò)誤的判斷。似乎這里并沒(méi)有考慮全面(見(jiàn)附錄)
(3)使用另一個(gè)buffer緩存,因?qū)懢彺鎚_OutBuffer滿(mǎn)而帶來(lái)的多出的數(shù)據(jù)。
References:
[1] http://blog.csdn.net/yecao_kinux/article/details/1546914
[2] http://postfiles12.naver.net/data41/2009/4/11/187/33_kbkim007.jpg?type=w3