第四節:事件對象(Event Objects)
本節介紹如下內容:
1. 同步與異步
2. 為何需要同步
3. 什么是事件對象(Event Object)
4. 事件對象類接口定義
5. 示例程序
6. 事件對象類的UNIX和Windows實現
1. 同步(Synchronization)與異步(Asynchronization)
首先對同步與異步的概念做一個簡單的說明。
當程序1調用程序2時,程序1停下不動,直到程序2完成回到程序1來,程序1才繼續去, 這就是所謂的同步。如果程序1調用程序2后,徑自繼續自己的下一個動作,那么兩者之就是所謂的異步。
舉個例子,在WIN32
API中,SendMessage()就是同步行為,而PostMessage()就是異步行 為。在Windows系統中,PostMessage()是把消息放到對方的消息隊列中,然后回到原調用 點繼續執行,所以這就是異步(asynchronous)行為。而SendMessage()根本就像是“直 接調用窗口的窗口函數”,直到該窗口函數結束,然后才回到原調用點,所以它是同步( synchronous)行為。
2. 為何需要同步
撰寫多線程程序的一個最具挑戰性的問題就是:如何讓一個線程和另一個線程合作。除非 你讓他們同心協力,否則必然會出現如第三節所說的競爭條件(race condition)和數據 被破壞(data
corruption)的情況。
當多個線程共享同一內存區域的時候,我們需要確保每一個線程所看到的數據的一致性。假如對于每一個線程所使用的變量來說,其它任何線程都不會讀取或使用該 變量,那么根 本不存在數據一致性的問題。同樣地,對于一個有著只讀屬性的變量來說,多個線程同時 讀取它的值的話,也不會有數據一致性的問題存在。然而,當一個線程可以修改一個變量 ,同時其它線程也能夠讀取或修改該變量的話,我們就需要同步這些線程,以確保每一個 線程在訪問該變量的內存內容時所用到的值是有效的。
舉個例子,假設有一塊未初始化的內存塊和兩個線程,一個讀線程,一個寫線程。我們應 該保證讀線程在讀取該內存塊時,它已經被寫線程初始化好了,否則讀線程只能讀到一塊 未初始化完成的無效數據。這就需要用到線程的同步機制(synchronous
mechanism)。 線程間的協調工作是由同步機制來完成的。同步機制相當于線程之間的紅綠燈。程序員可 以設計讓一組線程使用同一個紅綠燈系統。這個紅綠燈系統負責給某個線程綠燈而給其他 線程紅燈。這一組紅綠燈系統必須確保每一個線程都有機會獲得綠燈。 有好多種同步機制可以運用。使用哪一種完全視欲解決的問題而定。這些同步機制常常以 各種方式組合在一起,以產生出更精密的機制。
3. 什么是事件對象(Event Object)
事件對象(Event
Object)是一種最具彈性的同步機制,它的唯一目的就是成為激發(
Signaled)狀態或未激發(Unsignaled)狀態。這兩種狀態完全由程序控制。
我們通過上面介紹的讀寫線程的例子來說明事件對象的激發狀態和未激發狀態的含義。讀 線程和寫線程擁有同一個事件對象。該事件對象的初始狀態為非激發狀態。當讀線程需要 讀共享的那塊內存時,它需要判斷該事件對象的狀態。如果該事件對象處于非激發狀態,
則讀線程等待,直到該事件對象處于激發狀態為止。寫線程會在那塊共享的內存被初始化 好之后將該事件對象的狀態設為激發狀態。這時讀線程得知了該事件對象的狀態已經由非 激發狀態變為激發狀態,于是它開始讀取那塊共享的內存,并執行后續的操作。 事件對象之所以有大用途,正是因為它們的狀態完全在程序員的掌控之下。因此,程序員 可以精確的告訴一個事件對象做什么事,以及什么時候去做。
事件對象可以分為自動重置的事件對象(Automatic-Reset
Event Object)和手動重置的 事件對象(Manual-Reset Event Object)。自動重置的事件對象會在事件對象變成激發 狀態(因而喚醒一個線程)之后,自動重置為非激發狀態。而手動重置的事件對象,不會 自動重置,必須靠程序操作才能將激發狀態的事件對象重置為非激發狀態。 事件對象所能完成的一切功能都可以通過互斥來完成。下面我們通過比較使用事件對象來 實現讀寫線程的例子和使用互斥來實現讀寫線程的例子,以說明事件對象的作用和它存在 的必要性。
例一:使用事件對象來實現讀寫線程
void threadRead(事件對象類型 *事件對象)
{
阻塞事件對象;
讀取共享內存的數據;
}
void threadWrite(事件對象類型 *事件對象)
{
將適當的數據寫入共享內存;
激發事件對象;
}
例二:使用互斥來實現讀寫線程
bool globalIsWritten = false;
void threadRead(通行證類型 *通行證)
{
獲取通行證;
while (!globalIsWritten)
{
歸還通行證;
sleep(sometime);
獲取通行證;
}
歸還通行證;
讀取共享內存的數據;
}
void threadWrite(通行證類型 *通行證)
{
將適當的數據寫入共享內存;
獲取通行證;
globalIsWritten = true;
歸還通行證;
}
很明顯,使用事件對象來實現讀寫線程的代碼要比使用互斥來實現讀寫線程的代碼優雅許 多。使用事件對象來實現讀寫線程的代碼顯得更加干凈整潔,而且可讀性更高。使用互斥 來實現讀寫線程時,在讀線程中,需要輪詢地互斥訪問讀寫線程間的共享變量
globalIsWritten,因此其效率一定不如使用事件對象來實現讀寫線程的效率高。我將后 面的“手動重置的事件對象”的示例程序改為完全使用互斥來實現后,發現其運行時間是 使用事件對象來實現的1.21倍。這個測試結果和我們的預期相一致。 因此,對于類似于讀寫線程這樣的例子,事件對象相對于互斥提供了更加優雅和高效的解 決方案。
4.
事件對象類接口定義
文件event.h
#ifndef __EVENT_H__
#define __EVENT_H__
#include <windows.h>
class Event
{
public:
Event(bool bManualUnsignal, bool bSignaled);
virtual ~Event();
virtual bool block();
virtual bool signal();
virtual bool unsignal();
private:
// 依賴于具體實現,后面再說。
};
#endif
其中,
Event::Event(bool bManualUnsignal, bool bSignaled),事件對象類的構造函數。
bManualUnsignal用于指定事件對象的類型。如果其值為true,則該事件對象是手動重置 的事件對象;如果其值為false,則該事件對象是自動重置的事件對象。bSignaled用于指 定事件對象的初始狀態。如果其值為true,則該事件對象的初始狀態為激發狀態;如果其 值為false,則該事件對象的初始狀態為非激發狀態。
Event::~Event(),事件對象類的析構函數。用于摧毀事件對象。
Event::block(),根據事件對象的狀態,對擁有該事件對象的線程進行控制。如果事件對 象處于非激發狀態,則擁有該事件對象的線程開始等待,直到該事件對象的狀態變為激發 狀態。如果事件對象處于激發狀態或者當事件對象的狀態由非激發狀態變為激發狀態的時 候,首先判斷該事件對象是那種類型的,如果該事件對象是自動重置的,那么需要將該事 件對象的狀態設為非激發狀態,然后喚醒等待該事件對象的線程。
Event::signal(),將事件對象的狀態設為激發狀態。如果事件對象是手動重置的事件對 象,那么該事件對象會一直保持激發狀態,直到Event::unsignal()被調用,該事件對象 才會由激發狀態變為非激發狀態。在手動設置的事件對象保持激發狀態的時候,所有等待 該事件對象的線程都將被喚醒。如果事件對象是自動重置的事件對象,那么該事件對象會 一直保持激發狀態,直到一個等待該事件對象的線程被喚醒,這時該事件對象會由激發狀 態變為非激發狀態(由Event::block()來完成)。
Event::unsignal(),將事件對象的狀態設為非激發狀態。該方法主要用于手動重置的事 件對象,它必須顯式地調用該方法以使得自己的狀態變為非激發狀態。而對于自動重置的 事件對象來說,當一個等待線程被喚醒時,它會自動地將自己的狀態由激發狀態變為非激 發狀態。
在Windows操作系統中,還有一種對事件對象的操作,叫做PulseEvent()。在我們的事件 對象模型中并沒有引入該接口,因為PulseEvent()是一個不穩定的操作。Windows只是為 了向后兼容才保留了PulseEvent()。
下面對PulseEvent()函數做一個簡單的介紹,并且說明為什么該操作不穩定。
如果一個事件對象是手動重置的,那么對該事件對象進行PulseEvent()操作后,該事件對 象會被設為激發狀態,所有的等待該事件對象的線程都會被喚醒,之后該事件對象恢復為 非激發狀態。如果一個事件對象是自動重置的,那么對該事件對象進行PulseEvent()操作 后,該事件對象會被設為激發狀態,一個等待該事件對象的線程會被喚醒,之后該事件對 象恢復為非激發狀態。
注意,如果沒有任何線程在等待事件對象(不管是手動重置的還是自動重置的),或者沒 有任何線程可以立即被喚醒的話,對該事件對象進行PulseEvent()操作后,唯一的結果是 該事件對象的狀態被設置為非激發狀態。在這種情況下,這個事件對象會被遺失。這時,可能會引起死鎖。
舉個例子,假設一個程序由兩個線程(線程A和線程B)組成。線程A累加一個計數器,
后調用Event::block()等待一個事件對象。如果在這兩個操作之間發生了上下文切換(
context switch),線程B開始執行,它檢查計數器內容然后對著同一個事件對象進行
PulseEvent()操作。這時候這個要求蘇醒的請求會被遺失掉。而線程A會因為它等待的事 件對象永遠不會被設置為激發狀態而永遠等待下去,程序進入死鎖狀態。這時,線程A被 稱作饑餓線程。
因此,PulseEvent()是一個不穩定的操作,在我們的事件對象模型中將不包括該操作。
5.
示例程序
自動重置的事件對象
文件common.h
#ifndef __COMMON_H__
#define __COMMON_H__
struct Param
{
long threadID;
int *count;
};
const int TCOUNT = 10;
const int COUNT_LIMIT = 12;
#endif
文件watchcount.h
#ifndef __WATCH_COUNT_H__
#define __WATCH_COUNT_H__
#include "thread.h"
class Event;
class Mutex;
class WatchCount : public Thread
{
public:
WatchCount(Event& e, Mutex& m);
protected:
void* run(void *param);
private:
Event& event;
Mutex& mutex;
};
#endif
文件watchcount.cpp
#include "watchcount.h"
#include "common.h"
#include "mutex.h"
#include "event.h"
#include <iostream>
using std::cout;
using std::endl;
WatchCount::WatchCount(Event& e, Mutex& m) : event(e), mutex(m)
{
}
void* WatchCount::run(void *param)
{
Param *prm = static_cast<Param *>(param);
long id = prm->threadID;
int *count = prm->count;
mutex.acquire();
cout << "Starting WatchCount: thread "
<< id
<< "."
<< endl;
cout << "WatchCount: thread "
<< id
<< " going into wait..."
<< endl;
mutex.release();
event.block();
mutex.acquire();
cout << "WatchCount: thread "
<< id
<< " Event signaled."
<< endl;
*count += 125;
cout << "WatchCount: thread "
<< id
<< " count now = "
<< *count
<< "."
<< endl;
mutex.release();
return NULL;
}
文件inccount.h
#ifndef __INC_COUNT_H__
#define __INC_COUNT_H__
#include "thread.h"
class Event;
class Mutex;
class IncCount : public Thread
{
public:
IncCount(Event& e, Mutex& m);
protected:
void* run(void *param);
private:
Event& event;
Mutex& mutex;
};
#endif
文件inccount.cpp
#include "inccount.h"
#include "common.h"
#include "mutex.h"
#include "event.h"
#include <iostream>
using std::cout;
using std::endl;
IncCount::IncCount(Event& e, Mutex& m) : event(e), mutex(m)
{
}
void* IncCount::run(void *param)
{
Param *prm = static_cast<Param *>(param);
long id = prm->threadID;
int *count = prm->count;
for (int i = 0; i < TCOUNT; ++i)
{
mutex.acquire();
++(*count);
/*
* Check the value of count and signal waiting
thread when condition
is
* reached.
*/
if (*count == COUNT_LIMIT)
{
cout << "IntCount: thread
"
<< id
<< ",
count = "
<< *count
<< "
Threshold reached. ";
event.signal();
cout << "Just sent
signal."
<< endl;
}
cout << "IncCount: thread "
<< id
<< ", count = "
<< *count
<< ", unlocking
mutex."
<< endl;
mutex.release();
/* Do some work so threads can alternate on mutex
lock */
sleep(1000);
}
return NULL;
}
文件mainautounsignal.cpp
#include "inccount.h"
#include "watchcount.h"
#include "common.h"
#include "mutex.h"
#include "event.h"
#include <iostream>
using std::cout;
using std::endl;
int main(int argc, char* argv[])
{
Event event(false, false);
Mutex mutex;
int count = 0;
Param prm1 = {1, &count};
Param prm2 = {2, &count};
Param prm3 = {3, &count};
WatchCount wc(event, mutex);
IncCount ic1(event, mutex);
IncCount ic2(event, mutex);
wc.start(&prm1);
ic1.start(&prm2);
ic2.start(&prm3);
/* Wait for all thread to complete */
wc.wait();
ic1.wait();
ic2.wait();
cout << "Main(): Waited on 3 thread. Final value of
count = "
<< count
<< ". Done."
<< endl;
return 0;
}
在此示例程序中,主線程創造了三個線程。其中,兩個線程(IncCount)對一個“count ”變量執行遞增操作,第三個線程(WatchCount)觀察那個“count”變量的值。當“ count”變量達到一個預定義的值(COUNT_LIMIT)時,等待線程(WatchCount)被兩個遞 增線程(IncCount)中的一個喚醒。等待線程(WatchCount)被喚醒后會立即修改“ count”變量的值。兩個遞增線程(IncCount)會一直執行,直到達到TCOUNT為止。最后 ,主線程會打印出“count”變量的最終值。
手動重置的事件對象
文件common.h
#ifndef __COMMON_H__
#define __COMMON_H__
#include <string>
using std::string;
struct Param
{
long threadID;
string *data;
};
#endif
文件readfrombuffer.h
#ifndef __READ_FROM_BUFFER_H__
#define __READ_FROM_BUFFER_H__
#include "thread.h"
class Event;
class Mutex;
class ReadFromBuffer : public Thread
{
public:
ReadFromBuffer(Event& e, Mutex& m);
protected:
void* run(void *param);
private:
Event& event;
Mutex& mutex;
};
#endif
文件readfrombuffer.cpp
#include "readfrombuffer.h"
#include "common.h"
#include "event.h"
#include "mutex.h"
#include <iostream>
using std::cout;
using std::endl;
ReadFromBuffer::ReadFromBuffer(Event& e, Mutex& m) : event(e), mutex(m)
{
}
void* ReadFromBuffer::run(void *param)
{
Param *prm = static_cast<Param *>(param);
long id = prm->threadID;
string *data = prm->data;
mutex.acquire();
cout << "ReadFromBuffer: thread "
<< id
<< " waiting for event
signaled..."
<< endl;
mutex.release();
event.block();
mutex.acquire();
cout << "ReadFromBuffer: thread "
<< id
<< " reading from buffer ("
<< *data
<< ")"
<< endl;
mutex.release();
return NULL;
}
文件writetobuffer.h
#ifndef __WRITE_TO_BUFFER__
#define __WRITE_TO_BUFFER__
#include "thread.h"
class Event;
class Mutex;
class WriteToBuffer : public Thread
{
public:
WriteToBuffer(Event& e, Mutex& m);
protected:
void* run(void *param);
private:
Event& event;
Mutex& mutex;
};
#endif
文件writetobuffer.cpp
#include "writetobuffer.h"
#include "common.h"
#include "event.h"
#include "mutex.h"
#include <iostream>
using std::cout;
using std::endl;
WriteToBuffer::WriteToBuffer(Event& e, Mutex& m) : event(e), mutex(m)
{
}
void* WriteToBuffer::run(void *param)
{
Param *prm = static_cast<Param *>(param);
long id = prm->threadID;
string *data = prm->data;
*data = "Hello World!";
mutex.acquire();
cout << "WriteToBuffer: thread "
<< id
<< " writing to the shared
buffer..."
<< endl;
mutex.release();
event.signal();
return NULL;
}
文件mainmanualunsignal.cpp
#include "writetobuffer.h"
#include "readfrombuffer.h"
#include "common.h"
#include "event.h"
#include "mutex.h"
#include <iostream>
using std::cout;
using std::endl;
int main(int argc, char* argv[])
{
Event event(true, false);
Mutex mutex;
string data;
Param prm1 = {1, &data};
Param prm2 = {2, &data};
Param prm3 = {3, &data};
Param prm4 = {4, &data};
Param prm5 = {5, &data};
ReadFromBuffer read1(event, mutex);
ReadFromBuffer read2(event, mutex);
ReadFromBuffer read3(event, mutex);
ReadFromBuffer read4(event, mutex);
WriteToBuffer write(event, mutex);
read1.start(&prm1);
read2.start(&prm2);
read3.start(&prm3);
read4.start(&prm4);
write.start(&prm5);
mutex.acquire();
cout << "Main thread waiting for threads to
exit..."
<< endl;
mutex.release();
read1.wait();
read2.wait();
read3.wait();
read4.wait();
write.wait();
cout << "All threads ended, cleaning up for
application exit..."
<< endl;
return 0;
}
在此示例程序中,主線程創造了五個線程。其中,四個線程(ReadFromBuffer)讀取“ data”變量的內容,第五個線程(WriteToBuffer)初始化“data”變量。四個讀線程(
ReadFromBuffer)會在寫線程(WriteToBuffer)完成對“data”變量的初始化之前一直 保持等待狀態。當寫線程(WriteToBuffer)將“data”變量初始化好之后,四個讀線程 (ReadFromBuffer)才會被一一喚醒。最后,主線程會在這四個讀線程(ReadFromBuffer
)和一個寫線程(WriteToBuffer)都執行完成后退出,從而結束整個程序。
6.
事件對象類的UNIX和Windows實現
UNIX實現
文件event.h
#ifndef __EVENT_H__
#define __EVENT_H__
#include <pthread.h>
class Event
{
public:
Event(bool bManualUnsignal, bool bSignaled);
virtual ~Event();
virtual bool block();
virtual bool signal();
virtual bool unsignal();
private:
const bool bManUnsig;
pthread_cond_t cv;
pthread_mutex_t mutex;
bool bSig;
};
#endif
文件event.cpp
#include "event.h"
Event::Event(bool bManualUnsignal, bool bSignaled) : bManUnsig(bManualUnsignal
), bSig(bSignaled)
{
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&cv, NULL);
}
Event::~Event()
{
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cv);
}
bool Event::block()
{
int ret = 0;
ret += pthread_mutex_lock(&mutex);
if (bSig)
{
if (!bManUnsig)
{
bSig = false;
}
}
else
{
pthread_cond_wait(&cv, &mutex);
if (!bManUnsig)
{
bSig = false;
}
}
ret += pthread_mutex_unlock(&mutex);
return ret == 0;
}
bool Event::signal()
{
int ret = 0;
ret += pthread_mutex_lock(&mutex);
if (!bSig)
{
if (bManUnsig)
{
ret +=
pthread_cond_broadcast(&cv);
}
else
{
ret += pthread_cond_signal(&cv);
}
bSig = true;
}
ret += pthread_mutex_unlock(&mutex);
return ret == 0;
}
bool Event::unsignal()
{
int ret = 0;
ret += pthread_mutex_lock(&mutex);
if (bSig)
{
bSig = false;
}
ret += pthread_mutex_unlock(&mutex);
return ret == 0;
}
Windows實現
文件event.h
#ifndef __EVENT_H__
#define __EVENT_H__
#include <windows.h>
class Event
{
public:
Event(bool bManualUnsignal, bool bSignaled);
virtual ~Event();
virtual bool block();
virtual bool signal();
virtual bool unsignal();
private:
HANDLE handle;
};
#endif
文件event.cpp
#include "event.h"
Event::Event(bool bManualUnsignal, bool bSignaled)
{
handle = CreateEvent(NULL, bManualUnsignal, bSignaled, NULL);
}
Event::~Event()
{
CloseHandle(handle);
}
bool Event::block()
{
return WaitForSingleObject(handle, INFINITE) == WAIT_OBJECT_0;
}
bool Event::signal()
{
return SetEvent(handle) == TRUE;
}
bool Event::unsignal()
{
return ResetEvent(handle) == TRUE;
}
小結
本節首先介紹了同步與異步的基本概念,進而說明了同步在多線程編程中的作用。
事件對象(Event
Object)是一種最具彈性的同步機制。事件對象在某些條件滿足之前將 一直保持非激發狀態。程序員可以完全控制事件對象的狀態(激發狀態和非激發狀態)。 事件對象使得程序員可以以最大的靈活性來定義復雜的同步對象。有兩種類型的事件對象 (自動重置的事件對象和手動重置的事件對象)。一個手動重置的事件對象需要程序員顯 式地將其狀態從激發狀態返回到非激發狀態。然而一個自動重置的事件對象會在一個
Event::block()操作完成后自動地返回到非激發狀態。 雖然事件對象所能完成的一切功能都可以通過互斥來完成,但是使用事件對象的解決方案 顯得更加優雅,并且效率更高。