眾所皆知,完成端口是在WINDOWS平臺(tái)下效率最高,擴(kuò)展性最好的IO模型,特別針對(duì)于WINSOCK的海量連接時(shí),更能顯示出其威力。其實(shí)建立一個(gè)完成端口的服務(wù)器也很簡(jiǎn)單,只要注意幾個(gè)函數(shù),了解一下關(guān)鍵的步驟也就行了。
這是篇完成端口入門級(jí)的文章,分為以下幾步來(lái)說(shuō)明完成端口:
函數(shù)
常見(jiàn)問(wèn)題以及解答
步驟
例程
1、函數(shù):
我們?cè)谕瓿啥丝谀P拖聲?huì)使用到的最重要的兩個(gè)函數(shù)是:
CreateIoCompletionPort、GetQueuedCompletionStatus
CreateIoCompletionPort 的作用是創(chuàng)建一個(gè)完成端口和把一個(gè)IO句柄和完成端口關(guān)聯(lián)起來(lái):
// 創(chuàng)建完成端口
HANDLE CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
// 把一個(gè)IO句柄和完成端口關(guān)聯(lián)起來(lái),這里的句柄是一個(gè)socket 句柄
CreateIoCompletionPort((HANDLE)sClient, CompletionPort, (DWORD)PerHandleData, 0);
其中第一個(gè)參數(shù)是句柄,可以是文件句柄、SOCKET句柄。
第二個(gè)就是我們上面創(chuàng)建出來(lái)的完成端口,這里就把兩個(gè)東西關(guān)聯(lián)在一起了。
第三個(gè)參數(shù)很關(guān)鍵,叫做PerHandleData,就是對(duì)應(yīng)于每個(gè)句柄的數(shù)據(jù)塊。我們可以使用這個(gè)參數(shù)在后面取到與這個(gè)SOCKET對(duì)應(yīng)的數(shù)據(jù)。
最后一個(gè)參數(shù)給0,意思就是根據(jù)CPU的個(gè)數(shù),允許盡可能多的線程并發(fā)執(zhí)行。
GetQueuedCompletionStatus 的作用就是取得完成端口的結(jié)果:
// 從完成端口中取得結(jié)果
GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, (LPDWORD)&PerHandleData, (LPOVERLAPPED*)&PerIoData, INFINITE)
第一個(gè)參數(shù)是完成端口
第二個(gè)參數(shù)是表明這次的操作傳遞了多少個(gè)字節(jié)的數(shù)據(jù)
第三個(gè)參數(shù)是OUT類型的參數(shù),就是前面CreateIoCompletionPort傳進(jìn)去的單句柄數(shù)據(jù),這里就是前面的SOCKET句柄以及與之相對(duì)應(yīng)的數(shù)據(jù),這里操作系統(tǒng)給我們返回,讓我們不用自己去做列表查詢等操作了。
第四個(gè)參數(shù)就是進(jìn)行IO操作的結(jié)果,是我們?cè)谕哆f WSARecv / WSASend 等操作時(shí)傳遞進(jìn)去的,這里操作系統(tǒng)做好準(zhǔn)備后,給我們返回了。非常省事!!
個(gè)人感覺(jué)完成端口就是操作系統(tǒng)為我們包裝了很多重疊IO的不爽的地方,讓我們可以更方便的去使用,下篇我將會(huì)嘗試去講述完成端口的原理。
2、常見(jiàn)問(wèn)題和解答
a、什么是單句柄數(shù)據(jù)(PerHandle)和單IO數(shù)據(jù)(PerIO)
單句柄數(shù)據(jù)就是和句柄對(duì)應(yīng)的數(shù)據(jù),像socket句柄,文件句柄這種東西。
單IO數(shù)據(jù),就是對(duì)應(yīng)于每次的IO操作的數(shù)據(jù)。例如每次的WSARecv/WSASend等等
其實(shí)我覺(jué)得PER是每次的意思,翻譯成每個(gè)句柄數(shù)據(jù)和每次IO數(shù)據(jù)還比較清晰一點(diǎn)。
在完成端口中,單句柄數(shù)據(jù)直接通過(guò)GetQueuedCompletionStatus 返回,省去了我們自己做容器去管理。單IO數(shù)據(jù)也容許我們自己擴(kuò)展OVERLAPPED結(jié)構(gòu),所以,在這里所有與應(yīng)用邏輯有關(guān)的東西都可以在此擴(kuò)展。
b、如何判斷客戶端的斷開(kāi)
我們要處理幾種情況
1) 如果客戶端調(diào)用了closesocket,我們就可以這樣判斷他的斷開(kāi):
if(0 == GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, 。。。)
{
}
if(BytesTransferred == 0)
{
// 客戶端斷開(kāi),釋放資源
}
2) 如果是客戶端直接退出,那就會(huì)出現(xiàn)64錯(cuò)誤,指定的網(wǎng)絡(luò)名不可再用。這種情況我們也要處理的:
if(0 == GetQueuedCompletionStatus(。。。))
{
if( (GetLastError() == WAIT_TIMEOUT) || (GetLastError() == ERROR_NETNAME_DELETED) )
{
// 客戶端斷開(kāi),釋放資源
}
}
3、步驟
編寫完成端口服務(wù)程序,無(wú)非就是以下幾個(gè)步驟:
1、創(chuàng)建一個(gè)完成端口
2、根據(jù)CPU個(gè)數(shù)創(chuàng)建工作者線程,把完成端口傳進(jìn)去線程里
3、創(chuàng)建偵聽(tīng)SOCKET,把SOCKET和完成端口關(guān)聯(lián)起來(lái)
4、創(chuàng)建PerIOData,向連接進(jìn)來(lái)的SOCKET投遞WSARecv操作
5、線程里所做的事情:
a、GetQueuedCompletionStatus,在退出的時(shí)候就可以使用PostQueudCompletionStatus使線程退出
b、取得數(shù)據(jù)并處理
4、例程
下面是服務(wù)端的例程,可以使用《WinSocket模型的探討——Overlapped模型(一)》中的客戶端程序來(lái)測(cè)試次服務(wù)端。稍微研究一下,也就會(huì)對(duì)完成端口模型有個(gè)大概的了解了。
/*
完成端口服務(wù)器
接收到客戶端的信息,直接顯示出來(lái)
*/
#include "winerror.h"
#include "Winsock2.h"
#pragma comment(lib, "ws2_32")
#include "windows.h"
#include <iostream>
using namespace std;
/// 宏定義
#define PORT 5050
#define DATA_BUFSIZE 8192
#define OutErr(a) cout << (a) << endl \
<< "出錯(cuò)代碼:" << WSAGetLastError() << endl \
<< "出錯(cuò)文件:" << __FILE__ << endl \
<< "出錯(cuò)行數(shù):" << __LINE__ << endl \
#define OutMsg(a) cout << (a) << endl;
/// 全局函數(shù)定義
///////////////////////////////////////////////////////////////////////
//
// 函數(shù)名 : InitWinsock
// 功能描述 : 初始化WINSOCK
// 返回值 : void
//
///////////////////////////////////////////////////////////////////////
void InitWinsock()
{
// 初始化WINSOCK
WSADATA wsd;
if( WSAStartup(MAKEWORD(2, 2), &wsd) != 0)
{
OutErr("WSAStartup()");
}
}
///////////////////////////////////////////////////////////////////////
//
// 函數(shù)名 : BindServerOverlapped
// 功能描述 : 綁定端口,并返回一個(gè) Overlapped 的Listen Socket
// 參數(shù) : int nPort
// 返回值 : SOCKET
//
///////////////////////////////////////////////////////////////////////
SOCKET BindServerOverlapped(int nPort)
{
// 創(chuàng)建socket
SOCKET sServer = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
// 綁定端口
struct sockaddr_in servAddr;
servAddr.sin_family = AF_INET;
servAddr.sin_port = htons(nPort);
servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
if(bind(sServer, (struct sockaddr *)&servAddr, sizeof(servAddr)) < 0)
{
OutErr("bind Failed!");
return NULL;
}
// 設(shè)置監(jiān)聽(tīng)隊(duì)列為200
if(listen(sServer, 200) != 0)
{
OutErr("listen Failed!");
return NULL;
}
return sServer;
}
/// 結(jié)構(gòu)體定義
typedef struct
{
OVERLAPPED Overlapped;
WSABUF DataBuf;
CHAR Buffer[DATA_BUFSIZE];
} PER_IO_OPERATION_DATA, * LPPER_IO_OPERATION_DATA;
typedef struct
{
SOCKET Socket;
} PER_HANDLE_DATA, * LPPER_HANDLE_DATA;
DWORD WINAPI ProcessIO(LPVOID lpParam)
{
HANDLE CompletionPort = (HANDLE)lpParam;
DWORD BytesTransferred;
LPPER_HANDLE_DATA PerHandleData;
LPPER_IO_OPERATION_DATA PerIoData;
while(true)
{
if(0 == GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, (LPDWORD)&PerHandleData, (LPOVERLAPPED*)&PerIoData, INFINITE))
{
if( (GetLastError() == WAIT_TIMEOUT) || (GetLastError() == ERROR_NETNAME_DELETED) )
{
cout << "closing socket" << PerHandleData->Socket << endl;
closesocket(PerHandleData->Socket);
delete PerIoData;
delete PerHandleData;
continue;
}
else
{
OutErr("GetQueuedCompletionStatus failed!");
}
return 0;
}
// 說(shuō)明客戶端已經(jīng)退出
if(BytesTransferred == 0)
{
cout << "closing socket" << PerHandleData->Socket << endl;
closesocket(PerHandleData->Socket);
delete PerIoData;
delete PerHandleData;
continue;
}
// 取得數(shù)據(jù)并處理
cout << PerHandleData->Socket << "發(fā)送過(guò)來(lái)的消息:" << PerIoData->Buffer << endl;
// 繼續(xù)向 socket 投遞WSARecv操作
DWORD Flags = 0;
DWORD dwRecv = 0;
ZeroMemory(PerIoData, sizeof(PER_IO_OPERATION_DATA));
PerIoData->DataBuf.buf = PerIoData->Buffer;
PerIoData->DataBuf.len = DATA_BUFSIZE;
WSARecv(PerHandleData->Socket, &PerIoData->DataBuf, 1, &dwRecv, &Flags, &PerIoData->Overlapped, NULL);
}
return 0;
}
void main()
{
InitWinsock();
HANDLE CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
// 根據(jù)系統(tǒng)的CPU來(lái)創(chuàng)建工作者線程
SYSTEM_INFO SystemInfo;
GetSystemInfo(&SystemInfo);
for(int i = 0; i < SystemInfo.dwNumberOfProcessors * 2; i++)
{
HANDLE hProcessIO = CreateThread(NULL, 0, ProcessIO, CompletionPort, 0, NULL);
if(hProcessIO)
{
CloseHandle(hProcessIO);
}
}
// 創(chuàng)建偵聽(tīng)SOCKET
SOCKET sListen = BindServerOverlapped(PORT);
SOCKET sClient;
LPPER_HANDLE_DATA PerHandleData;
LPPER_IO_OPERATION_DATA PerIoData;
while(true)
{
// 等待客戶端接入
//sClient = WSAAccept(sListen, NULL, NULL, NULL, 0);
sClient = accept(sListen, 0, 0);
cout << "Socket " << sClient << "連接進(jìn)來(lái)" << endl;
PerHandleData = new PER_HANDLE_DATA();
PerHandleData->Socket = sClient;
// 將接入的客戶端和完成端口聯(lián)系起來(lái)
CreateIoCompletionPort((HANDLE)sClient, CompletionPort, (DWORD)PerHandleData, 0);
// 建立一個(gè)Overlapped,并使用這個(gè)Overlapped結(jié)構(gòu)對(duì)socket投遞操作
PerIoData = new PER_IO_OPERATION_DATA();
ZeroMemory(PerIoData, sizeof(PER_IO_OPERATION_DATA));
PerIoData->DataBuf.buf = PerIoData->Buffer;
PerIoData->DataBuf.len = DATA_BUFSIZE;
// 投遞一個(gè)WSARecv操作
DWORD Flags = 0;
DWORD dwRecv = 0;
WSARecv(sClient, &PerIoData->DataBuf, 1, &dwRecv, &Flags, &PerIoData->Overlapped, NULL);
}
DWORD dwByteTrans;
PostQueuedCompletionStatus(CompletionPort, dwByteTrans, 0, 0);
closesocket(sListen);
}
好了,本篇文章就到此為止,
回復(fù) 更多評(píng)論