• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>
            posts - 58,  comments - 75,  trackbacks - 0

              接受器/連接器模式設(shè)計(jì)用于降低連接建立與連接建立后所執(zhí)行的服務(wù)之間的耦合。例如,在WWW瀏覽器中,所執(zhí)行的服務(wù)或“實(shí)際工作”是解析和顯示客戶瀏覽器接收到的HTML頁(yè)面。連接建立是次要的,可能通過(guò)BSD socket或其他一些等價(jià)的IPC機(jī)制來(lái)完成。使用這些模式允許程序員專注于“實(shí)際工作”,而最少限度地去關(guān)心怎樣在服務(wù)器和客戶之間建立連接。而另外一方面,程序員也可以獨(dú)立于他所編寫(xiě)的、或?qū)⒁帉?xiě)的服務(wù)例程,去調(diào)諧連接建立的策略。

              因?yàn)樵撃J浇档土朔?wù)和連接建立方法之間的耦合,非常容易改動(dòng)其中一個(gè),而不影響另外一個(gè),從而也就可以復(fù)用以前編寫(xiě)的連接建立機(jī)制和服務(wù)例程的代碼。在同樣的例子中,使用這些模式的瀏覽器程序員一開(kāi)始可以構(gòu)造他的系統(tǒng)、使用特定的連接建立機(jī)制來(lái)運(yùn)行它和測(cè)試它;然后,如果先前的連接機(jī)制被證明不適合他所構(gòu)造的系統(tǒng),他可以決定說(shuō)他希望將底層連接機(jī)制改變?yōu)槎嗑€程的(或許使用線程池策略)。因?yàn)榇四J教峁┝藝?yán)格的去耦合,只需要極少的努力就可以實(shí)現(xiàn)這樣的變動(dòng)。

              在你能夠清楚地理解這一章中的許多例子,特別是更為高級(jí)的部分之前,你必須通讀有關(guān)反應(yīng)堆和IPC_SAP的章節(jié)(特別是接受器和連接器部分)。此外,你還可能需要參考線程和線程管理部分。

            ?

             

            ?

            7.1 接受器模式

            ??? 接受器通常被用在你本來(lái)會(huì)使用 BSD accept() 系統(tǒng)調(diào)用的地方。接受器模式也適用于同樣的環(huán)境,但就如我們將看到的,它提供了更多的功能。在 ACE 中,接收器模式借助名為 ACE_Acceptor 的“工廠”( Factory )實(shí)現(xiàn)。工廠(通常)是用于對(duì)助手對(duì)象的實(shí)例化過(guò)程進(jìn)行抽象的類。在面向?qū)ο笤O(shè)計(jì)中,復(fù)雜的類常常會(huì)將特定功能委托給助手類。復(fù)雜的類對(duì)助手的創(chuàng)建和委托必須很靈活。這種靈活性是在工廠的幫助下獲得的。工廠允許一個(gè)對(duì)象通過(guò)改變它所委托的對(duì)象來(lái)改變它的底層策略,而工廠提供給應(yīng)用的接口卻是一樣的,這樣,可能根本就無(wú)需對(duì)客戶代碼進(jìn)行改動(dòng)(有關(guān)工廠的更多信息,請(qǐng)閱讀“設(shè)計(jì)模式”中的參考文獻(xiàn))。

            ?

             

             

            ?

            7-1 工廠模式示意圖

            ?????? ACE_Acceptor 工廠允許應(yīng)用開(kāi)發(fā)者改變“助手”對(duì)象,以用于:

            • 被動(dòng)連接建立
            • 連接建立后的處理

            ?? 同樣地, ACE_Connector 工廠允許應(yīng)用開(kāi)發(fā)者改變“助手”對(duì)象,以用于:

            ?

             

            • 主動(dòng)連接建立
            • 連接建立后的處理

            ??? 下面的討論同時(shí)適用于接受器和連接器,所以作者將只討論接受器,而連接器同樣具有相應(yīng)的參數(shù)。

            ???????? ACE_Acceptor 被實(shí)現(xiàn)為模板容器,通過(guò)兩個(gè)類作為實(shí)參來(lái)進(jìn)行實(shí)例化。第一個(gè)參數(shù)實(shí)現(xiàn)特定的服務(wù)(類型為 ACE_Event_Handler 。因?yàn)樗挥糜谔幚?/font> I/O 事件,所以必須來(lái)自事件處理類層次),應(yīng)用在建立連接后執(zhí)行該服務(wù);第二個(gè)參數(shù)是“具體的”接受器(可以是在 IPC_SAP 一章中討論的各種變種)。

              特別要注意的是ACE_Acceptor工廠和底層所用的具體接受器是非常不同的。具體接受器可完全獨(dú)立于ACE_Acceptor工廠使用,而無(wú)需涉及我們?cè)谶@里討論的接受器模式(獨(dú)立使用接受器已在IPC_SAP一章中討論和演示)。ACE_Acceptor工廠內(nèi)在于接受器模式,并且不能在沒(méi)有底層具體接受器的情況下使用。ACE_Acceptor使用底層的具體接受器來(lái)建立連接。如我們已看到的,有若干ACE的類可被用作ACE_Acceptor工廠模板的第二個(gè)參數(shù)(也就是,具體接受器類)。但是服務(wù)處理類必須由應(yīng)用開(kāi)發(fā)者來(lái)實(shí)現(xiàn),而且其類型必須是ACE_Event_HandlerACE_Acceptor工廠可以這樣來(lái)實(shí)例化:

            typedef ACE_Acceptor<My_Service_Handler,ACE_SOCK_ACCEPTOR> MyAcceptor;

            ??? 這里,名為 My_Service_Handler 的事件處理器和具體接受器 ACE_SOCK_ACCEPTOR 被傳給 MyAcceptor ACE_SOCK_ACCEPTOR 是基于 BSD socket 流家族的 TCP 接受器(各種可傳給接受器工廠的不同類型的接受器,見(jiàn)表 7-1 IPC 一章)。請(qǐng)?jiān)俅巫⒁猓谑褂媒邮芷髂J綍r(shí),我們總是處理兩個(gè)接受器:名為 ACE_Acceptor 的工廠接受器,和 ACE 中的某種具體接受器,比如 ACE_SOCK_ACCEPTOR (你可以創(chuàng)建自定義的具體接受器來(lái)取代 ACE_SOCK_ACCEPTOR ,但你將很可能無(wú)需改變 ACE_Acceptor 工廠類中的任何東西)。

            重要提示

            ACE_SOCK_ACCEPTOR實(shí)際上是一個(gè)宏,其定義為:

            #define ACE_SOCK_ACCEPTOR ACE_SOCK_Acceptor, ACE_INET_Addr

            我們認(rèn)為這個(gè)宏的使用是必要的,因?yàn)樵陬愔械?/font>typedefs在某些平臺(tái)上無(wú)法工作。如果不是這樣的話,ACE_Acceptor就可以這樣來(lái)實(shí)例化:

            typedef ACE_Acceptor<My_Service_Handler,ACE_SOCK_Acceptor>MyAcceptor;

            在表7-1中對(duì)宏進(jìn)行了說(shuō)明。

            7.1.1 組件

            ??? 如上面的討論所說(shuō)明的,在接受器模式中有三個(gè)主要的參與類:

            ?

             

            ??? 具體接受器

            :它含有建立連接的特定策略,連接與底層的傳輸協(xié)議機(jī)制系在一起。下面是在ACE中的各種具體接受器的例子:ACE_SOCK_ACCEPTOR(使用TCP來(lái)建立連接)、ACE_LSOCK_ACCEPTOR(使用UNIXsocket來(lái)建立連接),等等。
            • 具體服務(wù)處理器
            :由應(yīng)用開(kāi)發(fā)者編寫(xiě),它的
            open()方法在連接建立后被自動(dòng)回調(diào)。接受器構(gòu)架假定服務(wù)處理類的類型是ACE_Event_Handler,這是ACE定義的接口類(該類已在反應(yīng)堆一章中詳細(xì)討論過(guò))。另一個(gè)特別為接受器和連接器模式的服務(wù)處理而創(chuàng)建的類是ACE_Svc_Handler。該類不僅基于ACE_Event_Handler接口(這是使用反應(yīng)堆所必需的),同時(shí)還基于在ASX流構(gòu)架中使用的ACE_Task類。ACE_Task類提供的功能有:創(chuàng)建分離的線程、使用消息隊(duì)列來(lái)存儲(chǔ)到來(lái)的數(shù)據(jù)消息、并發(fā)地處理它們,以及其他一些有用的功能。如果與接受器模式一起使用的具體服務(wù)處理器派生自ACE_Svc_Handler、而不是ACE_Event_Handler,它就可以獲得這些額外的功能。對(duì)ACE_Svc_Handler中的額外功能的使用,在這一章的高級(jí)課程里詳細(xì)討論。在下面的討論中,我們將使用ACE_Svc_Handler作為我們的事件處理器。在簡(jiǎn)單的ACE_Event_HandlerACE_Svc_Handler類之間的重要區(qū)別是,后者擁有一個(gè)底層通信流組件。這個(gè)流在ACE_Svc_Handler模板被實(shí)例化的時(shí)候設(shè)置。而在使用ACE_Event_Handler的情況下,我們必須自己增加I/O通信端點(diǎn)(也就是,流對(duì)象),作為事件處理器的私有數(shù)據(jù)成員。因而,在這樣的情況下,應(yīng)用開(kāi)發(fā)者應(yīng)該將他的服務(wù)處理器創(chuàng)建為ACE_Svc_Handler類的子類,并首先實(shí)現(xiàn)將被構(gòu)架自動(dòng)回調(diào)的open()方法。此外,因?yàn)?/font>ACE_Svc_Handler是一個(gè)模板,通信流組件和鎖定機(jī)制是作為模板參數(shù)被傳入的。
          1. 反應(yīng)堆
          2. :與
            ACE_Acceptor協(xié)同使用。如我們將看到的,在實(shí)例化接受器后,我們啟動(dòng)反應(yīng)堆的事件處理循環(huán)。反應(yīng)堆,如先前所解釋的,是一個(gè)事件分派類;而在此情況下,它被接受器用于將連接建立事件分派到適當(dāng)?shù)姆?wù)處理例程。

            7.1.2 用法

            ??? 通過(guò)觀察一個(gè)簡(jiǎn)單的例子,可以進(jìn)一步了解接受器。這個(gè)例子是一個(gè)簡(jiǎn)單的應(yīng)用,它使用接受器接受連接,隨后回調(diào)服務(wù)例程。當(dāng)服務(wù)例程被回調(diào)時(shí),它就讓用戶知道連接已經(jīng)被成功地建立。

            7-1

            #include ”ace/Reactor.h”

            #include ”ace/Svc_Handler.h”

            #include ”ace/Acceptor.h”

            #include ”ace/Synch.h”

            #include ”ace/SOCK_Acceptor.h”

             

            //Create a Service Handler whose open() method will be called back

            //automatically. This

             class MUST derive from ACE_Svc_Handler which is an

            //interface and as can be seen is a

             template container class itself. The

            //first parameter to ACE_Svc_Handler is the

             underlying stream that it

            //may use for communication. Since we are using TCP sockets the

             stream

            //is ACE_SOCK_STREAM. The second is the internal synchronization

            //mechanism it

             could use. Since we have a single threaded application we

            //pass it a “null” lock which

             will do nothing.

            class My_Svc_Handler:

            public ACE_Svc_Handler <ACE_SOCK_STREAM,ACE_NULL_SYNCH>

            {

            //the open method which will be called back automatically after the

            //connection has been

             established.

            public:

            int open(void*)

            {

            cout<<”Connection established”<<endl;

            }

            };

             

            // Create the acceptor as described above.

            typedef ACE_Acceptor<My_Svc_Handler,ACE_SOCK_ACCEPTOR> MyAcceptor;

             

            int main(int argc, char* argv[])

            {

            //create the address on which we wish to connect. The constructor takes

            //the port

             number on which to listen and will automatically take the

            //host’s IP address as the IP

             Address for the addr object 

            ?

            ACE_INET_Addr addr(PORT_NUM);

             

            //instantiate the appropriate acceptor object with the address on which

            //we wish to

             accept and the Reactor instance we want to use. In this

            //case we just use the global

             ACE_Reactor singleton. (Read more about

            //the reactor in the previous chapter)

            MyAcceptor acceptor(addr, ACE_Reactor::instance());

             

            while(1)

            // Start the reactor’s event loop

            ACE_Reactor::instance()->handle_events();

            }

            ??? 在上面的例子中,我們首先創(chuàng)建我們希望在其上接受連接的端點(diǎn)地址。因?yàn)槲覀儧Q定使用TCP/IP作為底層的連接協(xié)議,我們創(chuàng)建一個(gè)ACE_INET_Addr來(lái)作為我們的端點(diǎn),并將我們所要偵聽(tīng)的端口號(hào)傳給它。我們將這個(gè)地址和反應(yīng)堆單體的實(shí)例傳給我們要在此之后進(jìn)行實(shí)例化的接受器。這個(gè)接受器在被實(shí)例化后,將自動(dòng)接受任何在PORT_NUM端口上的連接請(qǐng)求,并且在為這樣的請(qǐng)求建立連接之后回調(diào)My_Svc_Handleropen()方法。注意當(dāng)我們實(shí)例化ACE_Acceptor工廠時(shí),我們傳給它的是我們想要使用的具體接受器(也就是ACE_SOCK_ACCEPTOR)和具體服務(wù)處理器(也就是My_Svc_Handler)。

              現(xiàn)在讓我們嘗試一下更為有趣的事情。在下一個(gè)例子中,我們將在連接請(qǐng)求到達(dá)、服務(wù)處理器被回調(diào)之后,將服務(wù)處理器登記回反應(yīng)堆。現(xiàn)在,如果新創(chuàng)建的連接上有任何數(shù)據(jù)到達(dá),我們的服務(wù)處理例程

            handle_input()方法都會(huì)被自動(dòng)回調(diào)。因而在此例中,我們?cè)谕瑫r(shí)使用反應(yīng)堆和接受器的特性:

            7-2

            #include ”ace/Reactor.h”

            #include ”ace/Svc_Handler.h”

            #include ”ace/Acceptor.h”

            #include ”ace/Synch.h”

            #include ”ace/SOCK_Acceptor.h”

            #define PORT_NUM 10101

            #define DATA_SIZE 12

            //forward declaration

            class My_Svc_Handler;

            //Create the Acceptor class

            typedef ACE_Acceptor<My_Svc_Handler,ACE_SOCK_ACCEPTOR>

            MyAcceptor;

            //Create a service handler similar to as seen in example 1. Except this

            //time include the handle_input() method which will be called back

            //automatically by the reactor when new data arrives on the newly

            //established connection

            class My_Svc_Handler:

            public ACE_Svc_Handler <ACE_SOCK_STREAM,ACE_NULL_SYNCH>

            {

            public:

            My_Svc_Handler()

            {

            data= new char[DATA_SIZE];

            }

            int open(void*)

            {

            cout<<”Connection established”<<endl;

             

            //Register the service handler with the reactor

            ACE_Reactor::instance()->register_handler(this,

            ACE_Event_Handler::READ_MASK);

             

            return 0;

            }

            int handle_input(ACE_HANDLE)

            {

            //After using the peer() method of ACE_Svc_Handler to obtain a

            //reference to the underlying stream of the service handler class

            //we call recv_n() on it to read the data which has been received.

            //This data is stored in the data array and then printed out

            peer().recv_n(data,DATA_SIZE);

            ACE_OS::printf(”<< %s\n”,data);

             

            //keep yourself registered with the reactor

            return 0;

            }

            private:

            char* data;

            };

            int main(int argc, char* argv[])

            {

            ACE_INET_Addr addr(PORT_NUM);

             

            //create the acceptor

            MyAcceptor acceptor(addr, //address to accept on

            ACE_Reactor::instance()); //the reactor to use

             

            while(1)

            //Start the reactor’s event loop

            ACE_Reactor::instance()->handle_events();

            }

            ??? 這個(gè)例子和前一例子的唯一區(qū)別是我們?cè)诜?wù)處理器的open()方法中將服務(wù)處理器登記到反應(yīng)堆上。因此,我們必須編寫(xiě)handle_input()方法;當(dāng)數(shù)據(jù)在連接上到達(dá)時(shí),這個(gè)方法會(huì)被反應(yīng)堆回調(diào)。在此例中我們只是將我們接收到的數(shù)據(jù)打印到屏幕上。ACE_Svc_Handler類的peer()方法返回對(duì)底層的對(duì)端流的引用。我們使用底層流包裝類的recv_n()方法來(lái)獲取連接上接收到的數(shù)據(jù)。

              該模式真正的威力在于,底層的連接建立機(jī)制完全封裝在具體接受器中,從而可以很容易地改變。在下一個(gè)例子里,我們改變底層的連接建立機(jī)制,讓它使用

            UNIXsocket、而不是TCP socket。這個(gè)例子(下劃線標(biāo)明少量變動(dòng))如下所示:

            7-3

            class My_Svc_Handler:

            public ACE_Svc_Handler <ACE_LSOCK_STREAM,ACE_NULL_SYNCH>{

            public:

            int open(void*)

            {

            cout<<”Connection established”<<endl;

            ACE_Reactor::instance()

            ->register_handler(this,ACE_Event_Handler::READ_MASK);

            }

            int handle_input(ACE_HANDLE)

            {

            char* data= new char[DATA_SIZE];

            peer().recv_n(data,DATA_SIZE);

            ACE_OS::printf(”<< %s\n”,data);

            return 0;

            }

            };

            typedef ACE_Acceptor<My_Svc_Handler,ACE_LSOCK_ACCEPTOR> MyAcceptor;

             

            int main(int argc, char* argv[])

            {

            ACE_UNIX_Addr addr(”/tmp/addr.ace”);

            MyAcceptor acceptor(address, ACE_Reactor::instance());

             

            while(1) /* Start the reactor’s event loop */

            ACE_Reactor::instance()->handle_events();

            }

            ?

            7-2和例7-3不同的地方標(biāo)注了下劃線。正如我們所說(shuō)過(guò)的,兩個(gè)程序間的不同非常之少,但是它們使用的連接建立范式卻極不相同。ACE中可用的連接建立機(jī)制在表7-1中列出:

            接受器類型

            所用地址

            所用流

            具體接受器

            TCP

            流接受器

            ACE_INET_Addr

            ACE_SOCK_STREAM

            ACE_SOCK_ACCEPTOR

            UNIX

            域本地流socket接受器

            ACE_UNIX_Addr

            ACE_LSOCK_STREAM

            ACE_LSOCK_ACCEPTOR

            管道作為底層通信機(jī)制

            ACE_SPIPE_Addr

            ACE_SPIPE_STREAM

            ACE_SPIPE_ACCEPTOR

             

            ?

            7-1 ACE中的連接建立機(jī)制

            ?

             

            ?

            7.2

            連接器

            ?

              連接器與接受器非常類似。它也是一個(gè)工廠,但卻是用于主動(dòng)地連接遠(yuǎn)程主機(jī)。在連接建立后,它將自動(dòng)回調(diào)適當(dāng)?shù)姆?wù)處理對(duì)象的

            open()方法。連接器通常被用在你本來(lái)會(huì)使用BSD connect()調(diào)用的地方。在ACE中,連接器,就如同接受器,被實(shí)現(xiàn)為名為ACE_Connector的模板容器類。如先前所提到的,它需要兩個(gè)參數(shù),第一個(gè)是事件處理器類,它在連接建立時(shí)被調(diào)用;第二個(gè)是“具體的”連接器類。

              你必須注意,底層的具體連接器和

            ACE_Connector工廠是非常不一樣的。ACE_Connector工廠使用底層的具體連接器來(lái)建立連接。隨后ACE_Connector工廠使用適當(dāng)?shù)氖录蚍?wù)處理例程(通過(guò)模板參數(shù)傳入)來(lái)在具體的連接器建立起連接之后處理新連接。如我們?cè)?/font>IPC一章中看到的,沒(méi)有ACE_Connector工廠,也可以使用這個(gè)具體的連接器。但是,沒(méi)有具體的連接器類,就會(huì)無(wú)法使用ACE_Connector工廠(因?yàn)橐删唧w的連接器類來(lái)實(shí)際處理連接建立)。

              下面是對(duì)

            ACE_Connector類進(jìn)行實(shí)例化的一個(gè)例子:

            ?

             

            ?

            typedef ACE_Connector<My_Svc_Handler,ACE_SOCK_CONNECTOR> MyConnector;

            ?

             

            ?

            這個(gè)例子中的第二個(gè)參數(shù)是具體連接器類

            ACE_SOCK_CONNECTOR。連接器和接受器模式一樣,在內(nèi)部使用反應(yīng)堆來(lái)在連接建立后回調(diào)服務(wù)處理器的open()方法。我們可以復(fù)用我們?yōu)榍懊娴慕邮芷骼铀鶎?xiě)的服務(wù)處理例程。

              一個(gè)使用連接器的例子可以進(jìn)一步說(shuō)明這一點(diǎn):

            ?

             

            ?

            7-4

            ?

            typedef ACE_Connector<My_Svc_Handler,ACE_SOCK_CONNECTOR> MyConnector;

             

            int main(int argc, char * argv[])

            {

            ACE_INET_Addr addr(PORT_NO,HOSTNAME);

            My_Svc_Handler * handler= new My_Svc_Handler;

             

            //Create the connector

            MyConnector connector;

             

            //Connects to remote machine

            if(connector.connect(handler,addr)==-1)

            ACE_ERROR(LM_ERROR,”%P|%t, %p”,”Connection failed”);

             

            //Registers with the Reactor

            while(1)

            ACE_Reactor::instance()->handle_events();

            }

            ?

             

              在上面的例子中,

            HOSTNAMEPORT_NO是我們希望主動(dòng)連接到的機(jī)器和端口。在實(shí)例化連接器之后,我們調(diào)用它的連接方法,將服務(wù)例程(會(huì)在連接完全建立后被回調(diào)),以及我們希望連接到的地址傳遞給它。

            ?

             

            ?

            7.2.1 同時(shí)使用接受器和連接器

            ?

              一般而言,接受器和連接器模式會(huì)在一起使用。在客戶

            /服務(wù)器應(yīng)用中,服務(wù)器通常含有接受器,而客戶含有連接器。但是,在特定的應(yīng)用中,可能需要同時(shí)使用接受器和連接器。下面給出這樣的應(yīng)用的一個(gè)例子:一條消息被反復(fù)發(fā)送給對(duì)端機(jī)器,而與此同時(shí)也從遠(yuǎn)端接受另一消息。因?yàn)閮煞N功能必須同時(shí)執(zhí)行,簡(jiǎn)單的解決方案就是分別在不同的線程里發(fā)送和接收消息。

              這個(gè)例子同時(shí)包含接受器和連接器。用戶可以在命令行上給出參數(shù),告訴應(yīng)用它想要扮演服務(wù)器還是客戶角色。隨后應(yīng)用就相應(yīng)地調(diào)用

            main_accept()main_connect()

            ?

             

            ?

            7-5

            ?

            #include ”ace/Reactor.h”

            #include ”ace/Svc_Handler.h”

            #include ”ace/Acceptor.h”

            #include ”ace/Synch.h”

            #include ”ace/SOCK_Acceptor.h”

            #include ”ace/Thread.h”

             

            //Add our own Reactor singleton

            typedef ACE_Singleton<ACE_Reactor,ACE_Null_Mutex> Reactor;

             

            //Create an Acceptor

            typedef ACE_Acceptor<MyServiceHandler,ACE_SOCK_ACCEPTOR> Acceptor;

             

            //Create a Connector

            typedef ACE_Connector<MyServiceHandler,ACE_SOCK_CONNECTOR> Connector;

             

            class MyServiceHandler:

            public ACE_Svc_Handler<ACE_SOCK_STREAM,ACE_NULL_SYNCH>

            {

            public:

            //Used by the two threads “globally” to determine their peer stream

            static ACE_SOCK_Stream* Peer;

             

            //Thread ID used to identify the threads

            ACE_thread_t t_id;

             

            int open(void*)

            {

            cout<<”Acceptor: received new connection”<<endl;

             

            //Register with the reactor to remember this handle

            Reactor::instance()

            ->register_handler(this,ACE_Event_Handler::READ_MASK);

             

            //Determine the peer stream and record it globally

            MyServiceHandler::Peer=&peer();

             

            //Spawn new thread to send string every second

            ACE_Thread::spawn((ACE_THR_FUNC)send_data,0,THR_NEW_LWP,&t_id);

             

            //keep the service handler registered by returning 0 to the

            //reactor

            return 0;

            }

             

            static void* send_data(void*)

            {

            while(1)

            {

            cout<<”>>Hello World”<<endl;

            Peer->send_n(”Hello World”,sizeof(”Hello World”));

             

            //Go to sleep for a second before sending again

            ACE_OS::sleep(1);

            }

             

            return 0;

            }

             

            int handle_input(ACE_HANDLE)

            {

            char* data= new char[12];

             

            //Check if peer aborted the connection

            if(Peer.recv_n(data,12)==0)

            {

            cout<<”P(pán)eer probably aborted connection”);

            ACE_Thread::cancel(t_id); //kill sending thread ..

            return -1; //de-register from the Reactor.

            }

             

            //Show what you got..

            cout<<”<< %s\n”,data”<<endl;

             

            //keep yourself registered

            return 0;

            }

            };

             

            //Global stream identifier used by both threads

            ACE_SOCK_Stream * MyServiceHandler::Peer=0;

             

            void main_accept()

            {

            ACE_INET_Addr addr(PORT_NO);

            Acceptor myacceptor(addr,Reactor::instance());

            while(1)

            Reactor::instance()->handle_events();

             

            return 0;

            }

             

            void main_connect()

            {

            ACE_INET_Addr addr(PORT_NO,HOSTNAME);

            Connector myconnector;

            myconnector.connect(my_svc_handler,addr);

            while(1)

            Reactor::instance()->handle_events();

            }

             

            int main(int argc, char* argv[])

            {

            // Use ACE_Get_Opt to parse and obtain arguments and then call the

            // appropriate function for accept or connect.

            ...

            }

            ?

            這個(gè)簡(jiǎn)單的例子演示怎樣聯(lián)合使用接受器和連接模式來(lái)生成服務(wù)處理例程,該例程與底層的網(wǎng)絡(luò)連接建立方法是完全分離的。通過(guò)改變相應(yīng)的設(shè)定具體連接器和接受器的模板參數(shù),可以很容易地改用任何其他的底層網(wǎng)絡(luò)連接建立協(xié)議。

            ?

             

            ?

            7.3

            高級(jí)課程

            ?

              下面的部分更為詳細(xì)地解釋接受器和連接器模式實(shí)際上是如何工作的。如果你想要調(diào)諧服務(wù)處理和連接建立策略(其中包括調(diào)諧底層具體連接器將要使用的服務(wù)處理例程的創(chuàng)建和并發(fā)策略,以及連接建立策略),對(duì)該模式的進(jìn)一步了解就是必要的。此外,還有一部分內(nèi)容解釋怎樣使用通過(guò)

            ACE_Svc_Handler類自動(dòng)獲得的高級(jí)特性。最后,我們說(shuō)明怎樣與接受器和連接器模式一起使用簡(jiǎn)單的輕量級(jí)ACE_Event_Handler

            ?

             

            ?

            7.3.1 ACE_SVC_HANDLER

            ?

              如上面所提到的,

            ACE_Svc_Handler類基于ACE_Task(它是ASX流構(gòu)架的一部分)和ACE_Event_Handler接口類。因而ACE_Svc_Handler既是任務(wù),又是事件處理器。這里我們將簡(jiǎn)要介紹ACE_TaskACE_Svc_Handler的功能。

            ?

             

            ?

            7.3.1.1 ACE_Task

            ?

              ACE_Task

            被設(shè)計(jì)為與ASX流構(gòu)架一起使用;ASX基于UNIX系統(tǒng)V中的流機(jī)制。在設(shè)計(jì)上ASXLarry Peterson構(gòu)建的X-kernel協(xié)議工具非常類似。

            ?

              ASX

            的基本概念是:到來(lái)的消息會(huì)被分配給由若干模塊module)組成的流。每個(gè)模塊在到來(lái)的消息上執(zhí)行某種固定操作,然后把它傳遞給下一個(gè)模塊作進(jìn)一步處理,直到它到達(dá)流的末端為止。模塊中的實(shí)際處理由任務(wù)來(lái)完成。每個(gè)模塊通常有兩個(gè)任務(wù),一個(gè)用于處理到來(lái)的消息,一個(gè)用于處理外出的消息。在構(gòu)造協(xié)議棧時(shí),這種結(jié)構(gòu)是非常有用的。因?yàn)槊總€(gè)模塊都有固定的簡(jiǎn)單接口,所創(chuàng)建的模塊可以很容易地在不同的應(yīng)用間復(fù)用。例如,設(shè)想一個(gè)應(yīng)用,它處理來(lái)自數(shù)據(jù)鏈路層的消息。程序員會(huì)構(gòu)造若干模塊,每個(gè)模塊分別處理不同層次的協(xié)議。因而,他會(huì)構(gòu)造一個(gè)單獨(dú)的模塊,進(jìn)行網(wǎng)絡(luò)層處理;另一個(gè)進(jìn)行傳輸層處理;還有一個(gè)進(jìn)行表示層處理。在構(gòu)造這些模塊之后,它們可以(在ASX的幫助下)被“”成一個(gè)流來(lái)使用。如果后來(lái)創(chuàng)建了一個(gè)新的(也許是更好的)傳輸模塊,就可以在不對(duì)程序產(chǎn)生任何影響的情況下、在流中替換先前的傳輸模塊。注意模塊就像是容納任務(wù)的容器。這些任務(wù)是實(shí)際的處理元件。一個(gè)模塊可能需要兩個(gè)任務(wù),如同在上面的例子中;也可能只需要一個(gè)任務(wù)。如你可能會(huì)猜到的,ACE_Task是模塊中被稱為任務(wù)的處理元件的實(shí)現(xiàn)。

            ?

             

            ?

            7.3.1.2

            任務(wù)通信的體系結(jié)構(gòu)

            ?

              每個(gè)

            ACE_Task都有一個(gè)內(nèi)部的消息隊(duì)列,用以與其他任務(wù)、模塊或是外部世界通信。如果一個(gè)ACE_Task想要發(fā)送一條消息給另一個(gè)任務(wù),它就將此消息放入目的任務(wù)的消息隊(duì)列中。一旦目的任務(wù)收到此消息,它就會(huì)立即對(duì)它進(jìn)行處理。

              所有

            ACE_Task都可以作為0個(gè)或多個(gè)線程來(lái)運(yùn)行。消息可以由多個(gè)線程放入ACE_Task的消息隊(duì)列,或是從中取出,程序員無(wú)需擔(dān)心破壞任何數(shù)據(jù)結(jié)構(gòu)。因而任務(wù)可被用作由多個(gè)協(xié)作線程組成的系統(tǒng)的基礎(chǔ)構(gòu)建組件。各個(gè)線程控制都可封裝在ACE_Task中,與其他任務(wù)通過(guò)發(fā)送消息到它們的消息隊(duì)列來(lái)進(jìn)行交互。

              這種體系結(jié)構(gòu)的唯一問(wèn)題是,任務(wù)只能通過(guò)消息隊(duì)列與在同一進(jìn)程內(nèi)的其他任務(wù)相互通信。

            ACE_Svc_Handler解決了這一問(wèn)題,它同時(shí)繼承自ACE_TaskACE_Event_Handler,并且增加了一個(gè)私有數(shù)據(jù)流。這種結(jié)合使得ACE_Svc_Handler對(duì)象能夠用作這樣的任務(wù):它能夠處理事件、并與遠(yuǎn)地主機(jī)的任務(wù)間發(fā)送和接收數(shù)據(jù)。

            ?

              ACE_Task

            被實(shí)現(xiàn)為模板容器,它通過(guò)鎖定機(jī)制來(lái)進(jìn)行實(shí)例化。該鎖用于保證內(nèi)部的消息隊(duì)列在多線程環(huán)境中的完整性。如先前所提到的,ACE_Svc_Handler模板容器不僅需要鎖定機(jī)制,還需要用于與遠(yuǎn)地任務(wù)通信的底層數(shù)據(jù)流來(lái)作為參數(shù)。

            7.3.1.3 創(chuàng)建ACE_Svc_Handler

            ?

              ACE_Svc_Handler

            模板通過(guò)鎖定機(jī)制和底層流來(lái)實(shí)例化,以創(chuàng)建所需的服務(wù)處理器。如果應(yīng)用只是單線程的,就不需要使用鎖,可以用ACE_NULL_SYNCH來(lái)將其實(shí)例化。但是,如果我們想要在多線程應(yīng)用中使用這個(gè)模板,可以這樣來(lái)進(jìn)行實(shí)例化:

            ?

             

            ?

            class MySvcHandler:

            public ACE_Svc_Handler<ACE_SOCK_STREAM,ACE_MT_SYNCH>

            ?

            {

            ?

            ...

            ?

            }

            ?

             

            ?

            7.3.1.4

            在服務(wù)處理器中創(chuàng)建多個(gè)線程

            ?

              在上面的例

            7-5中,我們使用ACE_Thread包裝類和它的靜態(tài)方法spawn(),創(chuàng)建了單獨(dú)的線程來(lái)發(fā)送數(shù)據(jù)給遠(yuǎn)地對(duì)端。但是,在我們完成此工作時(shí),我們必須定義使用C++ static修飾符的文件范圍內(nèi)的靜態(tài)send_data()方法。結(jié)果當(dāng)然就是,我們無(wú)法訪問(wèn)我們實(shí)例化的實(shí)際對(duì)象的任何數(shù)據(jù)成員。換句話說(shuō),我們被迫使send_data()成員函數(shù)成為class-wide的函數(shù),而這并不是我們所想要的。這樣做的唯一原因是,ACE_Thread::spawn()只能使用靜態(tài)成員函數(shù)來(lái)作為它所創(chuàng)建的線程的入口。另一個(gè)有害的副作用是到對(duì)端流的引用也必須成為靜態(tài)的。簡(jiǎn)而言之,這不是編寫(xiě)這些代碼的最好方式。

            ?

              ACE_Task

            提供了更好的機(jī)制來(lái)避免發(fā)生這樣的問(wèn)題。每個(gè)ACE_Task都有activate()方法,可用于為ACE_Task創(chuàng)建線程。所創(chuàng)建的線程的入口是非靜態(tài)成員函數(shù)svc()。因?yàn)?/font>svc()是非靜態(tài)函數(shù),它可以調(diào)用任何對(duì)象實(shí)例專有的數(shù)據(jù)或成員函數(shù)。ACE對(duì)程序員隱藏了該機(jī)制的所有實(shí)現(xiàn)細(xì)節(jié)。activate()方法有著非常多的用途,它允許程序員創(chuàng)建多個(gè)線程,所有這些線程都使用svc()方法作為它們的入口。還可以設(shè)置線程優(yōu)先級(jí)、句柄、名字,等等。activate()方法的原型是:

            ?

             

            // = Active object activation method.

            virtual int activate (long flags = THR_NEW_LWP,

            int n_threads = 1,

            int force_active = 0,

            long priority = ACE_DEFAULT_THREAD_PRIORITY,

            int grp_id = -1,

            ACE_Task_Base *task = 0,

            ACE_hthread_t thread_handles[] = 0,

            void *stack[] = 0,

            size_t stack_size[] = 0,

            ACE_thread_t thread_names[] = 0);

            ?

              第一個(gè)參數(shù)

            flags描述將要?jiǎng)?chuàng)建的線程所希望具有的屬性。在線程一章里有詳細(xì)描述。可用的標(biāo)志有:

            ?

             

            THR_CANCEL_DISABLE, THR_CANCEL_ENABLE, THR_CANCEL_DEFERRED,

            THR_CANCEL_ASYNCHRONOUS, THR_BOUND, THR_NEW_LWP, THR_DETACHED,

            THR_SUSPENDED, THR_DAEMON, THR_JOINABLE, THR_SCHED_FIFO,

            THR_SCHED_RR, THR_SCHED_DEFAULT

             

            ?

              第二個(gè)參數(shù)

            n_threads指定要?jiǎng)?chuàng)建的線程的數(shù)目。第三個(gè)參數(shù)force_active用于指定是否應(yīng)該創(chuàng)建新線程,即使activate()方法已在先前被調(diào)用過(guò)、因而任務(wù)或服務(wù)處理器已經(jīng)在運(yùn)行多個(gè)線程。如果此參數(shù)被設(shè)為false(0),且如果activate()是再次被調(diào)用,該方法就會(huì)設(shè)置失敗代碼,而不會(huì)生成更多的線程。

              第四個(gè)參數(shù)用于設(shè)置運(yùn)行線程的優(yōu)先級(jí)。缺省情況下,或優(yōu)先級(jí)被設(shè)為

            ACE_DEFAULT_THREAD_PRIORITY,方法會(huì)使用給定的調(diào)度策略(在flags中指定,例如,THR_SCHED_DEFAULT)的“適當(dāng)”優(yōu)先級(jí)。這個(gè)值是動(dòng)態(tài)計(jì)算的,并且是在給定策略的最低和最高優(yōu)先級(jí)之間。如果顯式地給定一個(gè)值,這個(gè)值就會(huì)被使用。注意實(shí)際的優(yōu)先級(jí)值極大地依賴于實(shí)現(xiàn),最好不要直接使用。在線程一章中,可讀到更多有關(guān)線程優(yōu)先級(jí)的內(nèi)容。

              還可以傳入將要?jiǎng)?chuàng)建的線程的線程句柄、線程名和棧空間,以在線程創(chuàng)建過(guò)程中使用。如果它們被設(shè)置為

            NULL,它們就不會(huì)被使用。但是如果要使用activate創(chuàng)建多個(gè)線程,就必須傳入線程的名字或句柄,才能有效地對(duì)它們進(jìn)行使用。

              下面的例子可以幫助你進(jìn)一步理解

            activate方法的使用:

            ?

             

            ?

            7-6

            ?

            #include ”ace/Reactor.h”

            #include ”ace/Svc_Handler.h”

            #include ”ace/Acceptor.h”

            #include ”ace/Synch.h”

            #include ”ace/SOCK_Acceptor.h”

             

            class MyServiceHandler; //forward declaration

            typedef ACE_Singleton<ACE_Reactor,ACE_Null_Mutex> Reactor;

            typedef ACE_Acceptor<MyServiceHandler,ACE_SOCK_ACCEPTOR> Acceptor;

             

            class MyServiceHandler:

            public ACE_Svc_Handler<ACE_SOCK_STREAM,ACE_MT_SYNCH>

            {

            // The two thread names are kept here

            ACE_thread_t thread_names[2];

             

            public:

            int open(void*)

            {

            ACE_DEBUG((LM_DEBUG, ”Acceptor: received new connection \n”));

             

            //Register with the reactor to remember this handler..

            Reactor::instance()

            ->register_handler(this,ACE_Event_Handler::READ_MASK);

            ACE_DEBUG((LM_DEBUG,”Acceptor: ThreadID:(%t) open\n”));

             

            //Create two new threads to create and send messages to the

            //remote machine.

            activate(THR_NEW_LWP,

            2, //2 new threads

            0, //force active false, if already created don’t try again.

            ACE_DEFAULT_THREAD_PRIORITY,//Use default thread priority

            -1,

            this,//Which ACE_Task object to create? In this case this one.

            0,// don’t care about thread handles used

            0,// don’t care about where stacks are created

            0,//don’t care about stack sizes

            thread_names); // keep identifiers in thread_names

             

            //keep the service handler registered with the acceptor.

            return 0;

            }

             

            void send_message1(void)

            {

            //Send message type 1

            ACE_DEBUG((LM_DEBUG,”(%t)Sending message::>>”));

             

            //Send the data to the remote peer

            ACE_DEBUG((LM_DEBUG,”Sent message1”));

            peer().send_n(”Message1”,LENGTH_MSG_1);

            } //end send_message1

             

            int send_message2(void)

            {

            //Send message type 1

            ACE_DEBUG((LM_DEBUG,”(%t)Sending message::>>”));

             

            //Send the data to the remote peer

            ACE_DEBUG((LM_DEBUG,”Sent Message2”));

            peer().send_n(”Message2”,LENGTH_MSG_2);

            }//end send_message_2

             

            int svc(void)

            {

            ACE_DEBUG( (LM_DEBUG,”(%t) Svc thread \n”));

            if(ACE_Thread::self()== thread_names[0])

            while(1) send_message1(); //send message 1s forever

            else

            while(1) send_message2(); //send message 2s forever

             

            return 0; // keep the com, piler happy.

            }

             

            int handle_input(ACE_HANDLE)

            {

            ACE_DEBUG((LM_DEBUG,”(%t) handle_input ::”));

            char* data= new char[13];

             

            //Check if peer aborted the connection

            if(peer().recv_n(data,12)==0)

            {

            printf(”P(pán)eer probably aborted connection”);

            return -1; //de-register from the Reactor.

            }

             

            //Show what you got..

            ACE_OS::printf(”<< %s\n”,data);

             

            //keep yourself registered

            return 0;

            }

            };

             

            int main(int argc, char* argv[])

            {

            ACE_INET_Addr addr(10101);

            ACE_DEBUG((LM_DEBUG,”Thread: (%t) main”));

             

            //Prepare to accept connections

            Acceptor myacceptor(addr,Reactor::instance());

             

            // wait for something to happen.

            while(1)

            Reactor::instance()->handle_events();

             

            return 0;

            }

            ?

            在此例中,服務(wù)處理器在它的

            open()方法中被登記到反應(yīng)堆上,隨后程序調(diào)用activate()來(lái)創(chuàng)建2個(gè)線程。線程的名字被記錄下來(lái),以便在它們調(diào)用svc()例程時(shí),我們可以將它們區(qū)別開(kāi)。每個(gè)線程發(fā)送一條不同類型的消息給遠(yuǎn)地對(duì)端。注意在此例中,線程的創(chuàng)建是完全透明的。此外,因?yàn)槿肟谑瞧胀ǖ姆庆o態(tài)成員函數(shù),它不需要進(jìn)行“丑陋的”改動(dòng)來(lái)記住數(shù)據(jù)成員,比如說(shuō)對(duì)端流。無(wú)論何時(shí)只要我們需要,我們就可以簡(jiǎn)單地調(diào)用成員函數(shù)peer()來(lái)獲取底層的流。

            7.3.1.5使用服務(wù)處理器中的消息隊(duì)列機(jī)制

              如前面所提到的,

            ACE_Svc_Handler類擁有內(nèi)建的消息隊(duì)列。這個(gè)消息隊(duì)列被用作在ACE_Svc_Handler和外部世界之間的主要通信接口。其他任務(wù)想要發(fā)給該服務(wù)處理器的消息被放入它的消息隊(duì)列中。這些消息會(huì)在單獨(dú)的線程里(通過(guò)調(diào)用activate()方法創(chuàng)建)處理。隨后另一個(gè)線程就可以把處理過(guò)的消息通過(guò)網(wǎng)絡(luò)發(fā)送給另外的遠(yuǎn)地目的地(很可能是另外的ACE_Svc_Handler)。

              如先前所提到的,在這種多線程情況下,

            ACE_Svc_Handler會(huì)自動(dòng)地使用鎖來(lái)確保消息隊(duì)列的完整性。所用的鎖即通過(guò)實(shí)例化ACE_Svc_Handler模板類創(chuàng)建具體服務(wù)處理器時(shí)所傳遞的鎖。之所用通過(guò)這樣的方式來(lái)傳遞鎖,是因?yàn)檫@樣程序員就可以對(duì)他的應(yīng)用進(jìn)行“調(diào)諧”。不同平臺(tái)上的不同鎖定機(jī)制有著不同程度的開(kāi)銷。如果需要,程序員可以創(chuàng)建他自己的優(yōu)化的、遵從ACE的鎖接口定義的鎖,并將其用于服務(wù)處理器。這是程序員通過(guò)使用ACE可獲得的靈活性的又一范例。重要的是程序員必須意識(shí)到,在此服務(wù)處理例程中的額外線程將帶來(lái)顯著的鎖定開(kāi)銷。為將此開(kāi)銷降至最低,程序員必須仔細(xì)地設(shè)計(jì)他的程序,確保使這樣的開(kāi)銷最小化。特別地,上面描述的例子有可能導(dǎo)致過(guò)度的開(kāi)銷,在大多數(shù)情況下可能并不實(shí)用。

              ACE_Task

            ,進(jìn)而是ACE_Svc_Handler(因?yàn)榉?wù)處理器也是一種任務(wù)),具有若干可用于對(duì)底層隊(duì)列進(jìn)行設(shè)置、操作、入隊(duì)和出隊(duì)操作的方法。這里我們將只討論這些方法中的一部分。因?yàn)樵诜?wù)處理器中(通過(guò)使用msg_queue()方法)可以獲取指向消息隊(duì)列自身的指針,所以也可以直接調(diào)用底層隊(duì)列(也就是,ACE_Message_Queue)的所有公共方法。(有關(guān)消息隊(duì)列提供的所有方法的更多細(xì)節(jié),請(qǐng)參見(jiàn)后面的“消息隊(duì)列”一章。)

              如上面所提到的,服務(wù)處理器的底層消息隊(duì)列是

            ACE_Message_Queue的實(shí)例,它是由服務(wù)處理器自動(dòng)創(chuàng)建的。在大多數(shù)情況下,沒(méi)有必要調(diào)用ACE_Message_Queue的底層方法,因?yàn)樵?/font>ACE_Svc_Handler類中已對(duì)它們的大多數(shù)進(jìn)行了包裝。ACE_Message_Queue是用于使ACE_Message_Block進(jìn)隊(duì)或出隊(duì)的隊(duì)列。每個(gè)ACE_Message_Block都含有指向“引用計(jì)數(shù)”(reference-counted)的ACE_Data_Block的指針,ACE_Data_Block依次又指向存儲(chǔ)在塊中的實(shí)際數(shù)據(jù)(見(jiàn)“消息隊(duì)列”一章)。這使得ACE_Message_Block可以很容易地進(jìn)行數(shù)據(jù)共享。

              ACE_Message_Block

            的主要作用是進(jìn)行高效數(shù)據(jù)操作,而不帶來(lái)許多拷貝開(kāi)銷。每個(gè)消息塊都有一個(gè)讀指針和寫(xiě)指針。無(wú)論何時(shí)我們從塊中讀取時(shí),讀指針會(huì)在數(shù)據(jù)塊中向前增長(zhǎng)。類似地,當(dāng)我們向塊中寫(xiě)的時(shí)候,寫(xiě)指針也會(huì)向前移動(dòng),這很像在流類型系統(tǒng)中的情況。可以通過(guò)ACE_Message_Block的構(gòu)造器向它傳遞分配器,以用于分配內(nèi)存(有關(guān)Allocator的更多信息,參見(jiàn)“內(nèi)存管理”一章)。例如,可以使用ACE_Cached_Allocation_Strategy,它預(yù)先分配內(nèi)存并從內(nèi)存池中返回指針,而不是在需要的時(shí)候才從堆中分配內(nèi)存。這樣的功能在需要可預(yù)測(cè)的性能時(shí)十分有用,比如在實(shí)時(shí)系統(tǒng)中。

              下面的例子演示怎樣使用消息隊(duì)列的一些功能:

             

            7-7

            #include ”ace/Reactor.h”

            #include ”ace/Svc_Handler.h”

            #include ”ace/Acceptor.h”

            #include ”ace/Synch.h”

            #include ”ace/SOCK_Acceptor.h”

            #include ”ace/Thread.h”

            #define NETWORK_SPEED 3

            class MyServiceHandler; //forward declaration

            typedef ACE_Singleton<ACE_Reactor,ACE_Null_Mutex> Reactor;

            typedef ACE_Acceptor<MyServiceHandler,ACE_SOCK_ACCEPTOR> Acceptor;

             

            class MyServiceHandler:

            public ACE_Svc_Handler<ACE_SOCK_STREAM,ACE_MT_SYNCH>{

            // The message sender and creator threads are handled here.

            ACE_thread_t thread_names[2];

             

            public:

            int open(void*)

            {

            ACE_DEBUG((LM_DEBUG, ”Acceptor: received new connection \n”));

             

            //Register with the reactor to remember this handler..

            Reactor::instance()

            ->register_handler(this,ACE_Event_Handler::READ_MASK);

            ACE_DEBUG((LM_DEBUG,”Acceptor: ThreadID:(%t) open\n”));

             

            //Create two new threads to create and send messages to the

            //remote machine.

            activate(THR_NEW_LWP,

            2, //2 new threads

            0,

            ACE_DEFAULT_THREAD_PRIORITY,

            -1,

            this,

            0,

            0,

            0,

            thread_names); // identifiers in thread_handles

             

            //keep the service handler registered with the acceptor.

            return 0;

            }

             

            void send_message(void)

            {

            //Dequeue the message and send it off

            ACE_DEBUG((LM_DEBUG,”(%t)Sending message::>>”));

             

            //dequeue the message from the message queue

            ACE_Message_Block *mb;

            ACE_ASSERT(this->getq(mb)!=-1);

            int length=mb->length();

            char *data =mb->rd_ptr();

             

            //Send the data to the remote peer

            ACE_DEBUG((LM_DEBUG,”%s \n”,data,length));

            peer().send_n(data,length);

             

            //Simulate very SLOW network.

            ACE_OS::sleep(NETWORK_SPEED);

             

            //release the message block

            mb->release();

            } //end send_message

             

            int construct_message(void)

            {

            // A very fast message creation algorithm

            // would lead to the need for queuing messages..

            // here. These messages are created and then sent

            // using the SLOW send_message() routine which is

            // running in a different thread so that the message

            //construction thread isn’t blocked.

            ACE_DEBUG((LM_DEBUG,”(%t)Constructing message::>> ”));

             

            // Create a new message to send

            ACE_Message_Block *mb;

            char *data=”Hello Connector”;

            ACE_NEW_RETURN (mb,ACE_Message_Block (16,//Message 16 bytes long

            ACE_Message_Block::MB_DATA,//Set header to data

            0,//No continuations.

            data//The data we want to send

            ), 0);

            mb->wr_ptr(16); //Set the write pointer.

             

            // Enqueue the message into the message queue

            // we COULD have done a timed wait for enqueuing in case

            // someone else holds the lock to the queue so it doesn’t block

            //forever..

            ACE_ASSERT(this->putq(mb)!=-1);

            ACE_DEBUG((LM_DEBUG,”Enqueued msg successfully\n”));

            }

             

            int svc(void)

            {

            ACE_DEBUG( (LM_DEBUG,”(%t) Svc thread \n”));

             

            //call the message creator thread

            if(ACE_Thread::self()== thread_names[0])

            while(1) construct_message(); //create messages forever

            else

            while(1) send_message(); //send messages forever

             

            return 0; // keep the compiler happy.

            }

             

            int handle_input(ACE_HANDLE)

            {

            ACE_DEBUG((LM_DEBUG,”(%t) handle_input ::”));

            char* data= new char[13];

             

            //Check if peer aborted the connection

            if(peer().recv_n(data,12)==0)

            {

            printf(”P(pán)eer probably aborted connection”);

            return -1; //de-register from the Reactor.

            }

             

            //Show what you got..

            ACE_OS::printf(”<< %s\n”,data);

             

            //keep yourself registered

            return 0;

            }

            };

             

            int main(int argc, char* argv[])

            {

            ACE_INET_Addr addr(10101);

            ACE_DEBUG((LM_DEBUG,”Thread: (%t) main”));

             

            //Prepare to accept connections

            Acceptor myacceptor(addr,Reactor::instance());

             

            // wait for something to happen.

            while(1)

            Reactor::instance()->handle_events();

             

            return 0;

            }

            這個(gè)例子演示怎樣使用

            putq()getq()方法來(lái)在隊(duì)列中放入或取出消息塊。它還演示怎樣創(chuàng)建消息塊,隨后設(shè)置它的寫(xiě)指針,并根據(jù)它的讀指針進(jìn)行讀取。注意消息塊中的實(shí)際數(shù)據(jù)的起始位置由消息塊的讀指針指示。消息塊的length()成員函數(shù)返回在消息塊中存儲(chǔ)的底層數(shù)據(jù)的長(zhǎng)度,其中不包括ACE_Message_Block中用于管理目的的部分。另外,我們也顯示了怎樣使用release()方法來(lái)釋放消息塊(mb)。

              要了解更多關(guān)于如何使用消息塊、數(shù)據(jù)塊或是消息隊(duì)列的信息,請(qǐng)閱讀此教程中有關(guān)“消息隊(duì)列”、

            ASX框架和其他相關(guān)的部分。

             

            7.4

            接受器和連接器模式工作原理

              接受器和連接器工廠(也就是

            ACE_ConnectorACE_Acceptor)有著非常類似的運(yùn)行結(jié)構(gòu)。它們的工作可大致劃分為三個(gè)階段:

             

            • 端點(diǎn)或連接初始化階段
            • 服務(wù)初始化階段
            • 服務(wù)處理階段
            •  

              7.4.1 端點(diǎn)或連接初始化階段

                在使用接受器的情況下,應(yīng)用級(jí)程序員可以調(diào)用

              ACE_Acceptor工廠的open()方法,或是它的缺省構(gòu)造器(它實(shí)際上會(huì)調(diào)用open()方法),來(lái)開(kāi)始被動(dòng)偵聽(tīng)連接。當(dāng)接受器工廠的open()方法被調(diào)用時(shí),如果反應(yīng)堆單體還沒(méi)有被實(shí)例化,open()方法就首先對(duì)其進(jìn)行實(shí)例化。隨后它調(diào)用底層具體接受器的open()方法。于是具體接受器會(huì)完成必要的初始化來(lái)偵聽(tīng)連接。例如,在使用ACE_SOCK_Acceptor的情況中,它打開(kāi)socket,將其綁定到用戶想要在其上偵聽(tīng)新連接的端口和地址上。在綁定端口后,它將會(huì)發(fā)出偵聽(tīng)調(diào)用。open方法隨后將接受器工廠登記到反應(yīng)堆。因而在接收到任何到來(lái)的連接請(qǐng)求時(shí),反應(yīng)堆會(huì)自動(dòng)回調(diào)接受器工廠的handle_input()方法。注意正是因?yàn)檫@一原因,接受器工廠才從ACE_Event_Handler類層次派生;這樣它才可以響應(yīng)ACCEPT事件,并被反應(yīng)堆自動(dòng)回調(diào)。

                在使用連接器的情況中,應(yīng)用程序員調(diào)用連接器工廠的

              connect()方法或connect_n()方法來(lái)發(fā)起到對(duì)端的連接。除了其他一些選項(xiàng),這兩個(gè)方法的參數(shù)包括我們想要連接到的遠(yuǎn)地地址,以及我們是想要同步還是異步地完成連接。我們可以同步或異步地發(fā)起NUMBER_CONN個(gè)連接:

               

              //Synchronous

              OurConnector.connect_n(NUMBER_CONN,ArrayofMySvcHandlers,Remote_Addr,0,

              ACE_Synch_Options::synch);

               

              //Asynchronous

              OurConnector.connect_n(NUMBER_CONN,ArrayofMySvcHandlers,Remote_Addr,0,

              ACE_Synch_Options::asynch);

               

              如果連接請(qǐng)求是異步的,

              ACE_Connector會(huì)在反應(yīng)堆上登記自己,等待連接被建立(ACE_Connector也派生自ACE_Event_Handler類層次)。一旦連接被建立,反應(yīng)堆將隨即自動(dòng)回調(diào)連接器。但如果連接請(qǐng)求是同步的,connect()調(diào)用將會(huì)阻塞,直到連接被建立、或是超時(shí)到期為止。超時(shí)值可通過(guò)改變特定的ACE_Synch_Options來(lái)指定。詳情請(qǐng)參見(jiàn)參考手冊(cè)。

               

              7.4.2 接受器的服務(wù)初始化階段

                在有連接請(qǐng)求在指定的地址和端口上到來(lái)時(shí),反應(yīng)堆自動(dòng)回調(diào)

              ACE_Acceptor工廠的handle_input()方法。

                該方法是一個(gè)“模板方法”(

              Template Method)。模板方法用于定義一個(gè)算法的若干步驟的順序,并允許改變特定步驟的執(zhí)行。這種變動(dòng)是通過(guò)允許子類定義這些方法的實(shí)現(xiàn)來(lái)完成的。(有關(guān)模板方法的更多信息見(jiàn)“設(shè)計(jì)模式”參考指南)。

                在我們的這個(gè)案例中,模板方法將算法定義如下:

               

              • make_svc_handler()
              :創(chuàng)建服務(wù)處理器。
            • accept_svc_handler()
            • :將連接接受進(jìn)前一步驟創(chuàng)建的服務(wù)處理器。
            • activate_svc_handler()
            • :?jiǎn)?dòng)這個(gè)新服務(wù)處理器。

               

                這些方法都可以被重新編寫(xiě),從而靈活地決定這些操作怎樣來(lái)實(shí)際執(zhí)行。

                這樣,

              handle_input()將首先調(diào)用make_svc_handler()方法,創(chuàng)建適當(dāng)類型的服務(wù)處理器(如我們?cè)谏厦娴睦又兴吹降哪菢樱?wù)處理器的類型由應(yīng)用程序員在ACE_Acceptor模板被實(shí)例化時(shí)傳入)。在缺省情況下,make_svc_handler()方法只是實(shí)例化恰當(dāng)?shù)姆?wù)處理器。但是,make_svc_handler()是一個(gè)“橋接”(bridge)方法,可被重載以提供更多復(fù)雜功能。(橋接是一種設(shè)計(jì)模式,它使類層次的接口與實(shí)現(xiàn)去耦合。參閱“設(shè)計(jì)模式”參考文獻(xiàn))。例如,服務(wù)處理器可創(chuàng)建為進(jìn)程級(jí)或線程級(jí)的單體,或者從庫(kù)中動(dòng)態(tài)鏈接,從磁盤(pán)中加載,甚或通過(guò)更復(fù)雜的方式創(chuàng)建,如從數(shù)據(jù)庫(kù)中查找并獲取服務(wù)處理器,并將它裝入內(nèi)存。

                在服務(wù)處理器被創(chuàng)建后,

              handle_input()方法調(diào)用accept_svc_handler()。該方法將連接“接受進(jìn)”服務(wù)處理器;缺省方式是調(diào)用底層具體接受器的accept()方法。在ACE_SOCK_Acceptor被用作具體接受器的情況下,它調(diào)用BSD accept()例程來(lái)建立連接(“接受”連接)。在連接建立后,連接句柄在服務(wù)處理器中被自動(dòng)設(shè)置(接受“進(jìn)”服務(wù)處理器);這個(gè)服務(wù)處理器是先前通過(guò)調(diào)用make_svc_handler()創(chuàng)建的。該方法也可被重載,以提供更復(fù)雜的功能。例如,不是實(shí)際創(chuàng)建新連接,而是“回收利用”舊連接。在我們演示各種不同的接受和連接策略時(shí),將更為詳盡地討論這一點(diǎn)。

               

              7.4.3 連接器的服務(wù)初始化階段

                應(yīng)用發(fā)出的

              connect()方法與接受器工廠中的handle_input()相類似,也就是,它是一個(gè)“模板方法”。

                在我們的這個(gè)案例中,模板方法

              connect()定義下面一些可被重定義的步驟:

               

              • make_svc_handler()
              :創(chuàng)建服務(wù)處理器。
            • connect_svc_handler()
            • :將連接接受進(jìn)前一步驟創(chuàng)建的服務(wù)處理器。
            • activate_svc_handler()
            • :?jiǎn)?dòng)這個(gè)新服務(wù)處理器。

               

                每一方法都可以被重新編寫(xiě),從而靈活地決定這些操作怎樣來(lái)實(shí)際執(zhí)行。

                這樣,在應(yīng)用發(fā)出

              connect()調(diào)用后,連接器工廠通過(guò)調(diào)用make_svc_handler()來(lái)實(shí)例化恰當(dāng)?shù)姆?wù)處理器,一如在接受器的案例中所做的那樣。其缺省行為只是實(shí)例化適當(dāng)?shù)念悾⑶乙部梢酝ㄟ^(guò)與接受器完全相同的方式重載。進(jìn)行這樣的重載的原因可以與上面提到的原因非常類似。

                在服務(wù)處理器被創(chuàng)建后,

              connect()調(diào)用確定連接是要成為異步的還是同步的。如果是異步的,在繼續(xù)下一步驟之前,它將自己登記到反應(yīng)堆,隨后調(diào)用connect_svc_handler()方法。該方法的缺省行為是調(diào)用底層具體連接器的connect()方法。在使用ACE_SOCK_Connector的情況下,這意味著將適當(dāng)?shù)倪x項(xiàng)設(shè)置為阻塞或非阻塞式I/O,然后發(fā)出BSD connect()調(diào)用。如果連接被指定為同步的,connect()調(diào)用將會(huì)阻塞、直到連接完全建立。在這種情況下,在連接建立后,它將在服務(wù)處理器中設(shè)置句柄,以與它現(xiàn)在連接到的對(duì)端通信(該句柄即是通過(guò)在服務(wù)處理器中調(diào)用peer()方法獲得的在流中存儲(chǔ)的句柄,見(jiàn)上面的例子)。在服務(wù)處理器中設(shè)置句柄后,連接器模式將進(jìn)行到最后階段:服務(wù)處理。

                如果連接被指定為異步的,在向底層的具體連接器發(fā)出非阻塞式

              connect()調(diào)用后,對(duì)connect_svc_handler()的調(diào)用將立即返回。在使用ACE_SOCK_Connector的情況中,這意味著發(fā)出非阻塞式BSD connect()調(diào)用。在連接稍后被實(shí)際建立時(shí),反應(yīng)堆將回調(diào)ACE_Connector工廠的handle_output()方法,該方法在通過(guò)make_svc_handler()方法創(chuàng)建的服務(wù)處理器中設(shè)置新句柄。然后工廠將進(jìn)行到下一階段:服務(wù)處理。

                與

              accept_svc_handler()情況一樣,connect_svc_handler()是一個(gè)“橋接”方法,可進(jìn)行重載以提供變化的功能。

               

              7.4.4 服務(wù)處理

                一旦服務(wù)處理器被創(chuàng)建、連接被建立,以及句柄在服務(wù)處理器中被設(shè)置,

              ACE_Acceptorhandle_input()方法(或者在使用ACE_Connector的情況下,是handle_output()connect_svc_handler())將調(diào)用activate_svc_handler()方法。該方法將隨即啟用服務(wù)處理器。其缺省行為是調(diào)用作為服務(wù)處理器的入口的open()方法。如我們?cè)谏厦娴睦又兴吹降模诜?wù)處理器開(kāi)始執(zhí)行時(shí),open()方法是第一個(gè)被調(diào)用的方法。是在open()方法中,我們調(diào)用activate()方法來(lái)創(chuàng)建多個(gè)線程控制;并在反應(yīng)堆上登記服務(wù)處理器,這樣當(dāng)新的數(shù)據(jù)在連接上到達(dá)時(shí),它會(huì)被自動(dòng)回調(diào)。該方法也是一個(gè)“橋接”方法,可被重載以提供更為復(fù)雜的功能。特別地,這個(gè)重載的方法可以提供更為復(fù)雜的并發(fā)策略,比如,在另一不同的進(jìn)程中運(yùn)行服務(wù)處理器。

               

              7.5

              調(diào)諧接受器和連接器策略

                如上面所提到的,因?yàn)槭褂昧丝梢灾剌d的橋接方法,很容易對(duì)接受器和連接器進(jìn)行調(diào)諧。橋接方法允許調(diào)諧:

               

              • 服務(wù)處理器的創(chuàng)建策略:
              • 通過(guò)重載接受器或連接器的make_svc_handler()方法來(lái)實(shí)現(xiàn)。例如,這可以意味著復(fù)用已有的服務(wù)處理器,或使用某種復(fù)雜的方法來(lái)獲取服務(wù)處理器,如上面所討論的那樣。
              • 連接策略:
              • 連接創(chuàng)建策略可通過(guò)重載
                connect_svc_handler()accept_svc_handler()方法來(lái)改變。
              • 服務(wù)處理器的并發(fā)策略:
              • 服務(wù)處理器的并發(fā)策略可通過(guò)重載
                activate_svc_handler()方法來(lái)改變。例如,服務(wù)處理器可以在另外的進(jìn)程中創(chuàng)建。

                 

                  如上所示,調(diào)諧是通過(guò)重載

                ACE_AcceptorACE_Connector類的橋接方法來(lái)完成的。ACE的設(shè)計(jì)使得程序員很容易完成這樣的重載和調(diào)諧。

                 

                7.5.1 ACE_Strategy_ConnectorACE_Strategy_Acceptor

                  為了方便上面所提到的對(duì)接受器和連接器模式的調(diào)諧方法,

                ACE提供了兩種特殊的“可調(diào)諧”接受器和連接器工廠,那就是ACE_Strategy_AcceptorACE_Strategy_Connector。它們和ACE_AcceptorACE_Connector非常類似,同時(shí)還使用了“策略”模式。

                  策略模式被用于使算法行為與類的接口去耦合。其基本概念是允許一個(gè)類(稱為

                Context Class,上下文類)的底層算法獨(dú)立于使用該類的客戶進(jìn)行變動(dòng)。這是通過(guò)具體策略類的幫助來(lái)完成的。具體策略類封裝執(zhí)行操作的算法或方法。這些具體策略類隨后被上下文類用于執(zhí)行各種操作(上下文類將“工作”委托給具體策略類)。因?yàn)樯舷挛念惒恢苯訄?zhí)行任何操作,當(dāng)需要改變功能時(shí),無(wú)需對(duì)它進(jìn)行修改。對(duì)上下文類所做的唯一修改是使用另一個(gè)具體策略類來(lái)執(zhí)行改變了的操作。(要閱讀有關(guān)策略模式的更多信息,參見(jiàn)“設(shè)計(jì)模式”的附錄)。

                  在

                ACE中,ACE_Strategy_ConnectorACE_Strategy_Acceptor使用若干具體策略類來(lái)改變算法,以創(chuàng)建服務(wù)處理器,建立連接,以及為服務(wù)處理器設(shè)置并發(fā)方法。如你可能已經(jīng)猜到的一樣,ACE_Strategy_ConnectorACE_Strategy_Acceptor利用了上面提到的橋接方法所提供的可調(diào)諧性。

                 

                7.5.1.1

                使用策略接受器和連接器

                  在

                ACE中已有若干具體的策略類可用于“調(diào)諧”策略接受器和連接器。當(dāng)類被實(shí)例化時(shí),它們作為參數(shù)被傳入策略接受器或連接器。表7-2顯示了可用于調(diào)諧策略接受器和連接器類的一些類。

                需要修改

                具體策略類

                描述

                創(chuàng)建策略

                (

                重定義make_svc_handler())

                ACE_NOOP_Creation_Strategy

                這個(gè)具體策略并實(shí)例化服務(wù)處理器,而只是一個(gè)空操作。

                ACE_Singleton_Strategy

                保證服務(wù)處理器被創(chuàng)建為單體。也就是,所有連接將有效地使用同一個(gè)服務(wù)處理例程。

                ACE_DLL_Strategy

                通過(guò)從動(dòng)態(tài)鏈接庫(kù)中動(dòng)態(tài)鏈接服務(wù)處理器來(lái)對(duì)它進(jìn)行創(chuàng)建。

                連接策略

                (

                重定義connect_svc_handler())

                ACE_Cached_Connect_Strategy

                檢查是否有已經(jīng)連接到特定的遠(yuǎn)地地址的服務(wù)處理器沒(méi)有在被使用。如果有這樣一個(gè)服務(wù)處理器,就對(duì)它進(jìn)行復(fù)用。

                并發(fā)策略

                (

                重定義activate_svc_handler())

                ACE_NOOP_Concurrency_Strategy

                一個(gè)“無(wú)為”(

                do-nothing)的并發(fā)策略。它甚至調(diào)用服務(wù)處理器的open()方法。

                ACE_Process_Strategy

                在另外的進(jìn)程中創(chuàng)建服務(wù)處理器,并調(diào)用它的

                open()方法。

                ACE_Reactive_Strategy

                先在反應(yīng)堆上登記服務(wù)處理器,然后調(diào)用它的

                open()方法。

                ACE_Thread_Strategy

                先調(diào)用服務(wù)處理器的

                open()方法,然后調(diào)用它的activate()方法,以讓另外的線程來(lái)啟動(dòng)服務(wù)處理器的svc()方法。

                 

                         表

                7-2 用于調(diào)諧策略接受器和連接器類的類

                 

                  下面的例子演示策略接受器和連接器類的使用。

                 

                7-8

                #include ”ace/Reactor.h”

                #include ”ace/Svc_Handler.h”

                #include ”ace/Acceptor.h”

                #include ”ace/Synch.h”

                #include ”ace/SOCK_Acceptor.h”

                #define PORT_NUM 10101

                #define DATA_SIZE 12

                //forward declaration

                class My_Svc_Handler;

                //instantiate a strategy acceptor

                typedef ACE_Strategy_Acceptor<My_Svc_Handler,ACE_SOCK_ACCEPTOR> MyAcceptor;

                //instantiate a concurrency strategy

                typedef ACE_Process_Strategy<My_Svc_Handler> Concurrency_Strategy;

                 

                // Define the Service Handler

                class My_Svc_Handler:

                public ACE_Svc_Handler <ACE_SOCK_STREAM,ACE_NULL_SYNCH>

                {

                private:

                char* data;

                public:

                My_Svc_Handler()

                {

                data= new char[DATA_SIZE];

                }

                 

                My_Svc_Handler(ACE_Thread_Manager* tm)

                {

                data= new char[DATA_SIZE];

                }

                 

                int open(void*)

                {

                cout<<”Connection established”<<endl;

                 

                //Register with the reactor

                ACE_Event_Handler::READ_MASK);

                 

                return 0;

                }

                 

                int handle_input(ACE_HANDLE)

                {

                peer().recv_n(data,DATA_SIZE);

                ACE_OS::printf(”<< %s\n”,data);

                 

                // keep yourself registered with the reactor

                return 0;

                }

                };

                 

                int main(int argc, char* argv[])

                {

                ACE_INET_Addr addr(PORT_NUM);

                 

                //Concurrency Strategy

                Concurrency_Strategy my_con_strat;

                 

                //Instantiate the acceptor

                MyAcceptor acceptor(addr, //address to accept on

                ACE_Reactor::instance(), //the reactor to use

                0,

                // don’t care about creation strategy

                0, // don’t care about connection estb. strategy

                &my_con_strat

                ); // use our new process concurrency strategy

                 

                while(1) /* Start the reactor’s event loop */

                ACE_Reactor::instance()->handle_events();

                }

                這個(gè)例子基于上面的例

                7-2。唯一的不同是它使用了ACE_Strategy_Acceptor,而不是使用ACE_Acceptor;并且它還使用ACE_Process_Strategy作為服務(wù)處理器的并發(fā)策略。這種并發(fā)策略保證一旦連接建立后,服務(wù)處理器在單獨(dú)的進(jìn)程中被實(shí)例化。如果在特定服務(wù)上的負(fù)載變得過(guò)于繁重,使用ACE_Process_Strategy可能是一個(gè)好主意。但是,在大多數(shù)情況下,使用ACE_Process_Strategy會(huì)過(guò)于昂貴,而ACE_Thread_Strategy可能是更好的選擇。

                 

                7.5.1.2

                使用ACE_Cached_Connect_Strategy進(jìn)行連接緩存

                 

                  在許多應(yīng)用中,客戶會(huì)連接到服務(wù)器,然后重新連接到同一服務(wù)器若干次;每次都要建立連接,執(zhí)行某些工作,然后掛斷連接(比如像在

                Web客戶中所做的那樣)。不用說(shuō),這樣做是非常低效而昂貴的,因?yàn)檫B接建立和掛斷是非常昂貴的操作。在這樣的情況下,連接者可以采用一種更好的策略:“記住”老連接并保持它,直到確定客戶不會(huì)再重新建立連接為止。ACE_Cached_Connect_Strategy就提供了這樣一種緩存策略。這個(gè)策略對(duì)象被ACE_Strategy_Connector用于提供基于緩存的連接建立。如果一個(gè)連接已經(jīng)存在,ACE_Strategy_Connector將會(huì)復(fù)用它,而不是創(chuàng)建新的連接。

                  當(dāng)客戶試圖重新建立連接到先前已經(jīng)連接的服務(wù)器時(shí),

                ACE_Cached_Connect_Strategy確保對(duì)老的連接和服務(wù)處理器進(jìn)行復(fù)用,而不是創(chuàng)建新的連接和服務(wù)處理器。因而,實(shí)際上,ACE_Cached_Connect_Strategy不僅管理連接建立策略,它還管理服務(wù)處理器創(chuàng)建策略。因?yàn)樵诖死校脩?b>不想創(chuàng)建新的服務(wù)處理器,我們將ACE_Null_Creation_Strategy傳遞給ACE_Strategy_Connector。如果連接先前沒(méi)有建立過(guò),ACE_Cached_Connect_Strategy將自動(dòng)使用內(nèi)部的創(chuàng)建策略來(lái)實(shí)例化適當(dāng)?shù)姆?wù)處理器,它是在這個(gè)模板類被實(shí)例化時(shí)傳入的。這個(gè)策略可被設(shè)置為用戶想要使用的任何一種策略。除此而外,也可以將ACE_Cached_Connect_Strategy自己在其構(gòu)造器中使用的創(chuàng)建、并發(fā)和recycling策略傳給它。下面的例子演示這些概念:

                 

                7-9

                #include ”ace/Reactor.h”

                #include ”ace/Svc_Handler.h”

                #include ”ace/Connector.h”

                #include ”ace/Synch.h”

                #include ”ace/SOCK_Connector.h”

                #include ”ace/INET_Addr.h”

                 

                #define PORT_NUM 10101

                #define DATA_SIZE 16

                 

                //forward declaration

                class My_Svc_Handler;

                //Function prototype

                static void make_connections(void *arg);

                 

                // Template specializations for the hashing function for the

                // hash_map which is used by the cache. The cache is used internally by the

                // Cached Connection Strategy . Here we use ACE_Hash_Addr

                // as our external identifier. This utility class has already

                // overloaded the == operator and the hash() method. (The

                // hashing function). The hash() method delegates the work to

                // hash_i() and we use the IP address and port to get a

                // a unique integer hash value.

                size_t ACE_Hash_Addr<ACE_INET_Addr>::hash_i (const ACE_INET_Addr &addr) const

                {

                return addr.get_ip_address () + addr.get_port_number ();

                }

                 

                //instantiate a strategy acceptor

                typedef ACE_Strategy_Connector<My_Svc_Handler,ACE_SOCK_CONNECTOR>

                STRATEGY_CONNECTOR;

                 

                //Instantiate the Creation Strategy

                typedef ACE_NOOP_Creation_Strategy<My_Svc_Handler>

                NULL_CREATION_STRATEGY;

                //Instantiate the Concurrency Strategy

                typedef ACE_NOOP_Concurrency_Strategy<My_Svc_Handler>

                NULL_CONCURRENCY_STRATEGY;

                //Instantiate the Connection Strategy

                typedef ACE_Cached_Connect_Strategy<My_Svc_Handler,

                ACE_SOCK_CONNECTOR,

                ACE_SYNCH_RW_MUTEX>

                CACHED_CONNECT_STRATEGY;

                 

                class My_Svc_Handler:

                public ACE_Svc_Handler <ACE_SOCK_STREAM,ACE_MT_SYNCH>

                {

                private:

                char* data;

                public:

                My_Svc_Handler()

                {

                data= new char[DATA_SIZE];

                }

                 

                My_Svc_Handler(ACE_Thread_Manager* tm)

                {

                data= new char[DATA_SIZE];

                }

                 

                //Called before the service handler is recycled..

                int

                 recycle (void *a=0)/P>

                {

                ACE_DEBUG ((LM_DEBUG,

                ”(%P|%t) recycling Svc_Handler %d with handle %d\n”,

                this, this->peer ().get_handle ()));

                return 0;

                }

                 

                int open(void*)

                {

                ACE_DEBUG((LM_DEBUG,”(%t)Connection established \n”));

                 

                //Register the service handler with the reactor

                ACE_Reactor::instance()

                ->register_handler(this,ACE_Event_Handler::READ_MASK);

                 

                activate(THR_NEW_LWP|THR_DETACHED);

                return 0;

                }

                 

                int handle_input(ACE_HANDLE)

                {

                ACE_DEBUG((LM_DEBUG,”Got input in thread: (%t) \n”));

                peer().recv_n(data,DATA_SIZE);

                ACE_DEBUG((LM_DEBUG,”<< %s\n”,data));

                 

                //keep yourself registered with the reactor

                return 0;

                }

                 

                int svc(void)

                {

                //send a few messages and then mark connection as idle so that it can

                // be recycled

                 later.

                ACE_DEBUG((LM_DEBUG,”Started the service routine \n”));

                for(int i=0;i<3;i++)

                {

                ACE_DEBUG((LM_DEBUG,”(%t)>>Hello World\n”));

                ACE_OS::fflush(stdout);

                peer().send_n(”Hello World”,sizeof(”Hello World”));

                }

                 

                //Mark the service handler as being idle now and let the

                //other threads reuse this connection

                this->idle(1);

                 

                //Wait for the thread to die

                this->thr_mgr()->wait();

                 

                return 0;

                }

                };

                 

                ACE_INET_Addr *addr;

                 

                int main(int argc, char* argv[])

                {

                addr= new ACE_INET_Addr(PORT_NUM,argv[1]);

                 

                //Creation Strategy

                NULL_CREATION_STRATEGY creation_strategy;

                 

                //Concurrency Strategy

                NULL_CONCURRENCY_STRATEGY concurrency_strategy;

                 

                //Connection Strategy

                CACHED_CONNECT_STRATEGY caching_connect_strategy;

                 

                //instantiate the connector

                STRATEGY_CONNECTOR connector(

                ACE_Reactor::instance(), //the reactor to use

                &creation_strategy,

                &caching_connect_strategy,

                &concurrency_strategy);

                 

                //Use the thread manager to spawn a single thread

                //to connect multiple times passing it the address

                //of the strategy connector

                if(ACE_Thread_Manager::instance()->spawn(

                (ACE_THR_FUNC) make_connections,

                (void *) &connector,

                THR_NEW_LWP) == -1)

                 

                ACE_ERROR ((LM_ERROR, ”(%P|%t) %p\n%a”, ”client thread spawn failed”));

                 

                while(1) /* Start the reactor’s event loop */

                ACE_Reactor::instance()->handle_events();

                }

                 

                //Connection establishment function, tries to establish connections

                //to the same server again and re-uses the connections from the cache

                void make_connections(void *arg)

                {

                ACE_DEBUG((LM_DEBUG,”(%t)Prepared to connect \n”));

                STRATEGY_CONNECTOR *connector= (STRATEGY_CONNECTOR*) arg;

                for (int i = 0; i < 10; i++)

                {

                My_Svc_Handler *svc_handler = 0;

                 

                // Perform a blocking connect to the server using the Strategy

                // Connector with a connection caching strategy. Since we are

                // connecting to the same <server_addr> these calls will return the

                // same dynamically allocated <Svc_Handler> for each <connect> call.

                if (connector->connect (svc_handler, *addr) == -1)

                {

                ACE_ERROR ((LM_ERROR, ”(%P|%t) %p\n”, ”connection failed\n”));

                return;

                }

                 

                // Rest for a few seconds so that the connection has been freed up

                ACE_OS::sleep (5);

                }

                }

                在上面的例子中,緩存式連接策略被用于緩存連接。要使用這一策略,需要一點(diǎn)額外的工作:定義

                ACE_Cached_Connect_Strategy在內(nèi)部使用的哈希映射管理器的hash()方法。這個(gè)hash()方法用于對(duì)服務(wù)處理器和ACE_Cached_Connect_Strategy內(nèi)部使用的連接進(jìn)行哈希運(yùn)算,放入緩存映射中。它簡(jiǎn)單地使用IP地址和端口號(hào)的總和作為哈希函數(shù),這也許并不是很好的哈希函數(shù)。

                  這個(gè)例子比至今為止我們所列舉的例子都要復(fù)雜一點(diǎn),所以有理由多進(jìn)行一點(diǎn)討論。

                  我們?yōu)?/p>ACE_Strategy_Acceptor使用空操作并發(fā)和創(chuàng)建策略。使用空操作創(chuàng)建策略必須的。如上面所解釋的,如果沒(méi)有使用ACE_NOOP_Creation_StrategyACE_Cached_Connection_Strategy將會(huì)產(chǎn)生斷言失敗。但是,在使用ACE_Cached_Connect_Strategy時(shí),任何并發(fā)策略都可以和策略接受器一起使用。如上面所提到的,ACE_Cached_Connect_Strategys所用的底層創(chuàng)建策略可以由用戶來(lái)設(shè)置。還可以設(shè)置recycling策略。這是在實(shí)例化caching_connect_strategy時(shí),通過(guò)將所需的創(chuàng)建和recycling策略的對(duì)象傳給它的構(gòu)造器來(lái)完成的。在這里我們沒(méi)有這樣做,而是使用了缺省的創(chuàng)建和recycling策略。

                  在適當(dāng)?shù)卦O(shè)置連接器后,我們使用

                Thread_Manager來(lái)派生新線程,且將make_connections()方法作為線程的入口。該方法使用我們的新的策略連接器來(lái)連接到遠(yuǎn)地站點(diǎn)。在連接建立后,該線程休眠5秒鐘,然后使用我們的緩存式連接器來(lái)重新創(chuàng)建同樣的連接。于是該線程應(yīng)該在連接器緩存中找到這個(gè)連接并復(fù)用它。

                  和平常一樣,一旦連接建立后,反應(yīng)堆回調(diào)我們的服務(wù)處理器(

                My_Svc_Handler)。隨后My_Svc_Handleropen()方法通過(guò)調(diào)用My_Svc_Handleractivate()方法來(lái)使它成為主動(dòng)對(duì)象。svc()方法隨后發(fā)送三條消息給遠(yuǎn)地主機(jī),并通過(guò)調(diào)用服務(wù)處理器的idle()方法將該連接標(biāo)記為空閑。注意this->thr_mrg_wait()要求線程管理器等待所有在線程管理器中的線程終止。如果你不要求線程管理器等待其它線程,根據(jù)在ACE中設(shè)定的語(yǔ)義,一旦ACE_Task(在此例中是ACE_Task類型的ACE_Svc_Handler)中的線程終止,ACE_Task對(duì)象(在此例中是ACE_My_Svc_Handler)就會(huì)被自動(dòng)刪除。如果發(fā)生了這種情況,在Cache_Connect_Strategy查找先前緩存的連接時(shí),它就不會(huì)如我們期望的那樣找到My_Svc_Handler,因?yàn)樗呀?jīng)被刪除掉了。

                  在

                My_Svc_Handler中還重載了ACE_Svc_Handler中的recycle()方法。當(dāng)有舊連接被ACE_Cache_Connect_Strategy找到時(shí),這個(gè)方法就會(huì)被自動(dòng)回調(diào),這樣服務(wù)處理器就可以在此方法中完成回收利用所特有的操作。在我們的例子中,我們只是打印出在緩存中找到的處理器的this指針的地址。在程序運(yùn)行時(shí),每次連接建立后所使用的句柄的地址是相同的,從而說(shuō)明緩存工作正常。

                 

                7.6

                通過(guò)接受器和連接器模式使用簡(jiǎn)單事件處理器

                  有時(shí), ,使用重量級(jí)的

                ACE_Svc_Handler作為接受器和連接器的處理器不僅沒(méi)有必要,而且會(huì)導(dǎo)致代碼臃腫。在這樣情況下,用戶可以使用較輕的ACE_Event_Handler方法來(lái)作為反應(yīng)堆在連接一旦建立時(shí)所回調(diào)的類。要采用這種方法,程序員需要重載get_handle()方法,并包含將要用于事件處理器的具體底層流。下面的例子有助于演示這些變動(dòng)。這里我們還編寫(xiě)了新的peer()方法,它返回底層流的引用(reference),就像在ACE_Svc_Handler類中所做的那樣。

                 

                7-10

                #include ”ace/Reactor.h”

                #include ”ace/Svc_Handler.h”

                #include ”ace/Acceptor.h”

                #include ”ace/Synch.h”

                #include ”ace/SOCK_Acceptor.h”

                 

                #define PORT_NUM 10101

                #define DATA_SIZE 12

                 

                //forward declaration

                class My_Event_Handler;

                 

                //Create the Acceptor class

                typedef ACE_Acceptor<My_Event_Handler,ACE_SOCK_ACCEPTOR>

                MyAcceptor;

                 

                //Create an event handler similar to as seen in example 2. We have to

                //overload the get_handle() method and write the peer() method. We also

                //provide the data member peer_ as the underlying stream which is

                //used.

                class My_Event_Handler: public ACE_Event_Handler

                {

                private:

                char* data;

                 

                //Add a new attribute for the underlying stream which will be used by

                //the Event Handler

                ACE_SOCK_Stream peer_;

                 

                public:

                My_Event_Handler()

                {

                data= new char[DATA_SIZE];

                }

                 

                int open(void*)

                {

                cout<<”Connection established”<<endl;

                 

                //Register the event handler with the reactor

                ACE_Reactor::instance()->register_handler(this,

                ACE_Event_Handler::READ_MASK);

                 

                return 0;

                }

                 

                int handle_input(ACE_HANDLE)

                {

                // After using the peer() method of our ACE_Event_Handler to obtain a

                //reference to the underlying stream of the service handler class we

                //call recv_n() on it to read the data which has been received. This

                //data is stored in the data array and then printed out

                peer().recv_n(data,DATA_SIZE);

                ACE_OS::printf(”<< %s\n”,data);

                 

                // keep yourself registered with the reactor

                return 0;

                }

                 

                // new method which returns the handle to the reactor when it

                //asks for it.

                ACE_HANDLE get_handle(void) const

                {

                return this->peer_.get_handle();

                }

                 

                //new method which returns a reference to the peer stream

                ACE_SOCK_Stream &peer(void) const

                {

                return (ACE_SOCK_Stream &) this->peer_;

                }

                };

                 

                int main(int argc, char* argv[])

                {

                ACE_INET_Addr addr(PORT_NUM);

                 

                //create the acceptor

                MyAcceptor acceptor(addr, //address to accept on

                ACE_Reactor::instance()); //the reactor to use

                 

                while(1) /* Start the reactor’s event loop */

                ACE_Reactor::instance()->handle_events();

                }

                文轉(zhuǎn)載至ACE開(kāi)發(fā)者/FONT>
                posted on 2007-02-27 21:40 walkspeed 閱讀(7256) 評(píng)論(1)  編輯 收藏 引用 所屬分類: ACE Farmeworks

                FeedBack:
                # re: ACE的接受器(Acceptor)和連接器(Connector):連接建立模式
                2007-09-10 15:16 | 李奕濤
                什么都可以
                  回復(fù)  更多評(píng)論
                  

                <2009年9月>
                303112345
                6789101112
                13141516171819
                20212223242526
                27282930123
                45678910

                常用鏈接

                留言簿(4)

                隨筆分類(64)

                隨筆檔案(58)

                文章分類(3)

                文章檔案(3)

                相冊(cè)

                收藏夾(9)

                C++零碎

                好友

                搜索

                •  

                積分與排名

                • 積分 - 161397
                • 排名 - 163

                最新評(píng)論

                閱讀排行榜

                評(píng)論排行榜

                久久亚洲精品国产亚洲老地址| 亚洲精品无码久久久久| 久久香蕉国产线看观看99| 日产久久强奸免费的看| 久久亚洲综合色一区二区三区 | 囯产精品久久久久久久久蜜桃 | 人人狠狠综合久久亚洲| 久久精品中文闷骚内射| 女同久久| 久久久久亚洲AV无码网站| 色综合久久精品中文字幕首页| 香蕉久久夜色精品升级完成| 97久久久精品综合88久久| 久久久青草青青国产亚洲免观| 精品国产热久久久福利| 国产精品久久久久久久久免费| 午夜视频久久久久一区| 99久久精品国产综合一区| 久久夜色精品国产亚洲| 26uuu久久五月天| 国内精品欧美久久精品| AAA级久久久精品无码区| 亚洲午夜久久影院| 99久久精品无码一区二区毛片 | 成人国内精品久久久久一区| 精品国产乱码久久久久久郑州公司 | 久久大香香蕉国产| 久久婷婷五月综合97色直播| 亚洲国产成人精品女人久久久| 99国内精品久久久久久久| 国产精品久久久久久久午夜片| 久久国产V一级毛多内射| 欧美国产精品久久高清| 久久亚洲AV成人无码软件| 亚洲精品乱码久久久久久久久久久久| 亚洲va久久久噜噜噜久久男同 | 色8激情欧美成人久久综合电| 伊人久久一区二区三区无码| 99久久99久久精品国产片果冻| 国产成年无码久久久久毛片| 精品免费久久久久国产一区|