青青草原综合久久大伊人导航_色综合久久天天综合_日日噜噜夜夜狠狠久久丁香五月_热久久这里只有精品

posts - 311, comments - 0, trackbacks - 0, articles - 0
  C++博客 :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理

(搬運工)boost之ThreadPool

Posted on 2012-07-17 16:24 點點滴滴 閱讀(3597) 評論(0)  編輯 收藏 引用 所屬分類: 02 編程語言

threadpool是基于boost庫實現的一個線程池子庫,但線程池實現起來不是很復雜。我們從threadpool中又能學到什么東西呢?

它是基于boost庫實現的,如果大家對boost庫有興趣,看看一個簡單的實現還是可以學到點東西的。

threadpool基本功能

1、任務封裝,包括普通任務(task_func)和優先級任務(prio_task_func)。

2、調度策略,包括fifo_scheduler(先進先出)、lifo_scheduler(后進先出)、prio_scheduler(優先級)。

3、結束策略,包括wait_for_all_tasks(全部任務等待)、wait_for_active_tasks(激活任務等待)、immediately(立即結束)。

4、動態修改線程池個數功能。

5、基于future封裝的異步返回值獲取功能。

在sorceforge上有一個用boost編寫的線程池。該線程池和boost結合的比較好,并且提供了多種任務執行策略,使用也非常簡單。 下載地址: http://threadpool.sourceforge.net/ 這個線程池不需要編譯,只要在項目中包含其頭文件就可以了。

一、源代碼分析

quickstart分析(/threadpool/libs/threadpool/quickstart

這個例子的代碼很簡單,但已經全部展示了線程池的核心內容,包括建立、調度、同步等操作。

view plaincopy to clipboardprint?

// Create fifo thread pool container with two threads.

pool tp(2);

// Add some tasks to the pool.

tp.schedule(&first_task);

tp.schedule(&second_task);

// Wait until all tasks are finished.

tp.wait();

// Create fifo thread pool container with two threads.

pool tp(2);

// Add some tasks to the pool.

tp.schedule(&first_task);

tp.schedule(&second_task);

// Wait until all tasks are finished.

tp.wait();

pool的定義具體見pool.hpp,但使用了pimpl模式,核心代碼見pool_core.hpp文件。

下面是pool的定義

typedef thread_pool<task_func, fifo_scheduler, static_size, resize_controller, wait_for_all_tasks> fifo_pool;

typedef fifo_pool pool;

從上面可以知道,pool實際就是fifo_pool,從模板參數可以看到,使用了fifo_schedulerwait_for_all_tasks

對于線程池有點理解的都知道,一般都是那幾樣東西,線程的封裝,條件變量,隊列數據結構。

所以簡單的能做的很簡單,復雜的的就看你的策略需求了。

對基于boost庫的threadpool子庫來說,上面的三樣東西都是現成的,線程封裝和條件變量直接使用thread子庫就行,隊列使用stl的標準容器。

task_adaptors.hpp

對線程任務的封裝,所謂task,我們可以理解成需要運行的函數。

threadpool最大限度的使用了functionbind功能來封裝函數,這點和thread子庫類似。

文件中涉及的內容主要有三個:task_func、prio_task_funclooped_task_func

對普通task的封裝

typedef function0<void> task_func;

如果對bindfunction熟悉的應該很好理解。

對優先級任務的封裝

class prio_task_func

這個類很簡單,重載了兩個方法,

operator()是仿函數的用法,

operator<是用于優先級比較使用的,用于stl容器的元素比較。

size_policies.hpp

size的封裝,包括empty_controllerresize_controllerstatic_size

shutdown_policies.hpp

對線程池結束的策略封裝,包括wait_for_all_tasks、wait_for_active_tasksimmediately。

這幾個類很簡單,具體操作封裝在pool中。

線程池運行過程中,包括隊列中等待的task,線程正在運行的task。

所以結束的時候,對這些task的策略操作是有選擇的。

scheduling_policies.hpp

對任務調度測試的封裝,包括fifo_scheduler、lifo_schedulerprio_scheduler。

實際上,這三個類的相似程度很高,大家可能更喜歡用繼承和虛函數實現。

前面說到保存task的隊列數據結構,在這里就看的很清楚了。

fifolifo使用的是std::deque,prio使用的是std::priority_queue,其他部分代碼沒什么好說的了。

pool_adaptors.hpp

對全局schedule函數的幾種封裝。

future.hpp

好像thread子庫也有future,但不清楚是否是一樣的內容。

threadpoolfuture是為了封裝異步函數調用返回值實現的。

簡單點理解,就是schedule任務的時候,把一個指針在兩者間綁定起來,后面就可以通過future來獲取返回值了。

當然,獲取返回值的過程應該是阻塞的,任務未完成時只能wait

locking_ptr.hpp

LockingPtr的簡單封裝,具體可googlevolatile - Multithreaded Programmer's Best Friend》。

threadpool大量使用了volatile關鍵字,所以需要LockingPtr保護。

scope_guard.hpp

對函數對象的封裝,利用C++析構函數時調用一個在構造函數時綁定的函數對象。

worker_thread.hpp

對工作線程的封裝,這個封裝不是指底層線程api封裝,因為這部分是由boostthread子庫提供的。

封裝針對的是循環執行task的邏輯函數(線程跑起來就loop run某個函數,從隊列中獲取task執行,空閑時等待。)

我們重點看的是runcreate_and_attach。

這兩個函數連起來看,就很清楚了,create_and_attach通過bind方式生成一個thread執行run方法。

run方法中的這條語句就是一個簡單的loop操作,

while(m_pool->execute_task()) {}

所以,當execute_task返回值為false時,run函數就結束了,bind該函數的thread也就結束了。

ok,來到這里,有必要簡單的把整個調用過程說明一下。

// Create fifo thread pool container with two threads.

pool tp(2);

該操作會調用pool的構造函數

view plaincopy to clipboardprint?

thread_pool(size_t initial_threads = 0)

: m_core(new pool_core_type)

, m_shutdown_controller(static_cast<void*>(0), bind(&pool_core_type::shutdown, m_core))

{

size_policy_type::init(*m_core, initial_threads);

}

thread_pool(size_t initial_threads = 0)

: m_core(new pool_core_type)

, m_shutdown_controller(static_cast<void*>(0), bind(&pool_core_type::shutdown, m_core))

{

size_policy_type::init(*m_core, initial_threads);

}

由于pimpl模式,所以所有代碼都封裝在m_core內實現的。

pool默認的線程個數為0,通過size_policy_type::init來初始化。

size_policy_type是一個模板參數,pool對應的是fifo,所以也就是static_size類型了。

//static_size類的init函數

view plaincopy to clipboardprint?

static void init(Pool& pool, size_t const worker_count)

{

pool.resize(worker_count);

}

static void init(Pool& pool, size_t const worker_count)

{

pool.resize(worker_count);

}

//pool_coreresize函數

這個函數有點長,主要是做動態配置線程個數的邏輯操作,create_and_attach也是在這里調用的。

view plaincopy to clipboardprint?

//worker_threadcreate_and_attach函數

static void create_and_attach(shared_ptr<pool_type> const & pool)

{

shared_ptr<worker_thread> worker(new worker_thread(pool));

if(worker)

{

//run是線程的loop函數

worker->m_thread.reset(new boost::thread(bind(&worker_thread::run, worker)));

}

}

//worker_threadcreate_and_attach函數

static void create_and_attach(shared_ptr<pool_type> const & pool)

{

shared_ptr<worker_thread> worker(new worker_thread(pool));

if(worker)

{

//run是線程的loop函數

worker->m_thread.reset(new boost::thread(bind(&worker_thread::run, worker)));

}

}

view plaincopy to clipboardprint?

//worker_threadrun函數

void run()

{

scope_guard notify_exception(bind(&worker_thread::died_unexpectedly, this));

while(m_pool->execute_task()) {} //loop直到返回值為false

notify_exception.disable();

m_pool->worker_destructed(this->shared_from_this());

}

//worker_threadrun函數

void run()

{

scope_guard notify_exception(bind(&worker_thread::died_unexpectedly, this));

while(m_pool->execute_task()) {} //loop直到返回值為false

notify_exception.disable();

m_pool->worker_destructed(this->shared_from_this());

}

//pool_coreexecute_task函數

這個函數有點長,簡單點說,就是從隊列中獲取task然后執行,如果隊列為空,則線程需要wait操作。

由于threadpool支持動態resize線程個數,從該函數我們也是可以看出來是如何做到的。

view plaincopy to clipboardprint?

// decrease number of threads if necessary

if(m_worker_count > m_target_worker_count)

{

return false; // terminate worker

}

// decrease number of threads if necessary

if(m_worker_count > m_target_worker_count)

{

return false; // terminate worker

}

pool內部使用了多個整數來記錄現在個數,譬如m_worker_countm_target_worker_count。

m_worker_count是當前激活運行中的線程個數。

m_target_worker_count是最新動態配置的線程個數。

當個數不匹配時,通過返回false方式結束線程。

// Add some tasks to the pool.

tp.schedule(&first_task);

view plaincopy to clipboardprint?

//thread_poolschedule函數

bool schedule(task_type const & task)

{

return m_core->schedule(task);

}

//pool_coreschedule函數(和execute_task函數強相關)

bool schedule(task_type const & task) volatile

{

locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);

if(lockedThis->m_scheduler.push(task))

{

//task成功入隊列后,notify_one一個線程。

lockedThis->m_task_or_terminate_workers_event.notify_one();

return true;

}

else

{

return false;

}

}

//thread_poolschedule函數

bool schedule(task_type const & task)

{

return m_core->schedule(task);

}

//pool_coreschedule函數(和execute_task函數強相關)

bool schedule(task_type const & task) volatile

{

locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);

if(lockedThis->m_scheduler.push(task))

{

//task成功入隊列后,notify_one一個線程。

lockedThis->m_task_or_terminate_workers_event.notify_one();

return true;

}

else

{

return false;

}

}

// Wait until all tasks are finished.

tp.wait();

//pool_corewait函數

void wait(size_t const task_threshold = 0) const volatile

bool wait(xtime const & timestamp, size_t const task_threshold = 0) const volatile

wait函數是一個阻塞操作,內部邏輯實現使用了一個條件變量,提供超時等待方式。

二、boost線程池使用實例

線程池可以減少創建和切換線程的額外開銷,利用已經存在的線程多次循環執行多個任務從而提高系統的處理能力,有關線程池的概念可google搜索,下面將其使用實例:

#include <iostream>
#include <sstream>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>

#include <boost/threadpool.hpp>

using namespace std;
using namespace boost::threadpool;


//
// Helpers
boost::mutex m_io_monitor;

void print(string text)
{
boost::mutex::scoped_lock lock(m_io_monitor);//每個線程使用全局互斥來保證每次只有一個線程執行
cout << text;
}

template<typename T>
string to_string(T const & value)
{
ostringstream ost;
ost << value;
ost.flush();
return ost.str();
}

//
// An example task functions
void task_1()
{
print(" task_1()/n");
}

void task_2()
{
print(" task_2()/n");
}

void task_3()
{
print(" task_3()/n");
}

int task_4()
{
print(" task_4()/n");
return 4;
}

void task_with_parameter(int value)
{
print(" task_with_parameter(" + to_string(value) + ")/n");
}

int loops = 0;
bool looped_task()
{
print(" looped_task()/n");
return ++loops < 5;
}


int task_int()
{
print(" task_int()/n");
return 23;
}


void fifo_pool_test()
{
pool tp;

tp.schedule(&task_1);
tp.schedule(boost::bind(task_with_parameter, 4));

if(!tp.empty())
{
tp.clear(); // remove all tasks -> no output in this test
}

size_t active_threads = tp.active();
size_t pending_threads = tp.pending();
size_t total_threads = tp.size();

size_t dummy = active_threads + pending_threads + total_threads;
dummy++;

tp.size_controller().resize(5);
tp.wait();
}

void lifo_pool_test()
{
lifo_pool tp;
tp.size_controller().resize(0);
schedule(tp, &task_1);
tp.size_controller().resize(10);
tp.wait();
}

void prio_pool_test()
{
prio_pool tp(2);
schedule(tp, prio_task_func(1, &task_1));
schedule(tp, prio_task_func(10,&task_2));
}


void future_test()
{
fifo_pool tp(5);
future<int> fut = schedule(tp, &task_4);
int res = fut();
}


int main (int , char * const [])
{
fifo_pool_test();
lifo_pool_test();
prio_pool_test();
future_test();
return 0;
}

任務返回值的獲取:

一般異步調用中,返回值的獲取有同步獲取和異步獲取兩種形式。

同步獲取返回值:

int task_int_23()
{
cout<<"task_int_23()/n";
return 23;
}

future<int> res = schedule(tp, &task_int_23);
res.wait();

cout<<"get res value:"<<res.get()<<endl;

異步獲取返回值:

不知道是設計者就不打算使用異步回調獲取返回值還是我看的不夠仔細,異步獲取返回值的方式還真沒有找著,只好自己簡單的寫了一個回調的仿函數來實現異步返回值的獲取。

//R為任務函數的返回值類型
template
<class R>
class callback_task
{
typedef boost::function<void (R)> callback;
typedef boost::function<R ()> function;

private:
callback c_;
function f_;

public:
//F:
任務執行函數
C:結果回調函數
template<class F,class C>
callback_task(F f,C c)
{
f_ = f;
c_ = c;
}

void operator()()
{
c_(f_());
}
};

通過這個對象可以很容易的實現異步結果的回調。

//task_int_23的結果回調函數
void callback(int k)
{
cout<<"get callback value:"<<k<<endl;
}

//通過回調的形式獲取任務的返回值
tp.schedule(callback_task<int>(&task_int_23,&callback));

青青草原综合久久大伊人导航_色综合久久天天综合_日日噜噜夜夜狠狠久久丁香五月_热久久这里只有精品
  • <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>
            欧美影院成年免费版| 亚洲国产成人在线播放| 久久久久久久久蜜桃| 亚洲欧美美女| 欧美综合国产精品久久丁香| 亚洲免费在线观看| 校园春色综合网| 久久久久久久网| 欧美激情精品久久久久久大尺度 | 亚洲视频在线播放| 亚洲午夜三级在线| 久久不见久久见免费视频1| 欧美一区二区三区视频在线| 久久精品视频在线播放| 欧美刺激性大交免费视频| 亚洲经典自拍| 99re成人精品视频| 羞羞色国产精品| 免费亚洲一区二区| 国产精品欧美久久| 亚洲国产福利在线| 午夜精品久久久久久久99樱桃| 久久久精品国产99久久精品芒果| 欧美va天堂| 亚洲免费网站| 欧美精品v日韩精品v国产精品| 国产精品高潮呻吟久久av黑人| 激情欧美一区二区| 亚洲一区二区三区涩| 免费观看久久久4p| 久久国产一区二区三区| 影音先锋成人资源站| 亚洲免费av片| 久久久久免费| av成人老司机| 久热精品在线视频| 国产午夜精品一区二区三区视频 | 欧美国产日产韩国视频| 中文在线资源观看视频网站免费不卡| 久久久国产91| 国产欧美精品一区二区三区介绍| 日韩一级网站| 欧美 日韩 国产在线| 性久久久久久久久| 国产精品免费一区豆花| 在线一区二区三区四区五区| 嫩草影视亚洲| 久久精品国产96久久久香蕉| 国产精品狼人久久影院观看方式| 亚洲三级免费| 欧美激情一区二区三区在线视频观看 | 久久成人免费视频| 国产亚洲一级| 久久国产免费看| 亚洲欧美国产视频| 国产精品美女久久久| 亚洲视频精选| av不卡在线看| 欧美日韩一区免费| 一区二区三区偷拍| 99精品欧美一区| 国产精品videosex极品| 亚洲一区在线观看视频| 99在线精品视频| 欧美日韩综合在线| 亚洲一区二区三区精品在线| 9久草视频在线视频精品| 欧美视频在线观看 亚洲欧| 亚洲特色特黄| 亚洲一区久久久| 国产一区二区0| 蜜臀av性久久久久蜜臀aⅴ| 另类人畜视频在线| 99精品视频免费在线观看| 日韩午夜电影| 国产日本欧美在线观看| 久久综合999| 免费亚洲电影在线观看| 亚洲无毛电影| 欧美一区二区三区四区夜夜大片 | 亚洲欧美三级伦理| 欧美在线关看| 亚洲人成绝费网站色www| 亚洲精品一区二区三区蜜桃久| 久久久国产精彩视频美女艺术照福利| 久久国产毛片| 亚洲欧洲另类国产综合| 日韩一级免费| 国产视频丨精品|在线观看| 久久琪琪电影院| 欧美激情综合在线| 欧美在线观看一区| 久久综合中文| 国产精品99久久久久久有的能看| 亚洲欧美日韩成人| 亚洲国产一二三| 亚洲欧美精品在线| 亚洲欧洲综合| 亚洲专区一区二区三区| 亚洲国产一区二区三区a毛片 | 91久久精品视频| 国产欧美日韩视频| 最新国产成人在线观看| 国产欧美精品在线播放| 91久久国产综合久久| 国产综合久久| 夜夜夜久久久| 91久久久久| 性欧美激情精品| 中文国产亚洲喷潮| 久久日韩粉嫩一区二区三区 | 亚洲国产cao| 国产香蕉97碰碰久久人人| 日韩一区二区精品| 亚洲国产另类精品专区| 亚洲综合日韩在线| 亚洲天堂免费观看| 欧美国产高清| 欧美搞黄网站| 精久久久久久久久久久| 亚洲综合首页| 亚洲欧美在线x视频| 欧美成人dvd在线视频| 久久一综合视频| 国产在线欧美日韩| 午夜欧美精品| 欧美在线观看天堂一区二区三区| 欧美日韩免费高清一区色橹橹| 亚洲国产99精品国自产| 亚洲风情在线资源站| 久久久xxx| 免费在线观看日韩欧美| 国精产品99永久一区一区| 亚洲欧美中文日韩在线| 欧美一区二区在线视频| 国产精品呻吟| 欧美亚洲在线播放| 久久婷婷久久| 伊人狠狠色j香婷婷综合| 久久精品91| 免费在线亚洲欧美| 最新亚洲激情| 欧美精品一区二区三区很污很色的| 亚洲国产高清在线观看视频| 91久久中文字幕| 欧美另类69精品久久久久9999| 亚洲美女中文字幕| 亚洲自拍偷拍一区| 欧美一区综合| 久久精品一区二区三区不卡| 国产精品美女主播在线观看纯欲| 亚洲已满18点击进入久久| 欧美一区二区在线播放| 国产一区二区三区四区五区美女 | 久久国产精品久久久| 国产亚洲精品久久久久动| 久久超碰97人人做人人爱| 欧美国产精品中文字幕| 亚洲另类自拍| 国产精品麻豆欧美日韩ww| 欧美亚洲综合在线| 欧美高清成人| 亚洲字幕一区二区| 在线观看国产精品淫| 欧美日韩国产va另类| 亚洲欧美网站| 亚洲国产婷婷综合在线精品| 亚洲欧美成人| 欲色影视综合吧| 欧美日一区二区在线观看 | 久久亚洲精品中文字幕冲田杏梨| 91久久精品久久国产性色也91| 亚洲男人天堂2024| 在线欧美视频| 国产精品电影在线观看| 久久经典综合| 亚洲视频网站在线观看| 欧美成人一区二区三区| 亚洲欧美日韩国产成人精品影院| 国产偷国产偷亚洲高清97cao | 免费短视频成人日韩| 一本大道久久a久久精品综合| 国产精品少妇自拍| 欧美老女人xx| 久久亚洲一区二区| 亚洲欧美影院| 亚洲精品国精品久久99热一| 欧美在线综合视频| 一区二区三区精品视频| 在线不卡视频| 国产午夜精品福利| 国产精品mm| 欧美日韩福利视频| 久久在线免费视频| 欧美在线黄色| 亚洲天堂网站在线观看视频| 亚洲激情在线观看| 欧美成人免费小视频| 久久久久久**毛片大全| 午夜视频精品|