轉(zhuǎn)載自http://blog.csdn.net/u010984552/article/details/51887108
為什么需要線程池
目前的大多數(shù)網(wǎng)絡(luò)服務(wù)器,包括Web服務(wù)器、Email服務(wù)器以及數(shù)據(jù)庫(kù)服務(wù)器等都具有一個(gè)共同點(diǎn),就是單位時(shí)間內(nèi)必須處理數(shù)目巨大的連接請(qǐng)求,但處理時(shí)間卻相對(duì)較短。
傳 統(tǒng)多線程方案中我們采用的服務(wù)器模型則是一旦接受到請(qǐng)求之后,即創(chuàng)建一個(gè)新的線程,由該線程執(zhí)行任務(wù)。任務(wù)執(zhí)行完畢后,線程退出,這就是是“即時(shí)創(chuàng)建,即 時(shí)銷毀”的策略。盡管與創(chuàng)建進(jìn)程相比,創(chuàng)建線程的時(shí)間已經(jīng)大大的縮短,但是如果提交給線程的任務(wù)是執(zhí)行時(shí)間較短,而且執(zhí)行次數(shù)極其頻繁,那么服務(wù)器將處于 不停的創(chuàng)建線程,銷毀線程的狀態(tài)。
我們將傳統(tǒng)方案中的線程執(zhí)行過(guò)程分為三個(gè)過(guò)程:T1、T2、T3。
- T1:線程創(chuàng)建時(shí)間
- T2:線程執(zhí)行時(shí)間,包括線程的同步等時(shí)間
- T3:線程銷毀時(shí)間
那么我們可以看出,線程本身的開(kāi)銷所占的比例為(T1+T3) / (T1+T2+T3)。如果線程執(zhí)行的時(shí)間很短的話,這比開(kāi)銷可能占到20%-50%左右。如果任務(wù)執(zhí)行時(shí)間很頻繁的話,這筆開(kāi)銷將是不可忽略的。
除此之外,線程池能夠減少創(chuàng)建的線程個(gè)數(shù)。通常線程池所允許的并發(fā)線程是有上界的,如果同時(shí)需要并發(fā)的線程數(shù)超過(guò)上界,那么一部分線程將會(huì)等待。而傳統(tǒng)方案中,如果同時(shí)請(qǐng)求數(shù)目為2000,那么最壞情況下,系統(tǒng)可能需要產(chǎn)生2000個(gè)線程。盡管這不是一個(gè)很大的數(shù)目,但是也有部分機(jī)器可能達(dá)不到這種要求。
因此線程池的出現(xiàn)正是著眼于減少線程池本身帶來(lái)的開(kāi)銷。線程池采用預(yù)創(chuàng)建的技術(shù),在應(yīng)用程序啟動(dòng)之后,將立即創(chuàng)建一定數(shù)量的線程(N1),放入空閑隊(duì)列中。這些線程都是處于阻塞(Suspended)狀態(tài),不消耗CPU,但占用較小的內(nèi)存空間。當(dāng)任務(wù)到來(lái)后,緩沖池選擇一個(gè)空閑線程,把任務(wù)傳入此線程中運(yùn)行。當(dāng)N1個(gè)線程都在處理任務(wù)后,緩沖池自動(dòng)創(chuàng)建一定數(shù)量的新線程,用于處理更多的任務(wù)。在任務(wù)執(zhí)行完畢后線程也不退出,而是繼續(xù)保持在池中等待下一次的任務(wù)。當(dāng)系統(tǒng)比較空閑時(shí),大部分線程都一直處于暫停狀態(tài),線程池自動(dòng)銷毀一部分線程,回收系統(tǒng)資源。
基于這種預(yù)創(chuàng)建技術(shù),線程池將線程創(chuàng)建和銷毀本身所帶來(lái)的開(kāi)銷分?jǐn)偟搅烁鱾€(gè)具體的任務(wù)上,執(zhí)行次數(shù)越多,每個(gè)任務(wù)所分擔(dān)到的線程本身開(kāi)銷則越小,不過(guò)我們另外可能需要考慮進(jìn)去線程之間同步所帶來(lái)的開(kāi)銷
構(gòu)建線程池框架
一般線程池都必須具備下面幾個(gè)組成部分:
- 線程池管理器:用于創(chuàng)建并管理線程池
- 工作線程: 線程池中實(shí)際執(zhí)行的線程
- 任務(wù)接口: 盡管線程池大多數(shù)情況下是用來(lái)支持網(wǎng)絡(luò)服務(wù)器,但是我們將線程執(zhí)行的任務(wù)抽象出來(lái),形成任務(wù)接口,從而是的線程池與具體的任務(wù)無(wú)關(guān)。
- 任務(wù)隊(duì)列: 線程池的概念具體到實(shí)現(xiàn)則可能是隊(duì)列,鏈表之類的數(shù)據(jù)結(jié)構(gòu),其中保存執(zhí)行線程。
我們實(shí)現(xiàn)的通用線程池框架由五個(gè)重要部分組成CThreadManage,CThreadPool,CThread,CJob,CWorkerThread,除此之外框架中還包括線程同步使用的類CThreadMutex和CCondition。
- CJob是所有的任務(wù)的基類,其提供一個(gè)接口Run,所有的任務(wù)類都必須從該類繼承,同時(shí)實(shí)現(xiàn)Run方法。該方法中實(shí)現(xiàn)具體的任務(wù)邏輯。
-
- CThread是linux中線程的包裝,其封裝了Linux線程最經(jīng)常使用的屬性和方法,它也是一個(gè)抽象類,是所有線程類的基類,具有一個(gè)接口Run。
-
- CWorkerThread是實(shí)際被調(diào)度和執(zhí)行的線程類,其從CThread繼承而來(lái),實(shí)現(xiàn)了CThread中的Run方法。
-
- CThreadPool是線程池類,其負(fù)責(zé)保存線程,釋放線程以及調(diào)度線程。
-
- CThreadManage是線程池與用戶的直接接口,其屏蔽了內(nèi)部的具體實(shí)現(xiàn)。
-
- CThreadMutex用于線程之間的互斥。
-
- CCondition則是條件變量的封裝,用于線程之間的同步。
CThreadManage直接跟客戶端打交道,其接受需要?jiǎng)?chuàng)建的線程初始個(gè)數(shù),并接受客戶端提交的任務(wù)。這兒的任務(wù)是具體的非抽象的任務(wù)。CThreadManage的內(nèi)部實(shí)際上調(diào)用的都是CThreadPool的相關(guān)操作。CThreadPool創(chuàng)建具體的線程,并把客戶端提交的任務(wù)分發(fā)給CWorkerThread,CWorkerThread實(shí)際執(zhí)行具體的任務(wù)。
理解系統(tǒng)組件
下面我們分開(kāi)來(lái)了解系統(tǒng)中的各個(gè)組件。
CThreadManage
CThreadManage的功能非常簡(jiǎn)單,其提供最簡(jiǎn)單的方法,其類定義如下:
C++ Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| | class CThreadManage { private: CThreadPool *m_Pool; int m_NumOfThread;
protected:
public: CThreadManage(); CThreadManage(int num); virtual ~CThreadManage();
void SetParallelNum(int num); void Run(CJob *job, void *jobdata); void TerminateAll(void); }; |
其中m_Pool指向?qū)嶋H的線程池;m_NumOfThread是初始創(chuàng)建時(shí)候允許創(chuàng)建的并發(fā)的線程個(gè)數(shù)。另外Run和TerminateAll方法也非常簡(jiǎn)單,只是簡(jiǎn)單的調(diào)用CThreadPool的一些相關(guān)方法而已。其具體的實(shí)現(xiàn)如下:
C++ Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| | CThreadManage::CThreadManage() { m_NumOfThread = 10; m_Pool = new CThreadPool(m_NumOfThread); }
CThreadManage::CThreadManage(int num) { m_NumOfThread = num; m_Pool = new CThreadPool(m_NumOfThread); }
CThreadManage::~CThreadManage() { if(NULL != m_Pool) delete m_Pool; }
void CThreadManage::SetParallelNum(int num) { m_NumOfThread = num; }
void CThreadManage::Run(CJob *job, void *jobdata) { m_Pool->Run(job, jobdata); }
void CThreadManage::TerminateAll(void) { m_Pool->TerminateAll(); } |
CThread
CThread 類實(shí)現(xiàn)了對(duì)Linux中線程操作的封裝,它是所有線程的基類,也是一個(gè)抽象類,提供了一個(gè)抽象接口Run,所有的CThread都必須實(shí)現(xiàn)該Run方法。CThread的定義如下所示:
C++ Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| | class CThread { private: int m_ErrCode; Semaphore m_ThreadSemaphore; //the inner semaphore, which is used to realize unsigned long m_ThreadID; bool m_Detach; //The thread is detached bool m_CreateSuspended; //if suspend after creating char *m_ThreadName; ThreadState m_ThreadState; //the state of the thread
protected: void SetErrcode(int errcode) { m_ErrCode = errcode; } static void *ThreadFunction(void *);
public: CThread(); CThread(bool createsuspended, bool detach); virtual ~CThread();
virtual void Run(void) = 0; void SetThreadState(ThreadState state) { m_ThreadState = state; } bool Terminate(void); //Terminate the threa bool Start(void); //Start to execute the thread void Exit(void); bool Wakeup(void); ThreadState GetThreadState(void) { return m_ThreadState; } int GetLastError(void) { return m_ErrCode; } void SetThreadName(char *thrname) { strcpy(m_ThreadName, thrname); } char *GetThreadName(void) { return m_ThreadName; } int GetThreadID(void) { return m_ThreadID; } bool SetPriority(int priority); int GetPriority(void); int GetConcurrency(void); void SetConcurrency(int num); bool Detach(void); bool Join(void); bool Yield(void); int Self(void); }; |
線程的狀態(tài)可以分為四種,空閑、忙碌、掛起、終止(包括正常退出和非正常退出)。由于目前Linux線程庫(kù)不支持掛起操作,因此,我們的此處的掛起操作類似于暫停。如果線程創(chuàng)建后不想立即執(zhí)行任務(wù),那么我們可以將其“暫停”,如果需要運(yùn)行,則喚醒。有一點(diǎn)必須注意的是,一旦線程開(kāi)始執(zhí)行任務(wù),將不能被掛起,其將一直執(zhí)行任務(wù)至完畢。
線程類的相關(guān)操作均十分簡(jiǎn)單。線程的執(zhí)行入口是從Start()函數(shù)開(kāi)始,其將調(diào)用函數(shù)ThreadFunction,ThreadFunction再調(diào)用實(shí)際的Run函數(shù),執(zhí)行實(shí)際的任務(wù)。
CThreadPool
CThreadPool是線程的承載容器,一般可以將其實(shí)現(xiàn)為堆棧、單向隊(duì)列或者雙向隊(duì)列。在我們的系統(tǒng)中我們使用STL Vector對(duì)線程進(jìn)行保存。CThreadPool的實(shí)現(xiàn)代碼如下:
C++ Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
| | class CThreadPool { friend class CWorkerThread;
private: unsigned int m_MaxNum; //the max thread num that can create at the same time unsigned int m_AvailLow; //The min num of idle thread that shoule kept unsigned int m_AvailHigh; //The max num of idle thread that kept at the same time unsigned int m_AvailNum; //the normal thread num of idle num; unsigned int m_InitNum; //Normal thread num;
protected: CWorkerThread *GetIdleThread(void); void AppendToIdleList(CWorkerThread *jobthread); void MoveToBusyList(CWorkerThread *idlethread); void MoveToIdleList(CWorkerThread *busythread); void DeleteIdleThread(int num); void CreateIdleThread(int num);
public: CThreadMutex m_BusyMutex; //when visit busy list,use m_BusyMutex to lock and unlock CThreadMutex m_IdleMutex; //when visit idle list,use m_IdleMutex to lock and unlock CThreadMutex m_JobMutex; //when visit job list,use m_JobMutex to lock and unlock CThreadMutex m_VarMutex; CCondition m_BusyCond; //m_BusyCond is used to sync busy thread list CCondition m_IdleCond; //m_IdleCond is used to sync idle thread list CCondition m_IdleJobCond; //m_JobCond is used to sync job list CCondition m_MaxNumCond;
vector<CWorkerThread *> m_ThreadList; vector<CWorkerThread *> m_BusyList; //Thread List vector<CWorkerThread *> m_IdleList; //Idle List
CThreadPool(); CThreadPool(int initnum); virtual ~CThreadPool();
void SetMaxNum(int maxnum) { m_MaxNum = maxnum; } int GetMaxNum(void) { return m_MaxNum; } void SetAvailLowNum(int minnum) { m_AvailLow = minnum; } int GetAvailLowNum(void) { return m_AvailLow; } void SetAvailHighNum(int highnum) { m_AvailHigh = highnum; } int GetAvailHighNum(void) { return m_AvailHigh; } int GetActualAvailNum(void) { return m_AvailNum; } int GetAllNum(void) { return m_ThreadList.size(); } int GetBusyNum(void) { return m_BusyList.size(); } void SetInitNum(int initnum) { m_InitNum = initnum; } int GetInitNum(void) { return m_InitNum; } void TerminateAll(void); void Run(CJob *job, void *jobdata); };
CWorkerThread *CThreadPool::GetIdleThread(void)
{
while(m_IdleList.size() == 0 )
m_IdleCond.Wait();
m_IdleMutex.Lock();
if(m_IdleList.size() > 0 )
{
CWorkerThread *thr = (CWorkerThread *)m_IdleList.front();
printf("Get Idle thread %d\n", thr->GetThreadID());
m_IdleMutex.Unlock();
return thr;
}
m_IdleMutex.Unlock();
return NULL; }
//create num idle thread and put them to idlelist
void CThreadPool::CreateIdleThread(int num)
{
for(int i = 0; i < num; i++) {
CWorkerThread *thr = new CWorkerThread();
thr->SetThreadPool(this);
AppendToIdleList(thr);
m_VarMutex.Lock();
m_AvailNum++;
m_VarMutex.Unlock();
thr->Start(); //begin the thread,the thread wait for job
}
}
void CThreadPool::Run(CJob *job, void *jobdata)
{
assert(job != NULL);
//if the busy thread num adds to m_MaxNum,so we should wait
if(GetBusyNum() == m_MaxNum)
m_MaxNumCond.Wait();
if(m_IdleList.size() < m_AvailLow)
{
if(GetAllNum() + m_InitNum - m_IdleList.size() < m_MaxNum )
CreateIdleThread(m_InitNum - m_IdleList.size());
else
CreateIdleThread(m_MaxNum - GetAllNum());
}
CWorkerThread *idlethr = GetIdleThread();
if(idlethr != NULL)
{
idlethr->m_WorkMutex.Lock();
MoveToBusyList(idlethr);
idlethr->SetThreadPool(this);
job->SetWorkThread(idlethr);
printf("Job is set to thread %d \n", idlethr->GetThreadID());
idlethr->SetJob(job, jobdata);
}
} |
在CThreadPool中存在兩個(gè)鏈表,一個(gè)是空閑鏈表,一個(gè)是忙碌鏈表。Idle鏈表中存放所有的空閑進(jìn)程,當(dāng)線程執(zhí)行任務(wù)時(shí)候,其狀態(tài)變?yōu)槊β禒顟B(tài),同時(shí)從空閑鏈表中刪除,并移至忙碌鏈表中。在CThreadPool的構(gòu)造函數(shù)中,我們將執(zhí)行下面的代碼:
C++ Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| | for(int i = 0; i < m_InitNum; i++)
{
CWorkerThread *thr = new CWorkerThread();
AppendToIdleList(thr);
thr->SetThreadPool(this);
thr->Start(); //begin the thread,the thread wait for job
} |
在該代碼中,我們將創(chuàng)建m_InitNum個(gè)線程,創(chuàng)建之后即調(diào)用AppendToIdleList放入Idle鏈表中,由于目前沒(méi)有任務(wù)分發(fā)給這些線程,因此線程執(zhí)行Start后將自己掛起。
事實(shí)上,線程池中容納的線程數(shù)目并不是一成不變的,其會(huì)根據(jù)執(zhí)行負(fù)載進(jìn)行自動(dòng)伸縮。為此在CThreadPool中設(shè)定四個(gè)變量:
m_InitNum:處世創(chuàng)建時(shí)線程池中的線程的個(gè)數(shù)。
m_MaxNum:當(dāng)前線程池中所允許并發(fā)存在的線程的最大數(shù)目。
m_AvailLow:當(dāng)前線程池中所允許存在的空閑線程的最小數(shù)目,如果空閑數(shù)目低于該值,表明負(fù)載可能過(guò)重,此時(shí)有必要增加空閑線程池的數(shù)目。實(shí)現(xiàn)中我們總是將線程調(diào)整為m_InitNum個(gè)。
m_AvailHigh:當(dāng)前線程池中所允許的空閑的線程的最大數(shù)目,如果空閑數(shù)目高于該值,表明當(dāng)前負(fù)載可能較輕,此時(shí)將刪除多余的空閑線程,刪除后調(diào)整數(shù)也為m_InitNum個(gè)。
m_AvailNum:目前線程池中實(shí)際存在的線程的個(gè)數(shù),其值介于m_AvailHigh和m_AvailLow之間。如果線程的個(gè)數(shù)始終維持在m_AvailLow和m_AvailHigh之間,則線程既不需要?jiǎng)?chuàng)建,也不需要?jiǎng)h除,保持平衡狀態(tài)。因此如何設(shè)定m_AvailLow和m_AvailHigh的值,使得線程池最大可能的保持平衡態(tài),是線程池設(shè)計(jì)必須考慮的問(wèn)題。
線程池在接受到新的任務(wù)之后,線程池首先要檢查是否有足夠的空閑池可用。檢查分為三個(gè)步驟:
(1)檢查當(dāng)前處于忙碌狀態(tài)的線程是否達(dá)到了設(shè)定的最大值m_MaxNum,如果達(dá)到了,表明目前沒(méi)有空閑線程可用,而且也不能創(chuàng)建新的線程,因此必須等待直到有線程執(zhí)行完畢返回到空閑隊(duì)列中。
(2)如果當(dāng)前的空閑線程數(shù)目小于我們?cè)O(shè)定的最小的空閑數(shù)目m_AvailLow,則我們必須創(chuàng)建新的線程,默認(rèn)情況下,創(chuàng)建后的線程數(shù)目應(yīng)該為m_InitNum,因此創(chuàng)建的線程數(shù)目應(yīng)該為( 當(dāng)前空閑線程數(shù)與m_InitNum);但是有一種特殊情況必須考慮,就是現(xiàn)有的線程總數(shù)加上創(chuàng)建后的線程數(shù)可能超過(guò)m_MaxNum,因此我們必須對(duì)線程的創(chuàng)建區(qū)別對(duì)待。
C++ Code
1 2 3 4 5 6 7 8 9
| | if(GetAllNum() + m_InitNum - m_IdleList.size() < m_MaxNum )
CreateIdleThread(m_InitNum - m_IdleList.size());
else
CreateIdleThread(m_MaxNum - GetAllNum());
|
如果創(chuàng)建后總數(shù)不超過(guò)m_MaxNum,則創(chuàng)建后的線程為m_InitNum;如果超過(guò)了,則只創(chuàng)建( m_MaxNum-當(dāng)前線程總數(shù) )個(gè)。
(3)調(diào)用GetIdleThread方法查找空閑線程。如果當(dāng)前沒(méi)有空閑線程,則掛起;否則將任務(wù)指派給該線程,同時(shí)將其移入忙碌隊(duì)列。
當(dāng)線程執(zhí)行完畢后,其會(huì)調(diào)用MoveToIdleList方法移入空閑鏈表中,其中還調(diào)用m_IdleCond.Signal()方法,喚醒GetIdleThread()中可能阻塞的線程。
CJob
CJob類相對(duì)簡(jiǎn)單,其封裝了任務(wù)的基本的屬性和方法,其中最重要的是Run方法,代碼如下:
C++ Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| | class CJob {
private:
int m_JobNo; //The num was assigned to the job
char *m_JobName; //The job name
CThread *m_pWorkThread; //The thread associated with the job
public:
CJob( void );
virtual ~CJob();
int GetJobNo(void) const { return m_JobNo; }
void SetJobNo(int jobno) { m_JobNo = jobno; }
char *GetJobName(void) const { return m_JobName; }
void SetJobName(char *jobname);
CThread *GetWorkThread(void) { return m_pWorkThread; }
void SetWorkThread ( CThread *pWorkThread ) {
m_pWorkThread = pWorkThread;
}
virtual void Run ( void *ptr ) = 0;
}; |
線程池使用示例
至此我們給出了一個(gè)簡(jiǎn)單的與具體任務(wù)無(wú)關(guān)的線程池框架。使用該框架非常的簡(jiǎn)單,我們所需要的做的就是派生CJob類,將需要完成的任務(wù)實(shí)現(xiàn)在Run方法中。然后將該Job交由CThreadManage去執(zhí)行。下面我們給出一個(gè)簡(jiǎn)單的示例程序
C++ Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| | class CXJob: public CJob {
public:
CXJob() { i = 0; }
~CXJob() {}
void Run(void *jobdata) {
printf("The Job comes from CXJOB\n");
sleep(2);
}
};
class CYJob: public CJob
{
public:
CYJob() { i = 0; }
~CYJob() {}
void Run(void *jobdata) {
printf("The Job comes from CYJob\n");
}
};
main()
{
CThreadManage *manage = new CThreadManage(10);
for(int i = 0; i < 40; i++)
{
CXJob *job = new CXJob();
manage->Run(job, NULL);
}
sleep(2);
CYJob *job = new CYJob();
manage->Run(job, NULL);
manage->TerminateAll();
} |
CXJob和CYJob都是從Job類繼承而來(lái),其都實(shí)現(xiàn)了Run接口。CXJob只是簡(jiǎn)單的打印一句”The Job comes from CXJob”,CYJob也只打印”The Job comes from CYJob”,然后均休眠2秒鐘。在主程序中我們初始創(chuàng)建10個(gè)工作線程。然后分別執(zhí)行40次CXJob和一次CYJob。
C++ 線程池的封裝實(shí)現(xiàn)
為了充分利用多核的優(yōu)勢(shì),我們利用多線程來(lái)進(jìn)行任務(wù)處理,但線程也同樣不能濫用,會(huì)帶來(lái)一下幾個(gè)問(wèn)題:
1)線程本身存在開(kāi)銷,系統(tǒng)必須為每個(gè)線程分配如棧,TLS(線程局部存儲(chǔ)),寄存器等。
2)線程管理會(huì)給系統(tǒng)帶來(lái)開(kāi)銷,context切換同樣會(huì)給系統(tǒng)帶來(lái)成本。
3)線程本身是可以重用的資源,不需要每次都進(jìn)行初始化。
所以往往在使用中,我們無(wú)需把線程與task任務(wù)進(jìn)行一對(duì)一對(duì)應(yīng),只需要預(yù)先初始化有限的線程個(gè)數(shù)來(lái)處理無(wú)限的task任務(wù)即可,線程池應(yīng)運(yùn)而生,原理也就是如此。

主要含有三個(gè)隊(duì)列
- 工作隊(duì)列
- 工作線程隊(duì)列
- 忙碌線程隊(duì)列
工作隊(duì)列是一個(gè)阻塞隊(duì)列,任務(wù)(仿函數(shù))任務(wù)不算被push進(jìn)來(lái)(notify阻塞獲取的工作線程),工作線程隊(duì)列(一直不變)則從該隊(duì)列中獲取任務(wù)執(zhí)行(wait獲取,當(dāng)任務(wù)隊(duì)列為空時(shí)阻塞等待通知),如果獲取到任務(wù),則將線程會(huì)進(jìn)入忙碌線程隊(duì)列中,執(zhí)行任務(wù)的仿函數(shù),當(dāng)工作完成,重新移出工作線程隊(duì)列。
定義線程池專屬異常:
C++ Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328
| | struct TC_ThreadPool_Exception : public TC_Exception { TC_ThreadPool_Exception(const string &buffer) : TC_Exception(buffer) {}; TC_ThreadPool_Exception(const string &buffer, int err) : TC_Exception(buffer, err) {}; ~TC_ThreadPool_Exception () throw () {}; };
/** * @brief 用通線程池類, 與tc_functor, tc_functorwrapper配合使用. * * 使用方式說(shuō)明: * 1 采用tc_functorwrapper封裝一個(gè)調(diào)用 * 2 用tc_threadpool對(duì)調(diào)用進(jìn)行執(zhí)行 * 具體示例代碼請(qǐng)參見(jiàn):test/test_tc_thread_pool.cpp */
/**線程池本身繼承自鎖,可以幫助鎖定**/ class TC_ThreadPool : public TC_ThreadLock { public:
/** * @brief 構(gòu)造函數(shù) * */ TC_ThreadPool ();
/** * @brief 析構(gòu), 會(huì)停止所有線程 */ ~TC_ThreadPool ();
/** * @brief 初始化. * * @param num 工作線程個(gè)數(shù) */ void init(size_t num);
/** * @brief 獲取線程個(gè)數(shù). * * @return size_t 線程個(gè)數(shù) */ size_t getThreadNum() { Lock sync(* this); return _jobthread. size(); }
/** * @brief 獲取線程池的任務(wù)數(shù)( exec添加進(jìn)去的). * * @return size_t 線程池的任務(wù)數(shù) */ size_t getJobNum() { return _jobqueue. size(); }
/** * @brief 停止所有線程 */ void stop();
/** * @brief 啟動(dòng)所有線程 */ void start();
/** * @brief 啟動(dòng)所有線程并, 執(zhí)行初始化對(duì)象. * * @param ParentFunctor * @param tf */ template<class ParentFunctor> void start(const TC_FunctorWrapper< ParentFunctor> &tf) { for(size_t i = 0; i < _jobthread .size(); i++) { _startqueue. push_back(new TC_FunctorWrapper<ParentFunctor >(tf)); }
start(); }
/** * @brief 添加對(duì)象到線程池執(zhí)行,該函數(shù)馬上返回, * 線程池的線程執(zhí)行對(duì)象 */ template<class ParentFunctor> void exec(const TC_FunctorWrapper< ParentFunctor> &tf) { _jobqueue.push_back(new TC_FunctorWrapper<ParentFunctor >(tf)); }
/** * @brief 等待所有工作全部結(jié)束(隊(duì)列無(wú)任務(wù), 無(wú)空閑線程). * * @param millsecond 等待的時(shí)間( ms), -1:永遠(yuǎn)等待 * @return true, 所有工作都處理完畢 * false,超時(shí)退出 */ bool waitForAllDone(int millsecond = -1);
public:
/** * @brief 線程數(shù)據(jù)基類,所有線程的私有數(shù)據(jù)繼承于該類 */ class ThreadData { public: /** * @brief 構(gòu)造 */ ThreadData() {}; /** * @brief 析夠 */ virtual ~ThreadData() {};
/** * @brief 生成數(shù)據(jù). * * @ param T * @return ThreadData* */ template<typename T> static T *makeThreadData() { return new T; } };
/** * @brief 設(shè)置線程數(shù)據(jù). * * @param p 線程數(shù)據(jù) */ static void setThreadData(ThreadData *p);
/** * @brief 獲取線程數(shù)據(jù). * * @return ThreadData* 線程數(shù)據(jù) */ static ThreadData *getThreadData();
/** * @brief 設(shè)置線程數(shù)據(jù), key需要自己維護(hù). * * @param pkey 線程私有數(shù)據(jù)key * @param p 線程指針 */ static void setThreadData(pthread_key_t pkey, ThreadData *p);
/** * @brief 獲取線程數(shù)據(jù), key需要自己維護(hù). * * @param pkey 線程私有數(shù)據(jù)key * @return 指向線程的ThreadData*指針 */ static ThreadData *getThreadData(pthread_key_t pkey);
protected:
/** * @brief 釋放資源. * * @param p */ static void destructor(void *p);
/** * @brief 初始化key */ class KeyInitialize { public: /** * @brief 初始化key */ KeyInitialize() { int ret = pthread_key_create(&TC_ThreadPool::g_key, TC_ThreadPool::destructor); if(ret != 0) { throw TC_ThreadPool_Exception("[TC_ThreadPool::KeyInitialize] pthread_key_create error", ret); } }
/** * @brief 釋放key */ ~KeyInitialize() { pthread_key_delete(TC_ThreadPool::g_key); } };
/** * @brief 初始化key的控制 */ static KeyInitialize g_key_initialize;
/** * @brief 數(shù)據(jù)key */ static pthread_key_t g_key;
protected: /** * @brief 線程池中的工作線程 */ class ThreadWorker : public TC_Thread { public: /** * @brief 工作線程構(gòu)造函數(shù). * * @ param tpool */ ThreadWorker(TC_ThreadPool *tpool);
/** * @brief 通知工作線程結(jié)束 */ void terminate();
protected: /** * @brief 運(yùn)行 */ virtual void run();
protected: /** * 線程池指針 */ TC_ThreadPool *_tpool;
/** * 是否結(jié)束線程 */ bool _bTerminate; };
protected:
/** * @brief 清除 */ void clear();
/** * @brief 獲取任務(wù), 如果沒(méi)有任務(wù), 則為NULL. * * @return TC_FunctorWrapperinterface* */ TC_FunctorWrapperInterface *get(ThreadWorker *ptw);
/** * @brief 獲取啟動(dòng)任務(wù). * * @return TC_FunctorWrapperInterface* */ TC_FunctorWrapperInterface *get();
/** * @brief 空閑了一個(gè)線程. * * @param ptw */ void idle(ThreadWorker *ptw);
/** * @brief 通知等待在任務(wù)隊(duì)列上的工作線程醒來(lái) */ void notifyT();
/** * @brief 是否處理結(jié)束. * * @return bool */ bool finish();
/** * @brief 線程退出時(shí)調(diào)用 */ void exit();
friend class ThreadWorker; protected:
/** * 任務(wù)隊(duì)列 */ TC_ThreadQueue< TC_FunctorWrapperInterface *> _jobqueue;
/** * 啟動(dòng)任務(wù) */ TC_ThreadQueue< TC_FunctorWrapperInterface *> _startqueue;
/** * 工作線程 */ std::vector<ThreadWorker *> _jobthread;
/** * 繁忙線程 */ std::set<ThreadWorker *> _busthread;
/** * 任務(wù)隊(duì)列的鎖 */ TC_ThreadLock _tmutex;
/** * 是否所有任務(wù)都執(zhí)行完畢 */ bool _bAllDone; }; |
工作線程設(shè)計(jì)如下:
C++ Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| | TC_ThreadPool ::ThreadWorker::ThreadWorker(TC_ThreadPool *tpool) : _tpool (tpool) , _bTerminate ( false) { }
void TC_ThreadPool ::ThreadWorker::terminate() { _bTerminate = true; _tpool->notifyT(); }
void TC_ThreadPool ::ThreadWorker::run() { //調(diào)用初始化部分 TC_FunctorWrapperInterface *pst = _tpool->get(); if(pst) { try { (*pst)(); } catch ( ... ) { } delete pst; pst = NULL; }
//調(diào)用處理部分 while (! _bTerminate) { TC_FunctorWrapperInterface *pfw = _tpool->get( this); if(pfw != NULL) { auto_ptr< TC_FunctorWrapperInterface> apfw(pfw);
try { (*pfw)(); } catch ( ... ) { }
_tpool->idle( this); } }
//結(jié)束 _tpool->exit(); }
每個(gè)工作線程在剛開(kāi)始時(shí)都會(huì)執(zhí)行一下初始化操作,并進(jìn)入一個(gè)無(wú)限循環(huán)的部分//調(diào)用處理部分 while (! _bTerminate) { TC_FunctorWrapperInterface *pfw = _tpool->get( this); if(pfw != NULL) { auto_ptr< TC_FunctorWrapperInterface> apfw(pfw);
try { (*pfw)(); } catch ( ... ) { }
_tpool->idle( this); } } |
該工作主要是無(wú)限的從線程池的工作隊(duì)列中獲取任務(wù)并執(zhí)行,如果成功獲取任務(wù),則會(huì)將線程移進(jìn)忙碌隊(duì)列:
C++ Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| | TC_FunctorWrapperInterface *TC_ThreadPool:: get(ThreadWorker *ptw) {
TC_FunctorWrapperInterface *pFunctorWrapper = NULL; if(! _jobqueue. pop_front(pFunctorWrapper, 1000)) { return NULL; }
{ Lock sync( _tmutex); _busthread. insert(ptw); } return pFunctorWrapper; } |
執(zhí)行完,移回工作線程隊(duì)列:_tpool->idle( this);
C++ Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| | void TC_ThreadPool:: idle(ThreadWorker *ptw) { Lock sync( _tmutex); _busthread. erase(ptw);
//無(wú)繁忙線程, 通知等待在線程池結(jié)束的線程醒過(guò)來(lái) if( _busthread. empty()) { _bAllDone = true; _tmutex.notifyAll(); } }
|
此處jobThread隊(duì)列初始化后不會(huì)改變(因?yàn)闆](méi)有實(shí)現(xiàn)自增長(zhǎng)功能),所以非線程安全的vector隊(duì)列即可,busthread的忙碌線程隊(duì)列會(huì)被移進(jìn)移出,但是操作會(huì)自帶Lock sync( _tmutex),該互斥量是線程池本身繼承的,所以是共有的,也無(wú)需另外使用線程安全的TC_ThreadQueue,使用vector即可。
TC_ThreadPool:: idle中的
C++ Code
1 2 3 4 5 6
| | if( _busthread. empty()) { _bAllDone = true; _tmutex.notifyAll(); } |
主要用于當(dāng)線程池工作起來(lái)后的waitForAllDone方法:
C++ Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| | bool TC_ThreadPool:: waitForAllDone( int millsecond) { Lock sync( _tmutex);
start1: //任務(wù)隊(duì)列和繁忙線程都是空的 if (finish()) { return true; }
//永遠(yuǎn)等待 if(millsecond < 0) { _tmutex.timedWait(1000); goto start1; }
int64_t iNow = TC_Common:: now2ms(); int m = millsecond; start2:
bool b = _tmutex.timedWait(millsecond); //完成處理了 if(finish()) { return true; }
if(!b) { return false; }
millsecond = max((int64_t )0, m - (TC_Common ::now2ms() - iNow)); goto start2;
return false; }
_tmutex.timedWait(millsecond)方法喚醒。反復(fù)判斷是否所有的工作是否完成:
bool TC_ThreadPool:: finish() { return _startqueue. empty() && _jobqueue .empty() && _busthread. empty() && _bAllDone; }
|
整體cpp實(shí)現(xiàn)如下:
C++ Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
| | TC_ThreadPool ::KeyInitialize TC_ThreadPool::g_key_initialize; pthread_key_t TC_ThreadPool::g_key ;
void TC_ThreadPool::destructor( void *p) { ThreadData *ttd = ( ThreadData *)p; if(ttd) { delete ttd; } }
void TC_ThreadPool::exit() { TC_ThreadPool:: ThreadData *p = getThreadData(); if(p) { delete p; int ret = pthread_setspecific( g_key, NULL ); if(ret != 0) { throw TC_ThreadPool_Exception ("[TC_ThreadPool::setThreadData] pthread_setspecific error", ret); } }
_jobqueue. clear(); }
void TC_ThreadPool::setThreadData( TC_ThreadPool:: ThreadData *p) { TC_ThreadPool:: ThreadData *pOld = getThreadData(); if(pOld != NULL && pOld != p) { delete pOld; }
int ret = pthread_setspecific( g_key, ( void *)p); if(ret != 0) { throw TC_ThreadPool_Exception ("[TC_ThreadPool::setThreadData] pthread_setspecific error", ret); } }
TC_ThreadPool ::ThreadData *TC_ThreadPool::getThreadData () { return ( ThreadData *) pthread_getspecific( g_key); }
void TC_ThreadPool::setThreadData( pthread_key_t pkey, ThreadData *p) { TC_ThreadPool:: ThreadData *pOld = getThreadData(pkey); if(pOld != NULL && pOld != p) { delete pOld; }
int ret = pthread_setspecific(pkey, ( void *)p); if(ret != 0) { throw TC_ThreadPool_Exception ("[TC_ThreadPool::setThreadData] pthread_setspecific error", ret); } }
TC_ThreadPool ::ThreadData *TC_ThreadPool::getThreadData( pthread_key_t pkey) { return ( ThreadData *) pthread_getspecific(pkey); }
TC_ThreadPool::TC_ThreadPool() : _bAllDone ( true) { }
TC_ThreadPool::~TC_ThreadPool() { stop(); clear(); }
void TC_ThreadPool::clear() { std::vector< ThreadWorker *>::iterator it = _jobthread. begin(); while(it != _jobthread. end()) { delete (*it); ++it; }
_jobthread. clear(); _busthread. clear(); }
void TC_ThreadPool::init( size_t num) { stop();
Lock sync(* this);
clear();
for( size_t i = 0; i < num; i++) { _jobthread. push_back( new ThreadWorker( this)); } }
void TC_ThreadPool::stop() { Lock sync(* this);
std::vector< ThreadWorker *>::iterator it = _jobthread. begin(); while(it != _jobthread. end()) { if ((*it)-> isAlive()) { (*it)-> terminate(); (*it)-> getThreadControl().join (); } ++it; } _bAllDone = true; }
void TC_ThreadPool::start() { Lock sync(* this);
std::vector< ThreadWorker *>::iterator it = _jobthread. begin(); while(it != _jobthread. end()) { (*it)-> start(); ++it; } _bAllDone = false; }
bool TC_ThreadPool:: finish() { return _startqueue. empty() && _jobqueue .empty() && _busthread. empty() && _bAllDone; }
bool TC_ThreadPool::waitForAllDone( int millsecond) { Lock sync( _tmutex);
start1: //任務(wù)隊(duì)列和繁忙線程都是空的 if (finish ()) { return true; }
//永遠(yuǎn)等待 if(millsecond < 0) { _tmutex.timedWait(1000); goto start1; }
int64_t iNow = TC_Common:: now2ms(); int m = millsecond; start2:
bool b = _tmutex.timedWait(millsecond); //完成處理了 if(finish ()) { return true; }
if(!b) { return false; }
millsecond = max((int64_t )0, m - (TC_Common ::now2ms() - iNow)); goto start2;
return false; }
TC_FunctorWrapperInterface *TC_ThreadPool::get( ThreadWorker *ptw) {
TC_FunctorWrapperInterface *pFunctorWrapper = NULL; if(! _jobqueue. pop_front(pFunctorWrapper, 1000)) { return NULL; }
{ Lock sync( _tmutex); _busthread. insert(ptw); } return pFunctorWrapper; }
TC_FunctorWrapperInterface *TC_ThreadPool::get() { TC_FunctorWrapperInterface *pFunctorWrapper = NULL; if(! _startqueue. pop_front(pFunctorWrapper)) { return NULL; }
return pFunctorWrapper; }
void TC_ThreadPool::idle( ThreadWorker *ptw) { Lock sync( _tmutex); _busthread. erase(ptw);
//無(wú)繁忙線程, 通知等待在線程池結(jié)束的線程醒過(guò)來(lái) if( _busthread. empty()) { _bAllDone = true; _tmutex.notifyAll(); } }
void TC_ThreadPool::notifyT() { _jobqueue. notifyT(); } |
線程池使用后記
線程池適合場(chǎng)合
事 實(shí)上,線程池并不是萬(wàn)能的。它有其特定的使用場(chǎng)合。線程池致力于減少線程本身的開(kāi)銷對(duì)應(yīng)用所產(chǎn)生的影響,這是有前提的,前提就是線程本身開(kāi)銷與線程執(zhí)行任 務(wù)相比不可忽略。如果線程本身的開(kāi)銷相對(duì)于線程任務(wù)執(zhí)行開(kāi)銷而言是可以忽略不計(jì)的,那么此時(shí)線程池所帶來(lái)的好處是不明顯的,比如對(duì)于FTP服務(wù)器以及Telnet服務(wù)器,通常傳送文件的時(shí)間較長(zhǎng),開(kāi)銷較大,那么此時(shí),我們采用線程池未必是理想的方法,我們可以選擇“即時(shí)創(chuàng)建,即時(shí)銷毀”的策略。
總之線程池通常適合下面的幾個(gè)場(chǎng)合:
(1) 單位時(shí)間內(nèi)處理任務(wù)頻繁而且任務(wù)處理時(shí)間短
(2) 對(duì)實(shí)時(shí)性要求較高。如果接受到任務(wù)后在創(chuàng)建線程,可能滿足不了實(shí)時(shí)要求,因此必須采用線程池進(jìn)行預(yù)創(chuàng)建。
(3) 必須經(jīng)常面對(duì)高突發(fā)性事件,比如Web服務(wù)器,如果有足球轉(zhuǎn)播,則服務(wù)器將產(chǎn)生巨大的沖擊。此時(shí)如果采取傳統(tǒng)方法,則必須不停的大量產(chǎn)生線程,銷毀線程。此時(shí)采用動(dòng)態(tài)線程池可以避免這種情況的發(fā)生。