青青草原综合久久大伊人导航_色综合久久天天综合_日日噜噜夜夜狠狠久久丁香五月_热久久这里只有精品

   C++ 技術中心

   :: 首頁 :: 聯系 ::  :: 管理
  160 Posts :: 0 Stories :: 87 Comments :: 0 Trackbacks

公告

鄭重聲明:本BLOG所發表的原創文章,作者保留一切權利。必須經過作者本人同意后方可轉載,并注名作者(天空)和出處(CppBlog.com)。作者Email:coder@luckcoder.com

留言簿(27)

搜索

  •  

最新隨筆

最新評論

評論排行榜

epoll,event poll,on linux kernel 2.6.x.pthread,nptl-2.12
   LT/ET:ET也會多次發送event,當然頻率遠低于LT,但是epoll one shot才是真正的對"one connection VS one thread in worker thread pool,不依賴于任何connection-data-queue"的基礎支持 .我看到大部分對epoll_wait的處理模式如下,很教科化,因為man-pages就是這樣舉例子的。
man-pages epoll_wait handle:
#define MAX_EVENTS 10
struct epoll_event events[MAX_EVENTS];
for (;;) 
{
   nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
   for(1~nfds)
       handle events[i];
   ......
}
epoll_ctl_add的當然是EPOLLIN|EPOLLET,此外我就不知道處理上面代碼的是一條線程還是一堆線程(in threadpool),但愿不是一條線程吧!如果是的話,難不成等處理完這MAX_EVENTS 個再去處理接下來的MAX_EVENTS 個?慢否? 但是如果是一堆線程的話,你是否考慮過如何處理request-data in one connection在邏輯上的完整性,也就是一個request-data-packet可能會被分割成若干次發送,在上面的處理模式中你真的要好好設計一下了。
而我的epoll_wait處理模式如下:
struct epoll_event activeEvent;
for(;;)
{
   epoll_wait(epollfd, &activeEvent1/*很驚訝嗎,但絕不是一條線程在運行這段代碼,而是一堆*/, timeout);
   if handle activeEvent success
      epoll_ctl_mod EPOLLIN|EPOLLET|EPOLLONESHOT
  ......
}
處理上面代碼的當然是一堆線程in threadpool,而且epoll_ctl_add的是EPOLLIN|EPOLLET|EPOLLONESHOT
因為我的設計理念是嚴格遵守one connection VS one thread in worker thread pool。
所以我下面的server框架的基本模型是:
One connection VS one thread in worker thread pool ,worker thread performs epollWorkerRoutine.
在epollWorkerRoutine中有以下的職責:
1.handle request,當忙時增加epollWorkerThread數量但不超過maxThreads,post/MOD EPOLLIN|EPOLLONESHOT Interested Event to epoll.
2.timeout時檢查是否空閑和當前epollWorkerThread數量,當空閑時保持或減少至minThreads數量.
3.對所有Accepted-socket管理生命周期,這里利用系統的keepalive probes,若想實現業務層"心跳探測"只需將QSS_SIO_KEEPALIVE_VALS_TIMEOUT 改回系統默認的2小時.這里并不維護所有連接列表,當然你可以在/proc/getpid/fd下找到所有的socket fd.
4.linux上的non-blocking socket的操作仍然依賴recv,send,不像windows上的wsarecv+overlapped,即便不用fcntl fd o_nonblock也可以立即返回。我這里把send動作實現成了blocking的(internalBlockingSender),同樣的道理,non-blocking send依然會形成響應數據在邏輯上的碎片錯序,特別是你如果采用上面那個教科化的處理模式的化,并且還是多線程的話,那么這簡直就亂透了。當然你可以使用response-data-queue來達到異步準確發送數據的目的。

下面結合源碼,淺析一下epoll programming:
socketserver.h
#ifndef __Q_SOCKET_SERVER__
#define __Q_SOCKET_SERVER__
#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <string.h>
#include <sys/epoll.h>
#include <pthread.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#define SOCKET_ERROR -1
#define INVALID_SOCKET -1
typedef int SOCKET;
typedef struct sockaddr_in SOCKADDR_IN;
typedef unsigned short WORD;
typedef unsigned int DWORD;

#define QSS_SIO_KEEPALIVE_VALS_TIMEOUT 30*60
#define QSS_SIO_KEEPALIVE_VALS_INTERVAL 5
#define QSS_SIO_KEEPALIVE_VALS_COUNT 3
#define MAX_THREADS 100
#define MAX_THREADS_MIN  10
#define MIN_WORKER_WAIT_TIMEOUT  20*1000
#define MAX_WORKER_WAIT_TIMEOUT  60*MIN_WORKER_WAIT_TIMEOUT
#define MAX_THREADPOOLS  32

#define MAX_BUF_SIZE 1024
/* ulimit -n opened FDs per process.記得修改哦,否則還是select效果,就不是epoll效果了哦,呵呵*/
#define BLOCKING_SEND_TIMEOUT 20

typedef void (*CSocketLifecycleCallback)(int cs,int lifecycle);//lifecycle:0:OnAccepted,-1:OnClose
typedef int (*BlockingSender_t)(void * senderBase,int cs, void * buf, size_t nbs);
typedef int (*InternalProtocolHandler)(struct epoll_event * event,BlockingSender_t _blockingSender,void * senderBase);//return -1:SOCKET_ERROR

typedef struct {
  WORD passive;
  WORD port;//uint16_t
  WORD minThreads;
  WORD maxThreads;
  pthread_spinlock_t g_spinlock;//PTHREAD_PROCESS_PRIVATE
  volatile int lifecycleStatus;//0-created,1-starting, 2-running,3-stopping,4-exitSignaled,5-stopped
  int  workerWaitTimeout;//wait timeout
  volatile int workerCounter;
  volatile int currentBusyWorkers;
  volatile int CSocketsCounter;
  CSocketLifecycleCallback cslifecb;
  InternalProtocolHandler protoHandler;
  SOCKET server_s;
  SOCKADDR_IN serv_addr;
  int epollFD;//main epoller.
  int BSendEpollFD;//For blocking send.
}QSocketServer;

typedef struct {
  SOCKET client_s;
  SOCKADDR_IN client_addr;
  uint32_t curEvents;

  char buf[MAX_BUF_SIZE];
  DWORD numberOfBytesTransferred;
  char * data;

  int BSendEpollFDRelated;
  pthread_mutex_t writableLock;
  pthread_cond_t  writableMonitor;
}QSSEPollEvent;//for per connection

int createSocketServer(QSocketServer ** qss_ptr,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,int workerWaitTimeout);
int startSocketServer(QSocketServer *qss);
int shutdownSocketServer(QSocketServer *qss);
#endif

qsocketserver_model.c  //下面的代碼離生產環境還差內存池logger哦!
 #include "socketserver.h"
#include <dirent.h>
#include <regex.h>
#define DIGIT_PATTERN_STRING "^[0-9]+$"
void *  epollWorkerRoutine(void *);
void *  blockingSendEpollerRoutine(void *);
int isDigitStr(const char *str){
    int ret=-1;
    regex_t regex;
    regmatch_t matchs[1];
    if(!regcomp(&regex,DIGIT_PATTERN_STRING,REG_EXTENDED/*這里不要傳0哦,否則nomatch*/)){
        ret=!regexec(&regex,str, 1,matchs,0);
        regfree(&regex);
    }
    return ret;
}

static int setNonBlocking(int sock)
{
    int opts;
    opts=fcntl(sock,F_GETFL);
    if(opts==-1)
    {
        perror("fcntl(sock,GETFL) failed!\n");
        return opts;
    }
    opts = opts|O_NONBLOCK;
    opts=fcntl(sock,F_SETFL,opts);
    if(opts==-1)
    {
        perror("fcntl(sock,SETFL,opts) failed!\n");
        return opts;
    }
    return 1;
}

static void adjustQSSWorkerLimits(QSocketServer *qss){
   //to adjust availabe size.
}
typedef struct{
 QSocketServer * qss;
 pthread_t th;
}QSSWORKER_PARAM;

static WORD addQSSWorker(QSocketServer *qss,WORD addCounter){
 WORD res=0;
 if(qss->workerCounter<qss->minThreads||(qss->currentBusyWorkers==qss->workerCounter&&qss->workerCounter<qss->maxThreads))
 {
  QSSWORKER_PARAM * pParam=NULL;
  int i=0;
  pthread_spin_lock(&qss->g_spinlock);
  if(qss->workerCounter+addCounter<=qss->maxThreads)
   for(;i<addCounter;i++)
   {
    pParam=malloc(sizeof(QSSWORKER_PARAM));

    if(pParam){
     pthread_create(&pParam->th,NULL,epollWorkerRoutine,pParam);
     pParam->qss=qss;
     qss->workerCounter++,res++;
    }
   }
  pthread_spin_unlock(&qss->g_spinlock);
 }
 return res;
}

static void SOlogger(const char * msg,SOCKET s){
 perror(msg);
    if(s>0)
    close(s);
}

static int _InternalProtocolHandler(struct epoll_event * event,BlockingSender_t _blockingSender,void * senderBase){
    QSSEPollEvent *qssEPEvent=event->data.ptr;
    int ret;
    printf("_InternalProtocolHandler START pollRes==1,err:%d, ...cs:%d,,,,,th:%lu,\n",errno,qssEPEvent->client_s,pthread_self());
    if((ret=recv(qssEPEvent->client_s,qssEPEvent->buf,MAX_BUF_SIZE,0))>0){
        //sleep(5);
        ret=_blockingSender(senderBase,qssEPEvent->client_s,qssEPEvent->buf,ret);
    }
     printf("_InternalProtocolHandler END ret=%d,err:%d,%s, ...cs:%d,,,,,th:%lu,\n",ret,errno,strerror(errno),qssEPEvent->client_s,pthread_self());
 return ret;
}

int createSocketServer(QSocketServer ** qss_ptr,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,int workerWaitTimeout)
{

 QSocketServer * qss=malloc(sizeof(QSocketServer));
 qss->passive=passive;
 qss->port=port;
 qss->minThreads=minThreads;
 qss->maxThreads=maxThreads;
 qss->workerWaitTimeout=workerWaitTimeout;
 qss->lifecycleStatus=0;
 pthread_spin_init(&qss->g_spinlock,PTHREAD_PROCESS_PRIVATE);
 qss->workerCounter=0;
 qss->currentBusyWorkers=0;
 qss->CSocketsCounter=0;
 qss->cslifecb=cslifecb,qss->protoHandler=protoHandler;
 if(!qss->protoHandler)
  qss->protoHandler=_InternalProtocolHandler;
 adjustQSSWorkerLimits(qss);
 *qss_ptr=qss;
 return 1;
}

int startSocketServer(QSocketServer *qss)
{
    if(qss==NULL)
        return 0;
    else{
        pthread_spin_lock(&qss->g_spinlock);
        if(qss->lifecycleStatus==0){
            qss->lifecycleStatus=1;
            pthread_spin_unlock(&qss->g_spinlock);
        }else{
            pthread_spin_unlock(&qss->g_spinlock);
            return 0;
        }
    }
    //bzero(&qss->serv_addr, sizeof(qss->serv_addr));

 qss->serv_addr.sin_family=AF_INET;
 qss->serv_addr.sin_port=htons(qss->port);
 inet_aton("127.0.0.1",&(qss->serv_addr.sin_addr));
 //qss->serv_addr.sin_addr.s_addr=INADDR_ANY;//inet_addr("127.0.0.1");

 qss->server_s=socket(AF_INET,SOCK_STREAM,IPPROTO_IP);
 if(setNonBlocking(qss->server_s)==-1)
 {
    SOlogger("setNonBlocking server_s failed.\n",0);
     return 0;
 }

 if(qss->server_s==INVALID_SOCKET)
 {
  SOlogger("socket failed.\n",0);
  return 0;
 }

 if(bind(qss->server_s,(struct sockaddr *)&qss->serv_addr,sizeof(SOCKADDR_IN))==SOCKET_ERROR)
 {
  SOlogger("bind failed.\n",qss->server_s);
  return 0;
 }

 if(listen(qss->server_s,SOMAXCONN/*這個宏windows也有,這里是128,當然你可以設的小些,它影響開銷的*/)==SOCKET_ERROR)
{
  SOlogger("listen failed.\n",qss->server_s);
  return 0;
}
    qss->epollFD=epoll_create1(0);/*這里不是epoll_create(size)哦,你可能不知道如何設置size,所以忽略它吧*/
    if(qss->epollFD==-1){
        SOlogger("epoll_create1 0, main epollFD  failed.\n",qss->server_s);
        return 0;
    }
    qss->BSendEpollFD=epoll_create1(0);//for blocking send.
    if(qss->BSendEpollFD==-1){
        SOlogger("epoll_create1 0,BSendEpollFD failed.\n",qss->server_s);
        return 0;
    }

    {//ADD ACCEPT EVENT
        struct epoll_event _epEvent;
        QSSEPollEvent *qssEPEvent=malloc(sizeof(QSSEPollEvent));
        qssEPEvent->client_s=qss->server_s;
        _epEvent.events=qssEPEvent->curEvents=EPOLLIN|EPOLLET;
        _epEvent.data.ptr=qssEPEvent;
        if(epoll_ctl(qss->epollFD,EPOLL_CTL_ADD,qss->server_s,&_epEvent)==-1){
            SOlogger("epoll_ctl server_s to accept failed.\n",qss->server_s);
            free(qssEPEvent);
            return 0;
        }
    }
    {//starup blocking send epoller.
        QSSWORKER_PARAM * pParam=malloc(sizeof(QSSWORKER_PARAM));
        pParam->qss=qss;
        pthread_create(&pParam->th,NULL,blockingSendEpollerRoutine,pParam);
    }

 //initialize worker for epoll events.
 addQSSWorker(qss,qss->minThreads);
 qss->lifecycleStatus=2;
 return 1;
}

int shutdownSocketServer(QSocketServer *qss){
    //change qss->lifecycleStatus
    if(qss==NULL)
        return 0;
    else{
        pthread_spin_lock(&qss->g_spinlock);
        if(qss->lifecycleStatus==2){
            qss->lifecycleStatus=3;
            pthread_spin_unlock(&qss->g_spinlock);
        }else{
            pthread_spin_unlock(&qss->g_spinlock);
            return 0;
        }
    }
    /*shutdown server-listening socket,這里優雅的做法是shutdown--notify-->epoll-->close.記得shutdown會發送EOF的哦*/
    shutdown(qss->server_s,SHUT_RDWR);

    // /proc/getpid/fd  shutdown all socket cs != serv_s
    {
        char dirBuf[64];
        struct dirent * de;
        DIR *pd=NULL;
        int sockFD;
        sprintf(dirBuf,"/proc/%d/fd/",getpid());
        pd=opendir(dirBuf);
        if(pd!=NULL){
            while((de=readdir(pd))!=NULL){
                if(isDigitStr(de->d_name)){
                    sockFD=atoi(de->d_name);
                    if(isfdtype(sockFD,S_IFSOCK))
                    shutdown(sockFD,SHUT_RDWR);
                }
            }
            closedir(pd);
        }
        /*fstat(ret,&_stat);S_ISSOCK(_stat.st_mode)======isfdtype(sockFD,S_IFSOCK)*/
    }
 return 1;
}

static int onAcceptRoutine(QSocketServer * qss)
{
    SOCKADDR_IN client_addr;
    unsigned int client_addr_leng=sizeof(SOCKADDR_IN);
    SOCKET cs;
    struct epoll_event _epEvent;
    QSSEPollEvent *qssEPEvent=NULL;
    cs=accept(qss->server_s,(struct sockaddr *)&client_addr,&client_addr_leng);
    if(cs==INVALID_SOCKET)
    {
        printf("onAccept failed:%d,%s\n",errno,strerror(errno));
        epoll_ctl(qss->epollFD,EPOLL_CTL_DEL,qss->server_s,NULL);//EINVAL 22  Invalid argument
        return 0;
    }
    if(setNonBlocking(cs)==-1)
    {
        printf("onAccept setNonBlocking client_s failed.cs:%d\n",cs);
        return 0;
    }

    {// set keepalive option
        int keepAlive = 1;
        int keepIdle = QSS_SIO_KEEPALIVE_VALS_TIMEOUT;
        int keepInterval = QSS_SIO_KEEPALIVE_VALS_INTERVAL;
        int keepCount = QSS_SIO_KEEPALIVE_VALS_COUNT;
        if(setsockopt(cs, SOL_SOCKET, SO_KEEPALIVE, (void *)&keepAlive, sizeof(keepAlive))||
           setsockopt(cs, SOL_TCP, TCP_KEEPIDLE, (void *)&keepIdle, sizeof(keepIdle))||
           setsockopt(cs, SOL_TCP, TCP_KEEPINTVL, (void *)&keepInterval, sizeof(keepInterval))||
           setsockopt(cs, SOL_TCP, TCP_KEEPCNT, (void *)&keepCount, sizeof(keepCount)))
           {
            printf("onAccept set keepalive option client_s failed.cs:%d,err:%s\n",cs,strerror(errno));
            return 0;
           }
    }
    qssEPEvent=malloc(sizeof(QSSEPollEvent));
    qssEPEvent->client_s=cs;
    {
        _epEvent.events=qssEPEvent->curEvents=EPOLLIN|EPOLLET|EPOLLONESHOT;
        qssEPEvent->BSendEpollFDRelated=0;
        _epEvent.data.ptr=qssEPEvent;/*這里又和教科的不一樣哦,真正的user data用ptr,而不是單一的fd*/
        if(epoll_ctl(qss->epollFD,EPOLL_CTL_ADD,cs,&_epEvent)==-1){
            printf("onAccept epoll_ctl client_s failed.cs:%d,err:%d\n",cs,errno);
            free(qssEPEvent);
            return 0;
        }else{
            pthread_spin_lock(&qss->g_spinlock);
            qss->CSocketsCounter++;
            pthread_spin_unlock(&qss->g_spinlock);
            if(qss->cslifecb)
                qss->cslifecb(cs,0);
        }
    }
    printf("onAccepted flags:err:%d ,cs:%d.\n",errno,cs);
    return 1;
}

typedef struct{
    QSocketServer * qss;
    QSSEPollEvent * event;
}InternalSenderBase_t;

static int internalBlockingSender(void * senderBase,int cs, void * _buf, size_t nbs){
    InternalSenderBase_t *sb=(InternalSenderBase_t *)senderBase;
    char * _sbuf=_buf;
    int ret=0,sum=0,curEpoll_ctl_opt,*errno_ptr=&errno;

    QSSEPollEvent *qssEPEvent=NULL;
    struct epoll_event _epEvent;

    struct timespec sendTimeo;

    while(1){
        *errno_ptr=0;
        while(sum<nbs&&(ret=send(cs,_sbuf,nbs-sum,0))>0)
            sum+=ret,_sbuf+=ret;
        if(sum==nbs||ret==0)
            break;
        else if(ret==-1){
            if(errno==EAGAIN&&sum<nbs){
                qssEPEvent=sb->event;
                _epEvent.data.ptr=qssEPEvent;
                _epEvent.events=EPOLLOUT|EPOLLET|EPOLLONESHOT;
                if(qssEPEvent->BSendEpollFDRelated==0){
                    pthread_mutex_init(&qssEPEvent->writableLock,NULL);
                    pthread_cond_init(&qssEPEvent->writableMonitor,NULL);
                    qssEPEvent->BSendEpollFDRelated=1;
                    curEpoll_ctl_opt=EPOLL_CTL_ADD;
                }else{
                    curEpoll_ctl_opt=EPOLL_CTL_MOD;
                }

                {//wait writable.
                    int flag=0;
                    pthread_mutex_lock(&qssEPEvent->writableLock);
                    if(epoll_ctl(sb->qss->BSendEpollFD,curEpoll_ctl_opt,qssEPEvent->client_s,&_epEvent)==0){
                        sendTimeo.tv_nsec=0,sendTimeo.tv_sec=time(NULL)+BLOCKING_SEND_TIMEOUT;
                        int err=pthread_cond_timedwait(&qssEPEvent->writableMonitor,&qssEPEvent->writableLock,&sendTimeo);
                        if(err)
                            flag=-1;
                    }else
                        flag=-1;
                    pthread_mutex_unlock(&qssEPEvent->writableLock);
                    if(flag==-1)
                        break;
                }

            }else{
                if(errno==EAGAIN&&sum==nbs)
                    ret=nbs;//it is ok;
                break;
            }
        }
    }//end while.
    return ret;
}
void *  blockingSendEpollerRoutine(void *_param){
    QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)_param;
    QSocketServer * qss=pParam->qss;
    //pthread_t * curThread=&pParam->th;
    struct epoll_event epEvents[qss->maxThreads];
    QSSEPollEvent *qssEPEvent=NULL;
    int pollRes,*errno_ptr=&errno;

    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);

    free(pParam);
    while(1){

        pollRes=epoll_wait(qss->BSendEpollFD,epEvents,qss->maxThreads,-1);
        if(pollRes>=1){
            int i=0;
            for(;i<pollRes;i++)
            if(epEvents[i].events&EPOLLOUT){//這個epollfd只應該做以下的事情,少做為快!
                qssEPEvent=epEvents[i].data.ptr;
                pthread_mutex_lock(&qssEPEvent->writableLock);
                pthread_cond_signal(&qssEPEvent->writableMonitor);
                pthread_mutex_unlock(&qssEPEvent->writableLock);
            }

        }else if(pollRes==-1){//errno 
            printf("blockingSendEpollerRoutine pollRes==-1,err:%d, errno...%s\n",*errno_ptr,strerror(*errno_ptr));
            break;
        }

    }

    return NULL;
}

void *  epollWorkerRoutine(void * _param){
    QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)_param;
    QSocketServer * qss=pParam->qss;
    pthread_t * curThread=&pParam->th;
    struct epoll_event _epEvent;
    QSSEPollEvent *qssEPEvent=NULL;
    InternalSenderBase_t _senderBase;
    int pollRes=0,handleCode=0,exitCode=0,SOErrOccurred=0,*errno_ptr=&errno;
    _senderBase.qss=qss;
    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);

    free(pParam);
    while(!exitCode){

        *errno_ptr=0,SOErrOccurred=0,qssEPEvent=NULL;
        pollRes=epoll_wait(qss->epollFD,&_epEvent,1,qss->workerWaitTimeout);
        if(pollRes==1){
            qssEPEvent=(QSSEPollEvent *)_epEvent.data.ptr;

            if(qssEPEvent->client_s==qss->server_s)
            {//Accepted Socket.
               onAcceptRoutine(qss);
               continue;
            }else{
                if(qss->protoHandler){
                    _senderBase.event=_epEvent.data.ptr;
                    pthread_spin_lock(&qss->g_spinlock);
                    qss->currentBusyWorkers++;
                    pthread_spin_unlock(&qss->g_spinlock);

                    addQSSWorker(qss,1);
                    handleCode=qss->protoHandler(&_epEvent,internalBlockingSender,&_senderBase);

                    pthread_spin_lock(&qss->g_spinlock);
                    qss->currentBusyWorkers--;
                    pthread_spin_unlock(&qss->g_spinlock);

                    if(handleCode>0){
                        _epEvent.events=EPOLLIN|EPOLLET|EPOLLONESHOT;
                        if(epoll_ctl(qss->epollFD,EPOLL_CTL_MOD,qssEPEvent->client_s,&_epEvent)==-1)
                            SOErrOccurred=2;
                    }else{
                        SOErrOccurred=1;//maybe socket closed 0. Or -1 socket error.
                    }
                }
            }

        }else if(pollRes==0){//timeout
            printf("pollRes==0,err:%d, timeout...th:%lu\n",*errno_ptr,*curThread);
            if(qss->lifecycleStatus<=3&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads)
            {
                pthread_spin_lock(&qss->g_spinlock);
                if(qss->lifecycleStatus<=3&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads){
                    qss->workerCounter--;//until qss->workerCounter decrease to qss->minThreads
                    exitCode=2;
                }
                pthread_spin_unlock(&qss->g_spinlock);
            }else if(qss->lifecycleStatus>=4)
                    exitCode=4;

        }else if(pollRes==-1){//errno
            printf("pollRes==-1,err:%d, errno...%s\n",*errno_ptr,strerror(*errno_ptr));
            exitCode=1;
        }

        if(SOErrOccurred){
            if(qss->cslifecb)
                qss->cslifecb(qssEPEvent->client_s,-1);
            /*if(qssEPEvent)*/{
                epoll_ctl(qss->epollFD,EPOLL_CTL_DEL,qssEPEvent->client_s,NULL);
                epoll_ctl(qss->BSendEpollFD,EPOLL_CTL_DEL,qssEPEvent->client_s,NULL);
                close(qssEPEvent->client_s);
                if(qssEPEvent->BSendEpollFDRelated){
                    pthread_cond_destroy(&qssEPEvent->writableMonitor);
                    pthread_mutex_destroy(&qssEPEvent->writableLock);
                }
                free(qssEPEvent);
              }
            pthread_spin_lock(&qss->g_spinlock);
            if(--qss->CSocketsCounter==0&&qss->lifecycleStatus>=3){
             //for qss workerSize,
             qss->lifecycleStatus=4;
             exitCode=3;
            }
            pthread_spin_unlock(&qss->g_spinlock);
        }//SOErrOccurred handle;

    }//end main while.

    if(exitCode!=2){
        int clearup=0;
        pthread_spin_lock(&qss->g_spinlock);
        if(!--qss->workerCounter&&qss->lifecycleStatus>=4){//clearup QSS
          clearup=1;
        }
        pthread_spin_unlock(&qss->g_spinlock);
        if(clearup){
         close(qss->epollFD);
         close(qss->BSendEpollFD);
         pthread_spin_destroy(&qss->g_spinlock);
         free(qss);
        }
    }//exitCode handle;
 return NULL;
}

 

posted on 2013-12-01 21:37 C++技術中心 閱讀(2015) 評論(0)  編輯 收藏 引用 所屬分類: Linux 編程
青青草原综合久久大伊人导航_色综合久久天天综合_日日噜噜夜夜狠狠久久丁香五月_热久久这里只有精品
  • <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>
            欧美日韩美女一区二区| av成人免费在线| 亚洲日本电影| 亚洲国产天堂网精品网站| 亚洲国产欧美日韩精品| 亚洲大片精品永久免费| 亚洲国产成人精品久久久国产成人一区| 狠狠色噜噜狠狠狠狠色吗综合| 精品福利av| 99精品黄色片免费大全| 午夜天堂精品久久久久| 久久久人成影片一区二区三区| 美女视频黄a大片欧美| 亚洲大片一区二区三区| 久久久国产一区二区三区| 免费久久精品视频| 欧美在线观看一区二区三区| 久久久人成影片一区二区三区观看 | 欧美国产综合视频| 欧美三级电影一区| 国产精品一区久久| 欧美一级专区免费大片| 久久久精品国产免大香伊| 能在线观看的日韩av| 欧美日韩一二三四五区| 国内精品久久久久影院 日本资源| 亚洲三级国产| 久久久一区二区三区| 亚洲精品一区二区三区99| 欧美亚洲在线观看| 欧美日韩一区二区在线观看视频| 韩日精品中文字幕| 亚洲免费综合| 亚洲日本成人在线观看| 久久九九99| 国产欧美亚洲一区| 制服丝袜激情欧洲亚洲| 欧美电影免费观看| 久久爱另类一区二区小说| 欧美三级视频在线观看| 91久久久久久久久久久久久| 久久久精品五月天| 亚洲一级黄色av| 欧美日韩在线一二三| 亚洲精品视频免费观看| 免费中文字幕日韩欧美| 欧美一区二区三区婷婷月色 | 亚洲免费观看高清完整版在线观看熊 | 一本色道久久综合一区| 久久女同互慰一区二区三区| 国产精品久久久久婷婷| 亚洲一区二区三区免费在线观看 | 美日韩精品视频免费看| 国产在线拍偷自揄拍精品| 欧美一级专区免费大片| 黄色一区二区在线观看| 日韩网站在线| 亚洲一区自拍| 亚洲国产裸拍裸体视频在线观看乱了 | 亚洲国产视频一区| 久久一区二区三区av| 午夜激情综合网| 国产精品久久久久国产精品日日| 一本久久青青| 日韩小视频在线观看| 欧美激情日韩| 亚洲免费电影在线观看| 亚洲黄色在线视频| 欧美噜噜久久久xxx| 在线一区二区三区做爰视频网站| 亚洲国产网站| 欧美体内she精视频在线观看| 亚洲一区二区黄色| 亚洲欧美一区二区在线观看| 国产一区二区三区在线观看免费视频 | 午夜亚洲性色视频| 激情久久久久久久久久久久久久久久| 久久精品国产亚洲aⅴ| 久久激情中文| 亚洲欧洲精品一区二区三区 | 国产精品乱子乱xxxx| 小黄鸭视频精品导航| 欧美一区二区三区啪啪| 1000部国产精品成人观看| 亚洲丰满少妇videoshd| 欧美日韩视频一区二区| 欧美在线影院在线视频| 久久免费午夜影院| 一区二区三区 在线观看视| 亚洲一区二区三区成人在线视频精品 | 国产精品久久久久久久app| 久久久久久久999精品视频| 免费成人黄色| 亚洲欧美国产一区二区三区| 久久aⅴ国产欧美74aaa| 亚洲精品欧洲| 亚洲欧美日韩成人高清在线一区| 亚洲图片欧美午夜| 尤物九九久久国产精品的特点| 亚洲国产精选| 国产又爽又黄的激情精品视频| 亚洲国产成人在线| 国产日韩精品在线播放| 亚洲三级色网| 激情亚洲网站| 这里只有精品电影| 91久久国产综合久久91精品网站| 亚洲天堂成人| 日韩午夜在线| 亚洲欧美日韩综合aⅴ视频| 久久久九九九九| 午夜精品视频| 欧美人在线观看| 另类av导航| 国产精品视频精品视频| 最新成人av网站| 亚洲动漫精品| 欧美一区二区三区播放老司机| 亚洲午夜精品一区二区三区他趣| 美女视频一区免费观看| 久久久精品国产免费观看同学 | 樱花yy私人影院亚洲| 亚洲一区免费| 亚洲影院免费| 欧美色图一区二区三区| 欧美激情网友自拍| 1769国产精品| 久久一区二区三区国产精品| 久久久国产成人精品| 国产精品亚洲а∨天堂免在线| 亚洲美女av电影| a4yy欧美一区二区三区| 欧美成人亚洲成人| 亚洲东热激情| 亚洲精品三级| 欧美国产综合| 亚洲人成网站精品片在线观看| 亚洲精品午夜精品| 欧美精品三区| 99精品国产一区二区青青牛奶| 99视频一区二区| 欧美区一区二| 在线一区二区三区四区五区| 亚洲与欧洲av电影| 国产精品久久久久久久久久ktv| 中文在线资源观看视频网站免费不卡| 一区二区三区精品视频在线观看| 欧美日韩不卡一区| 在线亚洲一区观看| 久久久7777| 亚洲激情视频网站| 欧美日韩一区高清| 午夜伦理片一区| 美女图片一区二区| 99ri日韩精品视频| 国产精品美女xx| 久久不射网站| 亚洲国产精品va在线观看黑人| 99这里只有精品| 国产女精品视频网站免费| 久久精品理论片| 欧美激情导航| 亚洲一区免费视频| 狠狠色综合网| 日韩一级片网址| 亚洲欧美日韩视频一区| 亚洲精品日韩在线观看| 欧美激情第一页xxx| 夜夜嗨av色一区二区不卡| 午夜激情亚洲| 亚洲高清视频在线观看| 欧美日韩精品三区| 欧美中文字幕在线视频| 亚洲高清av| 欧美在线啊v| 亚洲精品久久久久久一区二区 | 黄页网站一区| 欧美国产日韩一区二区在线观看| 亚洲欧洲日韩在线| 国产精品视频免费在线观看| 久热精品视频在线观看一区| 一区二区三区视频免费在线观看 | 欧美一区二区三区四区在线| 亚洲大胆视频| 国产日韩一区二区| 欧美精品国产一区| 久久精品色图| 国产精品99久久久久久久久| 麻豆av一区二区三区| 亚洲深夜福利| 亚洲人精品午夜在线观看| 国产精品一二一区| 欧美精品久久一区二区| 久久精品视频免费| 欧美一区二区精品久久911| 亚洲九九精品| 欧美激情第4页| 美女日韩欧美| 久久综合伊人77777| 欧美一区不卡|