Linux中處理來自共享對象的同步事件
怎么利用設(shè)計模式來更有效的使用共享內(nèi)存
級別:中等
Sachin Agrawal (sachin_agrawal@in.ibm.com), Senior Software Engineer, IBM Software Labs, India
Swati P. Udas (swatudas@in.ibm.com), Software Engineer, IBM
10 Nov 2005
在高級語言例如C++中有效的使用共享內(nèi)存并不是一件淺顯易懂的事情,但是它也能克服這些內(nèi)在的困難。這篇文章描述了在Linux上使用共享內(nèi)存的兩個C++設(shè)計模式并包含了樣例代碼,以及給讀者打開了更有效的進程間通信的一扇門。
在面向?qū)ο笙到y(tǒng)中,當一個對象接收到一個消息時它能夠發(fā)送一套事件。這些事件主要在同步模式下被處理。這個調(diào)用進程或者線程在發(fā)送消息調(diào)用完成之前,發(fā)送給對象一個消息和處理事件。然而,如果這個對象送出這些被更多的進程共享以及駐留在內(nèi)存里的事件,情況就稍微的發(fā)生了一些變化。
這篇文章用兩個C++的設(shè)計模式詳細的描述了上述的情況,并且用一些例子程序闡明了解決方案。
- 我們首先描述了沒有使用共享內(nèi)存的例子程序。
- 其次作了一些改動去使用共享內(nèi)存,這里使用的是第一種設(shè)計模式
- 最后,闡述了怎么完成進程間通信,使用的是第二中設(shè)計模式
你能應(yīng)用這些設(shè)計模式中的全部概念悼任何的機器構(gòu)架,操作系統(tǒng)和編譯器上。我們使用的是32位Intel®構(gòu)架的RedHat Linux 7.1發(fā)行版 ,使用GNU C++編譯器的版本是3.2.3。
沒有共享內(nèi)存
讓我們開始一個沒有使用共享內(nèi)存的例子程序:
1
Listing 1. common.h
2
3
#ifndef __COMMON_H__
4
#define __COMMON_H__
5
6
7
class IObjectWithEvents
8

{
9
public:
10
class IEventSink
11
{
12
public:
13
virtual void OnEvent(pid_t pid, const char * msg) = 0;
14
};
15
16
static IObjectWithEvents * getInstance();
17
virtual bool AddEventHandler(IEventSink * pEI) = 0;
18
virtual void SendMessage() = 0;
19
20
};
21
22
#endif //__COMMON_H__
在
I
O
bjectWithEvents
類接口中包含了定義了
OnEvent()
方法的
I
E
ventSink
這個嵌入類,這是一個接受發(fā)送者
pid
和字符串消息的事件處理器。
getInstance()
方法返回一個共享內(nèi)存中的對象的引用,
AddEventHandler()
是注冊一個事件處理器,
SendMessage()
發(fā)送一個消息到對象上,沒有任何共享內(nèi)存的引用,下面的
Listing 2
中列出了
IobjectWithEvents
的程序代碼:
1 Listing 2. shm-client1.cpp
2
3 #include <iostream>
4 #include <sys/types.h>
5 #include <unistd.h>
6 #include "common.h"
7
8 #define HERE __FILE__ << ":" << __LINE__ << " "
9
10 using namespace std;
11
12 class EventSink : public IObjectWithEvents::IEventSink
13 {
14 public:
15 void OnEvent(pid_t pid, const char * msg)
16 {
17 cout << HERE << "Message from pid(" << pid << ")\t : " << msg << endl;
18 }
19 };
20
21
22 int main()
23 {
24 IObjectWithEvents * powe = IObjectWithEvents::getInstance();
25
26 EventSink sink;
27 powe->AddEventHandler(&sink);
28
29 powe->SendMessage();
30 return 0;
31 }
EventSink
類提供了事件處理器的實現(xiàn),在主函數(shù)中顯示了發(fā)送消息和處理事件的標準順序。
Listing3
中列出了
ObjectWithEvents
的典型實現(xiàn)代碼:
1 Listing 3. ObjectWithEvents.h
2
3 #include "common.h"
4
5 class ObjectWithEvents : public IObjectWithEvents
6 {
7 public:
8 // We assume singleton design pattern for illustration
9 static ObjectWithEvents * ms_pObjectWithEvents;
10
11 ObjectWithEvents();
12
13 //the implementation for IObjectWithEvents
14 void FireEvent();
15 virtual bool AddEventHandler(IEventSink * pEI);
16 virtual void SendMessage();
17
18 //Collection for maintaining events
19 enum { MAX_EVENT_HANDLERS = 16, };
20 long m_npEI;
21 IEventSink * m_apEI[MAX_EVENT_HANDLERS];
22 pid_t m_alPID[MAX_EVENT_HANDLERS];
23 };
24
25
26 Listing 4. ObjectWithEvents.cpp
27
28 #include <iostream>
29 #include <sys/types.h>
30 #include <sys/shm.h>
31 #include <unistd.h>
32 #include <pthread.h>
33 #include "ObjectWithEvents.h"
34
35 using namespace std;
36
37 ObjectWithEvents * ObjectWithEvents::ms_pObjectWithEvents = NULL;
38
39 IObjectWithEvents * IObjectWithEvents::getInstance()
40 {
41 // the following commented code is for illustration only.
42
43 /*
44 if (NULL == ObjectWithEvents::ms_pObjectWithEvents)
45 {
46 ObjectWithEvents::ms_pObjectWithEvents = new ObjectWithEvents();
47 }
48 */
49
50 return ObjectWithEvents::ms_pObjectWithEvents;
51 }
52
53 ObjectWithEvents::ObjectWithEvents() : m_npEI(0)
54 {
55
56 }
57
58
59 void ObjectWithEvents::FireEvent()
60 {
61 // iterate through the collection
62 for (long i = 0; i < m_npEI; i++)
63 {
64 //Recheck for NULL
65 if (0 != m_apEI[i])
66 {
67 // Fire the event
68 m_apEI[i]->OnEvent(m_alPID[i], "");
69 }
70 }
71
72 return;
73 }
74
75 bool ObjectWithEvents::AddEventHandler(IEventSink * pEI)
76 {
77 // NULL check
78 if (NULL == pEI)
79 {
80 return false;
81 }
82
83 // check if there is space for this event handler
84 if (MAX_EVENT_HANDLERS == m_npEI)
85 {
86 return false;
87 }
88
89 // Add this event handler to the collection
90 m_alPID[m_npEI] = getpid();
91 m_apEI[m_npEI++] = pEI;
92
93 return true;
94 }
95
96 void ObjectWithEvents::SendMessage()
97 {
98 //Some processing
99 //And then fire the event
100
101 FireEvent();
102
103 return;
104 }
你能使用下面的命令行來編譯這些例子程序:
g++ -g -o shm_client shm_client1.cpp ObjectWithEvents.cpp
當你運行shm_client時,將得到下面的輸出:
$ ./shm_client shm_client1.cpp:16 Message from pid(3920)
:
使用共享內(nèi)存:沒有事件緩存
現(xiàn)在,對于在共享內(nèi)存中實例ObjectWithEvents的實現(xiàn)作了以下的修改。
1 Listing 5. Changes to ObjectWithEvents.cpp
2
3
4 // To add a declaration for the "new" operator:
5 class ObjectWithEvents : public IObjectWithEvents{
6 public: void * operator new(unsigned int);
7 };
8
9 // To include an additional header for the Initializer class:
10
11 #include "Initializer.h"
12
13
14 // To overload the operator "new":
15
16 void * ObjectWithEvents::operator new(unsigned int)
17 {
18 return ms_pObjectWithEvents;
19 }
20
21
22 // Then, FireEvent is completely changed:
23
24
25 void ObjectWithEvents::FireEvent()
26 {
27 // We need to serialize all access to the collection by more than one process
28 int iRetVal = Initializer::LockMutex();
29
30 if (0 != iRetVal)
31 {
32 return;
33 }
34
35 pid_t pid = getpid();
36
37 // iterate through the collection and fire only events belonging to the current process
38 for (long i = 0; i < m_npEI; i++)
39 {
40 // Check whether the handler belongs to the current process.
41 if (pid != m_alPID[i])
42 {
43 continue;
44 }
45
46 //Recheck for NULL
47 if (0 != m_apEI[i])
48 {
49 m_apEI[i]->OnEvent(pid, "");
50 }
51 }
52
53 // release the mutex
54 if ((0 == iRetVal) && (0 != Initializer::UnlockMutex()))
55 {
56 // Deal with error.
57 }
58
59 return;
60 }
61
62
63 // The following are changes to ObjectWithEvents::AddEventHandler():
64
65 // 1. Before accessing the collection, we lock the mutex:
66
67 int bRetVal = Initializer::LockMutex();
68
69 if (0 != bRetVal)
70 {
71 return false;
72 }
73
74
75 // 2. After accessing the collection, we release the mutex:
76
77 if ((0 == bRetVal) && (0 != Initializer::UnlockMutex()))
78 {
79 // Deal with error.
80 }
在共享內(nèi)存中的示例化對象,定義了一個叫做Initializer
的額外的類
1 Listing 6. Initializer.h
2
3 #ifndef __Initializer_H__
4 #define __Initializer_H__
5
6 class Initializer
7 {
8 public :
9 int m_shmid;
10 static Initializer ms_Initializer;
11 Initializer();
12
13 static pthread_mutex_t ms_mutex;
14 static int LockMutex();
15 static int UnlockMutex();
16 };
17
18 #endif // __Initializer_H__
Initializer
定義了共享內(nèi)存
id
的變量
m_shmid
和對于同步事件處理器的一個信號量變量
ms_mutex.
LockMutex()
鎖定互斥體,
UnlockMutex()
則解鎖互斥體。
Listing7
列出了
Initializer
的實現(xiàn)代碼:
1 Listing 7. Initializer.cpp
2
3 #include <iostream>
4 #include <sys/types.h>
5 #include <sys/shm.h>
6 #include <unistd.h>
7 #include <pthread.h>
8 #include "Initializer.h"
9 #include "ObjectWithEvents.h"
10
11 using namespace std;
12
13 Initializer Initializer::ms_Initializer;
14
15 pthread_mutex_t Initializer::ms_mutex = PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP;
16
17 Initializer::Initializer() : m_shmid(-1)
18 {
19 bool bCreated = false;
20 key_t key = 0x1234;
21
22 m_shmid = shmget(key,sizeof(ObjectWithEvents), 0666);
23
24 if (-1 == m_shmid)
25 {
26 if(ENOENT != errno)
27 {
28 cerr<<"Critical Error"<<endl;
29 return;
30 }
31
32 m_shmid = shmget(key, sizeof(ObjectWithEvents), IPC_CREAT|0666);
33
34 if (-1 == m_shmid )
35 {
36 cout << " Critical Error " << errno<< endl;
37 return;
38 }
39
40 bCreated = true;
41 }
42
43 ObjectWithEvents::ms_pObjectWithEvents = (ObjectWithEvents*)shmat(m_shmid,NULL,0);
44
45 if (NULL == ObjectWithEvents::ms_pObjectWithEvents)
46 {
47 cout << " Critical Error " << errno << endl;
48 return;
49 }
50
51 if (true == bCreated)
52 {
53 ObjectWithEvents * p = new ObjectWithEvents();
54 }
55
56 // Create a mutex with no initial owner.
57
58 pthread_mutex_init(&ms_mutex, NULL);
59 }
60
61
62
63 int Initializer::LockMutex()
64 {
65 // Request ownership of mutex.
66
67 pthread_mutex_lock(&ms_mutex);
68
69 if(EDEADLK == errno)
70 {
71 cout << "DeadLock" << endl;
72 return -1;
73 }
74
75 return 0;
76 }
77
78 int Initializer::UnlockMutex()
79 {
80 return pthread_mutex_unlock(&ms_mutex);
81 }
如果共享內(nèi)存不存在的話則創(chuàng)建它,并在共享內(nèi)存里做成共享對象。如果共享內(nèi)存已經(jīng)存在的話,則略過構(gòu)造共享對象。Initializer::m_shmid紀錄標識符ObjectWithEvents::ms_pObjectWithEvents
并記錄共享對象的引用。
即使在所有的進程從這個共享內(nèi)存分離(detach)也不釋放共享內(nèi)存。讓你用ipcrm命令顯示的銷毀它并能用ipcs命令快速察看共享內(nèi)存的信息。用下面的命令編譯成可執(zhí)行程序:
g++ -g -o shm_client shm_client1.cpp ObjectWithEvents.cpp Initializer.cpp
控制臺上會有下面那樣的輸出信息:
Listing 8. The console dump
$ ./shm_client
shm_client1.cpp:16 Message from pid(4332) :
$ ipcs
------ Shared Memory Segments --------
key shmid owner perms bytes nattch status
0x00001234 327686 sachin 666 136 0
$ ./shm_client
shm_client1.cpp:16 Message from pid(4333) :
$ ipcrm -m 327686
ObjectWithEvents
實例中有一個能從很多進程中收集事件的收集器。它僅僅能發(fā)送當前進程注冊的事件。設(shè)計模式中表明了以下的兩點:
- 任何訪問事件收集器都要被互斥體對象保護
- 在發(fā)送之前事件都要通過進程ID進行過濾。
IPC的共享內(nèi)存和事件緩存
現(xiàn)在,讓我們看看進程間通信的共享內(nèi)存和事件緩存。如果事件被緩存到共享對象里,則他們在稍后會被過濾。接收的進程將根據(jù)事件查詢共享對象。然后,通過一個同步模型,進程間的通信能被接受到,這是開發(fā)下面的設(shè)計模式的主要動機。
在
IobjectWithEvents
中,
像下面那樣增加一對兒方法:
1 Listing 9. Adding methods to IobjectWithEvents
2
3 class IObjectWithEvents
4 {
5 public:
6
7 virtual bool EnqueueEvent(const char * msg) = 0;
8 virtual bool PollForEvents() = 0;
9 };
10
EnqueueEvent()
簡單的增加到共享對象中的事件緩存中,并且PollForEvents()
將接收這個緩存。
shm_client1將像下面那樣被使用EnqueueEvent
()
:
powe->EnqueueEvent("Message from shm_client1");
shm_client2(實質(zhì)上是shm_client1的拷貝)將像下面那樣使用PollForEvents()
:
powe->EnqueueEvent("Message from shm_client2"); powe->PollForEvents();
ObjectWithEvents
的實現(xiàn)代碼是下面那樣:
Listing 10. Additions to ObjectWithEvents
1 class ObjectWithEvents : public IObjectWithEvents
2 {
3 public:
4 virtual bool EnqueueEvent(const char * msg);
5 virtual bool PollForEvents();
6
7 //The event cache
8 enum { MAX_EVENTS = 16, MAX_EVENT_MSG = 256, };
9 long m_nEvents;
10 pid_t m_alPIDEvents[MAX_EVENTS];
11 char m_aaMsgs[MAX_EVENTS][MAX_EVENT_MSG];
12 };
13
14
15
新的構(gòu)造函數(shù)變成:
ObjectWithEvents::ObjectWithEvents() : m_npEI(0), m_nEvents(0) { }
EnqueueEvent()
存儲事件(例如,每一個被發(fā)送的事件的消息和進程號)到一個隊列中,PollForEvents()
則迭代整個隊列并為隊列中的事件一個一個的調(diào)用OnEvent().
Listing 11. EnqueueEvent
1 bool ObjectWithEvents::EnqueueEvent(const char * msg)
2 {
3 if (NULL == msg)
4 {
5 return false;
6 }
7
8 if (MAX_EVENTS == m_nEvents)
9 {
10 //IEventSink collection full
11 return false;
12 }
13
14 int bRetVal = Initializer::LockMutex();
15
16 if (0 != bRetVal)
17 {
18 return false;
19 }
20
21 m_alPIDEvents[m_nEvents] = getpid();
22 strncpy(m_aaMsgs[m_nEvents++], msg, MAX_EVENT_MSG - 1);
23
24 if ((0 == bRetVal) && (0 != Initializer::UnlockMutex()))
25 {
26 // Deal with error.
27 }
28
29 return true;
30 }
31
32
33 bool ObjectWithEvents::PollForEvents()
34 {
35 if (0 == m_nEvents)
36 {
37 return true;
38 }
39
40 int bRetVal = Initializer::LockMutex();
41
42 if (0 != bRetVal)
43 {
44 return false;
45 }
46
47 pid_t pid = getpid();
48
49 for (long i = 0; i < m_npEI; i++)
50 {
51 // Does the handler belongs to current process ?
52
53 if (pid != m_alPID[i])
54 {
55 continue;
56 }
57
58 //Recheck for NULL
59
60 if (0 == m_apEI[i])
61 {
62 continue;
63 }
64
65 for (long j = 0; j < m_nEvents; j++)
66 {
67 m_apEI[i]->OnEvent(m_alPIDEvents[j], m_aaMsgs[j]);
68 }
69 }
70
71 if ((0 == bRetVal) && (0 != Initializer::UnlockMutex()))
72 {
73 // Deal with error.
74 }
75
76 return true;
77 }
現(xiàn)在使用下面的命令變成新命令:
g++ -g -o shm_client1 shm_client1.cpp ObjectWithEvents.cpp Initializer.cpp
g++ -g -o shm_client2 shm_client2.cpp ObjectWithEvents.cpp Initializer.cpp
在你的控制臺上將像下面那樣輸出消息:
Listing 12. Output from shm_client1 and shm_client2
$ ./shm_client1
$ ./ipcs
------ Shared Memory Segments --------
key shmid owner perms bytes nattch status
0x00001234 360454 sachin 666 4300 0
$ ./shm_client2
shm_client2.cpp:16 Message from pid(4454) : Message from shm_client1
shm_client2.cpp:16 Message from pid(4456) : Message from shm_client2
下載
Description
|
Name
|
Size
|
Download method
|
Shared memory sample code
|
sync_code.zip
|
4KB
|
FTP
|