青青草原综合久久大伊人导航_色综合久久天天综合_日日噜噜夜夜狠狠久久丁香五月_热久久这里只有精品

woaidongmao

文章均收錄自他人博客,但不喜標(biāo)題前加-[轉(zhuǎn)貼],因其丑陋,見(jiàn)諒!~
隨筆 - 1469, 文章 - 0, 評(píng)論 - 661, 引用 - 0
數(shù)據(jù)加載中……

Intel TBB::Pipeline,按序處理數(shù)據(jù)

在上一篇文章(TBB:pipeline,軟件流水線的威力)最后提出了幾個(gè)問(wèn)題,我們逐個(gè)來(lái)看看TBB::Pipeline是怎么解決的。

 

 

為什么Pipeline可以保證數(shù)據(jù)執(zhí)行的順序?既然TBB歸根到底是通過(guò)多線程執(zhí)行任務(wù),為什么不會(huì)在讀入先后兩個(gè)字符串后,后讀入的字符串先被下一個(gè)task處理?Pipeline里是不是有一個(gè)類似于FIFO 先進(jìn)先出隊(duì)列之類的東西?

 

 

之前曾經(jīng)質(zhì)疑過(guò)Pipeline的性能,甚至想自己用MultiThreading來(lái)模擬一個(gè)流水線,但很快就發(fā)現(xiàn)其中實(shí)現(xiàn)的難點(diǎn)。數(shù)據(jù)執(zhí)行的順序性就是其中之一。

 

假設(shè)以一個(gè)thread代表流水線上的一個(gè)節(jié)點(diǎn),如果某節(jié)點(diǎn)是并發(fā)執(zhí)行的,那么就需要2個(gè)以上的thread(AB),上一節(jié)點(diǎn)處理完畢的順序數(shù)據(jù)到底是先送給A還是B呢?處理完畢后后又該先將A還是B中的數(shù)據(jù)送到下一節(jié)點(diǎn)呢?即使可以人為的指定AB之間的優(yōu)先規(guī)則,由于thread本身被調(diào)度的不確定性,實(shí)際運(yùn)行中還是有很多不可預(yù)知的困難。

 

流水線的一個(gè)顯著特性就是保證每個(gè)數(shù)據(jù)均以相同的順序流過(guò)每個(gè)節(jié)點(diǎn)。因此,TBB::Pipeline中的一個(gè)首要任務(wù)就是在節(jié)點(diǎn)被并發(fā)執(zhí)行的同時(shí),仍能夠保證所處理的數(shù)據(jù)的次序而不需額外的處理代碼。此外,在要求串行處理的節(jié)點(diǎn),要保證即使排在前面的數(shù)據(jù)先被處理,即使排在后面的數(shù)據(jù)先到達(dá)。

 

Pipeline的中心思想就是以token來(lái)控制數(shù)據(jù)的處理順序和流水線的深度。Pipeline::run函數(shù)中指定了token的最大值:

 

void pipeline::run( size_t max_number_of_live_tokens ) {}

 

 

每一個(gè)數(shù)據(jù)在進(jìn)入Pipeline的時(shí)候都會(huì)按照先后順序依次分配一個(gè)token,如line1處:

 

task* stage_task::execute() {

    __TBB_ASSERT( !my_at_start || !my_object, NULL );

    if( my_at_start ) {

        if( my_filter->is_serial() ) {

            if( (my_object = (*my_filter)(my_object)) ) {

                my_token = my_pipeline.token_counter++; //line1

                my_token_ready = true;

                ITT_NOTIFY( sync_releasing, &my_pipeline.input_tokens );

                if( --my_pipeline.input_tokens>0 )

                    spawn( *new( allocate_additional_child_of(*my_pipeline.end_counter) ) stage_task( my_pipeline ) );

            } else {

                my_pipeline.end_of_input = true; //line2

                return NULL;

            }

...

}

 

如果當(dāng)前流水線中的token全部用完了,那么暫時(shí)就不會(huì)處理新的數(shù)據(jù),直到已進(jìn)入Pipeline的數(shù)據(jù)被處理完畢有空閑的token(line2)

 

仍然以TBB中的例子text_filter為例考慮,流水線為 MyInputFilter->MyTransformFilter->MyOutputFilerMyInputFilter從磁盤(pán)上讀取數(shù)據(jù),MyTransformFilter轉(zhuǎn)換成大寫(xiě)字母,MyOutputFilter將轉(zhuǎn)換好的數(shù)據(jù)寫(xiě)入磁盤(pán)。因此,MyInputFilter節(jié)點(diǎn)和MyOutputFiler節(jié)點(diǎn)必須是串行執(zhí)行,而MyTransformFilter可以并發(fā)執(zhí)行。對(duì)于MyInputFilter讀入的一串順序數(shù)據(jù),token依次為1->2->3,如何保證經(jīng)過(guò)轉(zhuǎn)換后數(shù)據(jù)也是以相同的順序?qū)懭氪疟P(pán)?

 

秘密在于TBB中的一個(gè)類tbb::internal::ordered_bufferMyOutputFilter用它來(lái)保證按照token的順序執(zhí)行其隊(duì)列中的數(shù)據(jù),而不管數(shù)據(jù)進(jìn)入隊(duì)列的先后次序,換句話說(shuō),即使排在后面的數(shù)據(jù)token 2先被某個(gè)MyTransformFilter節(jié)點(diǎn)處理完畢送往MyOutputFilter,只要數(shù)據(jù)token 1沒(méi)到達(dá)沒(méi)被MyOutputFilter執(zhí)行,數(shù)據(jù)2就不會(huì)在數(shù)據(jù)1之前先寫(xiě)入磁盤(pán)。每一個(gè)需要被串行處理的節(jié)點(diǎn),都會(huì)有一個(gè)ordered_buffer類型的成員變量。

 

先看看ordered_buffer的定義:

 

//! A buffer of ordered items.

/** Each item is a task, inserted into a position in the buffer corrsponding to a Token. */

class ordered_buffer {

    typedef  Token  size_type;

 

    //! Array of deferred tasks that cannot yet start executing.

    /** Element is NULL if unused. */

    task** array; //數(shù)組,以順序方式保存所有待處理的task

 

    //! Size of array

    /** Always 0 or a power of 2 */

    size_type array_size; //數(shù)組的尺寸

 

    //! Lowest token that can start executing.

    /** All prior Token have already been seen. */

    Token low_token; //當(dāng)前正在處理的token,

 

    //! Serializes updates.

    spin_mutex array_mutex; //用于保護(hù)array并發(fā)訪問(wèn)的鎖

};

 

仍然是在task* stage_task::execute() {

...

 if( ordered_buffer* input_buffer = my_filter->input_buffer ) {

            // The next filter must execute tokens in order.

            stage_task& clone = *new( allocate_continuation() ) stage_task( my_pipeline, my_filter );

            clone.my_token = my_token;                       //token號(hào)

            clone.my_token_ready = my_token_ready;

            clone.my_object = my_object;                    //數(shù)據(jù)

            next = input_buffer->put_token(clone);//task放入隊(duì)列

        } else {

            /* A semi-hackish way to reexecute the same task object immediately without spawning.

               recycle_as_continuation marks the task for future execution,

               and then 'this' pointer is returned to bypass spawning. */

            recycle_as_continuation();

            next = this;

        }

    } else {

...

}

 

對(duì)于需要被串行處理的節(jié)點(diǎn),使用ordered_bufferput_token函數(shù)將相關(guān)的數(shù)據(jù)和task引用放入隊(duì)列。put_token的實(shí)現(xiàn)是關(guān)鍵:

 

    template<typename StageTask>

    task* put_token( StageTask& putter ) {

        task* result = &putter;

        {

            spin_mutex::scoped_lock lock( array_mutex );

            Token token = putter.next_token_number();

            if( token!=low_token ) {

                // Trying to put token that is beyond low_token.

                // Need to wait until low_token catches up before dispatching.

                result = NULL;

                __TBB_ASSERT( (tokendiff_t)(token-low_token)>0, NULL );

                if( token-low_token>=array_size )

                    grow( token-low_token+1 );

                ITT_NOTIFY( sync_releasing, this );

                array[token&array_size-1] = &putter;

            }

        }

        return result;

    }

這個(gè)函數(shù)的實(shí)質(zhì)是,首先取得下一個(gè)要處理的token,然后把待執(zhí)行的task放到ordered_buffer的任務(wù)隊(duì)列中的"合適位置",而low_token指向當(dāng)前需要處理的token編號(hào)。

 

例如low_token=0,當(dāng)前需要處理0號(hào)token,下一個(gè)token1,因此task保存在array[1]處并處于阻塞狀態(tài),待0號(hào)token處理完畢后,low_token增加1,再?gòu)?/span>array數(shù)組中取出1號(hào)token對(duì)應(yīng)的task進(jìn)行處理。

 

Pipeline中是這樣通知串行節(jié)點(diǎn)以處理好一條數(shù)據(jù)的:

還是在task* stage_task::execute() {

...

if( ordered_buffer* input_buffer = my_filter->input_buffer )

            input_buffer->note_done(my_token,*this);

...

}

 

看看note_done的實(shí)現(xiàn)會(huì)有一種大徹大悟的感覺(jué)!如果剛完成的token就是次序最優(yōu)先的token(low_token),那取出下一個(gè)要執(zhí)行的task,以spawn的方式讓TBBtask scheduler來(lái)調(diào)度:

 

 

//! Note that processing of a token is finished.

    /** Fires up processing of the next token, if processing was deferred. */

    void note_done( Token token, task& spawner ) {

        task* wakee=NULL;

        {

            spin_mutex::scoped_lock lock( array_mutex );

            if( token==low_token ) {

                // Wake the next task

                task*& item = array[++low_token & array_size-1];

                ITT_NOTIFY( sync_acquired, this );

                wakee = item;

                item = NULL;

            }

        }

        if( wakee ) {

            spawner.spawn(*wakee);

        }

    }

 

 

 

 

ordered_buffer是一個(gè)非常有趣的實(shí)現(xiàn),相比于常見(jiàn)的用FIFO queue來(lái)實(shí)現(xiàn)線程間的數(shù)據(jù)傳遞,ordered_buffer可謂精巧。我們可以好好利用ordered_buffer的原理來(lái)進(jìn)一步改進(jìn)我們的代碼。

 

posted on 2009-05-02 01:53 肥仔 閱讀(2536) 評(píng)論(0)  編輯 收藏 引用 所屬分類: 庫(kù) & 代碼段

青青草原综合久久大伊人导航_色综合久久天天综合_日日噜噜夜夜狠狠久久丁香五月_热久久这里只有精品
  • <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>
            亚洲一区图片| 国内精品亚洲| 国产精品色婷婷| 欧美一区二区三区四区高清 | 免费日韩av片| 久久人人精品| 一本一本a久久| 欧美一区二区视频97| 黄色精品一区二区| 日韩午夜一区| 亚洲免费福利视频| 欧美极品在线观看| 亚洲欧美一区二区在线观看| 亚洲视频免费在线观看| 亚洲大胆av| 亚欧美中日韩视频| 中文av一区二区| 蜜桃av一区二区在线观看| 午夜精品婷婷| 欧美精品在线看| 美腿丝袜亚洲色图| 国产精品中文字幕在线观看| 亚洲韩国精品一区| 国产亚洲综合精品| 亚洲欧美中文在线视频| 亚洲视频在线观看| 欧美日韩亚洲在线| 99ri日韩精品视频| 亚洲天堂成人在线视频| 欧美日韩亚洲一区在线观看| 亚洲精品免费观看| a91a精品视频在线观看| 欧美成人在线免费观看| 亚洲黄页一区| 中文精品在线| 国产精品一区二区久久精品| 在线一区二区视频| 久久久国产精品一区二区中文| 黄色一区二区三区| 久久综合一区二区| 亚洲品质自拍| 久久都是精品| 亚洲国产激情| 欧美日韩第一页| 午夜精品视频| 亚洲美女在线视频| 久久精品99国产精品酒店日本| 亚洲国产美女精品久久久久∴| 在线精品福利| 久久综合狠狠综合久久激情| 欧美激情精品久久久久久久变态| 亚洲国产综合在线看不卡| 欧美激情在线狂野欧美精品| 99精品视频免费全部在线| 久久久久国产精品人| 亚洲成色最大综合在线| 国产精品久在线观看| 久久综合五月天婷婷伊人| 亚洲一级片在线观看| 免费亚洲一区二区| 午夜国产精品视频| 亚洲美女啪啪| 亚洲国产欧美国产综合一区| 国产区欧美区日韩区| 国产精品黄页免费高清在线观看| 免费永久网站黄欧美| 久久精品色图| 久久久精品视频成人| 久久精品国产第一区二区三区最新章节 | 日韩一二三区视频| 亚洲国产精品va| 在线播放日韩欧美| 亚洲欧洲精品一区二区三区不卡 | 羞羞视频在线观看欧美| 亚洲午夜久久久久久久久电影院 | 在线午夜精品| 一区二区三区日韩| 国产精品99久久久久久白浆小说| 亚洲国产精品va在看黑人| 亚洲黄色天堂| 午夜一区二区三区不卡视频| 久久精品一区二区三区中文字幕| 免费欧美高清视频| 日韩午夜电影av| 欧美在线电影| 欧美日韩一区二区国产| 国产九色精品成人porny| 一区二区亚洲精品国产| 亚洲最新在线视频| 久久精品官网| 日韩视频永久免费观看| 欧美亚洲免费| 国产精品国产三级国产aⅴ无密码| 国产视频在线观看一区二区三区 | 136国产福利精品导航网址| 亚洲精品视频二区| 欧美一区二区三区啪啪| 欧美高清视频| 久久在线免费观看| 黑人极品videos精品欧美裸| 亚洲欧美国产制服动漫| 亚洲剧情一区二区| 欧美精选一区| 99精品国产99久久久久久福利| 欧美sm视频| 亚洲电影第1页| 欧美亚洲在线观看| 亚洲欧美国产精品va在线观看| 欧美激情亚洲另类| 日韩视频欧美视频| 亚洲人体大胆视频| 欧美成人国产一区二区| 亚洲精品国久久99热| 欧美韩日视频| 久久精品男女| 国产精品入口日韩视频大尺度| 日韩亚洲欧美在线观看| 亚洲裸体在线观看| 欧美日韩国产精品| 亚洲欧美中文在线视频| 9l视频自拍蝌蚪9l视频成人| 欧美日韩在线一区| 欧美一区二区三区日韩| 亚洲欧美综合网| 亚洲欧洲日夜超级视频| 亚洲精品日产精品乱码不卡| 欧美揉bbbbb揉bbbbb| 久久不见久久见免费视频1| 久久漫画官网| 久久久在线视频| 亚洲午夜激情网页| 久久大逼视频| 亚洲一区二区在线视频| 久久亚洲私人国产精品va| 亚洲小说欧美另类社区| 久久字幕精品一区| 欧美在线视频a| 欧美久久影院| 亚洲精品色图| 亚洲激情图片小说视频| 亚洲已满18点击进入久久| 亚洲免费黄色| 久久riav二区三区| 亚洲欧美久久| 国产精品久久久久久久久久免费| 欧美色网一区二区| 亚洲成人在线视频播放 | 麻豆av福利av久久av| 亚洲欧美日韩国产中文在线| 欧美精选一区| 亚洲美女视频在线免费观看| 亚洲精品免费在线播放| 欧美自拍偷拍午夜视频| 久久精品成人一区二区三区蜜臀| 欧美偷拍一区二区| 这里只有精品在线播放| 亚洲综合清纯丝袜自拍| 国产精品成人一区二区三区吃奶| 日韩亚洲欧美在线观看| 亚洲婷婷在线| 国产精品欧美久久| 亚洲欧美中文字幕| 欧美在线播放高清精品| 国产一区二区三区久久悠悠色av| 亚洲欧美中文另类| 久久天天躁狠狠躁夜夜爽蜜月| 一区二区三区亚洲| 欧美极品在线播放| 一本色道久久综合亚洲精品按摩| 亚洲男人影院| 在线精品福利| 欧美日韩成人一区二区| 亚洲一区二区三区四区在线观看| 亚洲自拍偷拍福利| 悠悠资源网亚洲青| 欧美日韩精品在线| 久久成人免费视频| 欧美大片第1页| 亚洲天堂av图片| 精品88久久久久88久久久| 欧美精品日韩| 久久久久88色偷偷免费| 亚洲社区在线观看| 亚洲福利小视频| 久久先锋资源| 午夜精品久久| 亚洲天天影视| 一本一本a久久| 日韩亚洲在线观看| 亚洲人成在线免费观看| 中日韩高清电影网| 老司机一区二区三区| 欧美一区二区三区男人的天堂 | 另类欧美日韩国产在线| 亚洲欧美综合国产精品一区| 理论片一区二区在线| 久久精品首页| 久久久久久久性| 久久尤物视频| 欧美日韩成人精品|