簡述: Overlapped I/O也稱
Asynchronous
I/O,異步I/O模型。異步I/O和同步I/O不同,同步I/O時,程序被掛起,一直到I/O處理完,程序才能獲得控制。異步I/O,調用一個函數(shù)告訴
OS,進行I/O操作,不等I/O結束就立即返回,繼續(xù)程序執(zhí)行,操作系統(tǒng)完成I/O之后,通知消息給你。Overlapped
I/O只是一種模型,它可以由內核對象(hand),事件內核對象(hEvent), 異步過程調用(apcs) 和完成端口(I/O
completion)實現(xiàn)。
Overlapped I/O的設計的目的:
取代多線程功能,(多線程存在同步機制,錯誤處理,在成千上萬個線程
I/O中,線程上下文切換是十分消耗CPU資源的)。
Overlapped I/O模型是
OS為你傳遞數(shù)據,完成上下文切換,在處理完之后通知你。由程序中的處理,變?yōu)镺S的處理。內部也是用線程處理的。
Overlapped數(shù)據結構:
typedef struct _OVERLAPPED { // o
DWORD Internal; //通常被保留,當GetOverlappedResult()傳回False并且GatLastError()并非傳回ERROR_IO_PENDINO時,該狀態(tài)置為系統(tǒng)定的狀態(tài)。
DWORD InternalHigh; //通常被保留,當GetOverlappedResult()傳回False時,為被傳輸數(shù)據的長度。
DWORD Offset; //指定文件的位置,從該位置傳送數(shù)據,文件位置是相對文件開始處的字節(jié)偏移量。調用 ReadFile或WriteFile函數(shù)之前調用進程設置這個成員,讀寫命名管道及通信設備時調用進程忽略這個成員;
DWORD OffsetHigh; //指定開始傳送數(shù)據的字節(jié)偏移量的高位字,讀寫命名管道及通信設備時調用進程忽略這個成員;
HANDLE hEvent; //標識事件,數(shù)據傳送完成時把它設為信號狀態(tài),調用ReadFil,eWriteFile,ConnectNamedPipe TransactNamedPipe函數(shù)前,調用進程設置這個成員. 相關函數(shù)CreateEvent ResetEvent GetOverlappedResult WaitForSingleObject CWinThread GetLastError
} OVERLAPPED, *LPOVERLAPPED;
二個重要功能:1.
標識每個正在
overlapped 的操作。
2. 程序和系統(tǒng)之間提供了共享區(qū)域。參數(shù)可以在區(qū)域內雙向傳遞。
OVERLAPPED和數(shù)據緩沖區(qū)釋放問題:
在請求時,不能釋放,只有在
I/O請求完成之后,才可以釋放。如果發(fā)出多個overlapped請求,每個overlapped讀寫操作,都必須包含文件位置(socket),另外,如果有多個磁盤,I/O執(zhí)行次序無法保證。(每個overlapped都是獨立的請求操作)。
內核對象(hand)實現(xiàn):
例子:用
overlapped模型讀一個磁盤文件內容。
1.把設備句柄看作同步對象,
ReadFile將設備句柄設為無信號。ReadFile 異步I/O字節(jié)位置必須在OVERLAPPED結構中指定。
2.完成
I/O,設置信息狀態(tài)。為有信號。
3.WaitForSingleObject或WaitForMultipleObject判斷或者異步設備調用
GetOverLappedResult函數(shù)。
int main()
{
BOOL rc;
HANDLE hFile;
DWORD numread;
OVERLAPPED overlap;
char buf[READ_SIZE];
char szPath[MAX_PATH];
CheckOsVersion();
GetWindowsDirectory(szPath, sizeof(szPath));
strcat(szPath, "\\WINHLP32.EXE");
hFile = CreateFile( szPath,
GENERIC_READ,
FILE_SHARE_READ|FILE_SHARE_WRITE,
NULL,
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED,
NULL
);
if (hFile == INVALID_HANDLE_VALUE)
{
printf("Could not open %s\n", szPath);
return -1;
}
memset(&overlap, 0, sizeof(overlap));
overlap.Offset = 1500;
rc = ReadFile(
hFile,
buf,
READ_SIZE,
&numread,
&overlap
);
printf("Issued read request\n");
if (rc)
{
printf("Request was returned immediately\n");
}
else
{
if (GetLastError() == ERROR_IO_PENDING)
{
printf("Request queued, waiting
\n");
WaitForSingleObject(hFile, INFINITE);
printf("Request completed.\n");
rc = GetOverlappedResult(
hFile,
&overlap,
&numread,
FALSE
);
printf("Result was %d\n", rc);
}
else
{
printf("Error reading file\n");
}
}
CloseHandle(hFile);
return EXIT_SUCCESS;
}
事件內核對象(hEvent):
內核對象(hand)實現(xiàn)的問題:
不能區(qū)分那一個
overlapped操作,對同一個文件handle,系統(tǒng)有多個異步操作時(一邊讀文件頭,一邊寫文件尾, 有一個完成,就會有信號,不能區(qū)分是那種操作。),為每個進行中的overlapped調用GetOverlappedResult是不好的作法。
事件內核對象(hEvent)實現(xiàn)方案: Overlapped成員
hEven標識事件內核對象。CreateEvent,為每個請求創(chuàng)建一個事件,初始化每個請求的hEvent成員(對同一文件多個讀寫請求,每個操作綁定一個event對象)。調用WaitForMultipleObject來等等其中一個(或全部)完成。
另外
Event對象必須是手動重置。使用自動重置(在等待event之前設置,WaitForSingleObject()和 WaitForMultipleObjects()函數(shù)永不返回)。
自動重置事件
WaitForSingleObject()和 WaitForMultipleObjects()會等待事件到信號狀態(tài),隨后又自動將其重置為非信號狀態(tài),這樣保證了等待此事件的線程中只有一個會被喚醒。
手動重置事件
需要用戶調用ResetEvent()才會重置事件。可能有若干個線程在等待同一事件,
這樣當事件變?yōu)樾盘枲顟B(tài)時,所有等待線程都可以運行了。
SetEvent()函數(shù)用來把事件對象設置成信號狀態(tài),ResetEvent()把事件對象重置成非信號狀態(tài),兩者均需事件對象句柄作參數(shù)。
相關例子如下:
int main()
{
BOOL rc;
HANDLE hFile;
DWORD numread;
OVERLAPPED overlap;
char buf[READ_SIZE];
char szPath[MAX_PATH];
CheckOsVersion();
GetWindowsDirectory(szPath, sizeof(szPath));
strcat(szPath, "\\WINHLP32.EXE");
hFile = CreateFile( szPath,
GENERIC_READ,
FILE_SHARE_READ|FILE_SHARE_WRITE,
NULL,
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED,
NULL
);
if (hFile == INVALID_HANDLE_VALUE)
{
printf("Could not open %s\n", szPath);
return -1;
}
memset(&overlap, 0, sizeof(overlap));
overlap.Offset = 1500;
rc = ReadFile(
hFile,
buf,
READ_SIZE,
&numread,
&overlap
);
printf("Issued read request\n");
if (rc)
{
printf("Request was returned immediately\n");
}
else
{
if (GetLastError() == ERROR_IO_PENDING)
{
printf("Request queued, waiting
\n");
WaitForSingleObject(hFile, INFINITE);
printf("Request completed.\n");
rc = GetOverlappedResult(
hFile,
&overlap,
&numread,
FALSE
);
printf("Result was %d\n", rc);
}
else
{
printf("Error reading file\n");
}
}
CloseHandle(hFile);
return EXIT_SUCCESS;
}
異步過程調用(apcs):
事件內核對象(hEvent)的問題:
事件內核對象在使用
WaitForMultipleObjects時,只能等待64個對象。需要另建兩個數(shù)據組,并gOverlapped[nIndex].hEvent = ghEvents[nIndex]綁定起來。
異步過程調用(apcs)實現(xiàn)方案:
異步過程調用,
callback回調函數(shù),在一個Overlapped I/O完成之后,系統(tǒng)調用該回調函數(shù)。OS在有信號狀態(tài)下(設備句柄),才會調用回調函數(shù)(可能有很多APCS等待處理了),傳給它完成I/O請求的錯誤碼,傳輸字節(jié)數(shù)和Overlapped結構的地址。
五個函數(shù)可以設置信號狀態(tài):
1.SleepEx
2.
WaitForSingleObjectEx
3.
WaitForMultipleObjectEx
4.SingalObjectAndWait
5.
MsgWaitForMultipleObjectsEx
Main函數(shù)調用
WaitForSingleObjectEx, APCS被處理,調用回調函數(shù)FileIOCompletionRoutine
VOID WINAPI FileIOCompletionRoutine(
DWORD dwErrorCode, // completion code
DWORD dwNumberOfBytesTransfered, // number of bytes transferred
LPOVERLAPPED lpOverlapped // pointer to structure with I/O information
)
{
int nIndex = (int)(lpOverlapped->hEvent);
printf("Read #%d returned %d. %d bytes were read.\n",
nIndex,
dwErrorCode,
dwNumberOfBytesTransfered);
if (++nCompletionCount == MAX_REQUESTS)
SetEvent(ghEvent); // Cause the wait to terminate
}
int main()
{
int i;
char szPath[MAX_PATH];
CheckOsVersion();
MTVERIFY(
ghEvent = CreateEvent(
NULL, // No security
TRUE, // Manual reset - extremely important!
FALSE, // Initially set Event to non-signaled state
NULL // No name
)
);
GetWindowsDirectory(szPath, sizeof(szPath));
strcat(szPath, "\\WINHLP32.EXE");
ghFile = CreateFile( szPath,
GENERIC_READ,
FILE_SHARE_READ|FILE_SHARE_WRITE,
NULL,
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED,
NULL
);
if (ghFile == INVALID_HANDLE_VALUE)
{
printf("Could not open %s\n", szPath);
return -1;
}
for (i=0; i<MAX_REQUESTS; i++)
{
QueueRequest(i, i*16384, READ_SIZE);
}
printf("QUEUED!!\n");
for (;;)
{
DWORD rc;
rc = WaitForSingleObjectEx(ghEvent, INFINITE, TRUE );
if (rc == WAIT_OBJECT_0)
break;
MTVERIFY(rc == WAIT_IO_COMPLETION);
}
CloseHandle(ghFile);
return EXIT_SUCCESS;
}
int QueueRequest(int nIndex, DWORD dwLocation, DWORD dwAmount)
{
int i;
BOOL rc;
DWORD err;
gOverlapped[nIndex].hEvent = (HANDLE)nIndex;
gOverlapped[nIndex].Offset = dwLocation;
for (i=0; i<MAX_TRY_COUNT; i++)
{
rc = ReadFileEx(
ghFile,
gBuffers[nIndex],
dwAmount,
&gOverlapped[nIndex],
FileIOCompletionRoutine
);
if (rc)
{
printf("Read #%d queued for overlapped I/O.\n", nIndex);
return TRUE;
}
err = GetLastError();
if ( err == ERROR_INVALID_USER_BUFFER ||
err == ERROR_NOT_ENOUGH_QUOTA ||
err == ERROR_NOT_ENOUGH_MEMORY )
{
Sleep(50); // Wait around and try later
continue;
}
break;
}
printf("ReadFileEx failed.\n");
return -1;
}
完成端口(I/O completion):
異步過程調用
(apcs)問題:
只有發(fā)
overlapped請求的線程才可以提供callback函數(shù)(需要一個特定的線程為一個特定的I/O請求服務)。
完成端口
(I/O completion)的優(yōu)點:
不會限制
handle個數(shù),可處理成千上萬個連接。I/O completion port允許一個線程將一個請求暫時保存下來,由另一個線程為它做實際服務。
并發(fā)模型與線程池:
在典型的并發(fā)模型中,服務器為每一個客戶端創(chuàng)建一個線程,如果很多客戶同時請求,則這些線程都是運行的,那么
CPU就要一個個切換,CPU花費了更多的時間在線程切換,線程確沒得到很多CPU時間。到底應該創(chuàng)建多少個線程比較合適呢,微軟件幫助文檔上講應該是2*CPU個。但理想條件下最好線程不要切換,而又能象線程池一樣,重復利用。I/O完成端口就是使用了線程池。
理解與使用:
第一步:
在我們使用完成端口之前,要調用
CreateIoCompletionPort函數(shù)先創(chuàng)建完成端口對象。定義如下:
HANDLE CreateIoCompletionPort(
HANDLE FileHandle,
HANDLE ExistingCompletionPort,
DWORD CompletionKey,
DWORD NumberOfConcurrentThreads
);
FileHandle:
文件或設備的
handle, 如果值為INVALID_HANDLE_VALUE則產生一個沒有和任何文件handle有關系的port.( 可以用來和完成端口聯(lián)系的各種句柄,文件,套接字
)
ExistingCompletionPort:
NULL時生成一個新
port, 否則handle會加到此port上。
CompletionKey:
用戶自定義數(shù)值,被交給服務的線程。GetQueuedCompletionStatus函數(shù)時我們可以完全得到我們在此聯(lián)系函數(shù)中的完成鍵(申請的內存塊)。在
GetQueuedCompletionStatus中可以完封不動的得到這個內存塊,并且使用它。
NumberOfConcurrentThreads:
參數(shù)
NumberOfConcurrentThreads用來指定在一個完成端口上可以并發(fā)的線程數(shù)量。理想的情況是,一個處理器上只運行一
個線程,這樣可以避免線程上下文切換的開銷。如果這個參數(shù)的值為0,那就是告訴系統(tǒng)線程數(shù)與處理器數(shù)相同。我們可以用下面的代碼來創(chuàng)建I/O完成端口。
隱藏在之創(chuàng)建完成端口的秘密:
1.創(chuàng)建一個完成端口
CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, dwNumberOfConcurrentThreads);
2.設備列表,完成端口把它同一個或多個設備相關聯(lián)。
CreateIoCompletionPort(hDevice, hCompPort, dwCompKey, 0) ;
第二步:
根據處理器個數(shù),創(chuàng)建
cpu*2個工作線程:CreateThread(NULL, 0, ServerWorkerThread, CompletionPort,0, &ThreadID))與此同時,服務器調用
WSASocket,bind, listen, WSAAccept,之后,調用CreateIoCompletionPort((HANDLE) Accept, CompletionPort... )把一個套接字句柄和一個完成端口綁定到一起。完成端口又同一個或多個設備相關聯(lián)著,所以以套接字為基礎,投遞發(fā)送和請求,對
I/O處理。接著,可以依賴完成端口,接收有關I/O操作完成情況的通知。再看程序里:WSARecv(Accept, &(PerIoData->DataBuf), 1, &RecvBytes, &Flags,&(PerIoData->Overlapped), NULL)開始調用,這里象前面講過的一樣,既然是異步
I/O,所以WSASend和WSARecv的調用會立即返回。
系統(tǒng)處理:
當一個設備的異步
I/O請求完成之后,系統(tǒng)會檢查該設備是否關聯(lián)了一個完成端口,如果是,系統(tǒng)就向該完成端口的I/O完成隊列中加入完成的I/O請求列。
然后我們需要從這個完成隊列中,取出調用后的結果
(需要通過一個Overlapped結構來接收調用的結果)。怎么知道這個隊列中已經有處理后的結果呢,調用GetQueuedCompletionStatus函數(shù)。
工作線程與完成端口: 和異步過程調用不同
(在一個
Overlapped I/O完成之后,系統(tǒng)調用該回調函數(shù)。OS在有信號狀態(tài)下(設備句柄),才會調用回調函數(shù)(可能有很多APCS等待處理了))
GetQueuedCompletionStatus在工作線程內調用
GetQueuedCompletionStatus函數(shù)。
GetQueuedCompletionStatus(
HANDLE CompletionPort,
LPDWORD lpNumberOfBytesTransferred,
LPDWORD lpCompletionKey,
LPOVERLAPPED *lpOverlapped,
DWORD dwMilliseconds
);
CompletionPort:指出了線程要監(jiān)視哪一個完成端口。很多服務應用程序只是使用一個I/O完成端口,所有的I/O請求完成以后的通知都將發(fā)給該端口。
lpNumberOfBytesTransferred:傳輸?shù)臄?shù)據字節(jié)數(shù)
lpCompletionKey:
完成端口的單句柄數(shù)據指針,這個指針將可以得到我們在CreateIoCompletionPort中申請那片內存。
lpOverlapped:
重疊I/O請求結構,這個結構同樣是指向我們在重疊請求時所申請的內存塊,同時和lpCompletionKey,一樣我們也可以利用這個內存塊來存儲我們要保存的任意數(shù)據。
dwMilliseconds:
等待的最長時間(毫秒),如果超時,lpOverlapped被設為NULL,函數(shù)返回False.
GetQueuedCompletionStatus功能及隱藏的秘密:
GetQueuedCompletionStatus使調用線程掛起,直到指定的端口的I/O完成隊列中出現(xiàn)了一項或直到超時。(I/0完成隊列中出現(xiàn)了記錄)調用GetQueuedCompletionStatus時,調用線程的ID(cpu*2個線程,每個ServerWorkerThread的線程ID)就被放入該等待線程隊列中。
等待線程隊列很簡單,只是保存了這些線程的ID。完成端口會按照后進先出的原則將一個線程隊列的ID放入到釋放線程列表中。
這樣,I/O完成端口內核對象就知道哪些線程正在等待處理完成的I/O請求。當端口的I/O完成隊列出現(xiàn)一項時,完成端口就喚醒(睡眠狀態(tài)中變?yōu)榭烧{度狀態(tài))等待線程隊列中的一個線程。線程將得到完成I/O項中的信息:傳輸?shù)淖止?jié)數(shù),完成鍵(單句柄數(shù)據結構)和Overlapped結構地址,線程是通過GetQueuedCompletionStatus返回這些信息,等待CPU的調度。
GetQueuedCompletionStatus返回可能有多種原因,如果傳遞無效完成端口句柄,函數(shù)返回False,GetLastError返回一個錯誤(ERROR_INVALID_HANDLE),如果超時,返回False, GetLastError返回WAIT_TIMEOUT, i/o完成隊列刪除一項,該表項是一個成功完成的I/O請求,則返回True。
調用GetQueuedCompletionStatus的線程是后進先出的方式喚醒的,比如有4個線程等待,如果有一個I/O,最后一個調用GetQueuedCompletionStatus的線程被喚醒來處理。處理完之后,再調用 GetQueuedCompletionStatus進入等待線程隊列中。
深入分析完成端口線程池調度原理:
假設我們運行在2CPU的機器上。創(chuàng)建完成端口時指定2個并發(fā),創(chuàng)建了4個工作線程加入線程池中等待完成I/O請求,且完成端口隊列(先入先出)中有3個完成I/O的請求的情況:
工作線程運行, 創(chuàng)建了4個工作線程,調用GetQueuedCompletionStatus時,該調用線程就進入了睡眠狀態(tài),假設這個時候,I/O完成隊列出現(xiàn)了三項,調用線程的ID就被放入該等待線程隊列中,
(如圖):
等待線程隊列(先進后出)
進隊列
出隊列
線程A
線程B
線程C
線程D I/O完成端口內核對象(第3個參數(shù)等級線程隊列),因此知道哪些線程正在等待處理完成的I/O請求。當端口的I/O完成隊列出現(xiàn)一項時,完成端口就喚醒(睡眠狀態(tài)中變?yōu)榭烧{度狀態(tài))等待線程隊列中的一個線程(前面講過等待線程隊列是后進先出)。所以線程D將得到完成I/O項中的信息:傳輸?shù)淖止?jié)數(shù),完成鍵(單句柄數(shù)據結構)和Overlapped結構地址,線程是通過GetQueuedCompletionStatus返回這些信息。
在前面我們指定了并發(fā)線程的數(shù)目是2,所以I/O完成端口喚醒2個線程,線程D和線程C,另兩個繼續(xù)休眠(線程B,線程A),直到線程D處理完了,發(fā)現(xiàn)表項里還有要處理的,就喚醒同一線程繼續(xù)處理。
等待線程隊列(后進先出
)
進隊列
出隊列
線程A
線程B
釋放線程隊列
線程C
線程D
線程并發(fā)量:
并發(fā)量限制了與該完成端口相關聯(lián)的可運行線程的數(shù)目, 它類似閥門的作用。 當與該完成端口相關聯(lián)的可運行線程的總數(shù)目達到了該并發(fā)量,系統(tǒng)就會阻塞任何與該完成端口相關聯(lián)的后續(xù)線程的執(zhí)行, 直到與該完成端口相關聯(lián)的可運行線程數(shù)目下降到小于該并發(fā)量為止。所以解釋了線程池中的運行線程可能會比設置的并發(fā)線程多的原因。
它的作用:
最有效的假想是發(fā)生在有完成包在隊列中等待,而沒有等待被滿足,因為此時完成端口達到了其并發(fā)量的極限。此時,一個正在運行中的線程調用 GetQueuedCompletionStatus時,它就會立刻從隊列中取走該完成包。這樣就不存在著環(huán)境的切換,因為該處于運行中的線程就會連續(xù)不斷地從隊列中取走完成包,而其他的線程就不能運行了。
注意:如果池中的所有線程都在忙,客戶請求就可能拒絕,所以要適當調整這個參數(shù),獲得最佳性能。
線程并發(fā):D線程掛起,加入暫停線程,醒來后又加入釋放線程隊列。
線程C
線程B
線程A
出隊列
進隊列
等待的線程隊列(后進先出)
釋放線程隊列
暫停線程
線程D
線程的安全退出:
PostQueudCompletionStatus函數(shù),我們可以用它發(fā)送一個自定義的包含了OVERLAPPED成員變量的結構地址,里面包含一個狀態(tài)變量,當狀態(tài)變量為退出標志時,線程就執(zhí)行清除動作然后退出。
完成端口使用需要注意的地方:
1.在執(zhí)行wsasend和wsarecv操作前,請先將overlapped結構體使用memset進行清零。