一個簡單線程池的實現
以前寫線程池是在網絡編程的時候,一個線程池處理一個網絡套接字,隨著連接的增多,效率很低,最近主要是在封裝一個ipc程序(進程間通信機制) 主要
涉及的技術:
Winsock, 線程池
因為光是基于線程池的技術效率還是很低,打算重新把其代碼整理重新封裝通過,在此基礎上通過完成端口來封裝一個簡單的高并發(fā)服務器。
可能涉及的技術
Winsock: windows網絡通信
完成端口: Windows上服務器的大規(guī)模連接機制。
線程池: 高效高利用率的線程機制。
本文主要實現一個線程池的例子,從基本原理入手,一個線程池會記錄每個線程的信息,以及每個線程的處理。
一般一個簡單線程池至少包含下列組成部分。
1.線程池管理器(ThreadPoolManager):用于創(chuàng)建并管理線程池
2.工作線程(WorkThread): 線程池中線程
3.任務接口(Task):每個任務必須實現的接口,以供工作線程調度任務的執(zhí)行。
4.任務隊列:用于存放沒有處理的任務。提供一種緩沖機制。
#pragma once2

3
#include "lock.h"4

5

/**//// 線程池6
namespace tp_ipc_peer_namespace7


{8

/**//// 接口9
class task_object10

{11
public:12

virtual ~task_object()
{}13
virtual void exec() = 0;14
};15

16
template< typename Functor>17
class task 18
: public task_object19

{20

21

/**//// 禁止操作22
private:23
task( const task &);24
task & operator =( const task & );25

26
public:27
typedef Functor functor_type;28

29
task( const functor_type & functor)30
: functor_( functor )31

{ }32

33

/**//// 執(zhí)行34
virtual void exec()35

{36
functor_();37
}38

39
private:40
Functor functor_;41
42
};43

44
class ctpool45

{46
typedef ctpool self_type;47
48
public:49
ctpool(void)50
:tpool_running_( true )51

{ 52
_m_start_threads( 1 );53
}54
ctpool ( unsigned threadsize )55
:tpool_running_(true)56

{57
_m_start_threads( threadsize );58
}59

60
template< typename Function>61
void push( const Function & f)62

{63

/**//// 枷鎖64
task_lock_.enter();65

66
task_container_.push_back( new tp_ipc_peer_namespace::task<Function>( f ) );67
68
task_lock_.leave();69

70
}71
72

~ctpool(void)
{}73

74
private:75

76

/**//// 創(chuàng)建線程池77
void _m_start_threads( unsigned size )78

{79
if ( size == 0 )80
size = 4;81

82
for ( unsigned i = 0 ; i < size ; i++)83

{84
tinfo_type tinfo;85
tinfo.state = 0;86
tinfo.handle = (HANDLE)::_beginthreadex( 0 , 0 , _m_work_thread , NULL , NULL ,&(tinfo.tid) );87
threads_.push_back( tinfo );88
}89
}90
91

/**//// 喚醒92
void _m_wakeup()93

{94
HANDLE handle = 0;95

96

/**//// 對共享區(qū) 枷鎖97
tlock_.enter();98
std::vector<tinfo_type>::iterator it = threads_.begin(), end = threads_.end();99

100
for ( ; it != end ; ++it )101

{102

if ( it->state == 0 ) /**//// 在等待狀態(tài)103

{104
handle = it->handle ;105
it->state = 1;106
break;107
}108
}109
tlock_.leave();110

111
while ( ::ResumeThread( handle ) != 1)112
;113
}114

115

/**//// 掛起某個線程116
void _m_suspend()117

{118
unsigned tid = ::GetCurrentThreadId();119
HANDLE handle = 0;120

121
tlock_.enter();122

123

/**//// 對共享區(qū) 枷鎖124
tlock_.enter();125
std::vector<tinfo_type>::iterator it = threads_.begin(), end = threads_.end();126

127
for ( ; it != end ; ++it )128

{129

if ( it->tid == tid ) /**//// 運行ID130

{131
handle = it->handle ;132
it->state = 0;133
break;134
}135
}136
tlock_.leave();137

138

/**//// 掛起139
if ( handle)140

{141
::SuspendThread( handle );142
}143
}144

145

/**//// 獲取task146
tp_ipc_peer_namespace::task_object * _m_read_task()147

{148
while( tpool_running_ )149

{150
tp_ipc_peer_namespace::task_object * task = NULL;151
152

/**//// 對共享區(qū) 枷鎖153
task_lock_.enter();154
if ( task_container_.size() )155

{156
task = *( task_container_.begin() );157
task_container_.erase( task_container_.begin() );158
}159
task_lock_.leave();160

161
if ( task )162

{163
return task;164
}165
else166
_m_suspend();167
}168
return NULL;169
}170

171
private:172
static unsigned __stdcall _m_work_thread(void * arg)173

{174
175
self_type & self = *reinterpret_cast<self_type*>(arg);176
tp_ipc_peer_namespace::task_object * task = 0;177

178
::SuspendThread(::GetCurrentThread());179

180
while( true )181

{182
task = self._m_read_task();183
if ( task )184

{185
task->exec();186

187
delete task ;188
task = 0;189
}190
else191
break;192
}193
194
::_endthreadex( 0 );195
return 0;196
}197

198
private:199

/**//// 線程信息200
struct tinfo_type201

{202
HANDLE handle;203
unsigned tid;204
unsigned long state; // 0 = sleep;205
};206

207

/**//// user define var208
private:209

/**//// 線程運行狀態(tài)210
volatile bool tpool_running_;211

/**//// 一個臨界區(qū)類212
sync::csectionlock tlock_; 213

/**//// 線程信息214
std::vector<tinfo_type> threads_; 215

/**//// 216
sync::csectionlock task_lock_;217

/**//// 一個回調函數218
std::vector<task_object* > task_container_;219

220
};221

222

223
}
備注:在設計ipc的時候參考 http://man.chinaunix.net/tech/lyceum/linuxK/ipc/ipc.html
線程池設計 http://www.ibm.com/developerworks/cn/java/l-threadPool/

