青青草原综合久久大伊人导航_色综合久久天天综合_日日噜噜夜夜狠狠久久丁香五月_热久久这里只有精品

posts - 2,  comments - 2,  trackbacks - 0
   epoll,event poll,on linux kernel 2.6.x.pthread,nptl-2.12
   LT/ET:ET也會(huì)多次發(fā)送event,當(dāng)然頻率遠(yuǎn)低于LT,但是epoll one shot才是真正的對(duì)"one connection VS one thread in worker thread pool,不依賴于任何connection-data-queue"的基礎(chǔ)支持 .我看到大部分對(duì)epoll_wait的處理模式如下,很教科化,因?yàn)閙an-pages就是這樣舉例子的。
man-pages epoll_wait handle:
#define MAX_EVENTS 10
struct epoll_event events[MAX_EVENTS];
for (;;)
{
   nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
   for(1~nfds)
       handle events[i];
   ......
}
epoll_ctl_add的當(dāng)然是EPOLLIN|EPOLLET,此外我就不知道處理上面代碼的是一條線程還是一堆線程(in threadpool),但愿不是一條線程吧!如果是的話,難不成等處理完這MAX_EVENTS 個(gè)再去處理接下來(lái)的MAX_EVENTS 個(gè)?慢否? 但是如果是一堆線程的話,你是否考慮過(guò)如何處理request-data in one connection在邏輯上的完整性,也就是一個(gè)request-data-packet可能會(huì)被分割成若干次發(fā)送,在上面的處理模式中你真的要好好設(shè)計(jì)一下了。
而我的epoll_wait處理模式如下:
struct epoll_event activeEvent;
for(;;)
{
   epoll_wait(epollfd, &activeEvent, 1/*很驚訝嗎,但絕不是一條線程在運(yùn)行這段代碼,而是一堆*/, timeout);
   if handle activeEvent success
      epoll_ctl_mod EPOLLIN|EPOLLET|EPOLLONESHOT
  ......
}
處理上面代碼的當(dāng)然是一堆線程in threadpool,而且epoll_ctl_add的是EPOLLIN|EPOLLET|EPOLLONESHOT
因?yàn)槲业脑O(shè)計(jì)理念是嚴(yán)格遵守one connection VS one thread in worker thread pool。
所以我下面的server框架的基本模型是:
One connection VS one thread in worker thread pool ,worker thread performs epollWorkerRoutine.
在epollWorkerRoutine中有以下的職責(zé):
1.handle request,當(dāng)忙時(shí)增加epollWorkerThread數(shù)量但不超過(guò)maxThreads,post/MOD EPOLLIN|EPOLLONESHOT Interested Event to epoll.
2.timeout時(shí)檢查是否空閑和當(dāng)前epollWorkerThread數(shù)量,當(dāng)空閑時(shí)保持或減少至minThreads數(shù)量.
3.對(duì)所有Accepted-socket管理生命周期,這里利用系統(tǒng)的keepalive probes,若想實(shí)現(xiàn)業(yè)務(wù)層"心跳探測(cè)"只需將QSS_SIO_KEEPALIVE_VALS_TIMEOUT 改回系統(tǒng)默認(rèn)的2小時(shí).這里并不維護(hù)所有連接列表,當(dāng)然你可以在/proc/getpid/fd下找到所有的socket fd.
4.linux上的non-blocking socket的操作仍然依賴recv,send,不像windows上的wsarecv+overlapped,即便不用fcntl fd o_nonblock也可以立即返回。我這里把send動(dòng)作實(shí)現(xiàn)成了blocking的(internalBlockingSender),同樣的道理,non-blocking send依然會(huì)形成響應(yīng)數(shù)據(jù)在邏輯上的碎片錯(cuò)序,特別是你如果采用上面那個(gè)教科化的處理模式的化,并且還是多線程的話,那么這簡(jiǎn)直就亂透了。當(dāng)然你可以使用response-data-queue來(lái)達(dá)到異步準(zhǔn)確發(fā)送數(shù)據(jù)的目的。

下面結(jié)合源碼,淺析一下epoll programming:
socketserver.h
#ifndef __Q_SOCKET_SERVER__
#define __Q_SOCKET_SERVER__
#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <string.h>
#include <sys/epoll.h>
#include <pthread.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#define SOCKET_ERROR -1
#define INVALID_SOCKET -1
typedef int SOCKET;
typedef struct sockaddr_in SOCKADDR_IN;
typedef unsigned short WORD;
typedef unsigned int DWORD;

#define QSS_SIO_KEEPALIVE_VALS_TIMEOUT 30*60
#define QSS_SIO_KEEPALIVE_VALS_INTERVAL 5
#define QSS_SIO_KEEPALIVE_VALS_COUNT 3
#define MAX_THREADS 100
#define MAX_THREADS_MIN  10
#define MIN_WORKER_WAIT_TIMEOUT  20*1000
#define MAX_WORKER_WAIT_TIMEOUT  60*MIN_WORKER_WAIT_TIMEOUT
#define MAX_THREADPOOLS  32

#define MAX_BUF_SIZE 1024
/* ulimit -n opened FDs per process.記得修改哦,否則還是select效果,就不是epoll效果了哦,呵呵*/
#define BLOCKING_SEND_TIMEOUT 20

typedef void (*CSocketLifecycleCallback)(int cs,int lifecycle);//lifecycle:0:OnAccepted,-1:OnClose
typedef int (*BlockingSender_t)(void * senderBase,int cs, void * buf, size_t nbs);
typedef int (*InternalProtocolHandler)(struct epoll_event * event,BlockingSender_t _blockingSender,void * senderBase);//return -1:SOCKET_ERROR

typedef struct {
  WORD passive;
  WORD port;//uint16_t
  WORD minThreads;
  WORD maxThreads;
  pthread_spinlock_t g_spinlock;//PTHREAD_PROCESS_PRIVATE
  volatile int lifecycleStatus;//0-created,1-starting, 2-running,3-stopping,4-exitSignaled,5-stopped
  int  workerWaitTimeout;//wait timeout
  volatile int workerCounter;
  volatile int currentBusyWorkers;
  volatile int CSocketsCounter;
  CSocketLifecycleCallback cslifecb;
  InternalProtocolHandler protoHandler;
  SOCKET server_s;
  SOCKADDR_IN serv_addr;
  int epollFD;//main epoller.
  int BSendEpollFD;//For blocking send.
}QSocketServer;

typedef struct {
  SOCKET client_s;
  SOCKADDR_IN client_addr;
  uint32_t curEvents;

  char buf[MAX_BUF_SIZE];
  DWORD numberOfBytesTransferred;
  char * data;

  int BSendEpollFDRelated;
  pthread_mutex_t writableLock;
  pthread_cond_t  writableMonitor;
}QSSEPollEvent;//for per connection

int createSocketServer(QSocketServer ** qss_ptr,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,int workerWaitTimeout);
int startSocketServer(QSocketServer *qss);
int shutdownSocketServer(QSocketServer *qss);
#endif

qsocketserver_model.c  //下面的代碼離生產(chǎn)環(huán)境還差內(nèi)存池logger哦!
 #include "socketserver.h"
#include <dirent.h>
#include <regex.h>
#define DIGIT_PATTERN_STRING "^[0-9]+$"
void *  epollWorkerRoutine(void *);
void *  blockingSendEpollerRoutine(void *);
int isDigitStr(const char *str){
    int ret=-1;
    regex_t regex;
    regmatch_t matchs[1];
    if(!regcomp(&regex,DIGIT_PATTERN_STRING,REG_EXTENDED/*這里不要傳0哦,否則nomatch*/)){
        ret=!regexec(&regex,str, 1,matchs,0);
        regfree(&regex);
    }
    return ret;
}

static int setNonBlocking(int sock)
{
    int opts;
    opts=fcntl(sock,F_GETFL);
    if(opts==-1)
    {
        perror("fcntl(sock,GETFL) failed!\n");
        return opts;
    }
    opts = opts|O_NONBLOCK;
    opts=fcntl(sock,F_SETFL,opts);
    if(opts==-1)
    {
        perror("fcntl(sock,SETFL,opts) failed!\n");
        return opts;
    }
    return 1;
}

static void adjustQSSWorkerLimits(QSocketServer *qss){
   //to adjust availabe size.
}
typedef struct{
 QSocketServer * qss;
 pthread_t th;
}QSSWORKER_PARAM;

static WORD addQSSWorker(QSocketServer *qss,WORD addCounter){
 WORD res=0;
 if(qss->workerCounter<qss->minThreads||(qss->currentBusyWorkers==qss->workerCounter&&qss->workerCounter<qss->maxThreads))
 {
  QSSWORKER_PARAM * pParam=NULL;
  int i=0;
  pthread_spin_lock(&qss->g_spinlock);
  if(qss->workerCounter+addCounter<=qss->maxThreads)
   for(;i<addCounter;i++)
   {
    pParam=malloc(sizeof(QSSWORKER_PARAM));

    if(pParam){
     pthread_create(&pParam->th,NULL,epollWorkerRoutine,pParam);
     pParam->qss=qss;
     qss->workerCounter++,res++;
    }
   }
  pthread_spin_unlock(&qss->g_spinlock);
 }
 return res;
}

static void SOlogger(const char * msg,SOCKET s){
 perror(msg);
    if(s>0)
    close(s);
}

static int _InternalProtocolHandler(struct epoll_event * event,BlockingSender_t _blockingSender,void * senderBase){
    QSSEPollEvent *qssEPEvent=event->data.ptr;
    int ret;
    printf("_InternalProtocolHandler START pollRes==1,err:%d, ...cs:%d,,,,,th:%lu,\n",errno,qssEPEvent->client_s,pthread_self());
    if((ret=recv(qssEPEvent->client_s,qssEPEvent->buf,MAX_BUF_SIZE,0))>0){
        //sleep(5);
        ret=_blockingSender(senderBase,qssEPEvent->client_s,qssEPEvent->buf,ret);
    }
     printf("_InternalProtocolHandler END ret=%d,err:%d,%s, ...cs:%d,,,,,th:%lu,\n",ret,errno,strerror(errno),qssEPEvent->client_s,pthread_self());
 return ret;
}

int createSocketServer(QSocketServer ** qss_ptr,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,int workerWaitTimeout)
{

 QSocketServer * qss=malloc(sizeof(QSocketServer));
 qss->passive=passive;
 qss->port=port;
 qss->minThreads=minThreads;
 qss->maxThreads=maxThreads;
 qss->workerWaitTimeout=workerWaitTimeout;
 qss->lifecycleStatus=0;
 pthread_spin_init(&qss->g_spinlock,PTHREAD_PROCESS_PRIVATE);
 qss->workerCounter=0;
 qss->currentBusyWorkers=0;
 qss->CSocketsCounter=0;
 qss->cslifecb=cslifecb,qss->protoHandler=protoHandler;
 if(!qss->protoHandler)
  qss->protoHandler=_InternalProtocolHandler;
 adjustQSSWorkerLimits(qss);
 *qss_ptr=qss;
 return 1;
}

int startSocketServer(QSocketServer *qss)
{
    if(qss==NULL)
        return 0;
    else{
        pthread_spin_lock(&qss->g_spinlock);
        if(qss->lifecycleStatus==0){
            qss->lifecycleStatus=1;
            pthread_spin_unlock(&qss->g_spinlock);
        }else{
            pthread_spin_unlock(&qss->g_spinlock);
            return 0;
        }
    }
    //bzero(&qss->serv_addr, sizeof(qss->serv_addr));

 qss->serv_addr.sin_family=AF_INET;
 qss->serv_addr.sin_port=htons(qss->port);
 inet_aton("127.0.0.1",&(qss->serv_addr.sin_addr));
 //qss->serv_addr.sin_addr.s_addr=INADDR_ANY;//inet_addr("127.0.0.1");

 qss->server_s=socket(AF_INET,SOCK_STREAM,IPPROTO_IP);
 if(setNonBlocking(qss->server_s)==-1)
 {
    SOlogger("setNonBlocking server_s failed.\n",0);
     return 0;
 }

 if(qss->server_s==INVALID_SOCKET)
 {
  SOlogger("socket failed.\n",0);
  return 0;
 }

 if(bind(qss->server_s,(struct sockaddr *)&qss->serv_addr,sizeof(SOCKADDR_IN))==SOCKET_ERROR)
 {
  SOlogger("bind failed.\n",qss->server_s);
  return 0;
 }

 if(listen(qss->server_s,SOMAXCONN/*這個(gè)宏windows也有,這里是128,當(dāng)然你可以設(shè)的小些,它影響開(kāi)銷的*/)==SOCKET_ERROR)
{
  SOlogger("listen failed.\n",qss->server_s);
  return 0;
}
    qss->epollFD=epoll_create1(0);/*這里不是epoll_create(size)哦,你可能不知道如何設(shè)置size,所以忽略它吧*/
    if(qss->epollFD==-1){
        SOlogger("epoll_create1 0, main epollFD  failed.\n",qss->server_s);
        return 0;
    }
    qss->BSendEpollFD=epoll_create1(0);//for blocking send.
    if(qss->BSendEpollFD==-1){
        SOlogger("epoll_create1 0,BSendEpollFD failed.\n",qss->server_s);
        return 0;
    }

    {//ADD ACCEPT EVENT
        struct epoll_event _epEvent;
        QSSEPollEvent *qssEPEvent=malloc(sizeof(QSSEPollEvent));
        qssEPEvent->client_s=qss->server_s;
        _epEvent.events=qssEPEvent->curEvents=EPOLLIN|EPOLLET;
        _epEvent.data.ptr=qssEPEvent;
        if(epoll_ctl(qss->epollFD,EPOLL_CTL_ADD,qss->server_s,&_epEvent)==-1){
            SOlogger("epoll_ctl server_s to accept failed.\n",qss->server_s);
            free(qssEPEvent);
            return 0;
        }
    }
    {//starup blocking send epoller.
        QSSWORKER_PARAM * pParam=malloc(sizeof(QSSWORKER_PARAM));
        pParam->qss=qss;
        pthread_create(&pParam->th,NULL,blockingSendEpollerRoutine,pParam);
    }

 //initialize worker for epoll events.
 addQSSWorker(qss,qss->minThreads);
 qss->lifecycleStatus=2;
 return 1;
}

int shutdownSocketServer(QSocketServer *qss){
    //change qss->lifecycleStatus
    if(qss==NULL)
        return 0;
    else{
        pthread_spin_lock(&qss->g_spinlock);
        if(qss->lifecycleStatus==2){
            qss->lifecycleStatus=3;
            pthread_spin_unlock(&qss->g_spinlock);
        }else{
            pthread_spin_unlock(&qss->g_spinlock);
            return 0;
        }
    }
    /*shutdown server-listening socket,這里優(yōu)雅的做法是shutdown--notify-->epoll-->close.記得shutdown會(huì)發(fā)送EOF的哦*/
    shutdown(qss->server_s,SHUT_RDWR);

    // /proc/getpid/fd  shutdown all socket cs != serv_s
    {
        char dirBuf[64];
        struct dirent * de;
        DIR *pd=NULL;
        int sockFD;
        sprintf(dirBuf,"/proc/%d/fd/",getpid());
        pd=opendir(dirBuf);
        if(pd!=NULL){
            while((de=readdir(pd))!=NULL){
                if(isDigitStr(de->d_name)){
                    sockFD=atoi(de->d_name);
                    if(isfdtype(sockFD,S_IFSOCK))
                    shutdown(sockFD,SHUT_RDWR);
                }
            }
            closedir(pd);
        }
        /*fstat(ret,&_stat);S_ISSOCK(_stat.st_mode)======isfdtype(sockFD,S_IFSOCK)*/
    }
 return 1;
}

static int onAcceptRoutine(QSocketServer * qss)
{
    SOCKADDR_IN client_addr;
    unsigned int client_addr_leng=sizeof(SOCKADDR_IN);
    SOCKET cs;
    struct epoll_event _epEvent;
    QSSEPollEvent *qssEPEvent=NULL;
    cs=accept(qss->server_s,(struct sockaddr *)&client_addr,&client_addr_leng);
    if(cs==INVALID_SOCKET)
    {
        printf("onAccept failed:%d,%s\n",errno,strerror(errno));
        epoll_ctl(qss->epollFD,EPOLL_CTL_DEL,qss->server_s,NULL);//EINVAL 22  Invalid argument
        return 0;
    }
    if(setNonBlocking(cs)==-1)
    {
        printf("onAccept setNonBlocking client_s failed.cs:%d\n",cs);
        return 0;
    }

    {// set keepalive option
        int keepAlive = 1;
        int keepIdle = QSS_SIO_KEEPALIVE_VALS_TIMEOUT;
        int keepInterval = QSS_SIO_KEEPALIVE_VALS_INTERVAL;
        int keepCount = QSS_SIO_KEEPALIVE_VALS_COUNT;
        if(setsockopt(cs, SOL_SOCKET, SO_KEEPALIVE, (void *)&keepAlive, sizeof(keepAlive))||
           setsockopt(cs, SOL_TCP, TCP_KEEPIDLE, (void *)&keepIdle, sizeof(keepIdle))||
           setsockopt(cs, SOL_TCP, TCP_KEEPINTVL, (void *)&keepInterval, sizeof(keepInterval))||
           setsockopt(cs, SOL_TCP, TCP_KEEPCNT, (void *)&keepCount, sizeof(keepCount)))
           {
            printf("onAccept set keepalive option client_s failed.cs:%d,err:%s\n",cs,strerror(errno));
            return 0;
           }
    }
    qssEPEvent=malloc(sizeof(QSSEPollEvent));
    qssEPEvent->client_s=cs;
    {
        _epEvent.events=qssEPEvent->curEvents=EPOLLIN|EPOLLET|EPOLLONESHOT;
        qssEPEvent->BSendEpollFDRelated=0;
        _epEvent.data.ptr=qssEPEvent;/*這里又和教科的不一樣哦,真正的user data用ptr,而不是單一的fd*/
        if(epoll_ctl(qss->epollFD,EPOLL_CTL_ADD,cs,&_epEvent)==-1){
            printf("onAccept epoll_ctl client_s failed.cs:%d,err:%d\n",cs,errno);
            free(qssEPEvent);
            return 0;
        }else{
            pthread_spin_lock(&qss->g_spinlock);
            qss->CSocketsCounter++;
            pthread_spin_unlock(&qss->g_spinlock);
            if(qss->cslifecb)
                qss->cslifecb(cs,0);
        }
    }
    printf("onAccepted flags:err:%d ,cs:%d.\n",errno,cs);
    return 1;
}

typedef struct{
    QSocketServer * qss;
    QSSEPollEvent * event;
}InternalSenderBase_t;

static int internalBlockingSender(void * senderBase,int cs, void * _buf, size_t nbs){
    InternalSenderBase_t *sb=(InternalSenderBase_t *)senderBase;
    char * _sbuf=_buf;
    int ret=0,sum=0,curEpoll_ctl_opt,*errno_ptr=&errno;

    QSSEPollEvent *qssEPEvent=NULL;
    struct epoll_event _epEvent;

    struct timespec sendTimeo;

    while(1){
        *errno_ptr=0;
        while(sum<nbs&&(ret=send(cs,_sbuf,nbs-sum,0))>0)
            sum+=ret,_sbuf+=ret;
        if(sum==nbs||ret==0)
            break;
        else if(ret==-1){
            if(errno==EAGAIN&&sum<nbs){
                qssEPEvent=sb->event;
                _epEvent.data.ptr=qssEPEvent;
                _epEvent.events=EPOLLOUT|EPOLLET|EPOLLONESHOT;
                if(qssEPEvent->BSendEpollFDRelated==0){
                    pthread_mutex_init(&qssEPEvent->writableLock,NULL);
                    pthread_cond_init(&qssEPEvent->writableMonitor,NULL);
                    qssEPEvent->BSendEpollFDRelated=1;
                    curEpoll_ctl_opt=EPOLL_CTL_ADD;
                }else{
                    curEpoll_ctl_opt=EPOLL_CTL_MOD;
                }

                {//wait writable.
                    int flag=0;
                    pthread_mutex_lock(&qssEPEvent->writableLock);
                    if(epoll_ctl(sb->qss->BSendEpollFD,curEpoll_ctl_opt,qssEPEvent->client_s,&_epEvent)==0){
                        sendTimeo.tv_nsec=0,sendTimeo.tv_sec=time(NULL)+BLOCKING_SEND_TIMEOUT;
                        int err=pthread_cond_timedwait(&qssEPEvent->writableMonitor,&qssEPEvent->writableLock,&sendTimeo);
                        if(err)
                            flag=-1;
                    }else
                        flag=-1;
                    pthread_mutex_unlock(&qssEPEvent->writableLock);
                    if(flag==-1)
                        break;
                }

            }else{
                if(errno==EAGAIN&&sum==nbs)
                    ret=nbs;//it is ok;
                break;
            }
        }
    }//end while.
    return ret;
}
void *  blockingSendEpollerRoutine(void *_param){
    QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)_param;
    QSocketServer * qss=pParam->qss;
    //pthread_t * curThread=&pParam->th;
    struct epoll_event epEvents[qss->maxThreads];
    QSSEPollEvent *qssEPEvent=NULL;
    int pollRes,*errno_ptr=&errno;

    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);

    free(pParam);
    while(1){

        pollRes=epoll_wait(qss->BSendEpollFD,epEvents,qss->maxThreads,-1);
        if(pollRes>=1){
            int i=0;
            for(;i<pollRes;i++)
            if(epEvents[i].events&EPOLLOUT){//這個(gè)epollfd只應(yīng)該做以下的事情,少做為快!
                qssEPEvent=epEvents[i].data.ptr;
                pthread_mutex_lock(&qssEPEvent->writableLock);
                pthread_cond_signal(&qssEPEvent->writableMonitor);
                pthread_mutex_unlock(&qssEPEvent->writableLock);
            }

        }else if(pollRes==-1){//errno 
            printf("blockingSendEpollerRoutine pollRes==-1,err:%d, errno...%s\n",*errno_ptr,strerror(*errno_ptr));
            break;
        }

    }

    return NULL;
}

void *  epollWorkerRoutine(void * _param){
    QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)_param;
    QSocketServer * qss=pParam->qss;
    pthread_t * curThread=&pParam->th;
    struct epoll_event _epEvent;
    QSSEPollEvent *qssEPEvent=NULL;
    InternalSenderBase_t _senderBase;
    int pollRes=0,handleCode=0,exitCode=0,SOErrOccurred=0,*errno_ptr=&errno;
    _senderBase.qss=qss;
    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);

    free(pParam);
    while(!exitCode){

        *errno_ptr=0,SOErrOccurred=0,qssEPEvent=NULL;
        pollRes=epoll_wait(qss->epollFD,&_epEvent,1,qss->workerWaitTimeout);
        if(pollRes==1){
            qssEPEvent=(QSSEPollEvent *)_epEvent.data.ptr;

            if(qssEPEvent->client_s==qss->server_s)
            {//Accepted Socket.
               onAcceptRoutine(qss);
               continue;
            }else{
                if(qss->protoHandler){
                    _senderBase.event=_epEvent.data.ptr;
                    pthread_spin_lock(&qss->g_spinlock);
                    qss->currentBusyWorkers++;
                    pthread_spin_unlock(&qss->g_spinlock);

                    addQSSWorker(qss,1);
                    handleCode=qss->protoHandler(&_epEvent,internalBlockingSender,&_senderBase);

                    pthread_spin_lock(&qss->g_spinlock);
                    qss->currentBusyWorkers--;
                    pthread_spin_unlock(&qss->g_spinlock);

                    if(handleCode>0){
                        _epEvent.events=EPOLLIN|EPOLLET|EPOLLONESHOT;
                        if(epoll_ctl(qss->epollFD,EPOLL_CTL_MOD,qssEPEvent->client_s,&_epEvent)==-1)
                            SOErrOccurred=2;
                    }else{
                        SOErrOccurred=1;//maybe socket closed 0. Or -1 socket error.
                    }
                }
            }

        }else if(pollRes==0){//timeout
            printf("pollRes==0,err:%d, timeout...th:%lu\n",*errno_ptr,*curThread);
            if(qss->lifecycleStatus<=3&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads)
            {
                pthread_spin_lock(&qss->g_spinlock);
                if(qss->lifecycleStatus<=3&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads){
                    qss->workerCounter--;//until qss->workerCounter decrease to qss->minThreads
                    exitCode=2;
                }
                pthread_spin_unlock(&qss->g_spinlock);
            }else if(qss->lifecycleStatus>=4)
                    exitCode=4;

        }else if(pollRes==-1){//errno
            printf("pollRes==-1,err:%d, errno...%s\n",*errno_ptr,strerror(*errno_ptr));
            exitCode=1;
        }

        if(SOErrOccurred){
            if(qss->cslifecb)
                qss->cslifecb(qssEPEvent->client_s,-1);
            /*if(qssEPEvent)*/{
                epoll_ctl(qss->epollFD,EPOLL_CTL_DEL,qssEPEvent->client_s,NULL);
                epoll_ctl(qss->BSendEpollFD,EPOLL_CTL_DEL,qssEPEvent->client_s,NULL);
                close(qssEPEvent->client_s);
                if(qssEPEvent->BSendEpollFDRelated){
                    pthread_cond_destroy(&qssEPEvent->writableMonitor);
                    pthread_mutex_destroy(&qssEPEvent->writableLock);
                }
                free(qssEPEvent);
              }
            pthread_spin_lock(&qss->g_spinlock);
            if(--qss->CSocketsCounter==0&&qss->lifecycleStatus>=3){
             //for qss workerSize,
             qss->lifecycleStatus=4;
             exitCode=3;
            }
            pthread_spin_unlock(&qss->g_spinlock);
        }//SOErrOccurred handle;

    }//end main while.

    if(exitCode!=2){
        int clearup=0;
        pthread_spin_lock(&qss->g_spinlock);
        if(!--qss->workerCounter&&qss->lifecycleStatus>=4){//clearup QSS
          clearup=1;
        }
        pthread_spin_unlock(&qss->g_spinlock);
        if(clearup){
         close(qss->epollFD);
         close(qss->BSendEpollFD);
         pthread_spin_destroy(&qss->g_spinlock);
         free(qss);
        }
    }//exitCode handle;
 return NULL;
}

 

 

 

 

posted on 2010-08-03 14:37 adapterofcoms 閱讀(5502) 評(píng)論(1)  編輯 收藏 引用

FeedBack:
# re: 一個(gè)基于Event Poll(epoll)的TCP Server Framework,淺析epoll
2010-08-04 13:12 | adapterofcoms
這篇文章昨天發(fā)的急,只是粘貼了源碼,沒(méi)有發(fā)表設(shè)計(jì)思路和對(duì)源碼的解析,今天已經(jīng)補(bǔ)上,希望大家多指教!  回復(fù)  更多評(píng)論
  

只有注冊(cè)用戶登錄后才能發(fā)表評(píng)論。
網(wǎng)站導(dǎo)航: 博客園   IT新聞   BlogJava   博問(wèn)   Chat2DB   管理



<2025年9月>
31123456
78910111213
14151617181920
21222324252627
2829301234
567891011

常用鏈接

留言簿

隨筆檔案(2)

搜索

  •  

最新評(píng)論

閱讀排行榜

評(píng)論排行榜

青青草原综合久久大伊人导航_色综合久久天天综合_日日噜噜夜夜狠狠久久丁香五月_热久久这里只有精品
  • <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>
            欧美尤物一区| 国产欧美日韩精品在线| 亚洲人成啪啪网站| 欧美一区二区三区免费看| 午夜在线视频观看日韩17c| 国产精品美女诱惑| 欧美一进一出视频| 欧美在线观看视频一区二区| 亚洲曰本av电影| 欧美午夜精品一区| 欧美一区二区三区啪啪| 免费日本视频一区| 99精品热6080yy久久 | 亚洲精品资源美女情侣酒店| 久久国产99| 亚洲人成免费| 亚洲精选久久| 久久久999精品| 一区二区三区精品久久久| 国产精品久久97| 欧美丰满高潮xxxx喷水动漫| 亚洲精品影院| 亚洲国产欧美在线人成| 性欧美在线看片a免费观看| 午夜精品在线| 亚洲一区二区免费视频| 激情久久五月天| 国产精品福利网站| 美日韩精品免费观看视频| 久久精品国产99| 午夜一区二区三区不卡视频| 亚洲欧美日韩中文播放| 亚洲一区一卡| 中日韩高清电影网| 99国产精品99久久久久久| 欧美粗暴jizz性欧美20| 久久久久久久久久久久久9999| 亚洲综合日韩| 欧美在线黄色| 国产乱人伦精品一区二区 | 亚洲二区视频在线| 国产欧美一区二区三区久久人妖| 国产精品久久久久久久久免费桃花| 欧美视频在线观看 亚洲欧| 亚洲伊人观看| 亚洲黄色在线视频| 欧美图区在线视频| 免费欧美在线| 精品不卡在线| 欧美私人啪啪vps| 国产精品久久影院| 国产亚洲成精品久久| 国产精品久久久久久超碰| 亚洲女人小视频在线观看| 99视频精品| 久久久噜噜噜久久中文字免| 久久婷婷色综合| 欧美一区二区视频在线观看2020| 午夜国产精品视频免费体验区| 久久精品国产在热久久| 蜜桃精品久久久久久久免费影院| 美女视频黄免费的久久| 99视频一区二区三区| 久久gogo国模裸体人体| 亚洲性图久久| 亚洲新中文字幕| 亚洲午夜一二三区视频| 亚洲国产精选| 一本色道久久88精品综合| 亚洲国产日韩一区二区| 亚洲日本久久| 亚洲色图制服丝袜| 亚洲国产日韩美| 亚洲视频图片小说| 欧美多人爱爱视频网站| 国一区二区在线观看| 亚洲天天影视| 亚洲欧美日韩国产综合| 欧美一区二区在线免费播放| 久久av一区二区三区漫画| 久久蜜臀精品av| 国产嫩草一区二区三区在线观看| 国产精品爽爽爽| 亚洲精选大片| 久久精品综合一区| 亚洲欧洲精品一区二区三区 | 国产精品免费小视频| 国产欧美婷婷中文| 亚洲美女电影在线| 亚洲国产美女| 老鸭窝亚洲一区二区三区| 国产在线国偷精品产拍免费yy| 亚洲视频一起| 亚洲视频二区| 99精品国产在热久久| 欧美激情国产日韩精品一区18| 亚洲黄色片网站| 欧美激情一区二区三区成人| 欧美日本韩国一区| 亚洲自拍偷拍视频| 中文国产一区| 另类尿喷潮videofree | 国产精品高清在线| 欧美一区国产在线| 欧美大片专区| 欧美一区观看| 欧美精品一区二区三区视频| 午夜视频一区在线观看| 蜜桃久久精品乱码一区二区| 国产日韩欧美日韩| 亚洲国产视频直播| 国产精品一级在线| 亚洲精品黄色| 亚洲二区三区四区| 国产日韩欧美黄色| 亚洲午夜精品久久| 久久米奇亚洲| 老鸭窝亚洲一区二区三区| 香蕉成人伊视频在线观看| 欧美肥婆bbw| 亚洲人成7777| 亚洲卡通欧美制服中文| 久热精品视频在线| 久久久久久久久伊人| 国产在线精品自拍| 亚洲欧美日韩精品久久亚洲区 | 久久综合电影| 欧美在线视频在线播放完整版免费观看 | 亚洲一区观看| 亚洲国产日韩在线| 正在播放亚洲一区| 91久久国产综合久久| 先锋影音国产一区| 一区二区欧美日韩视频| 久久久亚洲午夜电影| 小嫩嫩精品导航| 国产精品久久久久久久第一福利| 欧美激情一区三区| 91久久久一线二线三线品牌| 香蕉成人啪国产精品视频综合网| 欧美日本国产视频| 一本色道久久综合亚洲91| 99re在线精品| 欧美日韩亚洲在线| 一区二区91| 亚洲综合第一| 在线欧美日韩国产| 一本色道久久99精品综合| 老司机精品福利视频| 91久久在线视频| 午夜视频一区| 日韩一区二区精品| 国产一区999| 欧美午夜电影在线观看| 久久久激情视频| 亚洲深夜福利在线| 欧美国产免费| 久久伊人免费视频| 亚洲欧美激情视频在线观看一区二区三区| 亚洲视频中文字幕| 美女脱光内衣内裤视频久久影院| 日韩亚洲综合在线| 黄色日韩在线| 国内精品福利| 激情91久久| 禁断一区二区三区在线| 国产日韩欧美综合精品| 亚洲欧美日韩综合国产aⅴ| 国产精品五区| 久久久999精品免费| 亚洲午夜激情网站| 亚洲精品少妇| 亚洲美女视频在线免费观看| 美国成人直播| 麻豆精品在线播放| 久久福利视频导航| 久久一区免费| 欧美a级一区| 欧美国产先锋| 91久久久在线| 中文无字幕一区二区三区| 一区二区三区精品视频在线观看| 亚洲伦理在线免费看| 一区二区三区四区国产| 亚洲与欧洲av电影| 久久国产日本精品| 欧美成人dvd在线视频| 亚洲激情综合| 亚洲高清在线观看一区| 欧美激情第二页| 国产精品高潮粉嫩av| 久久综合给合久久狠狠色| 久久精品盗摄| 免费成人黄色| 欧美国产免费| 国产精品男gay被猛男狂揉视频| 国产精品欧美精品| 亚洲黄色影院| 欧美亚洲三区| 亚洲国内高清视频|