• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>

            一直想體驗下動態(tài)消息隊列的特性,我一看見“ACE目前提供兩種動態(tài)消息隊列:基于最終期限(deadline)的和基于松弛度(laxity)的(參見[IX])動態(tài)消息隊列”這句話,無限的好奇立即被激起,整了好久,結(jié)果我郁悶地發(fā)現(xiàn)不管我怎么設(shè)置,一點動態(tài)的特性都沒有看見,和靜態(tài)消息隊列沒有任何不同。調(diào)試跟進去才發(fā)現(xiàn),原來在Windows和Linux的config文件中都沒有定義"ACE_HAS_TIMED_MESSAGE_BLOCKS"這個宏,所以msg_deadline_time和msg_execution_time都不起任何作用。懶得去重編ACE庫了,就學(xué)習(xí)靜態(tài)隊列的使用吧 ^_^

                    ACE_Message_Queue_Factory這個工廠提供三個靜態(tài)函數(shù)分別用來創(chuàng)建靜態(tài)消息隊列和兩種類型的動態(tài)消息隊列。靜態(tài)消息隊列的消息也支持優(yōu)先級,但是消息的優(yōu)先級是靜態(tài)的,不需要通過動態(tài)計算而來。消息隊列有兩個術(shù)語讓我困惑很久。水位用來控制消息隊列中數(shù)據(jù)的大小,高水位(high_water_mark)用于控制消息隊列的上限,它用于控制生產(chǎn)者往里面放數(shù)據(jù)的量,如果消息隊列中數(shù)據(jù)量已經(jīng)達到高水位,而用使用了鎖,既使用“ACE_Message_Queue_Factory<ACE_MT_SYNCH>::create_static_message_queue();”創(chuàng)建消息隊列,那么生產(chǎn)者將被阻塞。高水位很容易理解,但是低水位是用來做什么的呢?我本來地以為所謂的低水位嘛,肯定是用來控制消費者的,比如低水位為20,那么當(dāng)數(shù)據(jù)量少于20時,消費者將被阻塞,直到生產(chǎn)者往隊列中放入數(shù)據(jù),數(shù)據(jù)量大于20時喚醒消費者即可,消費者此時可以將低水位調(diào)低比如到0,一次將這20個單位的數(shù)據(jù)消費掉,然后再次將低水位調(diào)至20,下一次循環(huán)將自己阻塞,如此這般就可以很好地避免“傻窗口綜合癥”( silly window syndrome )。帶著這個理解去測試靜態(tài)隊列時讓我郁悶了好一陣子,看了一陣子源代碼之后才發(fā)現(xiàn)事實并不是這么回事。只要消息隊列中還有數(shù)據(jù)消費者就不會被阻塞的,而當(dāng)數(shù)據(jù)量超過高水位時,生產(chǎn)者會被阻塞,既然會被阻塞,那么它肯定需要被喚醒,那么什么時候由誰來喚醒生產(chǎn)者呢?這就是低水位的作用,消費者一直消費數(shù)據(jù),當(dāng)數(shù)據(jù)低于低水位時它就喚醒生產(chǎn)者。我感覺這似乎并不能解決傻窗口綜合癥,可能我是誤解了傻窗口綜合癥吧。

                    下面的代碼很好的展示了靜態(tài)消息隊列的使用。

            #include "ace/Message_Queue.h"
            #include "ace/Get_Opt.h"
            #include "ace/OS.h"
            #include  <ace/Thread_Manager.h>
            #include <ace/Synch.h>

            //消息隊列指針
            ACE_Message_Queue<ACE_MT_SYNCH>* mq;

            const char S1[] = "C++";
            const char S2[] = "Java";
            const char S3[] = "PHP";
            const char S4[] = "C#";

            //四個消息指針
            ACE_Message_Block* mb1, * mb2, * mb3, * mb4;

            //生產(chǎn)者
            static void* produce(void *arg)
            {
             static int loop = 1;

                while(true)
                {
              ACE_OS::sleep(2);

              ACE_DEBUG((LM_DEBUG, "(%P : %t) producer...\n"));

              while(true)
              {
               if(loop == 1)
               {
                //將高水位設(shè)置為10, S1+S2的長度為3+4+2=9<10,因此可以將S3放進去
                //但是再放入S4時生產(chǎn)者將會被阻塞
                //需要注意的是水位的大小并不是消息的個數(shù),而是消息隊列中消息里面的數(shù)據(jù)量之和
                //如果也能以消息的個數(shù)作為高低水位的值就好了
                mq->high_water_mark(10);

                mq->enqueue_prio (mb1);
                mq->enqueue_prio (mb2);
                mq->enqueue_prio (mb3);

                ACE_DEBUG((LM_DEBUG, "(%P : %t) producer will pending!!\n"));

                //因為消費者在睡眠6秒之后才會調(diào)用deactivate,因此生產(chǎn)者會在這兒阻塞幾秒鐘
                //可以不斷地將msg_bytes打印出來觀察觀察
                int ret = mq->enqueue_prio (mb4);

                ACE_DEBUG((LM_DEBUG, "(%P : %t) producer waken up by  deactivate, ret = %d!!\n", ret));

                ++loop;
               }

               if(loop == 2)
               {
                ACE_OS::sleep(6);

                //將低水位設(shè)置為5,因為高水位仍然為10,當(dāng)前的數(shù)據(jù)量又超過了10,
                //所以下面的入隊操作仍會將生產(chǎn)者阻塞
                //這樣消費者消費消息,當(dāng)數(shù)據(jù)量小于5時,將喚醒生產(chǎn)者
                //生產(chǎn)者在此處等待被消費者喚醒
                mq->low_water_mark(5);

                ACE_DEBUG((LM_DEBUG, "(%P : %t) producer will pending again!!\n"));

                mq->enqueue_prio (mb4);

                ACE_DEBUG((LM_DEBUG, "(%P : %t) producer waken up by consumer!!\n"));

                ++loop;
               }
              }
                }

                return NULL;
            }

            //消費者
            void* consume(void *arg)
            {
             static int loop = 1;

                while(true)
                {
              ACE_OS::sleep(2);

              ACE_DEBUG((LM_DEBUG, "(%P : %t) consumer...\n"));

              if(loop == 1)
              {
               //等待6秒,此時生產(chǎn)者和消費者都將被阻塞
               ACE_OS::sleep(6);

               //deactivate會喚醒所有的線程,將消息隊列設(shè)置為不可用
               //以后所存取操作都會返回-1
               //這個操作會喚醒生產(chǎn)者
               mq->deactivate();

               ++loop;
              }

              if(loop == 2)
              {
               ACE_OS::sleep(2);

               //將消息隊列的狀態(tài)設(shè)置成ACTIVATED
               //消息又可以使用了
               mq->activate();

               ++loop;
              }

              if(loop == 3)
              {
               ACE_OS::sleep(10);

               //消費兩個消息之后,數(shù)據(jù)量就小于5了,低于低水位將喚醒生產(chǎn)者
               ACE_Message_Block *mb;
               mq->dequeue_head (mb);
               mq->dequeue_head (mb);

               ACE_DEBUG((LM_DEBUG, "(%P : %t) consumer wake up producer!!\n"));

               ++loop;
              }
                }

                return NULL;
            }

            int main(int argc, char* argv[])
            {
             mq = ACE_Message_Queue_Factory<ACE_MT_SYNCH>::create_static_message_queue();

             int priority;

             //使用隨機數(shù)作為消息的優(yōu)先級
             //數(shù)字越高,優(yōu)先級越高
             priority = ACE_OS::rand() % 100;
             mb1 = new ACE_Message_Block(S1, sizeof S1, priority);

             priority = ACE_OS::rand() % 100;
             mb2 = new ACE_Message_Block(S2, sizeof S2, priority);

             priority = ACE_OS::rand() % 100;
             mb3 = new ACE_Message_Block(S3, sizeof S3, priority);

             priority = ACE_OS::rand() % 100;
             mb4 = new ACE_Message_Block(S4, sizeof S4, priority);

             //將消息壓入隊列中,enqueue_prio根據(jù)消息的優(yōu)先級將消息放到適當(dāng)?shù)奈恢蒙?br /> //enqueue_head只是簡單地將數(shù)據(jù)存入隊列中,而不考慮消息的優(yōu)先級
             //使用enqueue_prio壓入消息后,可以簡單通過dequeue_head和dequeue_tail
             //分別按優(yōu)先級從高到低和從低到高取消息
             //如果使用enqueue_head和enqueue_tail壓入消息
             //則需要通過dequeue_prio來按照消息的優(yōu)先級依次將消息出隊列
             //沒有必要既使用enqueue_prio壓入消息,又實用dequeue_prio來取消息
             mq->enqueue_prio (mb1);
             mq->enqueue_prio (mb2);
             mq->enqueue_prio (mb3);
             mq->enqueue_prio (mb4);

             //輸出靜態(tài)消息隊列的相關(guān)信息
             //高低水位默認(rèn)值均為16384
             ACE_DEBUG((LM_DEBUG, "count : %d, bytes : %d, length : %d, high_water_mark : %d, low_water_mark : %d, status : %d\n",
              mq->message_count(), mq->message_bytes(), mq->message_length(),
              mq->high_water_mark(), mq->low_water_mark(),
              mq->state()));

             ACE_Message_Block *mb;

             //使用next遍歷消息,遍歷的順序為高優(yōu)先級到底優(yōu)先級
             ACE_DEBUG((LM_DEBUG, "===========next=============\n"));
             //peek一下,并不彈出消息,類似Windows的PeekMessage
             mq->peek_dequeue_head(mb);
             do 
             {
              ACE_DEBUG((LM_DEBUG, "message: %s, priority: %d\n", mb->rd_ptr(), mb->msg_priority()));
             }while(mb = mb->next());

             //使用迭代器遍歷消息隊列,遍歷的順序為高優(yōu)先級到底優(yōu)先級
             ACE_DEBUG((LM_DEBUG, "=========iterator=============\n"));
             ACE_Message_Queue<ACE_MT_SYNCH>::ITERATOR iterator (*mq);

                for (ACE_Message_Block *entry = 0;
                     iterator.next (entry) != 0;
                     iterator.advance ())
             {
              ACE_DEBUG((LM_DEBUG, "message: %s, priority: %d\n", entry->rd_ptr(), entry->msg_priority()));
             }


             ACE_DEBUG((LM_DEBUG, "============dequeue_head==========\n"));
             while(mq->dequeue_head (mb) != -1)
             {
              ACE_DEBUG((LM_DEBUG, "message: %s, priority: %d\n", mb->rd_ptr(), mb->msg_priority()));

              //這里如果不判斷的話,消息隊列空時會導(dǎo)致主線程被阻塞
              if(mq->is_empty())
               break;
             }

             ACE_DEBUG((LM_DEBUG, "\n\n"));

             //////////////////////////////////測試高低水位和隊列的state使用,進行測試之前mq隊列已空///////////////////////////
             //產(chǎn)生一個生產(chǎn)者線程
             ACE_Thread_Manager::instance()->spawn_n
                     (
                      1,
                      (ACE_THR_FUNC) produce
                );

             ////產(chǎn)生兩個消費者線程
             ACE_Thread_Manager::instance()->spawn_n
                     (
                      1,
                      (ACE_THR_FUNC) consume
                );

             //掛起主線程
             ACE_Thread_Manager::instance()->wait();
             
             return 0;
            }


            轉(zhuǎn)自http://iknow.4ucode.com/Study/Topic/895400
            posted on 2012-12-26 13:20 arrow8209 閱讀(1871) 評論(0)  編輯 收藏 引用

            只有注冊用戶登錄后才能發(fā)表評論。
            網(wǎng)站導(dǎo)航: 博客園   IT新聞   BlogJava   博問   Chat2DB   管理


            統(tǒng)計

            97久久精品人人做人人爽| 思思久久99热免费精品6| 色综合久久中文字幕无码| 日产精品久久久久久久性色| 国内精品久久久久伊人av| 成人精品一区二区久久| 亚洲а∨天堂久久精品9966| 久久水蜜桃亚洲av无码精品麻豆 | 精品国产一区二区三区久久| 久久综合欧美成人| 怡红院日本一道日本久久 | 少妇被又大又粗又爽毛片久久黑人| 久久人人添人人爽添人人片牛牛 | 精品伊人久久大线蕉色首页| 精品九九久久国内精品| 久久人人添人人爽添人人片牛牛| 国产精品久久波多野结衣| 久久涩综合| 亚洲一本综合久久| 久久人人妻人人爽人人爽| 无码国内精品久久人妻麻豆按摩| 久久精品亚洲日本波多野结衣| 亚洲国产成人久久综合碰| 91精品国产高清久久久久久91| 性色欲网站人妻丰满中文久久不卡| 久久国产视频99电影| 久久香蕉一级毛片| 久久久久久久久无码精品亚洲日韩| 人妻中文久久久久| 99久久国产综合精品成人影院| 成人国内精品久久久久一区| 欧洲人妻丰满av无码久久不卡| 日本久久中文字幕| 国产精品综合久久第一页| 7777久久亚洲中文字幕| 久久天堂AV综合合色蜜桃网| 久久SE精品一区二区| 久久久久99这里有精品10 | 久久久久亚洲AV无码专区网站| 久久美女人爽女人爽| 日本三级久久网|