#ifndef _ThreadPool_H_
#define _ThreadPool_H_
#pragma warning(disable: 4530)
#pragma warning(disable: 4786)
#include <cassert>
#include <vector>
#include <queue>
#include <windows.h>
class ThreadJob //工作基類
{
public:
//供線程池調(diào)用的虛函數(shù)
virtual void DoJob(void *pPara) = 0;
};
class ThreadPool
{
public:
//dwNum 線程池規(guī)模
ThreadPool(DWORD dwNum = 4) : _lThreadNum(0), _lRunningNum(0)
{
InitializeCriticalSection(&_csThreadVector);
InitializeCriticalSection(&_csWorkQueue);
_EventComplete = CreateEvent(0, false, false, NULL);
_EventEnd = CreateEvent(0, true, false, NULL);
_SemaphoreCall = CreateSemaphore(0, 0, 0x7FFFFFFF, NULL);
_SemaphoreDel = CreateSemaphore(0, 0, 0x7FFFFFFF, NULL);
assert(_SemaphoreCall != INVALID_HANDLE_VALUE);
assert(_EventComplete != INVALID_HANDLE_VALUE);
assert(_EventEnd != INVALID_HANDLE_VALUE);
assert(_SemaphoreDel != INVALID_HANDLE_VALUE);
AdjustSize(dwNum <= 0 ? 4 : dwNum);
}
~ThreadPool()
{
DeleteCriticalSection(&_csWorkQueue);
CloseHandle(_EventEnd);
CloseHandle(_EventComplete);
CloseHandle(_SemaphoreCall);
CloseHandle(_SemaphoreDel);
vector<ThreadItem*>::iterator iter;
for(iter = _ThreadVector.begin(); iter != _ThreadVector.end(); iter++)
{
if(*iter)
delete *iter;
}
DeleteCriticalSection(&_csThreadVector);
}
//調(diào)整線程池規(guī)模
int AdjustSize(int iNum)
{
if(iNum > 0)
{
ThreadItem *pNew;
EnterCriticalSection(&_csThreadVector);
for(int _i=0; _i<iNum; _i++)
{
_ThreadVector.push_back(pNew = new ThreadItem(this));
assert(pNew);
pNew->_Handle = CreateThread(NULL, 0, DefaultJobProc, pNew, 0, NULL);
assert(pNew->_Handle);
}
LeaveCriticalSection(&_csThreadVector);
}
else
{
iNum *= -1;
ReleaseSemaphore(_SemaphoreDel, iNum > _lThreadNum ? _lThreadNum : iNum, NULL);
}
return (int)_lThreadNum;
}
//調(diào)用線程池
void Call(void (*pFunc)(void *), void *pPara = NULL)
{
assert(pFunc);
EnterCriticalSection(&_csWorkQueue);
_JobQueue.push(new JobItem(pFunc, pPara));
LeaveCriticalSection(&_csWorkQueue);
ReleaseSemaphore(_SemaphoreCall, 1, NULL);
}
//調(diào)用線程池
inline void Call(ThreadJob * p, void *pPara = NULL)
{
Call(CallProc, new CallProcPara(p, pPara));
}
//結(jié)束線程池, 并同步等待
bool EndAndWait(DWORD dwWaitTime = INFINITE)
{
SetEvent(_EventEnd);
return WaitForSingleObject(_EventComplete, dwWaitTime) == WAIT_OBJECT_0;
}
//結(jié)束線程池
inline void End()
{
SetEvent(_EventEnd);
}
inline DWORD Size()
{
return (DWORD)_lThreadNum;
}
inline DWORD GetRunningSize()
{
return (DWORD)_lRunningNum;
}
bool IsRunning()
{
return _lRunningNum > 0;
}
protected:
//工作線程
static DWORD WINAPI DefaultJobProc(LPVOID lpParameter = NULL)
{
ThreadItem *pThread = static_cast<ThreadItem*>(lpParameter);
assert(pThread);
ThreadPool *pThreadPoolObj = pThread->_pThis;
assert(pThreadPoolObj);
InterlockedIncrement(&pThreadPoolObj->_lThreadNum);
HANDLE hWaitHandle[3];
hWaitHandle[0] = pThreadPoolObj->_SemaphoreCall;
hWaitHandle[1] = pThreadPoolObj->_SemaphoreDel;
hWaitHandle[2] = pThreadPoolObj->_EventEnd;
JobItem *pJob;
bool fHasJob;
for(;;)
{
DWORD wr = WaitForMultipleObjects(3, hWaitHandle, false, INFINITE);
//響應(yīng)刪除線程信號(hào)
if(wr == WAIT_OBJECT_0 + 1)
break;
//從隊(duì)列里取得用戶作業(yè)
EnterCriticalSection(&pThreadPoolObj->_csWorkQueue);
if(fHasJob = !pThreadPoolObj->_JobQueue.empty())
{
pJob = pThreadPoolObj->_JobQueue.front();
pThreadPoolObj->_JobQueue.pop();
assert(pJob);
}
LeaveCriticalSection(&pThreadPoolObj->_csWorkQueue);
//受到結(jié)束線程信號(hào)確定是否結(jié)束線程(結(jié)束線程信號(hào)&& 是否還有工作)
if(wr == WAIT_OBJECT_0 + 2 && !fHasJob)
break;
if(fHasJob && pJob)
{
InterlockedIncrement(&pThreadPoolObj->_lRunningNum);
pThread->_dwLastBeginTime = GetTickCount();
pThread->_dwCount++;
pThread->_fIsRunning = true;
pJob->_pFunc(pJob->_pPara); //運(yùn)行用戶作業(yè)
delete pJob;
pThread->_fIsRunning = false;
InterlockedDecrement(&pThreadPoolObj->_lRunningNum);
}
}
//刪除自身結(jié)構(gòu)
EnterCriticalSection(&pThreadPoolObj->_csThreadVector);
pThreadPoolObj->_ThreadVector.erase(find(pThreadPoolObj->_ThreadVector.begin(), pThreadPoolObj->_ThreadVector.end(), pThread));
LeaveCriticalSection(&pThreadPoolObj->_csThreadVector);
delete pThread;
InterlockedDecrement(&pThreadPoolObj->_lThreadNum);
if(!pThreadPoolObj->_lThreadNum) //所有線程結(jié)束
SetEvent(pThreadPoolObj->_EventComplete);
return 0;
}
//調(diào)用用戶對(duì)象虛函數(shù)
static void CallProc(void *pPara)
{
CallProcPara *cp = static_cast<CallProcPara *>(pPara);
assert(cp);
if(cp)
{
cp->_pObj->DoJob(cp->_pPara);
delete cp;
}
}
//用戶對(duì)象結(jié)構(gòu)
struct CallProcPara
{
ThreadJob* _pObj;//用戶對(duì)象
void *_pPara;//用戶參數(shù)
CallProcPara(ThreadJob* p, void *pPara) : _pObj(p), _pPara(pPara) { };
};
//用戶函數(shù)結(jié)構(gòu)
struct JobItem
{
void (*_pFunc)(void *);//函數(shù)
void *_pPara; //參數(shù)
JobItem(void (*pFunc)(void *) = NULL, void *pPara = NULL) : _pFunc(pFunc), _pPara(pPara) { };
};
//線程池中的線程結(jié)構(gòu)
struct ThreadItem
{
HANDLE _Handle; //線程句柄
ThreadPool *_pThis; //線程池的指針
DWORD _dwLastBeginTime; //最后一次運(yùn)行開始時(shí)間
DWORD _dwCount; //運(yùn)行次數(shù)
bool _fIsRunning;
ThreadItem(ThreadPool *pthis) : _pThis(pthis), _Handle(NULL), _dwLastBeginTime(0), _dwCount(0), _fIsRunning(false) { };
~ThreadItem()
{
if(_Handle)
{
CloseHandle(_Handle);
_Handle = NULL;
}
}
};
std::queue<JobItem *> _JobQueue; //工作隊(duì)列
std::vector<ThreadItem *> _ThreadVector; //線程數(shù)據(jù)
CRITICAL_SECTION _csThreadVector, _csWorkQueue; //工作隊(duì)列臨界, 線程數(shù)據(jù)臨界
HANDLE _EventEnd, _EventComplete, _SemaphoreCall, _SemaphoreDel;//結(jié)束通知, 完成事件, 工作信號(hào),刪除線程信號(hào)
long _lThreadNum, _lRunningNum; //線程數(shù), 運(yùn)行的線程數(shù)
};
#endif //_ThreadPool_H_
使用說明1:
調(diào)用方法
void threadfunc(void *p)
{
YourClass* yourObject = (YourClass*) p;
//

}
ThreadPool tp;
for(i=0; i<100; i++)
tp.Call(threadfunc);
ThreadPool tp(20);//20為初始線程池規(guī)模
tp.Call(threadfunc, lpPara);
使用時(shí)注意幾點(diǎn):
1. ThreadJob 沒什么用,直接寫線程函數(shù)吧。
2. 線程函數(shù)(threadfunc)的入口參數(shù)void* 可以轉(zhuǎn)成自定義的類型對(duì)象,這個(gè)對(duì)象可以記錄下線程運(yùn)行中的數(shù)據(jù),并設(shè)置線程當(dāng)前狀態(tài),以此與線程進(jìn)行交互。
3. 線程池有一個(gè)EndAndWait函數(shù),用于讓線程池中所有計(jì)算正常結(jié)束。有時(shí)線程池中的一個(gè)線程可能要運(yùn)行很長時(shí)間,怎么辦?可以通過線程函數(shù)threadfunc的入口參數(shù)對(duì)象來處理,比如:
class YourClass
{
int cmd; // cmd = 1是上線程停止計(jì)算,正常退出。
};
threadfunc(void* p) {
YourClass* yourObject = (YourClass*)p;
while (true) {
// do some calculation
if (yourClass->cmd == 1)
break;
}
}
在主線程中設(shè)置yourClass->cmd = 1,該線程就會(huì)自然結(jié)束。
使用說明2:


Code
void threadfunc(void *p)
{
//

}
ThreadPool tp;
for(i=0; i<100; i++)
tp.Call(threadfunc);
ThreadPool tp(20);//20為初始線程池規(guī)模
tp.Call(threadfunc, lpPara);
tp.AdjustSize(50);//增加50
tp.AdjustSize(-30);//減少30
class MyThreadJob : public ThreadJob //線程對(duì)象從ThreadJob擴(kuò)展
{
public:
virtual void DoJob(void *p)//自定義的虛函數(shù)
{
//

.
}
};
MyThreadJob mt[10];
ThreadPool tp;
for(i=0; i<100 i++)
tp.Call(mt + i);//tp.Call(mt + i, para);