• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>
            posts - 297,  comments - 15,  trackbacks - 0
            epoll,event poll,on linux kernel 2.6.x.pthread,nptl-2.12
               LT/ET:ET也會(huì)多次發(fā)送event,當(dāng)然頻率遠(yuǎn)低于LT,但是epoll one shot才是真正的對(duì)"one connection VS one thread in worker thread pool,不依賴(lài)于任何connection-data-queue"的基礎(chǔ)支持 .我看到大部分對(duì)epoll_wait的處理模式如下,很教科化,因?yàn)閙an-pages就是這樣舉例子的。
            man-pages epoll_wait handle:
            #define MAX_EVENTS 10
            struct epoll_event events[MAX_EVENTS];
            for (;;) 
            {
               nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
               for(1~nfds)
                   handle events[i];
               ......
            }
            epoll_ctl_add的當(dāng)然是EPOLLIN|EPOLLET,此外我就不知道處理上面代碼的是一條線程還是一堆線程(in threadpool),但愿不是一條線程吧!如果是的話,難不成等處理完這MAX_EVENTS 個(gè)再去處理接下來(lái)的MAX_EVENTS 個(gè)?慢否? 但是如果是一堆線程的話,你是否考慮過(guò)如何處理request-data in one connection在邏輯上的完整性,也就是一個(gè)request-data-packet可能會(huì)被分割成若干次發(fā)送,在上面的處理模式中你真的要好好設(shè)計(jì)一下了。
            而我的epoll_wait處理模式如下:
            struct epoll_event activeEvent;
            for(;;)
            {
               epoll_wait(epollfd, &activeEvent, 1/*很驚訝嗎,但絕不是一條線程在運(yùn)行這段代碼,而是一堆*/, timeout);
               if handle activeEvent success
                  epoll_ctl_mod EPOLLIN|EPOLLET|EPOLLONESHOT
              ......
            }
            處理上面代碼的當(dāng)然是一堆線程in threadpool,而且epoll_ctl_add的是EPOLLIN|EPOLLET|EPOLLONESHOT
            因?yàn)槲业脑O(shè)計(jì)理念是嚴(yán)格遵守one connection VS one thread in worker thread pool。
            所以我下面的server框架的基本模型是:
            One connection VS one thread in worker thread pool ,worker thread performs epollWorkerRoutine.
            在epollWorkerRoutine中有以下的職責(zé):
            1.handle request,當(dāng)忙時(shí)增加epollWorkerThread數(shù)量但不超過(guò)maxThreads,post/MOD EPOLLIN|EPOLLONESHOT Interested Event to epoll.
            2.timeout時(shí)檢查是否空閑和當(dāng)前epollWorkerThread數(shù)量,當(dāng)空閑時(shí)保持或減少至minThreads數(shù)量.
            3.對(duì)所有Accepted-socket管理生命周期,這里利用系統(tǒng)的keepalive probes,若想實(shí)現(xiàn)業(yè)務(wù)層"心跳探測(cè)"只需將QSS_SIO_KEEPALIVE_VALS_TIMEOUT 改回系統(tǒng)默認(rèn)的2小時(shí).這里并不維護(hù)所有連接列表,當(dāng)然你可以在/proc/getpid/fd下找到所有的socket fd.
            4.linux上的non-blocking socket的操作仍然依賴(lài)recv,send,不像windows上的wsarecv+overlapped,即便不用fcntl fd o_nonblock也可以立即返回。我這里把send動(dòng)作實(shí)現(xiàn)成了blocking的(internalBlockingSender),同樣的道理,non-blocking send依然會(huì)形成響應(yīng)數(shù)據(jù)在邏輯上的碎片錯(cuò)序,特別是你如果采用上面那個(gè)教科化的處理模式的化,并且還是多線程的話,那么這簡(jiǎn)直就亂透了。當(dāng)然你可以使用response-data-queue來(lái)達(dá)到異步準(zhǔn)確發(fā)送數(shù)據(jù)的目的。

            下面結(jié)合源碼,淺析一下epoll programming:
            socketserver.h
            #ifndef __Q_SOCKET_SERVER__
            #define __Q_SOCKET_SERVER__
            #include <errno.h>
            #include <sys/socket.h>
            #include <netinet/in.h>
            #include <netinet/tcp.h>
            #include <arpa/inet.h>
            #include <sys/types.h>
            #include <string.h>
            #include <sys/epoll.h>
            #include <pthread.h>
            #include <unistd.h>
            #include <fcntl.h>
            #include <stdio.h>
            #include <stdlib.h>
            #include <time.h>
            #define SOCKET_ERROR -1
            #define INVALID_SOCKET -1
            typedef int SOCKET;
            typedef struct sockaddr_in SOCKADDR_IN;
            typedef unsigned short WORD;
            typedef unsigned int DWORD;

            #define QSS_SIO_KEEPALIVE_VALS_TIMEOUT 30*60
            #define QSS_SIO_KEEPALIVE_VALS_INTERVAL 5
            #define QSS_SIO_KEEPALIVE_VALS_COUNT 3
            #define MAX_THREADS 100
            #define MAX_THREADS_MIN  10
            #define MIN_WORKER_WAIT_TIMEOUT  20*1000
            #define MAX_WORKER_WAIT_TIMEOUT  60*MIN_WORKER_WAIT_TIMEOUT
            #define MAX_THREADPOOLS  32

            #define MAX_BUF_SIZE 1024
            /* ulimit -n opened FDs per process.記得修改哦,否則還是select效果,就不是epoll效果了哦,呵呵*/
            #define BLOCKING_SEND_TIMEOUT 20

            typedef void (*CSocketLifecycleCallback)(int cs,int lifecycle);//lifecycle:0:OnAccepted,-1:OnClose
            typedef int (*BlockingSender_t)(void * senderBase,int cs, void * buf, size_t nbs);
            typedef int (*InternalProtocolHandler)(struct epoll_event * event,BlockingSender_t _blockingSender,void * senderBase);//return -1:SOCKET_ERROR

            typedef struct {
              WORD passive;
              WORD port;//uint16_t
              WORD minThreads;
              WORD maxThreads;
              pthread_spinlock_t g_spinlock;//PTHREAD_PROCESS_PRIVATE
              volatile int lifecycleStatus;//0-created,1-starting, 2-running,3-stopping,4-exitSignaled,5-stopped
              int  workerWaitTimeout;//wait timeout
              volatile int workerCounter;
              volatile int currentBusyWorkers;
              volatile int CSocketsCounter;
              CSocketLifecycleCallback cslifecb;
              InternalProtocolHandler protoHandler;
              SOCKET server_s;
              SOCKADDR_IN serv_addr;
              int epollFD;//main epoller.
              int BSendEpollFD;//For blocking send.
            }QSocketServer;

            typedef struct {
              SOCKET client_s;
              SOCKADDR_IN client_addr;
              uint32_t curEvents;

              char buf[MAX_BUF_SIZE];
              DWORD numberOfBytesTransferred;
              char * data;

              int BSendEpollFDRelated;
              pthread_mutex_t writableLock;
              pthread_cond_t  writableMonitor;
            }QSSEPollEvent;//for per connection

            int createSocketServer(QSocketServer ** qss_ptr,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,int workerWaitTimeout);
            int startSocketServer(QSocketServer *qss);
            int shutdownSocketServer(QSocketServer *qss);
            #endif

            qsocketserver_model.c  //下面的代碼離生產(chǎn)環(huán)境還差內(nèi)存池logger哦!
             #include "socketserver.h"
            #include <dirent.h>
            #include <regex.h>
            #define DIGIT_PATTERN_STRING "^[0-9]+$"
            void *  epollWorkerRoutine(void *);
            void *  blockingSendEpollerRoutine(void *);
            int isDigitStr(const char *str){
                int ret=-1;
                regex_t regex;
                regmatch_t matchs[1];
                if(!regcomp(&regex,DIGIT_PATTERN_STRING,REG_EXTENDED/*這里不要傳0哦,否則nomatch*/)){
                    ret=!regexec(&regex,str, 1,matchs,0);
                    regfree(&regex);
                }
                return ret;
            }

            static int setNonBlocking(int sock)
            {
                int opts;
                opts=fcntl(sock,F_GETFL);
                if(opts==-1)
                {
                    perror("fcntl(sock,GETFL) failed!\n");
                    return opts;
                }
                opts = opts|O_NONBLOCK;
                opts=fcntl(sock,F_SETFL,opts);
                if(opts==-1)
                {
                    perror("fcntl(sock,SETFL,opts) failed!\n");
                    return opts;
                }
                return 1;
            }

            static void adjustQSSWorkerLimits(QSocketServer *qss){
               //to adjust availabe size.
            }
            typedef struct{
             QSocketServer * qss;
             pthread_t th;
            }QSSWORKER_PARAM;

            static WORD addQSSWorker(QSocketServer *qss,WORD addCounter){
             WORD res=0;
             if(qss->workerCounter<qss->minThreads||(qss->currentBusyWorkers==qss->workerCounter&&qss->workerCounter<qss->maxThreads))
             {
              QSSWORKER_PARAM * pParam=NULL;
              int i=0;
              pthread_spin_lock(&qss->g_spinlock);
              if(qss->workerCounter+addCounter<=qss->maxThreads)
               for(;i<addCounter;i++)
               {
                pParam=malloc(sizeof(QSSWORKER_PARAM));

                if(pParam){
                 pthread_create(&pParam->th,NULL,epollWorkerRoutine,pParam);
                 pParam->qss=qss;
                 qss->workerCounter++,res++;
                }
               }
              pthread_spin_unlock(&qss->g_spinlock);
             }
             return res;
            }

            static void SOlogger(const char * msg,SOCKET s){
             perror(msg);
                if(s>0)
                close(s);
            }

            static int _InternalProtocolHandler(struct epoll_event * event,BlockingSender_t _blockingSender,void * senderBase){
                QSSEPollEvent *qssEPEvent=event->data.ptr;
                int ret;
                printf("_InternalProtocolHandler START pollRes==1,err:%d, ...cs:%d,,,,,th:%lu,\n",errno,qssEPEvent->client_s,pthread_self());
                if((ret=recv(qssEPEvent->client_s,qssEPEvent->buf,MAX_BUF_SIZE,0))>0){
                    //sleep(5);
                    ret=_blockingSender(senderBase,qssEPEvent->client_s,qssEPEvent->buf,ret);
                }
                 printf("_InternalProtocolHandler END ret=%d,err:%d,%s, ...cs:%d,,,,,th:%lu,\n",ret,errno,strerror(errno),qssEPEvent->client_s,pthread_self());
             return ret;
            }

            int createSocketServer(QSocketServer ** qss_ptr,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,int workerWaitTimeout)
            {

             QSocketServer * qss=malloc(sizeof(QSocketServer));
             qss->passive=passive;
             qss->port=port;
             qss->minThreads=minThreads;
             qss->maxThreads=maxThreads;
             qss->workerWaitTimeout=workerWaitTimeout;
             qss->lifecycleStatus=0;
             pthread_spin_init(&qss->g_spinlock,PTHREAD_PROCESS_PRIVATE);
             qss->workerCounter=0;
             qss->currentBusyWorkers=0;
             qss->CSocketsCounter=0;
             qss->cslifecb=cslifecb,qss->protoHandler=protoHandler;
             if(!qss->protoHandler)
              qss->protoHandler=_InternalProtocolHandler;
             adjustQSSWorkerLimits(qss);
             *qss_ptr=qss;
             return 1;
            }

            int startSocketServer(QSocketServer *qss)
            {
                if(qss==NULL)
                    return 0;
                else{
                    pthread_spin_lock(&qss->g_spinlock);
                    if(qss->lifecycleStatus==0){
                        qss->lifecycleStatus=1;
                        pthread_spin_unlock(&qss->g_spinlock);
                    }else{
                        pthread_spin_unlock(&qss->g_spinlock);
                        return 0;
                    }
                }
                //bzero(&qss->serv_addr, sizeof(qss->serv_addr));

             qss->serv_addr.sin_family=AF_INET;
             qss->serv_addr.sin_port=htons(qss->port);
             inet_aton("127.0.0.1",&(qss->serv_addr.sin_addr));
             //qss->serv_addr.sin_addr.s_addr=INADDR_ANY;//inet_addr("127.0.0.1");

             qss->server_s=socket(AF_INET,SOCK_STREAM,IPPROTO_IP);
             if(setNonBlocking(qss->server_s)==-1)
             {
                SOlogger("setNonBlocking server_s failed.\n",0);
                 return 0;
             }

             if(qss->server_s==INVALID_SOCKET)
             {
              SOlogger("socket failed.\n",0);
              return 0;
             }

             if(bind(qss->server_s,(struct sockaddr *)&qss->serv_addr,sizeof(SOCKADDR_IN))==SOCKET_ERROR)
             {
              SOlogger("bind failed.\n",qss->server_s);
              return 0;
             }

             if(listen(qss->server_s,SOMAXCONN/*這個(gè)宏windows也有,這里是128,當(dāng)然你可以設(shè)的小些,它影響開(kāi)銷(xiāo)的*/)==SOCKET_ERROR)
            {
              SOlogger("listen failed.\n",qss->server_s);
              return 0;
            }
                qss->epollFD=epoll_create1(0);/*這里不是epoll_create(size)哦,你可能不知道如何設(shè)置size,所以忽略它吧*/
                if(qss->epollFD==-1){
                    SOlogger("epoll_create1 0, main epollFD  failed.\n",qss->server_s);
                    return 0;
                }
                qss->BSendEpollFD=epoll_create1(0);//for blocking send.
                if(qss->BSendEpollFD==-1){
                    SOlogger("epoll_create1 0,BSendEpollFD failed.\n",qss->server_s);
                    return 0;
                }

                {//ADD ACCEPT EVENT
                    struct epoll_event _epEvent;
                    QSSEPollEvent *qssEPEvent=malloc(sizeof(QSSEPollEvent));
                    qssEPEvent->client_s=qss->server_s;
                    _epEvent.events=qssEPEvent->curEvents=EPOLLIN|EPOLLET;
                    _epEvent.data.ptr=qssEPEvent;
                    if(epoll_ctl(qss->epollFD,EPOLL_CTL_ADD,qss->server_s,&_epEvent)==-1){
                        SOlogger("epoll_ctl server_s to accept failed.\n",qss->server_s);
                        free(qssEPEvent);
                        return 0;
                    }
                }
                {//starup blocking send epoller.
                    QSSWORKER_PARAM * pParam=malloc(sizeof(QSSWORKER_PARAM));
                    pParam->qss=qss;
                    pthread_create(&pParam->th,NULL,blockingSendEpollerRoutine,pParam);
                }

             //initialize worker for epoll events.
             addQSSWorker(qss,qss->minThreads);
             qss->lifecycleStatus=2;
             return 1;
            }

            int shutdownSocketServer(QSocketServer *qss){
                //change qss->lifecycleStatus
                if(qss==NULL)
                    return 0;
                else{
                    pthread_spin_lock(&qss->g_spinlock);
                    if(qss->lifecycleStatus==2){
                        qss->lifecycleStatus=3;
                        pthread_spin_unlock(&qss->g_spinlock);
                    }else{
                        pthread_spin_unlock(&qss->g_spinlock);
                        return 0;
                    }
                }
                /*shutdown server-listening socket,這里優(yōu)雅的做法是shutdown--notify-->epoll-->close.記得shutdown會(huì)發(fā)送EOF的哦*/
                shutdown(qss->server_s,SHUT_RDWR);

                // /proc/getpid/fd  shutdown all socket cs != serv_s
                {
                    char dirBuf[64];
                    struct dirent * de;
                    DIR *pd=NULL;
                    int sockFD;
                    sprintf(dirBuf,"/proc/%d/fd/",getpid());
                    pd=opendir(dirBuf);
                    if(pd!=NULL){
                        while((de=readdir(pd))!=NULL){
                            if(isDigitStr(de->d_name)){
                                sockFD=atoi(de->d_name);
                                if(isfdtype(sockFD,S_IFSOCK))
                                shutdown(sockFD,SHUT_RDWR);
                            }
                        }
                        closedir(pd);
                    }
                    /*fstat(ret,&_stat);S_ISSOCK(_stat.st_mode)======isfdtype(sockFD,S_IFSOCK)*/
                }
             return 1;
            }

            static int onAcceptRoutine(QSocketServer * qss)
            {
                SOCKADDR_IN client_addr;
                unsigned int client_addr_leng=sizeof(SOCKADDR_IN);
                SOCKET cs;
                struct epoll_event _epEvent;
                QSSEPollEvent *qssEPEvent=NULL;
                cs=accept(qss->server_s,(struct sockaddr *)&client_addr,&client_addr_leng);
                if(cs==INVALID_SOCKET)
                {
                    printf("onAccept failed:%d,%s\n",errno,strerror(errno));
                    epoll_ctl(qss->epollFD,EPOLL_CTL_DEL,qss->server_s,NULL);//EINVAL 22  Invalid argument
                    return 0;
                }
                if(setNonBlocking(cs)==-1)
                {
                    printf("onAccept setNonBlocking client_s failed.cs:%d\n",cs);
                    return 0;
                }

                {// set keepalive option
                    int keepAlive = 1;
                    int keepIdle = QSS_SIO_KEEPALIVE_VALS_TIMEOUT;
                    int keepInterval = QSS_SIO_KEEPALIVE_VALS_INTERVAL;
                    int keepCount = QSS_SIO_KEEPALIVE_VALS_COUNT;
                    if(setsockopt(cs, SOL_SOCKET, SO_KEEPALIVE, (void *)&keepAlive, sizeof(keepAlive))||
                       setsockopt(cs, SOL_TCP, TCP_KEEPIDLE, (void *)&keepIdle, sizeof(keepIdle))||
                       setsockopt(cs, SOL_TCP, TCP_KEEPINTVL, (void *)&keepInterval, sizeof(keepInterval))||
                       setsockopt(cs, SOL_TCP, TCP_KEEPCNT, (void *)&keepCount, sizeof(keepCount)))
                       {
                        printf("onAccept set keepalive option client_s failed.cs:%d,err:%s\n",cs,strerror(errno));
                        return 0;
                       }
                }
                qssEPEvent=malloc(sizeof(QSSEPollEvent));
                qssEPEvent->client_s=cs;
                {
                    _epEvent.events=qssEPEvent->curEvents=EPOLLIN|EPOLLET|EPOLLONESHOT;
                    qssEPEvent->BSendEpollFDRelated=0;
                    _epEvent.data.ptr=qssEPEvent;/*這里又和教科的不一樣哦,真正的user data用ptr,而不是單一的fd*/
                    if(epoll_ctl(qss->epollFD,EPOLL_CTL_ADD,cs,&_epEvent)==-1){
                        printf("onAccept epoll_ctl client_s failed.cs:%d,err:%d\n",cs,errno);
                        free(qssEPEvent);
                        return 0;
                    }else{
                        pthread_spin_lock(&qss->g_spinlock);
                        qss->CSocketsCounter++;
                        pthread_spin_unlock(&qss->g_spinlock);
                        if(qss->cslifecb)
                            qss->cslifecb(cs,0);
                    }
                }
                printf("onAccepted flags:err:%d ,cs:%d.\n",errno,cs);
                return 1;
            }

            typedef struct{
                QSocketServer * qss;
                QSSEPollEvent * event;
            }InternalSenderBase_t;

            static int internalBlockingSender(void * senderBase,int cs, void * _buf, size_t nbs){
                InternalSenderBase_t *sb=(InternalSenderBase_t *)senderBase;
                char * _sbuf=_buf;
                int ret=0,sum=0,curEpoll_ctl_opt,*errno_ptr=&errno;

                QSSEPollEvent *qssEPEvent=NULL;
                struct epoll_event _epEvent;

                struct timespec sendTimeo;

                while(1){
                    *errno_ptr=0;
                    while(sum<nbs&&(ret=send(cs,_sbuf,nbs-sum,0))>0)
                        sum+=ret,_sbuf+=ret;
                    if(sum==nbs||ret==0)
                        break;
                    else if(ret==-1){
                        if(errno==EAGAIN&&sum<nbs){
                            qssEPEvent=sb->event;
                            _epEvent.data.ptr=qssEPEvent;
                            _epEvent.events=EPOLLOUT|EPOLLET|EPOLLONESHOT;
                            if(qssEPEvent->BSendEpollFDRelated==0){
                                pthread_mutex_init(&qssEPEvent->writableLock,NULL);
                                pthread_cond_init(&qssEPEvent->writableMonitor,NULL);
                                qssEPEvent->BSendEpollFDRelated=1;
                                curEpoll_ctl_opt=EPOLL_CTL_ADD;
                            }else{
                                curEpoll_ctl_opt=EPOLL_CTL_MOD;
                            }

                            {//wait writable.
                                int flag=0;
                                pthread_mutex_lock(&qssEPEvent->writableLock);
                                if(epoll_ctl(sb->qss->BSendEpollFD,curEpoll_ctl_opt,qssEPEvent->client_s,&_epEvent)==0){
                                    sendTimeo.tv_nsec=0,sendTimeo.tv_sec=time(NULL)+BLOCKING_SEND_TIMEOUT;
                                    int err=pthread_cond_timedwait(&qssEPEvent->writableMonitor,&qssEPEvent->writableLock,&sendTimeo);
                                    if(err)
                                        flag=-1;
                                }else
                                    flag=-1;
                                pthread_mutex_unlock(&qssEPEvent->writableLock);
                                if(flag==-1)
                                    break;
                            }

                        }else{
                            if(errno==EAGAIN&&sum==nbs)
                                ret=nbs;//it is ok;
                            break;
                        }
                    }
                }//end while.
                return ret;
            }
            void *  blockingSendEpollerRoutine(void *_param){
                QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)_param;
                QSocketServer * qss=pParam->qss;
                //pthread_t * curThread=&pParam->th;
                struct epoll_event epEvents[qss->maxThreads];
                QSSEPollEvent *qssEPEvent=NULL;
                int pollRes,*errno_ptr=&errno;

                pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);

                free(pParam);
                while(1){

                    pollRes=epoll_wait(qss->BSendEpollFD,epEvents,qss->maxThreads,-1);
                    if(pollRes>=1){
                        int i=0;
                        for(;i<pollRes;i++)
                        if(epEvents[i].events&EPOLLOUT){//這個(gè)epollfd只應(yīng)該做以下的事情,少做為快!
                            qssEPEvent=epEvents[i].data.ptr;
                            pthread_mutex_lock(&qssEPEvent->writableLock);
                            pthread_cond_signal(&qssEPEvent->writableMonitor);
                            pthread_mutex_unlock(&qssEPEvent->writableLock);
                        }

                    }else if(pollRes==-1){//errno 
                        printf("blockingSendEpollerRoutine pollRes==-1,err:%d, errno...%s\n",*errno_ptr,strerror(*errno_ptr));
                        break;
                    }

                }

                return NULL;
            }

            void *  epollWorkerRoutine(void * _param){
                QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)_param;
                QSocketServer * qss=pParam->qss;
                pthread_t * curThread=&pParam->th;
                struct epoll_event _epEvent;
                QSSEPollEvent *qssEPEvent=NULL;
                InternalSenderBase_t _senderBase;
                int pollRes=0,handleCode=0,exitCode=0,SOErrOccurred=0,*errno_ptr=&errno;
                _senderBase.qss=qss;
                pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);

                free(pParam);
                while(!exitCode){

                    *errno_ptr=0,SOErrOccurred=0,qssEPEvent=NULL;
                    pollRes=epoll_wait(qss->epollFD,&_epEvent,1,qss->workerWaitTimeout);
                    if(pollRes==1){
                        qssEPEvent=(QSSEPollEvent *)_epEvent.data.ptr;

                        if(qssEPEvent->client_s==qss->server_s)
                        {//Accepted Socket.
                           onAcceptRoutine(qss);
                           continue;
                        }else{
                            if(qss->protoHandler){
                                _senderBase.event=_epEvent.data.ptr;
                                pthread_spin_lock(&qss->g_spinlock);
                                qss->currentBusyWorkers++;
                                pthread_spin_unlock(&qss->g_spinlock);

                                addQSSWorker(qss,1);
                                handleCode=qss->protoHandler(&_epEvent,internalBlockingSender,&_senderBase);

                                pthread_spin_lock(&qss->g_spinlock);
                                qss->currentBusyWorkers--;
                                pthread_spin_unlock(&qss->g_spinlock);

                                if(handleCode>0){
                                    _epEvent.events=EPOLLIN|EPOLLET|EPOLLONESHOT;
                                    if(epoll_ctl(qss->epollFD,EPOLL_CTL_MOD,qssEPEvent->client_s,&_epEvent)==-1)
                                        SOErrOccurred=2;
                                }else{
                                    SOErrOccurred=1;//maybe socket closed 0. Or -1 socket error.
                                }
                            }
                        }

                    }else if(pollRes==0){//timeout
                        printf("pollRes==0,err:%d, timeout...th:%lu\n",*errno_ptr,*curThread);
                        if(qss->lifecycleStatus<=3&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads)
                        {
                            pthread_spin_lock(&qss->g_spinlock);
                            if(qss->lifecycleStatus<=3&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads){
                                qss->workerCounter--;//until qss->workerCounter decrease to qss->minThreads
                                exitCode=2;
                            }
                            pthread_spin_unlock(&qss->g_spinlock);
                        }else if(qss->lifecycleStatus>=4)
                                exitCode=4;

                    }else if(pollRes==-1){//errno
                        printf("pollRes==-1,err:%d, errno...%s\n",*errno_ptr,strerror(*errno_ptr));
                        exitCode=1;
                    }

                    if(SOErrOccurred){
                        if(qss->cslifecb)
                            qss->cslifecb(qssEPEvent->client_s,-1);
                        /*if(qssEPEvent)*/{
                            epoll_ctl(qss->epollFD,EPOLL_CTL_DEL,qssEPEvent->client_s,NULL);
                            epoll_ctl(qss->BSendEpollFD,EPOLL_CTL_DEL,qssEPEvent->client_s,NULL);
                            close(qssEPEvent->client_s);
                            if(qssEPEvent->BSendEpollFDRelated){
                                pthread_cond_destroy(&qssEPEvent->writableMonitor);
                                pthread_mutex_destroy(&qssEPEvent->writableLock);
                            }
                            free(qssEPEvent);
                          }
                        pthread_spin_lock(&qss->g_spinlock);
                        if(--qss->CSocketsCounter==0&&qss->lifecycleStatus>=3){
                         //for qss workerSize,
                         qss->lifecycleStatus=4;
                         exitCode=3;
                        }
                        pthread_spin_unlock(&qss->g_spinlock);
                    }//SOErrOccurred handle;

                }//end main while.

                if(exitCode!=2){
                    int clearup=0;
                    pthread_spin_lock(&qss->g_spinlock);
                    if(!--qss->workerCounter&&qss->lifecycleStatus>=4){//clearup QSS
                      clearup=1;
                    }
                    pthread_spin_unlock(&qss->g_spinlock);
                    if(clearup){
                     close(qss->epollFD);
                     close(qss->BSendEpollFD);
                     pthread_spin_destroy(&qss->g_spinlock);
                     free(qss);
                    }
                }//exitCode handle;
             return NULL;
            }


            from:

            http://m.shnenglu.com/adapterofcoms/archive/2010/08/03/122063.html

            posted on 2010-08-25 20:41 chatler 閱讀(2558) 評(píng)論(0)  編輯 收藏 引用 所屬分類(lèi): NetworkSocketLinux_Coding
            <2025年7月>
            293012345
            6789101112
            13141516171819
            20212223242526
            272829303112
            3456789

            常用鏈接

            留言簿(10)

            隨筆分類(lèi)(307)

            隨筆檔案(297)

            algorithm

            Books_Free_Online

            C++

            database

            Linux

            Linux shell

            linux socket

            misce

            • cloudward
            • 感覺(jué)這個(gè)博客還是不錯(cuò),雖然做的東西和我不大相關(guān),覺(jué)得看看還是有好處的

            network

            OSS

            • Google Android
            • Android is a software stack for mobile devices that includes an operating system, middleware and key applications. This early look at the Android SDK provides the tools and APIs necessary to begin developing applications on the Android platform using the Java programming language.
            • os161 file list

            overall

            搜索

            •  

            最新評(píng)論

            閱讀排行榜

            評(píng)論排行榜

            久久综合九色综合久99| 久久久久亚洲爆乳少妇无| 亚洲中文字幕无码一久久区| 久久久久亚洲av综合波多野结衣| 亚洲AV无码久久精品蜜桃| 国产精品青草久久久久婷婷 | 狠狠色婷婷久久一区二区| 久久无码人妻一区二区三区| 精品久久久久久国产三级| 久久久无码精品亚洲日韩京东传媒 | 亚洲伊人久久成综合人影院 | 国产情侣久久久久aⅴ免费| 久久亚洲国产成人精品无码区| 久久99久国产麻精品66| 久久伊人精品青青草原日本| 国产精品无码久久综合| 中文字幕无码久久精品青草| 伊人久久综在合线亚洲2019| 久久久久亚洲Av无码专| 亚洲欧美一区二区三区久久| 岛国搬运www久久| 久久这里只有精品首页| 久久青青草原精品国产| 亚洲精品无码久久久久久| 久久久精品人妻无码专区不卡| 成人综合伊人五月婷久久| 一本色道久久HEZYO无码| 亚洲а∨天堂久久精品| 久久国产香蕉视频| 国产精品va久久久久久久| 亚洲乱亚洲乱淫久久| 中文字幕一区二区三区久久网站| 91精品国产综合久久精品| 97久久精品无码一区二区天美 | 久久本道综合久久伊人| 国产精品久久毛片完整版| 久久精品国产只有精品2020 | 色诱久久av| 中文精品99久久国产| 久久精品国产色蜜蜜麻豆| 五月丁香综合激情六月久久|