• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>

            C小加

            厚德 博學(xué) 求真 至善 The bright moon and breeze
            posts - 145, comments - 195, trackbacks - 0, articles - 0
              C++博客 :: 首頁 :: 新隨筆 :: 聯(lián)系 :: 聚合  :: 管理

            轉(zhuǎn)自:http://blog.csdn.net/caisini_vc/article/details/4474910
            把這兩天做Proactor的一些經(jīng)驗和心得寫一下,可能會給一些人幫助。
                Proactor是異步模式的網(wǎng)絡(luò)處理器,ACE中叫做“前攝器”。
                先講幾個概念:
                前攝器(Proactor)-異步的事件多路分離器、處理器,是核心處理類。啟動后由3個線程組成(你不需要關(guān)心這三個線程,我只是讓你知道一下有這回事存在)。
                接受器(Acceptor)-用于服務(wù)端,監(jiān)聽在一個端口上,接受用戶的請求。
                連接器(Connector)-用于客戶端,去連接遠(yuǎn)程的監(jiān)聽。當(dāng)然,如果遠(yuǎn)程是ACE寫的,就是Acceptor。
                異步模式-即非阻塞模式。網(wǎng)絡(luò)的傳輸速度一般來講為10Mbps、100Mbps、1000Mbps。拿千兆網(wǎng)來說,實際的傳輸速度為1000Mbps/8大概為128KB左右。我們的CPU一般為P4 3.0GHZ,如果是32位的處理器,一秒鐘大概可以處理6G的字節(jié),那么,128KB的網(wǎng)絡(luò)速度是遠(yuǎn)遠(yuǎn)及不上處理器的速度的。網(wǎng)絡(luò)發(fā)送數(shù)據(jù)是一位一位發(fā)送出去的,如果CPU等在這里,發(fā)送完成函數(shù)才結(jié)束,那么,處理器浪費(fèi)了大量時間在網(wǎng)絡(luò)傳輸上。
                操作系統(tǒng)提供了異步的模式來傳輸網(wǎng)絡(luò)數(shù)據(jù),工作模式即:應(yīng)用程序把要發(fā)送的數(shù)據(jù)交給操作系統(tǒng),操作系統(tǒng)把數(shù)據(jù)放在系統(tǒng)緩沖區(qū)后就告訴應(yīng)用程序OK了,我?guī)湍惆l(fā),應(yīng)用程序該干嘛干嘛去。操作系統(tǒng)發(fā)送完成后,會給應(yīng)用系統(tǒng)一個回執(zhí),告訴應(yīng)用程序:剛才那個包發(fā)送完成了!
               舉個例子:你有幾封郵件和包裹要發(fā),最有效率的辦法是什么?你把郵件和包裹及交給總臺,總臺MM說,好了,你幫你發(fā),你忙去吧!然后你去工作了。過了一會,總臺MM打電話告訴你:“剛才我叫快遞公司的人來了,把你的包裹發(fā)出去了。郵局的人也來了,取走了郵件,放心好了”。同樣,如果你知道今天會有包裹來,比如你在淘寶上購物了,你能成天等在總臺?你應(yīng)該告訴總臺MM:“今天可能有我的一個快遞,你幫我收一下,晚上請你肯德基!”。MM:“看在肯得基的面子上,幫你收了”。某個時間,MM打電話來了:“帥哥,你的包裹到了,我?guī)湍愫炇樟耍靵砟冒伞?#8221;
               因為操作系統(tǒng)是很有效率的,所有,他在后臺收發(fā)是很快的。應(yīng)用程序也很簡單。Proactor就是這種異步模式的。Proactor就是總臺MM;ACE_Service_Handle就是總臺代為收發(fā)郵件的公司流程。

            我們看一個實例:


            //***********************************************************
            class TPTCPAsynchServerImpl : public ACE_Service_Handler
            {
            public:
             TPTCPAsynchServerImpl(
            void);
             
            ~TPTCPAsynchServerImpl(void);
             
            virtual void open (ACE_HANDLE handle, ACE_Message_Block &message_block); 
             
            virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
             
            virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
             
            virtual void  handle_time_out (const ACE_Time_Value &tv, const void *act=0);
            private:
             
            int initiate_read_stream (const ACE_Asynch_Read_Stream::Result &result);
             ACE_Asynch_Read_Stream rs_;
             ACE_Asynch_Write_Stream ws_;
            };


            這個例子從ACE_Service_Handler繼承過來,ACE_Service_Handle主要就是定義了一些回調(diào)函數(shù)。
            1、 virtual void open (ACE_HANDLE handle, ACE_Message_Block &message_block);
              當(dāng)有客戶端連接上來,連接建立成功后Proactor會調(diào)用這個方法。

            2、 virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
            當(dāng)用戶要讀的數(shù)據(jù)讀好了后,調(diào)用這個方法

            3、virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
            當(dāng)用戶要寫的數(shù)據(jù)在網(wǎng)卡上發(fā)送成功后,Proactor會回調(diào)這個方法

            4、 virtual void  handle_time_out (const ACE_Time_Value &tv, const void *act=0);
            當(dāng)用戶設(shè)定的時鐘到期了,這個方法會被調(diào)用。

            這跟和總臺MM的聯(lián)絡(luò)方法是不是一樣的?

            對還缺點東西,缺少怎么向總臺MM交待任務(wù)的方法。下面看看:

            首先,創(chuàng)建一個監(jiān)聽器。

             ACE_Asynch_Acceptor<TPTCPAsynchServerImpl> acceptor_;
            看到?jīng)],就是我們剛才寫的類,因為他繼承了回調(diào)接口,并實現(xiàn)了自已的代碼,模板中ACE_Asynch_Acceptor會在合適的時候回調(diào)這些方法。

            //創(chuàng)建一個地址對象
             ACE_INET_Addr addr(port, ip);
            acceptor_.open (addr, 8 * 1024, 1);
                Open后,就開始監(jiān)聽了。其它的,向Proactor注冊一些事件的事模板類中都替你做了,你不需要做很多事。
                那么,已經(jīng)開始監(jiān)聽了,我的程序從哪里開始呢?對于一個服務(wù)程序來講,程序是被用戶的連接驅(qū)動的,一個用戶程序想和通訊,必須先創(chuàng)建連接,就是Socket中的connect操作。這個操作Proactor會替我們做一些工作,當(dāng)連接創(chuàng)建完成后,上面講的Open方法會被調(diào)用,我們看看Open方法中都有些什么代碼:


            void TPTCPAsynchServerImpl::open (ACE_HANDLE handle, ACE_Message_Block &message_block)
            {
             ACE_DEBUG ((LM_DEBUG, 
            "%N:%l:TPTCPAsynchServerImpl::open() "));
             
            //構(gòu)造讀流
             if (rs_.open (*this, handle) == -1)
             {
              ACE_ERROR ((LM_ERROR, 
            "%N:%l: ""TPTCPAsynchServerImpl::open() Error"));
              
            return;
                }
             
            //構(gòu)造寫流
             if (ws_.open(*this, handle) == -1)
             {
              ACE_ERROR ((LM_ERROR, 
            "%N:%l: ""TPTCPAsynchServerImpl::open() Error"));
              
            return;
                }
             
            //獲取客戶端連接地址和端口
             ACE_INET_Addr addr; 
             ACE_SOCK_SEQPACK_Association ass
            =ACE_SOCK_SEQPACK_Association(handle); 
             size_t addr_size
            =1
             ass.get_local_addrs(
            &addr,addr_size);
             
            this->server_->onClientConnect((int)handle, addr.get_ip_address(), addr.get_port_number());

             

             
            //如果客戶連接時同時提交了數(shù)據(jù),需要偽造一個結(jié)果,然后呼叫讀事件
             if (message_block.length () != 0)
             {
             
            // ACE_DEBUG((LM_DEBUG, "message_block.length() != 0 "));
              
            // 復(fù)制消息塊
              ACE_Message_Block &duplicate =  *message_block.duplicate ();
              
            // 偽造讀結(jié)果,以便進(jìn)行讀完成回調(diào)
              ACE_Asynch_Read_Stream_Result_Impl *fake_result =
                    ACE_Proactor::instance ()
            ->create_asynch_read_stream_result (this->proxy (),
                                                                                 
            this->handle_,
                                                                                 duplicate,
                                                                                 
            1024,
                                                                                 
            0,
                                                                                 ACE_INVALID_HANDLE,
                                                                                 
            0,
                                                                                 
            0);
              size_t bytes_transferred 
            = message_block.length ();
              
            // Accept事件處理完成,wr_ptr指針會被向前移動,將其移動到開始位置
              duplicate.wr_ptr (duplicate.wr_ptr () - bytes_transferred);
              
            // 這個方法將調(diào)用回調(diào)函數(shù)
              fake_result->complete (message_block.length (), 10);
              
            // 銷毀偽造的讀結(jié)果
              delete fake_result;
             }

             
            // 否則,通知底層,準(zhǔn)備讀取用戶數(shù)據(jù)
             
            //創(chuàng)建一個消息塊。這個消息塊將用于從套接字中異步讀 
             ACE_Message_Block *mb = 0;
              ACE_NEW (mb, ACE_Message_Block (_bufSize));
             
            if (rs_.read (*mb, mb->size () - 1== -1)
             {
              delete mb;
              ACE_ERROR ((LM_ERROR, 
            "%N:%l:open init read failed!"));
              
            return;
             }
            }

             

            我們看到,首先創(chuàng)建了兩個流,就是前面類定義中定義的一個異步寫流,一個異步讀流。以后對網(wǎng)絡(luò)的讀和寫就通過這兩個流進(jìn)行。我還給出了一段讀客戶端地址和端口的代碼。然后是讀取客戶Connect可能附帶的數(shù)據(jù),那段代碼不用看懂,以后使用照抄就行。然后就是

             


            if (rs_.read (*mb, mb->size () - 1) == -1)
             {
              delete mb;
              ACE_ERROR ((LM_ERROR, "%N:%l:open init read failed!"));
              return;
             }

            這段代碼使用讀流讀一段數(shù)據(jù)。這段代碼就是向總臺MM交待:我要收包裹,收好了叫我!
            也就是說,這段代碼99%的可能是讀不出數(shù)據(jù)的,只是向Proactor注冊讀的事件,具體的等待、讀取操作由Proactor讀,讀到了,就回調(diào)Handle_Read_Stream方法。ACE_Message_Block是消息塊,數(shù)據(jù)就是存放在消息塊中的。
            下面看看Handle_Read_Stream方法的代碼:


            void TPTCPAsynchServerImpl::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
            {
             result.message_block ().rd_ptr ()[result.bytes_transferred ()] 
            = '/0';
              ACE_DEBUG ((LM_DEBUG, 
            "********************/n"));
              ACE_DEBUG ((LM_DEBUG, 
            "%s = %d/n""bytes_to_read", result.bytes_to_read ()));
              ACE_DEBUG ((LM_DEBUG, 
            "%s = %d/n""handle", result.handle ()));
              ACE_DEBUG ((LM_DEBUG, 
            "%s = %d/n""bytes_transfered", result.bytes_transferred ()));
              ACE_DEBUG ((LM_DEBUG, 
            "%s = %d/n""act", (u_long) result.act ()));
              ACE_DEBUG ((LM_DEBUG, 
            "%s = %d/n""success", result.success ()));
              ACE_DEBUG ((LM_DEBUG, 
            "%s = %d/n""completion_key", (u_long) result.completion_key ()));
              ACE_DEBUG ((LM_DEBUG, 
            "%s = %d/n""error", result.error ()));
              ACE_DEBUG ((LM_DEBUG, 
            "********************/n"));
             result.message_block().release();
             
            if (this->initiate_read_stream (result) == -1)
             {
              ACE_ERROR((LM_ERROR, 
            "%N:%l:read stream failed!connection closed, remove it:%d/n", result.handle()));
              closeConnection(result.handle());
             } 
            }
             

            這個函數(shù)被調(diào)用,就表明有數(shù)據(jù)已經(jīng)讀好了,包裹已經(jīng)在總臺了。Proactor比總臺MM還好,給你送上門了,數(shù)據(jù)就在Result里,上面演示了Result中的數(shù)據(jù)。然后把消息塊釋放了,然后調(diào)用initiate_read_stream繼續(xù)監(jiān)聽網(wǎng)絡(luò)上可能到來的數(shù)據(jù)。看看initiate_read_stream好了:


            int TPTCPAsynchServerImpl::initiate_read_stream (const ACE_Asynch_Read_Stream::Result &result)
            {
             ACE_DEBUG((LM_TRACE, 
            "%N:%l:TPTCPAsynchServerImpl::initiate_read_stream() "));
             
            //創(chuàng)建一個消息塊。這個消息塊將用于從套接字中異步讀 
             ACE_Message_Block *mb = new ACE_Message_Block(_bufSize);
             
            if (mb == NULL)
             {
              ACE_DEBUG((LM_ERROR, 
            "%N:%l:can't allock ACE_Message_Block.  ")); 
              
            return -1;
             }
             
             
            if (rs_.read (*mb, mb->size () - 1== -1)
             {
              delete mb;
              ACE_ERROR_RETURN ((LM_ERROR, 
            "%N:%l:rs->read() failed, clientID=%d ", result.handle()),  -1);
             }
             
            return 0;
            }

             

            代碼很簡單,就是創(chuàng)建一個新的消息塊,然后使用讀流注冊一個讀消息就可以了。

            到此為止,Proactor的讀流程很清楚了吧?

            下面再說一個寫流程。

            寫流程其實更簡單,在任意想向客戶端寫數(shù)據(jù)的地方,調(diào)用相應(yīng)代碼就行了,比如,我們提供了SendData方法來發(fā)送數(shù)據(jù),在任意想發(fā)送數(shù)據(jù)的地方調(diào)用SendData就行了,SendData的代碼如下:

            int TPTCPAsynchServerImpl::sendData(int clientID, const char *data, int dataLen, unsigned int &id)
            {
             ACE_DEBUG((LM_DEBUG, 
            "TPTCPAsynchServerImpl::sendData(void) "));
             ACE_Message_Block 
            *mb; 
             ACE_NEW_RETURN(mb, ACE_Message_Block(dataLen 
            + 1), -1);
             mb
            ->wr_ptr((char*)data);                  
             ACE_OS::memcpy(mb
            ->base(),(char*)data, dataLen);
             id 
            = GlobleSingleton::instance()->getIndex();
             mb
            ->msg_type((int)id);
             
            //向操作系統(tǒng)發(fā)送數(shù)據(jù)
             if (connection->ws->write (*mb , dataLen ) == -1)
             {
              ACE_ERROR_RETURN((LM_ERROR, 
            "%N:%l:sendData failed! clientID=%d ", clientID),-1);
             }
             
            return 0;
            }

             

            簡單說,就是創(chuàng)建了一個消息塊,把用戶數(shù)據(jù)拷貝進(jìn)來,然后調(diào)用寫流WS向Proactor發(fā)送一個Write事件就可以了,發(fā)送成功后,Handle_write_handle會被調(diào)用,看一下:


            void
            TPTCPAsynchServerImpl::handle_write_stream (
            const ACE_Asynch_Write_Stream::Result &result)
            {
              ACE_DEBUG ((LM_DEBUG,
                          
            "handle_write_stream called "));
              
            // Reset pointers.
              result.message_block ().rd_ptr (result.message_block ().rd_ptr () - result.bytes_transferred ());
              ACE_DEBUG ((LM_DEBUG, 
            "******************** "));
              ACE_DEBUG ((LM_DEBUG, 
            "%s = %d ""bytes_to_write", result.bytes_to_write ()));
              ACE_DEBUG ((LM_DEBUG, 
            "%s = %d ""handle", result.handle ()));
              ACE_DEBUG ((LM_DEBUG, 
            "%s = %d ""bytes_transfered", result.bytes_transferred ()));
              ACE_DEBUG ((LM_DEBUG, 
            "%s = %d ""act", (u_long) result.act ()));
              ACE_DEBUG ((LM_DEBUG, 
            "%s = %d ""success", result.success ()));
              ACE_DEBUG ((LM_DEBUG, 
            "%s = %d ""completion_key", (u_long) result.completion_key ()));
              ACE_DEBUG ((LM_DEBUG, 
            "%s = %d ""error", result.error ()));
              ACE_DEBUG ((LM_DEBUG, 
            "******************** "));
            #if 0
              ACE_DEBUG ((LM_DEBUG, 
            "%s = %s ""message_block", result.message_block ().rd_ptr ()));
            #endif
              
            // Release message block.
              result.message_block ().release ();
            }

             

            代碼中使用了result中發(fā)數(shù)據(jù),然后把消息塊釋放了,就這么簡單。


            ////////////////////////////////////////////////////////////////////////////////////////////////////

            這是簡單的proactor用法,當(dāng)然,復(fù)雜也基本就這樣用。所謂不基本的不是Proactor的內(nèi)容,而是服務(wù)器編程本身的麻煩。比如說,多個連接的管理、重發(fā)機(jī)制、發(fā)送隊列等等,這都不是ACE的內(nèi)容。這些要大家自己思考了,并添加。

            在這里,我要說幾個重要的問題:連接的管理。Acceptor是一個類,但是在每一個連接,Proactor都用了某種辦法創(chuàng)建了一個實例,所以,連接管理的群集類一定不能在Acceptor類中,不然得到的結(jié)果就是始終只有一條記錄。因為每個Acceptor都有一個實例,實例對應(yīng)一個連接,群集類也就每個實例一個了。要采取的方法是一個全局的容器對象就可以了。比如我這個類:


            typedef ACE_Map_Manager 
            <ACE_HANDLE, ConnectionBean *, ACE_Null_Mutex> ConnectionMap;
            typedef ACE_Map_Iterator
            <ACE_HANDLE, ConnectionBean *, ACE_Null_Mutex> ConnectionIterator;
            typedef ACE_Map_Entry   
            <ACE_HANDLE, ConnectionBean *> ConnectionEntry;
            class Globle
            {
            public:
             Globle(
            void);
             
            ~Globle(void);
             ITPServer
            * server_;
             ConnectionMap _connections;
             unsigned getIndex(
            void); 
             
            long getTimerId(void);
            private:
             unsigned 
            int index_;
             
            long timerId_;
            };
            typedef ACE_Singleton
            <Globle, ACE_Null_Mutex> GlobleSingleton;

             

            我使用ACE的Singleton模板創(chuàng)建這個類,每一個Acceptor要使用ConnectionMap,都使用這里的_connections,方法如下 :
              GlobleSingleton::instance()->connection.bind()......

            這個問題可是我花費(fèi)了2天時間找出來的,諸位同仁不可不戒啊,給點掌聲:)

            Feedback

            # re: ACE前攝器Proactor最好的講解(轉(zhuǎn)載)  回復(fù)  更多評論   

            2013-04-12 19:59 by coolypf
            128KB, 太離譜了

            # re: ACE前攝器Proactor最好的講解(轉(zhuǎn)載)  回復(fù)  更多評論   

            2013-04-13 11:08 by reborntercel
            實際的傳輸速度為1000Mbps/8大概為128KB左右,樓豬是不是搞錯了啊??????????????????????????

            # re: ACE前攝器Proactor最好的講解(轉(zhuǎn)載)  回復(fù)  更多評論   

            2014-12-30 15:31 by www
            人家說的對,是你們沒弄懂。1024/8=128KB
            久久精品国产WWW456C0M| 欧美激情一区二区久久久| 国产精品久久久久久五月尺| 国产成人精品久久一区二区三区av | 久久免费视频网站| 久久精品人人做人人爽97| 久久久久久久精品妇女99| 国内精品伊人久久久影院| 亚洲国产成人精品91久久久| 久久国产免费| 成人免费网站久久久| 日产精品久久久久久久| 亚洲精品蜜桃久久久久久| 色8激情欧美成人久久综合电| 亚洲精品国产美女久久久| 最新久久免费视频| 久久精品国产亚洲AV不卡| 欧美日韩成人精品久久久免费看| 国内高清久久久久久| 久久精品国产亚洲av麻豆图片 | 久久无码一区二区三区少妇 | 欧美精品国产综合久久| 久久婷婷五月综合色99啪ak| 理论片午午伦夜理片久久 | 99久久精品影院老鸭窝| 成人国内精品久久久久一区| 精品无码久久久久久尤物| 久久精品视频免费| 色综合久久天天综线观看| 少妇高潮惨叫久久久久久| 青青草原综合久久大伊人精品| 久久综合伊人77777麻豆| 日日噜噜夜夜狠狠久久丁香五月| 嫩草影院久久国产精品| 人妻无码精品久久亚瑟影视| 久久亚洲精品人成综合网| 国产女人aaa级久久久级| 精品国产乱码久久久久久呢 | 色综合久久久久网| 国内精品伊人久久久影院| 国产高潮国产高潮久久久|