• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>

            Networking /C++/Linux

              C++博客 :: 首頁 :: 聯(lián)系 :: 聚合  :: 管理
              11 Posts :: 14 Stories :: 1 Comments :: 0 Trackbacks

            常用鏈接

            留言簿(4)

            我參與的團(tuán)隊

            搜索

            •  

            最新評論

            閱讀排行榜

            評論排行榜

             這篇文章介紹下libevent在socket異步編程中的應(yīng)用。在一些對性能要求較高的網(wǎng)絡(luò)應(yīng)用程序中,為了防止程序阻塞在socket I/O操作上造成程序性能的下降,需要使用異步編程,即程序準(zhǔn)備好讀寫的函數(shù)(或接口)并向系統(tǒng)注冊,然后在需要的時候只向系統(tǒng)提交讀寫的請求之后就繼續(xù)做自己的事情,實際的讀寫操作由系統(tǒng)在合適的時候調(diào)用我們程序注冊的接口進(jìn)行。異步編程會給一些程序猿帶來一些理解和編寫上的困難,因為我們通常寫的一些簡單的程序都是順序執(zhí)行的,而異步編程將程序的執(zhí)行順序打亂了,有些代碼什么情況下執(zhí)行往往不是太清晰,因此也使得編程的復(fù)雜度大大增加。

                Note:這里系統(tǒng)這個詞使用的不準(zhǔn)確,實際上可以是自己封裝的異步調(diào)用機制,更常見的是一些可用的庫,比如libevent,ACE等

             

                想了解libevent的工作原理可以自行查詢資料,網(wǎng)上相關(guān)的介紹一大堆,也可以自己閱讀源碼進(jìn)行分析,本文僅從使用的角度做一個簡單的介紹,看如何快速的將libevent引入我們的程序中。任何應(yīng)用都免不了需要承載其功能的底層OS,libevent也不例外,其內(nèi)部是通過封裝操作系統(tǒng)的IO復(fù)用機制實現(xiàn)的,在linux系統(tǒng)上可能是epoll、kqueu之類的,取決于具體的OS所支持的IO復(fù)用方式,在我的系統(tǒng)上是epoll,因此可以理解為libevent提供了一個比epoll更為友好的操作接口,將程序猿從網(wǎng)絡(luò)IO處理的細(xì)節(jié)中解放出來,使其可以專注于目標(biāo)問題的處理上。

             

                首先,安裝libevent到任意目錄下

            wget http://monkey.org/~provos/libevent-1.4.13-stable.tar.gz
            tar –xzvf libevent-1.4.13-stable.tar.gz
            cd libevent-1.4.13-stable
            ./configure --prefix=/home/mydir/libevent
            make && make install

             

                現(xiàn)在假定我們要設(shè)計一個服務(wù)器程序,用于接收客戶端的數(shù)據(jù),并將接收的數(shù)據(jù)回寫給客戶端。下面來構(gòu)造該程序,由于本僅僅是展示一個Demo,因此程序中將不對錯誤進(jìn)行處理,假設(shè)所有的調(diào)用都成功

            2 #define PORT 25341
            3 #define BACKLOG 5
            4 #define MEM_SIZE 1024
            5 
            6 struct event_base* base;
            7 
            8 int main(int argc, char* argv[])
            9 {
            10     struct sockaddr_in my_addr;
            11     int sock;
            12 
            13     sock = socket(AF_INET, SOCK_STREAM, 0); 
            14     int yes = 1;
            15     setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
            16     memset(&my_addr, 0sizeof(my_addr));
            17     my_addr.sin_family = AF_INET;
            18     my_addr.sin_port = htons(PORT);
            19     my_addr.sin_addr.s_addr = INADDR_ANY;
            20     bind(sock, (struct sockaddr*)&my_addr, sizeof(struct sockaddr));
            21     listen(sock, BACKLOG);
            22 
            23     struct event listen_ev;
            24     base = event_base_new();
            25     event_set(&listen_ev, sock, EV_READ|EV_PERSIST, on_accept, NULL);
            26     event_base_set(base&listen_ev);
            27     event_add(&listen_ev, NULL);
            28     event_base_dispatch(base);
            29 
            30     return 0;
            31 }

                第13行說明創(chuàng)建的是一個TCP socket。第15行是服務(wù)器程序的通常做法,設(shè)置了該選項后,在父子進(jìn)程模型中,當(dāng)子進(jìn)程為客戶服務(wù)的時候如果父進(jìn)程退出,可以重新啟動程序完成服務(wù)的無縫升級,否則在所有父子進(jìn)程完全退出前再啟動程序會在該端口上綁定失敗,也即不能完成無縫升級的操作(更多信息可以參考該函數(shù)說明或Steven先生的<網(wǎng)絡(luò)編程>)。第24行用于創(chuàng)建一個事件處理的全局變量,可以理解為這是一個負(fù)責(zé)集中處理各種出入IO事件的總管家,它負(fù)責(zé)接收和派發(fā)所有輸入輸出IO事件的信息,這里調(diào)用的是函數(shù)event_base_new(), 很多程序里這里用的是event_init(),區(qū)別就是前者是線程安全的、而后者是非線程安全的,后者在其官方說明中已經(jīng)被標(biāo)志為過時的函數(shù)、且建議用前者代替,libevent中還有很多類似的函數(shù),比如建議用event_base_dispatch代替event_dispatch,用event_assign代替event_set和event_base_set等,關(guān)于libevent接口的詳細(xì)說明見其官方說明libevent_doc. 第25行說明在listen_en這個事件監(jiān)聽sock這個描述字的讀操作,當(dāng)讀消息到達(dá)是調(diào)用on_accept函數(shù),EV_PERSIST參數(shù)告訴系統(tǒng)持續(xù)的監(jiān)聽sock上的讀事件,如果不加該參數(shù),每次要監(jiān)聽該事件時就要重復(fù)的調(diào)用26行的event_add函數(shù),從前面的代碼可知,sock這個描述字是bind到本地的socket端口上,因此其對應(yīng)的可讀事件自然就是來自客戶端的連接到達(dá),我們就可以調(diào)用accept無阻塞的返回客戶的連接了。第26行將listen_ev注冊到base這個事件中,相當(dāng)于告訴處理IO的管家請留意我的listen_ev上的事件。第27行相當(dāng)于告訴處理IO的管家,當(dāng)有我的事件到達(dá)時你發(fā)給我(調(diào)用on_accept函數(shù)),至此對listen_ev的初始化完畢。第28行正式啟動libevent的事件處理機制,使系統(tǒng)運行起來,運行程序的話會發(fā)現(xiàn)event_base_dispatch是一個無限循環(huán)。

             

            下面是on_accept函數(shù)的內(nèi)容

               1: void on_accept(int sock, short event, void* arg)
               2: {
               3:     struct sockaddr_in cli_addr;
               4:     int newfd, sin_size;
               5:     // read_ev must allocate from heap memory, otherwise the program would crash from segmant fault
               6:     struct event* read_ev = (struct event*)malloc(sizeof(struct event));;
               7:     sin_size = sizeof(struct sockaddr_in);
               8:     newfd = accept(sock, (struct sockaddr*)&cli_addr, &sin_size);
               9:     event_set(read_ev, newfd, EV_READ|EV_PERSIST, on_read, read_ev);
              10:     event_base_set(base, read_ev);
              11:     event_add(read_ev, NULL);
              12: } 

                第9-12與前面main函數(shù)的24-26相同,即在代表客戶的描述字newfd上監(jiān)聽可讀事件,當(dāng)有數(shù)據(jù)到達(dá)是調(diào)用on_read函數(shù)。這里有亮點需要注意,一是read_ev需要從堆里malloc出來,如果是在棧上分配,那么當(dāng)函數(shù)返回時變量占用的內(nèi)存會被釋放,因此事件主循環(huán)event_base_dispatch會訪問無效的內(nèi)存而導(dǎo)致進(jìn)程崩潰(即crash);第二個要注意的是第9行read_ev作為參數(shù)傳遞給了on_read函數(shù)。

             

            下面是on_read函數(shù)的內(nèi)容

               1: void on_read(int sock, short event, void* arg)
               2: {
               3:     struct event* write_ev;
               4:     int size;
               5:     char* buffer = (char*)malloc(MEM_SIZE);
               6:     bzero(buffer, MEM_SIZE);
               7:     size = recv(sock, buffer, MEM_SIZE, 0);
               8:     printf("receive data:%s, size:%d\n", buffer, size);
               9:     if (size == 0) {
              10:         event_del((struct event*)arg);
              11:         free((struct event*)arg);
              12:         close(sock);
              13:         return;
              14:     }
              15:     write_ev = (struct event*) malloc(sizeof(struct event));;
              16:     event_set(write_ev, sock, EV_WRITE, on_write, buffer);
              17:     event_base_set(base, write_ev);
              18:     event_add(write_ev, NULL);
              19: }

                第9行,當(dāng)從socket讀返回0標(biāo)志對方已經(jīng)關(guān)閉了連接,因此這個時候就沒必要繼續(xù)監(jiān)聽該套接口上的事件,由于EV_READ在on_accept函數(shù)里是用EV_PERSIST參數(shù)注冊的,因此要顯示的調(diào)用event_del函數(shù)取消對該事件的監(jiān)聽。第18-21行與on_accept函數(shù)的6-11行類似,當(dāng)可寫時調(diào)用on_write函數(shù),注意第19行將buffer作為參數(shù)傳遞給了on_write。這段程序還有比較嚴(yán)重的問題,后面進(jìn)行說明。

             

            on_write函數(shù)的實現(xiàn)

            1 void on_write(int sock, short eventvoid* arg)
            2 {
            3     char* buffer = (char*)arg;
            4     send(sock, buffer, strlen(buffer), 0); 
            5 
            6     free(buffer);

            7 } 

                 on_write函數(shù)中向客戶端回寫數(shù)據(jù),然后釋放on_read函數(shù)中malloc出來的buffer。在很多書合編程指導(dǎo)中都很強調(diào)資源的所有權(quán),經(jīng)常要求誰分配資源、就由誰釋放資源,這樣對資源的管理指責(zé)就更明確,不容易出問題,但是通過該例子我們發(fā)現(xiàn)在異步編程中資源的分配與釋放往往是由不同的所有者操作的,因此也是比較容易出問題的地方。

             

                其實在on_read函數(shù)中從socket讀取數(shù)據(jù)后程序就可以直接調(diào)用write/send接口向客戶回寫數(shù)據(jù)了,因為寫事件已經(jīng)滿足,不存在異步不異步的問題,這里進(jìn)行on_write的異步操作僅僅是為了說明異步編程中資源的管理與釋放的問題,另外一方面,直接調(diào)用write/send函數(shù)向客戶端寫數(shù)據(jù)可能導(dǎo)致程序較長時間阻塞在IO操作上,比如socket的輸出緩沖區(qū)已滿,則write/send操作阻塞到有可用的緩沖區(qū)之后才能進(jìn)行實際的寫操作,而通過向?qū)懯录詏n_accept函數(shù),那么libevent會在合適的時間調(diào)用我們的callback函數(shù),(比如對于會引起IO阻塞的情況比如socket輸出緩沖區(qū)滿,則由libevent設(shè)計算法來處理,如此當(dāng)回調(diào)on_accept函數(shù)時我們在調(diào)用IO操作就不會發(fā)生真正的IO之外的阻塞)。注:前面括號中是我個人認(rèn)為一個庫應(yīng)該實現(xiàn)的功能,至于libevent是不是實現(xiàn)這樣的功能并不清楚也無意深究。

             

                再來看看前面提到的on_read函數(shù)中存在的問題,首先write_ev是動態(tài)分配的內(nèi)存,但是沒有釋放,因此存在內(nèi)存泄漏,另外,on_read中進(jìn)行malloc操作,那么當(dāng)多次調(diào)用該函數(shù)的時候就會造成內(nèi)存的多次泄漏。這里的解決方法是對socket的描述字可以封裝一個結(jié)構(gòu)體來保護(hù)讀、寫的事件以及數(shù)據(jù)緩沖區(qū),整理后的完整代碼如下

            #include <sys/socket.h>
            #include 
            <sys/types.h>
            #include 
            <netinet/in.h>
            #include 
            <stdio.h>

            #include 
            <event.h>


            #define PORT        25341
            #define BACKLOG     5
            #define MEM_SIZE    1024

            struct event_base* base;
            struct sock_ev {
                
            struct event* read_ev;
                
            struct event* write_ev;
                
            char* buffer;
            };

            void release_sock_event(struct sock_ev* ev)
            {
                event_del(ev
            ->read_ev);
                free(ev
            ->read_ev);
                free(ev
            ->write_ev);
                free(ev
            ->buffer);
                free(ev);
            }

            void on_write(int sock, short eventvoid* arg)
            {
                
            char* buffer = (char*)arg;
                send(sock, buffer, strlen(buffer), 
            0);

                free(buffer);
            }

            void on_read(int sock, short eventvoid* arg)
            {
                
            struct event* write_ev;
                
            int size;
                
            struct sock_ev* ev = (struct sock_ev*)arg;
                ev
            ->buffer = (char*)malloc(MEM_SIZE);
                bzero(ev
            ->buffer, MEM_SIZE);
                size 
            = recv(sock, ev->buffer, MEM_SIZE, 0);
                printf(
            "receive data:%s, size:%d\n", ev->buffer, size);
                
            if (size == 0) {
                    release_sock_event(ev);
                    close(sock);
                    
            return;
                }
                event_set(ev
            ->write_ev, sock, EV_WRITE, on_write, ev->buffer);
                event_base_set(
            base, ev->write_ev);
                event_add(ev
            ->write_ev, NULL);
            }

            void on_accept(int sock, short eventvoid* arg)
            {
                
            struct sockaddr_in cli_addr;
                
            int newfd, sin_size;
                
            struct sock_ev* ev = (struct sock_ev*)malloc(sizeof(struct sock_ev));
                ev
            ->read_ev = (struct event*)malloc(sizeof(struct event));
                ev
            ->write_ev = (struct event*)malloc(sizeof(struct event));
                sin_size 
            = sizeof(struct sockaddr_in);
                newfd 
            = accept(sock, (struct sockaddr*)&cli_addr, &sin_size);
                event_set(ev
            ->read_ev, newfd, EV_READ|EV_PERSIST, on_read, ev);
                event_base_set(
            base, ev->read_ev);
                event_add(ev
            ->read_ev, NULL);
            }

            int main(int argc, char* argv[])
            {
                
            struct sockaddr_in my_addr;
                
            int sock;

                sock 
            = socket(AF_INET, SOCK_STREAM, 0);
                
            int yes = 1;
                setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 
            &yes, sizeof(int));
                memset(
            &my_addr, 0sizeof(my_addr));
                my_addr.sin_family 
            = AF_INET;
                my_addr.sin_port 
            = htons(PORT);
                my_addr.sin_addr.s_addr 
            = INADDR_ANY;
                bind(sock, (
            struct sockaddr*)&my_addr, sizeof(struct sockaddr));
                listen(sock, BACKLOG);

                
            struct event listen_ev;
                
            base = event_base_new();
                event_set(
            &listen_ev, sock, EV_READ|EV_PERSIST, on_accept, NULL);
                event_base_set(
            base&listen_ev);
                event_add(
            &listen_ev, NULL);
                event_base_dispatch(
            base);

                
            return 0;

            }

             

                程序編譯的時候要加 -levent 連接選項,以連接libevent的共享庫,但是執(zhí)行的時候依然爆出如下錯誤:error while loading shared libraries: libevent-1.4.so.2: cannot open shared object file: No such file or directory, 這個是程序找不到共享庫的位置,通過執(zhí)行echo $LD_LIBRARY_PATH可以看到系統(tǒng)庫的環(huán)境變量里沒有我們安裝的路徑,即由--prefix制定的路徑,執(zhí)行export LD_LIBRARY_PATH=/home/mydir/libevent/lib/:$LD_LIBRARY_PATH將該路徑加入系統(tǒng)環(huán)境變量里,再執(zhí)行程序就可以了。

            轉(zhuǎn)自:http://www.cnblogs.com/cnspace/archive/2011/07/19/2110891.html

            posted on 2011-12-27 14:20 likun 閱讀(7743) 評論(1)  編輯 收藏 引用 所屬分類: C/C++Linux environment programming

            Feedback

            # re: libevent example 2013-05-12 22:50 n?w?e?m?t?f
            buffer被釋放了兩次!  回復(fù)  更多評論
              

            久久久久久国产a免费观看黄色大片 | 超级碰久久免费公开视频| 亚洲AV日韩精品久久久久久| 久久只有这精品99| 中文字幕乱码人妻无码久久| 久久精品国产第一区二区三区| 青青青伊人色综合久久| 久久久久国产一区二区三区| 精品久久亚洲中文无码| 亚洲国产精品久久久久久| 亚洲精品tv久久久久| 久久国产精品久久精品国产| 亚洲国产精品成人久久蜜臀 | 久久超碰97人人做人人爱| 日本免费一区二区久久人人澡 | 精品久久久久久久国产潘金莲| 69SEX久久精品国产麻豆| 久久精品国产精品亚洲艾草网美妙 | 亚洲精品成人网久久久久久| 久久国产精品77777| 亚洲国产精品无码久久久蜜芽| 国产精品久久久久久久午夜片| 欧洲成人午夜精品无码区久久| 久久天天躁狠狠躁夜夜av浪潮| 久久久久成人精品无码中文字幕 | 香蕉久久永久视频| 久久美女人爽女人爽| 久久青青草原精品国产| 久久无码AV中文出轨人妻| 久久婷婷五月综合成人D啪| 国产福利电影一区二区三区久久老子无码午夜伦不 | 久久婷婷是五月综合色狠狠| 久久精品国产亚洲av瑜伽| 久久免费高清视频| 久久久久久久尹人综合网亚洲| 久久久噜噜噜久久中文福利| 色诱久久久久综合网ywww| 7777久久久国产精品消防器材| 久久只有这精品99| 亚洲精品国产字幕久久不卡| 亚洲欧美日韩久久精品第一区|