相關(guān)UML:
網(wǎng)絡(luò)引擎整體結(jié)構(gòu):

SocketItem細(xì)節(jié):

先來(lái)看幾個(gè)底層結(jié)構(gòu):
//重疊結(jié)構(gòu)類(lèi)
class COverLapped
{
//變量定義
public:
WSABUF m_WSABuffer; //數(shù)據(jù)指針
OVERLAPPED m_OverLapped; //重疊結(jié)構(gòu)
const enOperationType m_OperationType; //操作類(lèi)型
//函數(shù)定義
public:
//構(gòu)造函數(shù)
COverLapped(enOperationType OperationType);
//析構(gòu)函數(shù)
virtual ~COverLapped();
//信息函數(shù)
public:
//獲取類(lèi)型
enOperationType GetOperationType() { return m_OperationType; }
};
//接收重疊結(jié)構(gòu)
class COverLappedSend : public COverLapped
{
//數(shù)據(jù)變量
public:
BYTE m_cbBuffer[SOCKET_BUFFER]; //數(shù)據(jù)緩沖
//函數(shù)定義
public:
//構(gòu)造函數(shù)
COverLappedSend();
//析構(gòu)函數(shù)
virtual ~COverLappedSend();
};
//重疊結(jié)構(gòu)模板
template <enOperationType OperationType> class CATLOverLapped : public COverLapped
{
//函數(shù)定義
public:
//構(gòu)造函數(shù)
CATLOverLapped() : COverLapped(OperationType) {}
//析構(gòu)函數(shù)
virtual ~CATLOverLapped() {}
};
先復(fù)習(xí)下基礎(chǔ),Windows下的網(wǎng)絡(luò)模型有很多種,這里只拿出三種來(lái)說(shuō):
EventSelect:基于信號(hào)機(jī)制,以socket為單位綁定信號(hào)量,當(dāng)socket上有指定的事件發(fā)生時(shí)激發(fā)信號(hào),然后查詢(xún)事件處理事件重設(shè)事件,繼續(xù)在信號(hào)量上等待。其實(shí)也是在伯克利select模型上的換不換藥的加強(qiáng)。
OverLapped:分兩種工作模式完成回調(diào),和完成事件。重疊IO監(jiān)視每次操作,每次IO都綁定一個(gè)重疊對(duì)象,當(dāng)操作完成以后激發(fā)信號(hào)或者調(diào)用回調(diào)。
IOCP:和overlapped類(lèi)似,不過(guò)結(jié)果經(jīng)過(guò)了Windows的預(yù)處理以隊(duì)列的形式掛在完成端口上
根據(jù)上面的復(fù)習(xí),可以得出一個(gè)結(jié)論,IOCP環(huán)境中每一次IO操作都需要一個(gè)重疊結(jié)構(gòu),那么一個(gè)CServerSocketItem至少需要如些這些東東:
他要接受數(shù)據(jù),所以必須有一個(gè)接受數(shù)據(jù)的 OverLapped
它要發(fā)送數(shù)據(jù),說(shuō)以必須有一個(gè)發(fā)送數(shù)據(jù)的 OverLapped
netFox對(duì)OverLapped做了使用了類(lèi)似池的的管理手段,他的Send都是不等待上一次完成就直接投遞下一個(gè)請(qǐng)求了,,,這是很操蛋的做法,,,
然后繼續(xù)復(fù)習(xí)下基礎(chǔ):
在EventSelect模型中獲處理件類(lèi)型流程是這樣:
event受信,使用::WSAEnumNetworkEvents查詢(xún)和這個(gè)event關(guān)聯(lián)的socket發(fā)生的事件,根據(jù)查詢(xún)到的事件類(lèi)型去處理事件
在以每一次IO為查詢(xún)對(duì)象重疊IO、IOCP模型中是這樣:
使用GetOverlappedResult 或者 GetQueuedCompletionStatus然后根據(jù)重疊結(jié)構(gòu)去查詢(xún)投遞的是什么類(lèi)型的操作,然后找到關(guān)聯(lián)的socket去操作,,,
這樣必然要給OverLapped做個(gè)擴(kuò)展,提供一種通過(guò)OverLapped查詢(xún)操作類(lèi)型和socket的能力。
通過(guò)分析代碼,netFox關(guān)聯(lián)socket是通過(guò)在創(chuàng)建完成端口的時(shí)候綁定SocketItem對(duì)象指針完成的,操作類(lèi)型是通過(guò)對(duì)OverLapped結(jié)構(gòu)加強(qiáng)完成的。
通過(guò)GetQueuedCompletionStatus獲取到完成OverLapped以后使用一個(gè)宏:
(這是COverLapped類(lèi)型) pSocketLapped=CONTAINING_RECORD(pOverLapped,COverLapped,m_OverLapped);
來(lái)獲取包裝后的OverLapped,然后獲取操作類(lèi)型,然后執(zhí)行具體操作。
其實(shí)宏的展開(kāi)如下:
(COverLapped*)((BYTE*)pOverLapped - (COverLapped*)(0)->m_OverLapped);
pOverLapped是獲取到的某個(gè)COverLapped中的成員變量,(COverLapped*)(0)->m_OverLapped是到在COverLapped中的偏移,((BYTE*)pOverLapped - (COverLapped*)(0)->m_OverLapped) 就是根據(jù)pOverLapped推算出來(lái)的包含地址為pOverLapped作為成員變量m_OverLapped的COverLapped對(duì)象的地址。
然后就分別調(diào)用:
//發(fā)送完成函數(shù)
bool CServerSocketItem::OnSendCompleted(COverLappedSend * pOverLappedSend, DWORD dwThancferred);
//接收完成函數(shù)
bool CServerSocketItem::OnRecvCompleted(COverLappedRecv * pOverLappedRecv, DWORD dwThancferred);
為毛要區(qū)分Send OverLapped 和 Recv OverLapped呢,,,
應(yīng)為投遞一次Send不一定是瞬間完成的,在處理的過(guò)程中存儲(chǔ)數(shù)據(jù)的內(nèi)存應(yīng)該是鎖定的,也就是不允許修改的,,,所以O(shè)verLapped應(yīng)該自己管理內(nèi)存。
而recv應(yīng)該也是需要有一片內(nèi)存直接接受數(shù)據(jù)的,很奇怪netFox沒(méi)有提供,,,
recv居然是在投遞接受請(qǐng)求的時(shí)候給了一個(gè)空的buffer,然后在完成回調(diào)中自己再次調(diào)用recv方法接受數(shù)據(jù)。
接受有關(guān)的成員變量如下:
//狀態(tài)變量
protected:
bool m_bNotify; //通知標(biāo)志
bool m_bRecvIng; //接收標(biāo)志
bool m_bCloseIng; //關(guān)閉標(biāo)志
bool m_bAllowBatch; //接受群發(fā)
WORD m_wRecvSize; //接收長(zhǎng)度
BYTE m_cbRecvBuf[SOCKET_BUFFER*5]; //接收緩沖
int iRetCode=recv(m_hSocket,(char *)m_cbRecvBuf+m_wRecvSize,sizeof(m_cbRecvBuf)-m_wRecvSize,0);
難道這么蠢的做法只是為了躲開(kāi)分包算法?
具體的看看接受代碼:
//接收完成函數(shù)
bool CServerSocketItem::OnRecvCompleted(COverLappedRecv * pOverLappedRecv, DWORD dwThancferred)
{
//效驗(yàn)數(shù)據(jù)
ASSERT(m_bRecvIng==true);
//設(shè)置變量
m_bRecvIng=false;
m_dwRecvTickCount=GetTickCount();
//判斷關(guān)閉
if (m_hSocket==INVALID_SOCKET)
{
CloseSocket(m_wRountID);
return true;
}
//接收數(shù)據(jù)
int iRetCode=recv(m_hSocket,(char *)m_cbRecvBuf+m_wRecvSize,sizeof(m_cbRecvBuf)-m_wRecvSize,0);
if (iRetCode<=0)
{
CloseSocket(m_wRountID);
return true;
}
//接收完成
m_wRecvSize+=iRetCode;
BYTE cbBuffer[SOCKET_BUFFER];
CMD_Head * pHead=(CMD_Head *)m_cbRecvBuf;
//處理數(shù)據(jù)
try
{
while (m_wRecvSize>=sizeof(CMD_Head))
{
//效驗(yàn)數(shù)據(jù)
WORD wPacketSize=pHead->CmdInfo.wDataSize;
if (wPacketSize>SOCKET_BUFFER) throw TEXT("數(shù)據(jù)包超長(zhǎng)");
if (wPacketSize<sizeof(CMD_Head)) throw TEXT("數(shù)據(jù)包非法");
if (pHead->CmdInfo.cbMessageVer!=SOCKET_VER) throw TEXT("數(shù)據(jù)包版本錯(cuò)誤");
if (m_wRecvSize<wPacketSize) break;
//提取數(shù)據(jù)
CopyMemory(cbBuffer,m_cbRecvBuf,wPacketSize);
WORD wRealySize=CrevasseBuffer(cbBuffer,wPacketSize);
ASSERT(wRealySize>=sizeof(CMD_Head));
m_dwRecvPacketCount++;
//解釋數(shù)據(jù)
WORD wDataSize=wRealySize-sizeof(CMD_Head);
void * pDataBuffer=cbBuffer+sizeof(CMD_Head);
CMD_Command Command=((CMD_Head *)cbBuffer)->CommandInfo;
//內(nèi)核命令
if (Command.wMainCmdID==MDM_KN_COMMAND)
{
switch (Command.wSubCmdID)
{
case SUB_KN_DETECT_SOCKET: //網(wǎng)絡(luò)檢測(cè)
{
break;
}
default: throw TEXT("非法命令碼");
}
}
else
{
//消息處理
m_pIServerSocketItemSink->OnSocketReadEvent(Command,pDataBuffer,wDataSize,this);
}
//刪除緩存數(shù)據(jù)
m_wRecvSize-=wPacketSize;
MoveMemory(m_cbRecvBuf,m_cbRecvBuf+wPacketSize,m_wRecvSize);
}
}
catch (
)
{
CloseSocket(m_wRountID);
return false;
}
return RecvData();
} 這是還是有分包算法的,總的來(lái)說(shuō)接受流程如下:
直接使用recv把數(shù)據(jù)接受到SocketItem的緩沖區(qū)中,當(dāng)長(zhǎng)度大于CMD_HEAD之后,進(jìn)入處理階段,處理head數(shù)據(jù)各種判斷,然后將數(shù)據(jù)扔出去,再調(diào)整緩沖區(qū),,,
簡(jiǎn)單的說(shuō):
Send完全不考慮同步問(wèn)題,不管一個(gè)勁的網(wǎng)隊(duì)列投遞Send請(qǐng)求,,,這邊處理隊(duì)列也是直接Send完事,完全不考慮上一次是否send成功,,,
Recv更是莫名其妙的使用完成端口繞一圈還回到recv直接接受了,,,
很狗血的做法,,,
更正下我自己狗血的不理解:
如果一個(gè)服務(wù)器提交了非常多的重疊的receive在每一個(gè)連接上,那么限制會(huì)隨著連接數(shù)的增長(zhǎng)而變化。如果一個(gè)服務(wù)器能夠預(yù)先估計(jì)可能會(huì)產(chǎn)生的最大并發(fā)連接數(shù),服務(wù)器可以投遞一個(gè)使用零緩沖區(qū)的receive在每一個(gè)連接上。因?yàn)楫?dāng)你提交操作沒(méi)有緩沖區(qū)時(shí),那么也不會(huì)存在內(nèi)存被鎖定了。使用這種辦法后,當(dāng)你的receive操作事件完成返回時(shí),該socket底層緩沖區(qū)的數(shù)據(jù)會(huì)原封不動(dòng)的還在其中而沒(méi)有被讀取到receive操作的緩沖區(qū)來(lái)。此時(shí),服務(wù)器可以簡(jiǎn)單的調(diào)用非阻塞式的recv將存在socket緩沖區(qū)中的數(shù)據(jù)全部讀出來(lái),一直到recv返回 WSAEWOULDBLOCK 為止。 這種設(shè)計(jì)非常適合那些可以犧牲數(shù)據(jù)吞吐量而換取巨大 并發(fā)連接數(shù)的服務(wù)器。當(dāng)然,你也需要意識(shí)到如何讓客戶(hù)端的行為盡量避免對(duì)服務(wù)器造成影響。在上一個(gè)例子中,當(dāng)一個(gè)零緩沖區(qū)的receive操作被返回后使 用一個(gè)非阻塞的recv去讀取socket緩沖區(qū)中的數(shù)據(jù),如果服務(wù)器此時(shí)可預(yù)計(jì)到將會(huì)有爆發(fā)的數(shù)據(jù)流,那么可以考慮此時(shí)投遞一個(gè)或者多個(gè)receive 來(lái)取代非阻塞的recv來(lái)進(jìn)行數(shù)據(jù)接收。(這比你使用1個(gè)缺省的8K緩沖區(qū)來(lái)接收要好的多。)
源碼中提供了一個(gè)簡(jiǎn)單實(shí)用的解決WSAENOBUF錯(cuò)誤的辦法。我們執(zhí)行了一個(gè)零字節(jié)緩沖的異步WSARead(...)(參見(jiàn) OnZeroByteRead(..))。當(dāng)這個(gè)請(qǐng)求完成,我們知道在TCP/IP棧中有數(shù)據(jù),然后我們通過(guò)執(zhí)行幾個(gè)有MAXIMUMPACKAGESIZE緩沖的異步WSARead(...)去讀,解決了WSAENOBUFS問(wèn)題。但是這種解決方法降低了服務(wù)器的吞吐量。
總結(jié):
解決方法一:
投遞使用空緩沖區(qū)的 receive操作,當(dāng)操作返回后,使用非阻塞的recv來(lái)進(jìn)行真實(shí)數(shù)據(jù)的讀取。因此在完成端口的每一個(gè)連接中需要使用一個(gè)循環(huán)的操作來(lái)不斷的來(lái)提交空緩沖區(qū)的receive操作。
解決方法二:
在投遞幾個(gè)普通含有緩沖區(qū)的receive操作后,進(jìn)接著開(kāi)始循環(huán)投遞一個(gè)空緩沖區(qū)的receive操作。這樣保證它們按照投遞順序依次返回,這樣我們就總能對(duì)被鎖定的內(nèi)存進(jìn)行解鎖。
///////////
如果一個(gè)服務(wù)器同時(shí)連接了許多客戶(hù)端, 對(duì)每個(gè)客戶(hù)端又調(diào)用了許多 WSARecv, 那么大量的內(nèi)存將會(huì)被鎖定到非分頁(yè)內(nèi)存池. 鎖定這些內(nèi)存時(shí)是按照頁(yè)面邊界來(lái)鎖定的, 也就是說(shuō)即使你 WSARecv 的緩存大小是 1 字節(jié), 被鎖定的內(nèi)存也將會(huì)是 4k. 非分頁(yè)內(nèi)存池是由整個(gè)系統(tǒng)共用的, 如果用完的話最壞的情況就是系統(tǒng)崩潰. 一個(gè)解決辦法是, 使用大小為 0 的緩沖區(qū)調(diào)用 WSARecv. 等到調(diào)用成功時(shí)再換用非阻塞的 recv 接收到來(lái)的數(shù)據(jù), 直到它返回 WSAEWOULDBLOCK 表明數(shù)據(jù)已經(jīng)全部讀完. 在這個(gè)過(guò)程中沒(méi)有任何內(nèi)存需要被鎖定, 但壞處是效率稍低.