锘??xml version="1.0" encoding="utf-8" standalone="yes"?> #define QSS_SIO_KEEPALIVE_VALS_TIMEOUT 30*60 #define MAX_BUF_SIZE 1024 typedef void (*CSocketLifecycleCallback)(int cs,int lifecycle);//lifecycle:0:OnAccepted,-1:OnClose typedef struct { typedef struct { char buf[MAX_BUF_SIZE]; int BSendEpollFDRelated; int createSocketServer(QSocketServer ** qss_ptr,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,int workerWaitTimeout); qsocketserver_model.c //涓嬮潰鐨勪唬鐮佺鐢熶駭鐜榪樺樊鍐呭瓨姹?/strong>鍜?strong>logger鍝︼紒 static int setNonBlocking(int sock) static void adjustQSSWorkerLimits(QSocketServer *qss){ static WORD addQSSWorker(QSocketServer *qss,WORD addCounter){ if(pParam){ static void SOlogger(const char * msg,SOCKET s){ static int _InternalProtocolHandler(struct epoll_event * event,BlockingSender_t _blockingSender,void * senderBase){ int createSocketServer(QSocketServer ** qss_ptr,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,int workerWaitTimeout) QSocketServer * qss=malloc(sizeof(QSocketServer)); int startSocketServer(QSocketServer *qss) qss->serv_addr.sin_family=AF_INET; qss->server_s=socket(AF_INET,SOCK_STREAM,IPPROTO_IP); if(qss->server_s==INVALID_SOCKET) if(bind(qss->server_s,(struct sockaddr *)&qss->serv_addr,sizeof(SOCKADDR_IN))==SOCKET_ERROR) if(listen(qss->server_s,SOMAXCONN/*榪欎釜瀹弚indows涔熸湁錛岃繖閲屾槸128錛屽綋鐒朵綘鍙互璁劇殑灝忎簺錛屽畠褰卞搷寮閿鐨?/strong>*/)==SOCKET_ERROR) {//ADD ACCEPT EVENT //initialize worker for epoll events. int shutdownSocketServer(QSocketServer *qss){ // /proc/getpid/fd shutdown all socket cs != serv_s {// set keepalive option typedef struct{ static int internalBlockingSender(void * senderBase,int cs, void * _buf, size_t nbs){ QSSEPollEvent *qssEPEvent=NULL; struct timespec sendTimeo; while(1){ {//wait writable. }else{ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL); free(pParam); pollRes=epoll_wait(qss->BSendEpollFD,epEvents,qss->maxThreads,-1); }else if(pollRes==-1){//errno } return NULL; free(pParam); *errno_ptr=0,SOErrOccurred=0,qssEPEvent=NULL; if(qssEPEvent->client_s==qss->server_s) addQSSWorker(qss,1); pthread_spin_lock(&qss->g_spinlock); if(handleCode>0){ }else if(pollRes==0){//timeout }else if(pollRes==-1){//errno if(SOErrOccurred){ }//end main while. if(exitCode!=2){ }QSSOverlapped;//for per connection #define MAX_THREADS 100 #define MAX_BUF_SIZE 1024 typedef struct Q_SOCKET_SERVER SocketServer; #endif typedef struct { DWORD acceptorRoutine(LPVOID); static void adjustQSSWorkerLimits(QSocketServer *qss){ typedef struct{ static WORD addQSSWorker(QSocketServer *qss,WORD addCounter){ static void SOlogger(const char * msg,SOCKET s,int clearup){ static int _InternalEchoProtocolHandler(LPWSAOVERLAPPED overlapped){ DWORD initializeSocketServer(SocketServer ** ssp,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,long workerWaitTimeout){ DWORD startSocketServer(SocketServer *ss){ SOlogger("WSAStartup failed.\n",0,0); DWORD shutdownSocketServer(SocketServer *ss){ DWORD acceptorRoutine(LPVOID ss){ }else{ if(qssOl) return 1; static int postRecvCompletionPacket(QSSOverlapped * qssOl,int SOErrOccurredCode){ DWORD completionWorkerRoutine(LPVOID ss){ printf("worker[%d] GetQueuedCompletionStatus F:%d \n",GetCurrentThreadId(),GetLastError()); if(SOErrOccurred){ //last to do 琛屾枃浠撲績(jī),闅懼厤鏈夐敊璇垨涓嶈凍涔嬪,甯屾湜澶у韙婅穬鎸囨璇勮,璋㈣阿! 榪欎釜妯″瀷鍦ㄦц兘涓婅繕鏄湁鏀硅繘鐨勭┖闂村摝錛?br>
LT/ET:ET涔熶細(xì)澶氭鍙戦乪vent,褰撶劧棰戠巼榪滀綆浜嶭T,浣嗘槸epoll one shot鎵嶆槸鐪熸鐨勫"one connection VS one thread in worker thread pool,涓嶄緷璧栦簬浠諱綍connection-data-queue"鐨勫熀紜鏀寔 .鎴戠湅鍒板ぇ閮ㄥ垎瀵筫poll_wait鐨勫鐞嗘ā寮忓涓嬶紝寰堟暀縐戝寲錛屽洜涓簃an-pages灝辨槸榪欐牱涓句緥瀛愮殑銆?br>man-pages epoll_wait handle:
#define MAX_EVENTS 10
struct epoll_event events[MAX_EVENTS];
for (;;)
{
nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
for(1~nfds)
handle events[i];
......
}
epoll_ctl_add鐨勫綋鐒舵槸EPOLLIN|EPOLLET,姝ゅ鎴戝氨涓嶇煡閬撳鐞嗕笂闈唬鐮佺殑鏄竴鏉$嚎紼嬭繕鏄竴鍫嗙嚎紼嬶紙in threadpool錛夛紝浣嗘効涓嶆槸涓鏉$嚎紼嬪惂錛佸鏋滄槸鐨勮瘽錛岄毦涓嶆垚絳夊鐞嗗畬榪?strong>MAX_EVENTS 涓啀鍘誨鐞嗘帴涓嬫潵鐨?strong>MAX_EVENTS 涓紵鎱㈠惁錛?浣嗘槸濡傛灉鏄竴鍫嗙嚎紼嬬殑璇濓紝浣犳槸鍚﹁冭檻榪囧浣曞鐞唕equest-data in one connection鍦ㄩ昏緫涓婄殑瀹屾暣鎬э紝涔熷氨鏄竴涓猺equest-data-packet鍙兘浼?xì)琚垎鍓叉垚鑻ヲq叉鍙戦侊紝鍦ㄤ笂闈㈢殑澶勭悊妯″紡涓綘鐪熺殑瑕佸ソ濂借璁′竴涓嬩簡(jiǎn)銆?br>鑰屾垜鐨別poll_wait澶勭悊妯″紡濡備笅錛?br>struct epoll_event activeEvent;
for(;;)
{
epoll_wait(epollfd, &activeEvent, 1/*寰堟儕璁跺悧錛屼絾緇濅笉鏄竴鏉$嚎紼嬪湪榪愯榪欐浠g爜,鑰屾槸涓鍫?/, timeout);
if handle activeEvent success
epoll_ctl_mod EPOLLIN|EPOLLET|EPOLLONESHOT
......
}
澶勭悊涓婇潰浠g爜鐨勫綋鐒舵槸涓鍫嗙嚎紼媔n threadpool,鑰屼笖epoll_ctl_add鐨勬槸EPOLLIN|EPOLLET|EPOLLONESHOT
鍥犱負(fù)鎴戠殑璁捐鐞嗗康鏄弗鏍奸伒瀹?strong>one connection VS one thread in worker thread pool銆?br>鎵浠ユ垜涓嬮潰鐨剆erver妗嗘灦鐨勫熀鏈ā鍨嬫槸:
One connection VS one thread in worker thread pool ,worker thread performs epollWorkerRoutine.
鍦╡pollWorkerRoutine涓湁浠ヤ笅鐨勮亴璐?
1.handle request,褰撳繖鏃跺鍔爀pollWorkerThread鏁伴噺浣嗕笉瓚呰繃maxThreads,post/MOD EPOLLIN|EPOLLONESHOT Interested Event to epoll.
2.timeout鏃舵鏌ユ槸鍚︾┖闂插拰褰撳墠epollWorkerThread鏁伴噺,褰撶┖闂叉椂淇濇寔鎴栧噺灝戣嚦minThreads鏁伴噺.
3.瀵規(guī)墍鏈堿ccepted-socket綆$悊鐢熷懡鍛ㄦ湡,榪欓噷鍒╃敤緋葷粺鐨刱eepalive probes,鑻ユ兂瀹炵幇涓氬姟灞?蹇?jī)锜╂帰娴?鍙渶灝哘SS_SIO_KEEPALIVE_VALS_TIMEOUT 鏀瑰洖緋葷粺榛樿鐨?灝忔椂.榪欓噷騫朵笉緇存姢鎵鏈夎繛鎺ュ垪琛紝褰撶劧浣犲彲浠ュ湪/proc/getpid/fd涓嬫壘鍒版墍鏈夌殑socket fd.
4.linux涓婄殑non-blocking socket鐨勬搷浣滀粛鐒朵緷璧杛ecv,send錛屼笉鍍弚indows涓婄殑wsarecv+overlapped,鍗充究涓嶇敤fcntl fd o_nonblock涔熷彲浠ョ珛鍗寵繑鍥炪傛垜榪欓噷鎶妔end鍔ㄤ綔瀹炵幇鎴愪簡(jiǎn)blocking鐨勶紙internalBlockingSender錛夛紝鍚屾牱鐨勯亾鐞嗭紝non-blocking send渚濈劧浼?xì)迮炴垚鍝嶅簲鏁版嵁鍦ㄩ昏緫涓婄殑紕庣墖閿欏簭錛岀壒鍒槸浣犲鏋滈噰鐢ㄤ笂闈㈤偅涓暀縐戝寲鐨勫鐞嗘ā寮忕殑鍖栵紝騫朵笖榪樻槸澶氱嚎紼嬬殑璇濓紝閭d箞榪欑畝鐩村氨涔遍忎簡(jiǎn)銆傚綋鐒朵綘鍙互浣跨敤response-data-queue鏉ヨ揪鍒板紓姝ュ噯紜彂閫佹暟鎹殑鐩殑銆?br>
涓嬮潰緇撳悎婧愮爜,嫻呮瀽涓涓媏poll programming:
socketserver.h
#ifndef __Q_SOCKET_SERVER__
#define __Q_SOCKET_SERVER__
#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <string.h>
#include <sys/epoll.h>
#include <pthread.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#define SOCKET_ERROR -1
#define INVALID_SOCKET -1
typedef int SOCKET;
typedef struct sockaddr_in SOCKADDR_IN;
typedef unsigned short WORD;
typedef unsigned int DWORD;
#define QSS_SIO_KEEPALIVE_VALS_INTERVAL 5
#define QSS_SIO_KEEPALIVE_VALS_COUNT 3
#define MAX_THREADS 100
#define MAX_THREADS_MIN 10
#define MIN_WORKER_WAIT_TIMEOUT 20*1000
#define MAX_WORKER_WAIT_TIMEOUT 60*MIN_WORKER_WAIT_TIMEOUT
#define MAX_THREADPOOLS 32
/* ulimit -n opened FDs per process.璁板緱淇敼鍝︼紝鍚﹀垯榪樻槸select鏁堟灉,灝變笉鏄痚poll鏁堟灉浜?jiǎn)鍝﹀Q屽懙鍛?/
#define BLOCKING_SEND_TIMEOUT 20
typedef int (*BlockingSender_t)(void * senderBase,int cs, void * buf, size_t nbs);
typedef int (*InternalProtocolHandler)(struct epoll_event * event,BlockingSender_t _blockingSender,void * senderBase);//return -1:SOCKET_ERROR
WORD passive;
WORD port;//uint16_t
WORD minThreads;
WORD maxThreads;
pthread_spinlock_t g_spinlock;//PTHREAD_PROCESS_PRIVATE
volatile int lifecycleStatus;//0-created,1-starting, 2-running,3-stopping,4-exitSignaled,5-stopped
int workerWaitTimeout;//wait timeout
volatile int workerCounter;
volatile int currentBusyWorkers;
volatile int CSocketsCounter;
CSocketLifecycleCallback cslifecb;
InternalProtocolHandler protoHandler;
SOCKET server_s;
SOCKADDR_IN serv_addr;
int epollFD;//main epoller.
int BSendEpollFD;//For blocking send.
}QSocketServer;
SOCKET client_s;
SOCKADDR_IN client_addr;
uint32_t curEvents;
DWORD numberOfBytesTransferred;
char * data;
pthread_mutex_t writableLock;
pthread_cond_t writableMonitor;
}QSSEPollEvent;//for per connection
int startSocketServer(QSocketServer *qss);
int shutdownSocketServer(QSocketServer *qss);
#endif
#include "socketserver.h"
#include <dirent.h>
#include <regex.h>
#define DIGIT_PATTERN_STRING "^[0-9]+$"
void * epollWorkerRoutine(void *);
void * blockingSendEpollerRoutine(void *);
int isDigitStr(const char *str){
int ret=-1;
regex_t regex;
regmatch_t matchs[1];
if(!regcomp(®ex,DIGIT_PATTERN_STRING,REG_EXTENDED/*榪欓噷涓嶈浼?鍝︼紝鍚﹀垯nomatch*/)){
ret=!regexec(®ex,str, 1,matchs,0);
regfree(®ex);
}
return ret;
}
{
int opts;
opts=fcntl(sock,F_GETFL);
if(opts==-1)
{
perror("fcntl(sock,GETFL) failed!\n");
return opts;
}
opts = opts|O_NONBLOCK;
opts=fcntl(sock,F_SETFL,opts);
if(opts==-1)
{
perror("fcntl(sock,SETFL,opts) failed!\n");
return opts;
}
return 1;
}
//to adjust availabe size.
}
typedef struct{
QSocketServer * qss;
pthread_t th;
}QSSWORKER_PARAM;
WORD res=0;
if(qss->workerCounter<qss->minThreads||(qss->currentBusyWorkers==qss->workerCounter&&qss->workerCounter<qss->maxThreads))
{
QSSWORKER_PARAM * pParam=NULL;
int i=0;
pthread_spin_lock(&qss->g_spinlock);
if(qss->workerCounter+addCounter<=qss->maxThreads)
for(;i<addCounter;i++)
{
pParam=malloc(sizeof(QSSWORKER_PARAM));
pthread_create(&pParam->th,NULL,epollWorkerRoutine,pParam);
pParam->qss=qss;
qss->workerCounter++,res++;
}
}
pthread_spin_unlock(&qss->g_spinlock);
}
return res;
}
perror(msg);
if(s>0)
close(s);
}
QSSEPollEvent *qssEPEvent=event->data.ptr;
int ret;
printf("_InternalProtocolHandler START pollRes==1,err:%d, ...cs:%d,,,,,th:%lu,\n",errno,qssEPEvent->client_s,pthread_self());
if((ret=recv(qssEPEvent->client_s,qssEPEvent->buf,MAX_BUF_SIZE,0))>0){
//sleep(5);
ret=_blockingSender(senderBase,qssEPEvent->client_s,qssEPEvent->buf,ret);
}
printf("_InternalProtocolHandler END ret=%d,err:%d,%s, ...cs:%d,,,,,th:%lu,\n",ret,errno,strerror(errno),qssEPEvent->client_s,pthread_self());
return ret;
}
{
qss->passive=passive;
qss->port=port;
qss->minThreads=minThreads;
qss->maxThreads=maxThreads;
qss->workerWaitTimeout=workerWaitTimeout;
qss->lifecycleStatus=0;
pthread_spin_init(&qss->g_spinlock,PTHREAD_PROCESS_PRIVATE);
qss->workerCounter=0;
qss->currentBusyWorkers=0;
qss->CSocketsCounter=0;
qss->cslifecb=cslifecb,qss->protoHandler=protoHandler;
if(!qss->protoHandler)
qss->protoHandler=_InternalProtocolHandler;
adjustQSSWorkerLimits(qss);
*qss_ptr=qss;
return 1;
}
{
if(qss==NULL)
return 0;
else{
pthread_spin_lock(&qss->g_spinlock);
if(qss->lifecycleStatus==0){
qss->lifecycleStatus=1;
pthread_spin_unlock(&qss->g_spinlock);
}else{
pthread_spin_unlock(&qss->g_spinlock);
return 0;
}
}
//bzero(&qss->serv_addr, sizeof(qss->serv_addr));
qss->serv_addr.sin_port=htons(qss->port);
inet_aton("127.0.0.1",&(qss->serv_addr.sin_addr));
//qss->serv_addr.sin_addr.s_addr=INADDR_ANY;//inet_addr("127.0.0.1");
if(setNonBlocking(qss->server_s)==-1)
{
SOlogger("setNonBlocking server_s failed.\n",0);
return 0;
}
{
SOlogger("socket failed.\n",0);
return 0;
}
{
SOlogger("bind failed.\n",qss->server_s);
return 0;
}
{
SOlogger("listen failed.\n",qss->server_s);
return 0;
}
qss->epollFD=epoll_create1(0);/*榪欓噷涓嶆槸epoll_create(size)鍝︼紝浣犲彲鑳戒笉鐭ラ亾濡備綍璁劇疆size,鎵浠ュ拷鐣ュ畠鍚?/
if(qss->epollFD==-1){
SOlogger("epoll_create1 0, main epollFD failed.\n",qss->server_s);
return 0;
}
qss->BSendEpollFD=epoll_create1(0);//for blocking send.
if(qss->BSendEpollFD==-1){
SOlogger("epoll_create1 0,BSendEpollFD failed.\n",qss->server_s);
return 0;
}
struct epoll_event _epEvent;
QSSEPollEvent *qssEPEvent=malloc(sizeof(QSSEPollEvent));
qssEPEvent->client_s=qss->server_s;
_epEvent.events=qssEPEvent->curEvents=EPOLLIN|EPOLLET;
_epEvent.data.ptr=qssEPEvent;
if(epoll_ctl(qss->epollFD,EPOLL_CTL_ADD,qss->server_s,&_epEvent)==-1){
SOlogger("epoll_ctl server_s to accept failed.\n",qss->server_s);
free(qssEPEvent);
return 0;
}
}
{//starup blocking send epoller.
QSSWORKER_PARAM * pParam=malloc(sizeof(QSSWORKER_PARAM));
pParam->qss=qss;
pthread_create(&pParam->th,NULL,blockingSendEpollerRoutine,pParam);
}
addQSSWorker(qss,qss->minThreads);
qss->lifecycleStatus=2;
return 1;
}
//change qss->lifecycleStatus
if(qss==NULL)
return 0;
else{
pthread_spin_lock(&qss->g_spinlock);
if(qss->lifecycleStatus==2){
qss->lifecycleStatus=3;
pthread_spin_unlock(&qss->g_spinlock);
}else{
pthread_spin_unlock(&qss->g_spinlock);
return 0;
}
}
/*shutdown server-listening socket,榪欓噷浼橀泤鐨勫仛娉曟槸shutdown--notify-->epoll-->close.璁板緱shutdown浼?xì)鍙戦丒OF鐨勫摝*/
shutdown(qss->server_s,SHUT_RDWR);
{
char dirBuf[64];
struct dirent * de;
DIR *pd=NULL;
int sockFD;
sprintf(dirBuf,"/proc/%d/fd/",getpid());
pd=opendir(dirBuf);
if(pd!=NULL){
while((de=readdir(pd))!=NULL){
if(isDigitStr(de->d_name)){
sockFD=atoi(de->d_name);
if(isfdtype(sockFD,S_IFSOCK))
shutdown(sockFD,SHUT_RDWR);
}
}
closedir(pd);
}
/*fstat(ret,&_stat);S_ISSOCK(_stat.st_mode)======isfdtype(sockFD,S_IFSOCK)*/
}
return 1;
}
static int onAcceptRoutine(QSocketServer * qss)
{
SOCKADDR_IN client_addr;
unsigned int client_addr_leng=sizeof(SOCKADDR_IN);
SOCKET cs;
struct epoll_event _epEvent;
QSSEPollEvent *qssEPEvent=NULL;
cs=accept(qss->server_s,(struct sockaddr *)&client_addr,&client_addr_leng);
if(cs==INVALID_SOCKET)
{
printf("onAccept failed:%d,%s\n",errno,strerror(errno));
epoll_ctl(qss->epollFD,EPOLL_CTL_DEL,qss->server_s,NULL);//EINVAL 22 Invalid argument
return 0;
}
if(setNonBlocking(cs)==-1)
{
printf("onAccept setNonBlocking client_s failed.cs:%d\n",cs);
return 0;
}
int keepAlive = 1;
int keepIdle = QSS_SIO_KEEPALIVE_VALS_TIMEOUT;
int keepInterval = QSS_SIO_KEEPALIVE_VALS_INTERVAL;
int keepCount = QSS_SIO_KEEPALIVE_VALS_COUNT;
if(setsockopt(cs, SOL_SOCKET, SO_KEEPALIVE, (void *)&keepAlive, sizeof(keepAlive))||
setsockopt(cs, SOL_TCP, TCP_KEEPIDLE, (void *)&keepIdle, sizeof(keepIdle))||
setsockopt(cs, SOL_TCP, TCP_KEEPINTVL, (void *)&keepInterval, sizeof(keepInterval))||
setsockopt(cs, SOL_TCP, TCP_KEEPCNT, (void *)&keepCount, sizeof(keepCount)))
{
printf("onAccept set keepalive option client_s failed.cs:%d,err:%s\n",cs,strerror(errno));
return 0;
}
}
qssEPEvent=malloc(sizeof(QSSEPollEvent));
qssEPEvent->client_s=cs;
{
_epEvent.events=qssEPEvent->curEvents=EPOLLIN|EPOLLET|EPOLLONESHOT;
qssEPEvent->BSendEpollFDRelated=0;
_epEvent.data.ptr=qssEPEvent;/*榪欓噷鍙堝拰鏁欑鐨勪笉涓鏍峰摝錛岀湡姝g殑user data鐢╬tr,鑰屼笉鏄崟涓鐨刦d*/
if(epoll_ctl(qss->epollFD,EPOLL_CTL_ADD,cs,&_epEvent)==-1){
printf("onAccept epoll_ctl client_s failed.cs:%d,err:%d\n",cs,errno);
free(qssEPEvent);
return 0;
}else{
pthread_spin_lock(&qss->g_spinlock);
qss->CSocketsCounter++;
pthread_spin_unlock(&qss->g_spinlock);
if(qss->cslifecb)
qss->cslifecb(cs,0);
}
}
printf("onAccepted flags:err:%d ,cs:%d.\n",errno,cs);
return 1;
}
QSocketServer * qss;
QSSEPollEvent * event;
}InternalSenderBase_t;
InternalSenderBase_t *sb=(InternalSenderBase_t *)senderBase;
char * _sbuf=_buf;
int ret=0,sum=0,curEpoll_ctl_opt,*errno_ptr=&errno;
struct epoll_event _epEvent;
*errno_ptr=0;
while(sum<nbs&&(ret=send(cs,_sbuf,nbs-sum,0))>0)
sum+=ret,_sbuf+=ret;
if(sum==nbs||ret==0)
break;
else if(ret==-1){
if(errno==EAGAIN&&sum<nbs){
qssEPEvent=sb->event;
_epEvent.data.ptr=qssEPEvent;
_epEvent.events=EPOLLOUT|EPOLLET|EPOLLONESHOT;
if(qssEPEvent->BSendEpollFDRelated==0){
pthread_mutex_init(&qssEPEvent->writableLock,NULL);
pthread_cond_init(&qssEPEvent->writableMonitor,NULL);
qssEPEvent->BSendEpollFDRelated=1;
curEpoll_ctl_opt=EPOLL_CTL_ADD;
}else{
curEpoll_ctl_opt=EPOLL_CTL_MOD;
}
int flag=0;
pthread_mutex_lock(&qssEPEvent->writableLock);
if(epoll_ctl(sb->qss->BSendEpollFD,curEpoll_ctl_opt,qssEPEvent->client_s,&_epEvent)==0){
sendTimeo.tv_nsec=0,sendTimeo.tv_sec=time(NULL)+BLOCKING_SEND_TIMEOUT;
int err=pthread_cond_timedwait(&qssEPEvent->writableMonitor,&qssEPEvent->writableLock,&sendTimeo);
if(err)
flag=-1;
}else
flag=-1;
pthread_mutex_unlock(&qssEPEvent->writableLock);
if(flag==-1)
break;
}
if(errno==EAGAIN&&sum==nbs)
ret=nbs;//it is ok;
break;
}
}
}//end while.
return ret;
}
void * blockingSendEpollerRoutine(void *_param){
QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)_param;
QSocketServer * qss=pParam->qss;
//pthread_t * curThread=&pParam->th;
struct epoll_event epEvents[qss->maxThreads];
QSSEPollEvent *qssEPEvent=NULL;
int pollRes,*errno_ptr=&errno;
while(1){
if(pollRes>=1){
int i=0;
for(;i<pollRes;i++)
if(epEvents[i].events&EPOLLOUT){//榪欎釜epollfd鍙簲璇ュ仛浠ヤ笅鐨勪簨鎯咃紝灝戝仛涓哄揩!
qssEPEvent=epEvents[i].data.ptr;
pthread_mutex_lock(&qssEPEvent->writableLock);
pthread_cond_signal(&qssEPEvent->writableMonitor);
pthread_mutex_unlock(&qssEPEvent->writableLock);
}
printf("blockingSendEpollerRoutine pollRes==-1,err:%d, errno...%s\n",*errno_ptr,strerror(*errno_ptr));
break;
}
}
void * epollWorkerRoutine(void * _param){
QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)_param;
QSocketServer * qss=pParam->qss;
pthread_t * curThread=&pParam->th;
struct epoll_event _epEvent;
QSSEPollEvent *qssEPEvent=NULL;
InternalSenderBase_t _senderBase;
int pollRes=0,handleCode=0,exitCode=0,SOErrOccurred=0,*errno_ptr=&errno;
_senderBase.qss=qss;
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);
while(!exitCode){
pollRes=epoll_wait(qss->epollFD,&_epEvent,1,qss->workerWaitTimeout);
if(pollRes==1){
qssEPEvent=(QSSEPollEvent *)_epEvent.data.ptr;
{//Accepted Socket.
onAcceptRoutine(qss);
continue;
}else{
if(qss->protoHandler){
_senderBase.event=_epEvent.data.ptr;
pthread_spin_lock(&qss->g_spinlock);
qss->currentBusyWorkers++;
pthread_spin_unlock(&qss->g_spinlock);
handleCode=qss->protoHandler(&_epEvent,internalBlockingSender,&_senderBase);
qss->currentBusyWorkers--;
pthread_spin_unlock(&qss->g_spinlock);
_epEvent.events=EPOLLIN|EPOLLET|EPOLLONESHOT;
if(epoll_ctl(qss->epollFD,EPOLL_CTL_MOD,qssEPEvent->client_s,&_epEvent)==-1)
SOErrOccurred=2;
}else{
SOErrOccurred=1;//maybe socket closed 0. Or -1 socket error.
}
}
}
printf("pollRes==0,err:%d, timeout...th:%lu\n",*errno_ptr,*curThread);
if(qss->lifecycleStatus<=3&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads)
{
pthread_spin_lock(&qss->g_spinlock);
if(qss->lifecycleStatus<=3&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads){
qss->workerCounter--;//until qss->workerCounter decrease to qss->minThreads
exitCode=2;
}
pthread_spin_unlock(&qss->g_spinlock);
}else if(qss->lifecycleStatus>=4)
exitCode=4;
printf("pollRes==-1,err:%d, errno...%s\n",*errno_ptr,strerror(*errno_ptr));
exitCode=1;
}
if(qss->cslifecb)
qss->cslifecb(qssEPEvent->client_s,-1);
/*if(qssEPEvent)*/{
epoll_ctl(qss->epollFD,EPOLL_CTL_DEL,qssEPEvent->client_s,NULL);
epoll_ctl(qss->BSendEpollFD,EPOLL_CTL_DEL,qssEPEvent->client_s,NULL);
close(qssEPEvent->client_s);
if(qssEPEvent->BSendEpollFDRelated){
pthread_cond_destroy(&qssEPEvent->writableMonitor);
pthread_mutex_destroy(&qssEPEvent->writableLock);
}
free(qssEPEvent);
}
pthread_spin_lock(&qss->g_spinlock);
if(--qss->CSocketsCounter==0&&qss->lifecycleStatus>=3){
//for qss workerSize,
qss->lifecycleStatus=4;
exitCode=3;
}
pthread_spin_unlock(&qss->g_spinlock);
}//SOErrOccurred handle;
int clearup=0;
pthread_spin_lock(&qss->g_spinlock);
if(!--qss->workerCounter&&qss->lifecycleStatus>=4){//clearup QSS
clearup=1;
}
pthread_spin_unlock(&qss->g_spinlock);
if(clearup){
close(qss->epollFD);
close(qss->BSendEpollFD);
pthread_spin_destroy(&qss->g_spinlock);
free(qss);
}
}//exitCode handle;
return NULL;
}
2.澶у鏈夎皝鎶婂畠璁劇疆涓鴻秴鍑篶pu涓暟鐨勫鹼紝褰撶劧涓嶅彧鏄痗pu涓暟鐨?鍊嶏紝鑰屾槸涓嬮潰鐨凪AX_THREADS 100鐢氳嚦鏇村ぇ銆?br>瀵逛簬榪欎釜鍊肩殑璁懼畾錛宮sdn騫舵病鏈夎闈炲緱璁炬垚cpu涓暟鐨?鍊嶏紝鑰屼笖涔熸病鏈夋妸鍑忓皯綰跨▼涔嬮棿涓婁笅鏂囦氦鎹㈣繖浜涘獎(jiǎng)鍝嶆壇鍒拌繖閲屾潵銆侷/O Completion Ports MSDN:"If your transaction required a lengthy computation, a larger concurrency value will allow more threads to run. Each completion packet may take longer to finish, but more completion packets will be processed at the same time. "銆?br> 瀵逛簬struct OVERLAPPED錛屾垜浠父浼?xì)濡備笅鎵╁睍锛?br>typedef struct {
WSAOVERLAPPED overlapped; //must be first member? 鏄殑錛屽繀欏繪槸絎竴涓傚鏋滀綘涓嶈偗瀹氾紝浣犲彲浠ヨ瘯璇曘?br> SOCKET client_s;
SOCKADDR_IN client_addr;
WORD optCode;//1--read,2--send. 鏈変漢甯鎬細(xì)瀹氫箟榪欎釜鏁版嵁鎴愬憳錛屼絾涔熸湁浜轟笉鐢紝浜夎鍦╯end/WSASend,姝ゆ椂鐨勫悓姝ュ拰寮傛鏄惁鏈夊繀瑕侊紵 鑷沖皯鎴戜笅闈㈢殑server鏇存湰灝辨病鐢ㄥ畠銆?br> char buf[MAX_BUF_SIZE];
WSABUF wsaBuf;//inited ? 榪欎釜涓嶈蹇樹簡(jiǎn)錛?br> DWORD numberOfBytesTransferred;
DWORD flags;
鎴戜笅闈㈢殑server妗嗘灦鐨勫熀鏈濇兂鏄?
One connection VS one thread in worker thread pool ,worker thread performs completionWorkerRoutine.
A Acceptor thread 涓撻棬鐢ㄦ潵accept socket,鍏寵仈鑷矷OCP,騫禬SARecv:post Recv Completion Packet to IOCP.
鍦╟ompletionWorkerRoutine涓湁浠ヤ笅鐨勮亴璐?
1.handle request,褰撳繖鏃跺鍔燾ompletionWorkerThread鏁伴噺浣嗕笉瓚呰繃maxThreads,post Recv Completion Packet to IOCP.
2.timeout鏃舵鏌ユ槸鍚︾┖闂插拰褰撳墠completionWorkerThread鏁伴噺,褰撶┖闂叉椂淇濇寔鎴栧噺灝戣嚦minThreads鏁伴噺.
3.瀵規(guī)墍鏈堿ccepted-socket綆$悊鐢熷懡鍛ㄦ湡,榪欓噷鍒╃敤緋葷粺鐨刱eepalive probes,鑻ユ兂瀹炵幇涓氬姟灞?蹇?jī)锜╂帰娴?鍙渶灝哘SS_SIO_KEEPALIVE_VALS_TIMEOUT 鏀瑰洖緋葷粺榛樿鐨?灝忔椂.
涓嬮潰緇撳悎婧愪唬鐮?嫻呮瀽涓涓婭OCP:
socketserver.h
#ifndef __Q_SOCKET_SERVER__
#define __Q_SOCKET_SERVER__
#include <winsock2.h>
#include <mstcpip.h>
#define QSS_SIO_KEEPALIVE_VALS_TIMEOUT 30*60*1000
#define QSS_SIO_KEEPALIVE_VALS_INTERVAL 5*1000
#define MAX_THREADS_MIN 10
#define MIN_WORKER_WAIT_TIMEOUT 20*1000
#define MAX_WORKER_WAIT_TIMEOUT 60*MIN_WORKER_WAIT_TIMEOUT
/*褰揂ccepted socket鍜宻ocket鍏抽棴鎴栧彂鐢熷紓甯告椂鍥炶皟CSocketLifecycleCallback*/
typedef void (*CSocketLifecycleCallback)(SOCKET cs,int lifecycle);//lifecycle:0:OnAccepted,-1:OnClose//娉ㄦ剰OnClose姝ゆ椂鐨剆ocket鏈繀鍙敤,鍙兘宸茬粡琚潪姝e父鍏抽棴鎴栧叾浠栧紓甯?
/*鍗忚澶勭悊鍥炶皟*/
typedef int (*InternalProtocolHandler)(LPWSAOVERLAPPED overlapped);//return -1:SOCKET_ERROR
DWORD initializeSocketServer(SocketServer ** ssp,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,long workerWaitTimeout);
DWORD startSocketServer(SocketServer *ss);
DWORD shutdownSocketServer(SocketServer *ss);
qsocketserver.c 綆縐?qss,鐩稿簲鐨凮VERLAPPED綆縐皅ssOl.
#include "socketserver.h"
#include "stdio.h"
typedef struct {
WORD passive;//daemon
WORD port;
WORD minThreads;
WORD maxThreads;
volatile long lifecycleStatus;//0-created,1-starting, 2-running,3-stopping,4-exitKeyPosted,5-stopped
long workerWaitTimeout;//wait timeout
CRITICAL_SECTION QSS_LOCK;
volatile long workerCounter;
volatile long currentBusyWorkers;
volatile long CSocketsCounter;//Accepted-socket寮曠敤璁℃暟
CSocketLifecycleCallback cslifecb;
InternalProtocolHandler protoHandler;
WORD wsaVersion;//=MAKEWORD(2,0);
WSADATA wsData;
SOCKET server_s;
SOCKADDR_IN serv_addr;
HANDLE iocpHandle;
}QSocketServer;
WSAOVERLAPPED overlapped;
SOCKET client_s;
SOCKADDR_IN client_addr;
WORD optCode;
char buf[MAX_BUF_SIZE];
WSABUF wsaBuf;
DWORD numberOfBytesTransferred;
DWORD flags;
}QSSOverlapped;
DWORD completionWorkerRoutine(LPVOID);
/*adjust size and timeout.*/
/*if(qss->maxThreads <= 0) {
qss->maxThreads = MAX_THREADS;
} else if (qss->maxThreads < MAX_THREADS_MIN) {
qss->maxThreads = MAX_THREADS_MIN;
}
if(qss->minThreads > qss->maxThreads) {
qss->minThreads = qss->maxThreads;
}
if(qss->minThreads <= 0) {
if(1 == qss->maxThreads) {
qss->minThreads = 1;
} else {
qss->minThreads = qss->maxThreads/2;
}
}
if(qss->workerWaitTimeout<MIN_WORKER_WAIT_TIMEOUT)
qss->workerWaitTimeout=MIN_WORKER_WAIT_TIMEOUT;
if(qss->workerWaitTimeout>MAX_WORKER_WAIT_TIMEOUT)
qss->workerWaitTimeout=MAX_WORKER_WAIT_TIMEOUT; */
}
QSocketServer * qss;
HANDLE th;
}QSSWORKER_PARAM;
WORD res=0;
if(qss->workerCounter<qss->minThreads||(qss->currentBusyWorkers==qss->workerCounter&&qss->workerCounter<qss->maxThreads)){
DWORD threadId;
QSSWORKER_PARAM * pParam=NULL;
int i=0;
EnterCriticalSection(&qss->QSS_LOCK);
if(qss->workerCounter+addCounter<=qss->maxThreads)
for(;i<addCounter;i++)
{
pParam=malloc(sizeof(QSSWORKER_PARAM));
if(pParam){
pParam->th=CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)completionWorkerRoutine,pParam,CREATE_SUSPENDED,&threadId);
pParam->qss=qss;
ResumeThread(pParam->th);
qss->workerCounter++,res++;
}
}
LeaveCriticalSection(&qss->QSS_LOCK);
}
return res;
}
perror(msg);
if(s>0)
closesocket(s);
if(clearup)
WSACleanup();
}
QSSOverlapped *qssOl=(QSSOverlapped *)overlapped;
printf("numOfT:%d,WSARecvd:%s,\n",qssOl->numberOfBytesTransferred,qssOl->buf);
//Sleep(500);
return send(qssOl->client_s,qssOl->buf,qssOl->numberOfBytesTransferred,0);
}
QSocketServer * qss=malloc(sizeof(QSocketServer));
qss->passive=passive>0?1:0;
qss->port=port;
qss->minThreads=minThreads;
qss->maxThreads=maxThreads;
qss->workerWaitTimeout=workerWaitTimeout;
qss->wsaVersion=MAKEWORD(2,0);
qss->lifecycleStatus=0;
InitializeCriticalSection(&qss->QSS_LOCK);
qss->workerCounter=0;
qss->currentBusyWorkers=0;
qss->CSocketsCounter=0;
qss->cslifecb=cslifecb,qss->protoHandler=protoHandler;
if(!qss->protoHandler)
qss->protoHandler=_InternalEchoProtocolHandler;
adjustQSSWorkerLimits(qss);
*ssp=(SocketServer *)qss;
return 1;
}
QSocketServer * qss=(QSocketServer *)ss;
if(qss==NULL||InterlockedCompareExchange(&qss->lifecycleStatus,1,0))
return 0;
qss->serv_addr.sin_family=AF_INET;
qss->serv_addr.sin_port=htons(qss->port);
qss->serv_addr.sin_addr.s_addr=INADDR_ANY;//inet_addr("127.0.0.1");
if(WSAStartup(qss->wsaVersion,&qss->wsData)){
/*榪欓噷榪樻湁涓彃鏇插氨鏄繖涓猈SAStartup琚皟鐢ㄧ殑鏃跺?瀹冨眳鐒朵細(xì)鍚姩涓鏉¢澶栫殑綰跨▼,褰撶劧紼嶅悗榪欐潯綰跨▼浼?xì)鑷姩閫鍑虹殑.涓嶇煡WSAClearup鍙堜細(xì)濡備綍?......*/
return 0;
}
qss->server_s=socket(AF_INET,SOCK_STREAM,IPPROTO_IP);
if(qss->server_s==INVALID_SOCKET){
SOlogger("socket failed.\n",0,1);
return 0;
}
if(bind(qss->server_s,(LPSOCKADDR)&qss->serv_addr,sizeof(SOCKADDR_IN))==SOCKET_ERROR){
SOlogger("bind failed.\n",qss->server_s,1);
return 0;
}
if(listen(qss->server_s,SOMAXCONN)==SOCKET_ERROR)/*榪欓噷鏉ヨ皥璋?strong>backlog,寰堝浜轟笉鐭ラ亾璁炬垚浣曞?鎴戣鍒拌繃1,5,50,100鐨?鏈変漢璇磋瀹氱殑瓚婂ぇ瓚婅楄祫婧?鐨勭‘,榪欓噷璁炬垚SOMAXCONN涓嶄唬琛╳indows浼?xì)鐪熺殑鋴社敤SOMAXCONN,鑰屾槸" If set to SOMAXCONN, the underlying service provider responsible for socket s will set the backlog to a maximum reasonable value. "錛屽悓鏃跺湪鐜板疄鐜涓紝涓嶅悓鎿嶄綔緋葷粺鏀寔TCP緙撳啿闃熷垪鏈夋墍涓嶅悓錛屾墍浠ヨ繕涓嶅璁╂搷浣滅郴緇熸潵鍐沖畾瀹冪殑鍊箋傚儚Apache榪欑鏈嶅姟鍣細(xì)
#ifndef DEFAULT_LISTENBACKLOG
#define DEFAULT_LISTENBACKLOG 511
#endif
*/
{
SOlogger("listen failed.\n",qss->server_s,1);
return 0;
}
qss->iocpHandle=CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,/*NumberOfConcurrentThreads-->*/qss->maxThreads);
//initialize worker for completion routine.
addQSSWorker(qss,qss->minThreads);
qss->lifecycleStatus=2;
{
QSSWORKER_PARAM * pParam=malloc(sizeof(QSSWORKER_PARAM));
pParam->qss=qss;
pParam->th=NULL;
if(qss->passive){
DWORD threadId;
pParam->th=CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)acceptorRoutine,pParam,0,&threadId);
}else
return acceptorRoutine(pParam);
}
return 1;
}
QSocketServer * qss=(QSocketServer *)ss;
if(qss==NULL||InterlockedCompareExchange(&qss->lifecycleStatus,3,2)!=2)
return 0;
closesocket(qss->server_s/*listen-socket*/);//..other accepted-sockets associated with the listen-socket will not be closed,except WSACleanup is called..
if(qss->CSocketsCounter==0)
qss->lifecycleStatus=4,PostQueuedCompletionStatus(qss->iocpHandle,0,-1,NULL);
WSACleanup();
return 1;
}
QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)ss;
QSocketServer * qss=pParam->qss;
HANDLE curThread=pParam->th;
QSSOverlapped *qssOl=NULL;
SOCKADDR_IN client_addr;
int client_addr_leng=sizeof(SOCKADDR_IN);
SOCKET cs;
free(pParam);
while(1){
printf("accept starting.....\n");
cs/*Accepted-socket*/=accept(qss->server_s,(LPSOCKADDR)&client_addr,&client_addr_leng);
if(cs==INVALID_SOCKET)
{
printf("accept failed:%d\n",GetLastError());
break;
}else{//SO_KEEPALIVE,SIO_KEEPALIVE_VALS 榪欓噷鏄埄鐢ㄧ郴緇熺殑"蹇?jī)锜╂帰娴?/strong>",keepalive probes.linux:setsockopt,SOL_TCP:TCP_KEEPIDLE,TCP_KEEPINTVL,TCP_KEEPCNT
struct tcp_keepalive alive,aliveOut;
int so_keepalive_opt=1;
DWORD outDW;
if(!setsockopt(cs,SOL_SOCKET,SO_KEEPALIVE,(char *)&so_keepalive_opt,sizeof(so_keepalive_opt))){
alive.onoff=TRUE;
alive.keepalivetime=QSS_SIO_KEEPALIVE_VALS_TIMEOUT;
alive.keepaliveinterval=QSS_SIO_KEEPALIVE_VALS_INTERVAL;
if(WSAIoctl(cs,SIO_KEEPALIVE_VALS,&alive,sizeof(alive),&aliveOut,sizeof(aliveOut),&outDW,NULL,NULL)==SOCKET_ERROR){
printf("WSAIoctl SIO_KEEPALIVE_VALS failed:%d\n",GetLastError());
break;
}
printf("setsockopt SO_KEEPALIVE failed:%d\n",GetLastError());
break;
}
}
CreateIoCompletionPort((HANDLE)cs,qss->iocpHandle,cs,0);
if(qssOl==NULL){
qssOl=malloc(sizeof(QSSOverlapped));
}
qssOl->client_s=cs;
qssOl->wsaBuf.len=MAX_BUF_SIZE,qssOl->wsaBuf.buf=qssOl->buf,qssOl->numberOfBytesTransferred=0,qssOl->flags=0;//initialize WSABuf.
memset(&qssOl->overlapped,0,sizeof(WSAOVERLAPPED));
{
DWORD lastErr=GetLastError();
int ret=0;
SetLastError(0);
ret=WSARecv(cs,&qssOl->wsaBuf,1,&qssOl->numberOfBytesTransferred,&qssOl->flags,&qssOl->overlapped,NULL);
if(ret==0||(ret==SOCKET_ERROR&&GetLastError()==WSA_IO_PENDING)){
InterlockedIncrement(&qss->CSocketsCounter);//Accepted-socket璁℃暟閫掑.
if(qss->cslifecb)
qss->cslifecb(cs,0);
qssOl=NULL;
}
if(!GetLastError())
SetLastError(lastErr);
}
printf("accept flags:%d ,cs:%d.\n",GetLastError(),cs);
}//end while.
free(qssOl);
if(qss)
shutdownSocketServer((SocketServer *)qss);
if(curThread)
CloseHandle(curThread);
}
int SOErrOccurred=0;
DWORD lastErr=GetLastError();
SetLastError(0);
//SOCKET_ERROR:-1,WSA_IO_PENDING:997
if(WSARecv(qssOl->client_s,&qssOl->wsaBuf,1,&qssOl->numberOfBytesTransferred,&qssOl->flags,&qssOl->overlapped,NULL)==SOCKET_ERROR
&&GetLastError()!=WSA_IO_PENDING)//this case lastError maybe 64, 10054
{
SOErrOccurred=SOErrOccurredCode;
}
if(!GetLastError())
SetLastError(lastErr);
if(SOErrOccurred)
printf("worker[%d] postRecvCompletionPacket SOErrOccurred=%d,preErr:%d,postedErr:%d\n",GetCurrentThreadId(),SOErrOccurred,lastErr,GetLastError());
return SOErrOccurred;
}
QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)ss;
QSocketServer * qss=pParam->qss;
HANDLE curThread=pParam->th;
QSSOverlapped * qssOl=NULL;
DWORD numberOfBytesTransferred=0;
ULONG_PTR completionKey=0;
int postRes=0,handleCode=0,exitCode=0,SOErrOccurred=0;
free(pParam);
while(!exitCode){
SetLastError(0);
if(GetQueuedCompletionStatus(qss->iocpHandle,&numberOfBytesTransferred,&completionKey,(LPOVERLAPPED *)&qssOl,qss->workerWaitTimeout)){
if(completionKey==-1&&qss->lifecycleStatus>=4)
{
printf("worker[%d] completionKey -1:%d \n",GetCurrentThreadId(),GetLastError());
if(qss->workerCounter>1)
PostQueuedCompletionStatus(qss->iocpHandle,0,-1,NULL);
exitCode=1;
break;
}
if(numberOfBytesTransferred>0){
InterlockedIncrement(&qss->currentBusyWorkers);
addQSSWorker(qss,1);
handleCode=qss->protoHandler((LPWSAOVERLAPPED)qssOl);
InterlockedDecrement(&qss->currentBusyWorkers);
if(handleCode>=0){
SOErrOccurred=postRecvCompletionPacket(qssOl,1);
}else
SOErrOccurred=2;
}else{
printf("worker[%d] numberOfBytesTransferred==0 ***** closesocket servS or cs *****,%d,%d ,ol is:%d\n",GetCurrentThreadId(),GetLastError(),completionKey,qssOl==NULL?0:1);
SOErrOccurred=3;
}
}else{ //GetQueuedCompletionStatus rtn FALSE, lastError 64 ,995[timeout worker thread exit.] ,WAIT_TIMEOUT:258
if(qssOl){
SOErrOccurred=postRecvCompletionPacket(qssOl,4);
}else {
if(GetLastError()!=WAIT_TIMEOUT){
exitCode=2;
}else{//wait timeout
if(qss->lifecycleStatus!=4&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads){
EnterCriticalSection(&qss->QSS_LOCK);
if(qss->lifecycleStatus!=4&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads){
qss->workerCounter--;//until qss->workerCounter decrease to qss->minThreads
exitCode=3;
}
LeaveCriticalSection(&qss->QSS_LOCK);
}
}
}
}//end GetQueuedCompletionStatus.
if(qss->cslifecb)
qss->cslifecb(qssOl->client_s,-1);
/*if(qssOl)*/{
closesocket(qssOl->client_s);
free(qssOl);
}
if(InterlockedDecrement(&qss->CSocketsCounter)==0&&qss->lifecycleStatus>=3){
//for qss workerSize,PostQueuedCompletionStatus -1
qss->lifecycleStatus=4,PostQueuedCompletionStatus(qss->iocpHandle,0,-1,NULL);
exitCode=4;
}
}
qssOl=NULL,numberOfBytesTransferred=0,completionKey=0,SOErrOccurred=0;//for net while.
}//end while.
if(exitCode!=3){
int clearup=0;
EnterCriticalSection(&qss->QSS_LOCK);
if(!--qss->workerCounter&&qss->lifecycleStatus>=4){//clearup QSS
clearup=1;
}
LeaveCriticalSection(&qss->QSS_LOCK);
if(clearup){
DeleteCriticalSection(&qss->QSS_LOCK);
CloseHandle(qss->iocpHandle);
free(qss);
}
}
CloseHandle(curThread);
return 1;
}
------------------------------------------------------------------------------------------------------------------------
瀵逛簬IOCP鐨凩astError鐨勮鯨鍒拰澶勭悊鏄釜闅劇偣,鎵浠ヨ娉ㄦ剰鎴戠殑completionWorkerRoutine鐨剋hile緇撴瀯,
緇撴瀯濡備笅:
while(!exitCode){
if(completionKey==-1){...break;}
if(GetQueuedCompletionStatus){/*鍦ㄨ繖涓猧f浣撲腑鍙浣犳姇閫掔殑OVERLAPPED is not NULL,閭d箞榪欓噷浣犲緱鍒扮殑灝辨槸瀹?/strong>.*/
if(numberOfBytesTransferred>0){
/*鍦ㄨ繖閲宧andle request,璁板緱瑕佺戶緇姇閫掍綘鐨凮VERLAPPED鍝? */
}else{
/*榪欓噷鍙兘瀹㈡埛绔垨鏈嶅姟绔痗losesocket(the socket),浣嗘槸OVERLAPPED is not NULL,鍙浣犳姇閫掔殑涓嶄負(fù)NULL!*/
}
}else{/*鍦ㄨ繖閲岀殑if浣撲腑,铏界劧GetQueuedCompletionStatus return FALSE,浣嗘槸涓嶄唬琛∣VERLAPPED涓瀹氫負(fù)NULL.鐗瑰埆鏄疧VERLAPPED is not NULL鐨勬儏鍐典笅,涓嶈浠ヤ負(fù)LastError鍙戠敓浜?灝變唬琛ㄥ綋鍓嶇殑socket鏃犵敤鎴栧彂鐢熻嚧鍛界殑寮傚父,姣斿鍙戠敓lastError:995榪欑鎯呭喌涓嬫鏃剁殑socket鏈夊彲鑳芥槸涓鍒囨甯哥殑鍙敤鐨?浣犱笉搴旇鍏抽棴瀹?/strong>.*/
if(OVERLAPPED is not NULL){
/*榪欑鎯呭喌涓?璇蜂笉綆?7,21緇х畫鎶曢掑惂!鍦ㄦ姇閫掑悗鍐嶆嫻嬮敊璇?/strong>.*/
}else{
}
}
if(socket error occured){
}
prepare for next while.
}