青青草原综合久久大伊人导航_色综合久久天天综合_日日噜噜夜夜狠狠久久丁香五月_热久久这里只有精品

線程池

轉載自http://blog.csdn.net/u010984552/article/details/51887108

為什么需要線程池
目前的大多數網絡服務器,包括Web服務器、Email服務器以及數據庫服務器等都具有一個共同點,就是單位時間內必須處理數目巨大的連接請求,但處理時間卻相對較短。 
傳 統多線程方案中我們采用的服務器模型則是一旦接受到請求之后,即創建一個新的線程,由該線程執行任務。任務執行完畢后,線程退出,這就是是“即時創建,即 時銷毀”的策略。盡管與創建進程相比,創建線程的時間已經大大的縮短,但是如果提交給線程的任務是執行時間較短,而且執行次數極其頻繁,那么服務器將處于 不停的創建線程,銷毀線程的狀態。
我們將傳統方案中的線程執行過程分為三個過程:T1、T2、T3。

  1. T1:線程創建時間
  2. T2:線程執行時間,包括線程的同步等時間
  3.  T3:線程銷毀時間

那么我們可以看出,線程本身的開銷所占的比例為(T1+T3) / (T1+T2+T3)。如果線程執行的時間很短的話,這比開銷可能占到20%-50%左右。如果任務執行時間很頻繁的話,這筆開銷將是不可忽略的。

 
除此之外,線程池能夠減少創建的線程個數。通常線程池所允許的并發線程是有上界的,如果同時需要并發的線程數超過上界,那么一部分線程將會等待。而傳統方案中,如果同時請求數目為2000,那么最壞情況下,系統可能需要產生2000個線程。盡管這不是一個很大的數目,但是也有部分機器可能達不到這種要求。
 
因此線程池的出現正是著眼于減少線程池本身帶來的開銷。線程池采用預創建的技術,在應用程序啟動之后,將立即創建一定數量的線程(N1),放入空閑隊列中。這些線程都是處于阻塞(Suspended)狀態,不消耗CPU,但占用較小的內存空間。當任務到來后,緩沖池選擇一個空閑線程,把任務傳入此線程中運行。當N1個線程都在處理任務后,緩沖池自動創建一定數量的新線程,用于處理更多的任務。在任務執行完畢后線程也不退出,而是繼續保持在池中等待下一次的任務。當系統比較空閑時,大部分線程都一直處于暫停狀態,線程池自動銷毀一部分線程,回收系統資源。
 
基于這種預創建技術,線程池將線程創建和銷毀本身所帶來的開銷分攤到了各個具體的任務上,執行次數越多,每個任務所分擔到的線程本身開銷則越小,不過我們另外可能需要考慮進去線程之間同步所帶來的開銷

構建線程池框架
一般線程池都必須具備下面幾個組成部分:

  • 線程池管理器:用于創建并管理線程池
  • 工作線程: 線程池中實際執行的線程
  • 任務接口: 盡管線程池大多數情況下是用來支持網絡服務器,但是我們將線程執行的任務抽象出來,形成任務接口,從而是的線程池與具體的任務無關。
  • 任務隊列: 線程池的概念具體到實現則可能是隊列,鏈表之類的數據結構,其中保存執行線程。

我們實現的通用線程池框架由五個重要部分組成CThreadManage,CThreadPool,CThread,CJob,CWorkerThread,除此之外框架中還包括線程同步使用的類CThreadMutex和CCondition。
 

  • CJob是所有的任務的基類,其提供一個接口Run,所有的任務類都必須從該類繼承,同時實現Run方法。該方法中實現具體的任務邏輯。
  •  
  • CThread是linux中線程的包裝,其封裝了Linux線程最經常使用的屬性和方法,它也是一個抽象類,是所有線程類的基類,具有一個接口Run。
  •  
  • CWorkerThread是實際被調度和執行的線程類,其從CThread繼承而來,實現了CThread中的Run方法。
  •  
  • CThreadPool是線程池類,其負責保存線程,釋放線程以及調度線程。
  •  
  • CThreadManage是線程池與用戶的直接接口,其屏蔽了內部的具體實現。
  •  
  • CThreadMutex用于線程之間的互斥。
  •  
  • CCondition則是條件變量的封裝,用于線程之間的同步。

CThreadManage直接跟客戶端打交道,其接受需要創建的線程初始個數,并接受客戶端提交的任務。這兒的任務是具體的非抽象的任務。CThreadManage的內部實際上調用的都是CThreadPool的相關操作。CThreadPool創建具體的線程,并把客戶端提交的任務分發給CWorkerThread,CWorkerThread實際執行具體的任務。
 
理解系統組件
下面我們分開來了解系統中的各個組件。
 
CThreadManage
CThreadManage的功能非常簡單,其提供最簡單的方法,其類定義如下:

 C++ Code 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class CThreadManage
{
private:
    CThreadPool  *m_Pool;
    int     m_NumOfThread;

protected:

public:
    CThreadManage();
    CThreadManage(int num);
    virtual ~CThreadManage();

    void   SetParallelNum(int num);
    void  Run(CJob *job, void *jobdata);
    void  TerminateAll(void);
};

其中m_Pool指向實際的線程池;m_NumOfThread是初始創建時候允許創建的并發的線程個數。另外Run和TerminateAll方法也非常簡單,只是簡單的調用CThreadPool的一些相關方法而已。其具體的實現如下: 

 C++ Code 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
CThreadManage::CThreadManage()
{
    m_NumOfThread = 10;
    m_Pool = new CThreadPool(m_NumOfThread);
}

CThreadManage::CThreadManage(int num)
{
    m_NumOfThread = num;
    m_Pool = new CThreadPool(m_NumOfThread);
}

CThreadManage::~CThreadManage()
{
    if(NULL != m_Pool)
        delete m_Pool;
}

void CThreadManage::SetParallelNum(int num)
{
    m_NumOfThread = num;
}

void CThreadManage::Run(CJob *job, void *jobdata)
{
    m_Pool->Run(job, jobdata);
}

void CThreadManage::TerminateAll(void)
{
    m_Pool->TerminateAll();
}

CThread
CThread 類實現了對Linux中線程操作的封裝,它是所有線程的基類,也是一個抽象類,提供了一個抽象接口Run,所有的CThread都必須實現該Run方法。CThread的定義如下所示:

 C++ Code 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class CThread
{
private:
    int     m_ErrCode;
    Semaphore  m_ThreadSemaphore; //the inner semaphore, which is used to realize
    unsigned   long m_ThreadID;
    bool     m_Detach;    //The thread is detached
    bool     m_CreateSuspended; //if suspend after creating
    char    *m_ThreadName;
    ThreadState m_ThreadState;   //the state of the thread

protected:
    void   SetErrcode(int errcode)
    {
        m_ErrCode = errcode;
    }
    static void *ThreadFunction(void *);

public:
    CThread();
    CThread(bool createsuspended, bool detach);
    virtual ~CThread();

    virtual void Run(void) = 0;
    void   SetThreadState(ThreadState state)
    {
        m_ThreadState = state;
    }
    bool   Terminate(void);  //Terminate the threa
    bool   Start(void);    //Start to execute the thread
    void   Exit(void);
    bool   Wakeup(void);
    ThreadState GetThreadState(void)
    {
        return m_ThreadState;
    }
    int   GetLastError(void)
    {
        return m_ErrCode;
    }
    void   SetThreadName(char *thrname)
    {
        strcpy(m_ThreadName, thrname);
    }
    char  *GetThreadName(void)
    {
        return m_ThreadName;
    }
    int   GetThreadID(void)
    {
        return m_ThreadID;
    }
    bool   SetPriority(int priority);
    int   GetPriority(void);
    int   GetConcurrency(void);
    void   SetConcurrency(int num);
    bool   Detach(void);
    bool   Join(void);
    bool   Yield(void);
    int   Self(void);
};

線程的狀態可以分為四種,空閑、忙碌、掛起、終止(包括正常退出和非正常退出)。由于目前Linux線程庫不支持掛起操作,因此,我們的此處的掛起操作類似于暫停。如果線程創建后不想立即執行任務,那么我們可以將其“暫停”,如果需要運行,則喚醒。有一點必須注意的是,一旦線程開始執行任務,將不能被掛起,其將一直執行任務至完畢。
 
線程類的相關操作均十分簡單。線程的執行入口是從Start()函數開始,其將調用函數ThreadFunction,ThreadFunction再調用實際的Run函數,執行實際的任務。
 
CThreadPool
CThreadPool是線程的承載容器,一般可以將其實現為堆棧、單向隊列或者雙向隊列。在我們的系統中我們使用STL Vector對線程進行保存。CThreadPool的實現代碼如下:

 C++ Code 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
class CThreadPool
{
    friend class CWorkerThread;

private:
    unsigned int m_MaxNum;  //the max thread num that can create at the same time
    unsigned int m_AvailLow; //The min num of idle thread that shoule kept
    unsigned int m_AvailHigh;  //The max num of idle thread that kept at the same time
    unsigned int m_AvailNum; //the normal thread num of idle num;
    unsigned int m_InitNum; //Normal thread num;

protected:
    CWorkerThread *GetIdleThread(void);
    void  AppendToIdleList(CWorkerThread *jobthread);
    void  MoveToBusyList(CWorkerThread *idlethread);
    void  MoveToIdleList(CWorkerThread *busythread);
    void  DeleteIdleThread(int num);
    void  CreateIdleThread(int num);

public:
    CThreadMutex m_BusyMutex;  //when visit busy list,use m_BusyMutex to lock and unlock
    CThreadMutex m_IdleMutex;  //when visit idle list,use m_IdleMutex to lock and unlock
    CThreadMutex m_JobMutex; //when visit job list,use m_JobMutex to lock and unlock
    CThreadMutex m_VarMutex;
    CCondition    m_BusyCond; //m_BusyCond is used to sync busy thread list
    CCondition    m_IdleCond; //m_IdleCond is used to sync idle thread list
    CCondition    m_IdleJobCond; //m_JobCond is used to sync job list
    CCondition    m_MaxNumCond;

    vector<CWorkerThread *>  m_ThreadList;
    vector<CWorkerThread *>  m_BusyList;  //Thread List
    vector<CWorkerThread *>  m_IdleList; //Idle List

    CThreadPool();
    CThreadPool(int initnum);
    virtual ~CThreadPool();

    void  SetMaxNum(int maxnum)
    {
        m_MaxNum = maxnum;
    }
    int   GetMaxNum(void)
    {
        return m_MaxNum;
    }
    void  SetAvailLowNum(int minnum)
    {
        m_AvailLow = minnum;
    }
    int   GetAvailLowNum(void)
    {
        return m_AvailLow;
    }
    void  SetAvailHighNum(int highnum)
    {
        m_AvailHigh = highnum;
    }
    int   GetAvailHighNum(void)
    {
        return m_AvailHigh;
    }
    int   GetActualAvailNum(void)
    {
        return m_AvailNum;
    }
    int   GetAllNum(void)
    {
        return m_ThreadList.size();
    }
    int   GetBusyNum(void)
    {
        return m_BusyList.size();
    }
    void  SetInitNum(int initnum)
    {
        m_InitNum = initnum;
    }
    int   GetInitNum(void)
    {
        return m_InitNum;
    }
    void  TerminateAll(void);
    void  Run(CJob *job, void *jobdata);
};


CWorkerThread *CThreadPool::GetIdleThread(void)

{

    while(m_IdleList.size() == 0 )

        m_IdleCond.Wait();



    m_IdleMutex.Lock();

    if(m_IdleList.size() > 0 )

    {

        CWorkerThread *thr = (CWorkerThread *)m_IdleList.front();

        printf("Get Idle thread %d\n", thr->GetThreadID());

        m_IdleMutex.Unlock();

        return thr;

    }

    m_IdleMutex.Unlock();

    return NULL;
}


//create num idle thread and put them to idlelist

void CThreadPool::CreateIdleThread(int num)

{

    for(int i = 0; i < num; i++)
    {

        CWorkerThread *thr = new CWorkerThread();

        thr->SetThreadPool(this);

        AppendToIdleList(thr);

        m_VarMutex.Lock();

        m_AvailNum++;

        m_VarMutex.Unlock();

        thr->Start();    //begin the thread,the thread wait for job

    }

}


void CThreadPool::Run(CJob *job, void *jobdata)

{

    assert(job != NULL);



    //if the busy thread num adds to m_MaxNum,so we should wait

    if(GetBusyNum() == m_MaxNum)

        m_MaxNumCond.Wait();



    if(m_IdleList.size() < m_AvailLow)

    {

        if(GetAllNum() + m_InitNum - m_IdleList.size() < m_MaxNum )

            CreateIdleThread(m_InitNum - m_IdleList.size());

        else

            CreateIdleThread(m_MaxNum - GetAllNum());

    }



    CWorkerThread *idlethr = GetIdleThread();

    if(idlethr != NULL)

    {

        idlethr->m_WorkMutex.Lock();

        MoveToBusyList(idlethr);

        idlethr->SetThreadPool(this);

        job->SetWorkThread(idlethr);

        printf("Job is set to thread %d \n", idlethr->GetThreadID());

        idlethr->SetJob(job, jobdata);

    }

}

在CThreadPool中存在兩個鏈表,一個是空閑鏈表,一個是忙碌鏈表。Idle鏈表中存放所有的空閑進程,當線程執行任務時候,其狀態變為忙碌狀態,同時從空閑鏈表中刪除,并移至忙碌鏈表中。在CThreadPool的構造函數中,我們將執行下面的代碼:
 

 C++ Code 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
for(int i = 0; i < m_InitNum; i++)

{

    CWorkerThread *thr = new CWorkerThread();

    AppendToIdleList(thr);

    thr->SetThreadPool(this);

    thr->Start();    //begin the thread,the thread wait for job

}

在該代碼中,我們將創建m_InitNum個線程,創建之后即調用AppendToIdleList放入Idle鏈表中,由于目前沒有任務分發給這些線程,因此線程執行Start后將自己掛起。
 
事實上,線程池中容納的線程數目并不是一成不變的,其會根據執行負載進行自動伸縮。為此在CThreadPool中設定四個變量:
 

m_InitNum:處世創建時線程池中的線程的個數。

m_MaxNum:當前線程池中所允許并發存在的線程的最大數目。

m_AvailLow:當前線程池中所允許存在的空閑線程的最小數目,如果空閑數目低于該值,表明負載可能過重,此時有必要增加空閑線程池的數目。實現中我們總是將線程調整為m_InitNum個。

m_AvailHigh:當前線程池中所允許的空閑的線程的最大數目,如果空閑數目高于該值,表明當前負載可能較輕,此時將刪除多余的空閑線程,刪除后調整數也為m_InitNum個。

m_AvailNum:目前線程池中實際存在的線程的個數,其值介于m_AvailHigh和m_AvailLow之間。如果線程的個數始終維持在m_AvailLow和m_AvailHigh之間,則線程既不需要創建,也不需要刪除,保持平衡狀態。因此如何設定m_AvailLow和m_AvailHigh的值,使得線程池最大可能的保持平衡態,是線程池設計必須考慮的問題。
 
線程池在接受到新的任務之后,線程池首先要檢查是否有足夠的空閑池可用。檢查分為三個步驟:
 
(1)檢查當前處于忙碌狀態的線程是否達到了設定的最大值m_MaxNum,如果達到了,表明目前沒有空閑線程可用,而且也不能創建新的線程,因此必須等待直到有線程執行完畢返回到空閑隊列中。
 
(2)如果當前的空閑線程數目小于我們設定的最小的空閑數目m_AvailLow,則我們必須創建新的線程,默認情況下,創建后的線程數目應該為m_InitNum,因此創建的線程數目應該為( 當前空閑線程數與m_InitNum);但是有一種特殊情況必須考慮,就是現有的線程總數加上創建后的線程數可能超過m_MaxNum,因此我們必須對線程的創建區別對待。 

 C++ Code 
1
2
3
4
5
6
7
8
9

if(GetAllNum() + m_InitNum - m_IdleList.size() < m_MaxNum )

    CreateIdleThread(m_InitNum - m_IdleList.size());

else

    CreateIdleThread(m_MaxNum - GetAllNum());

如果創建后總數不超過m_MaxNum,則創建后的線程為m_InitNum;如果超過了,則只創建( m_MaxNum-當前線程總數 )個。
 
(3)調用GetIdleThread方法查找空閑線程。如果當前沒有空閑線程,則掛起;否則將任務指派給該線程,同時將其移入忙碌隊列。
 
當線程執行完畢后,其會調用MoveToIdleList方法移入空閑鏈表中,其中還調用m_IdleCond.Signal()方法,喚醒GetIdleThread()中可能阻塞的線程。
 
CJob
CJob類相對簡單,其封裝了任務的基本的屬性和方法,其中最重要的是Run方法,代碼如下:

 C++ Code 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class CJob
{

private:

    int   m_JobNo;    //The num was assigned to the job

    char  *m_JobName;   //The job name

    CThread *m_pWorkThread;   //The thread associated with the job

public:

    CJob( void );

    virtual ~CJob();

    int   GetJobNo(voidconst
    {
        return m_JobNo;
    }

    void   SetJobNo(int jobno)
    {
        m_JobNo = jobno;
    }

    char  *GetJobName(voidconst
    {
        return m_JobName;
    }

    void   SetJobName(char *jobname);

    CThread *GetWorkThread(void)
    {
        return m_pWorkThread;
    }

    void   SetWorkThread ( CThread *pWorkThread )
    {

        m_pWorkThread = pWorkThread;

    }

    virtual void Run ( void *ptr ) = 0;

};

線程池使用示例
至此我們給出了一個簡單的與具體任務無關的線程池框架。使用該框架非常的簡單,我們所需要的做的就是派生CJob類,將需要完成的任務實現在Run方法中。然后將該Job交由CThreadManage去執行。下面我們給出一個簡單的示例程序 

 C++ Code 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class CXJob: public CJob
{

public:

    CXJob()
    {
        i = 0;
    }

    ~CXJob() {}

    void Run(void *jobdata)
    {

        printf("The Job comes from CXJOB\n");

        sleep(2);

    }

};



class CYJob: public CJob

{

public:

    CYJob()
    {
        i = 0;
    }

    ~CYJob() {}

    void Run(void *jobdata)
    {

        printf("The Job comes from CYJob\n");

    }

};



main()

{

    CThreadManage *manage = new CThreadManage(10);

    for(int i = 0; i < 40; i++)

    {

        CXJob  *job = new CXJob();

        manage->Run(job, NULL);

    }

    sleep(2);

    CYJob *job = new CYJob();

    manage->Run(job, NULL);

    manage->TerminateAll();

}

CXJob和CYJob都是從Job類繼承而來,其都實現了Run接口。CXJob只是簡單的打印一句”The Job comes from CXJob”,CYJob也只打印”The Job comes from CYJob”,然后均休眠2秒鐘。在主程序中我們初始創建10個工作線程。然后分別執行40次CXJob和一次CYJob。
 

C++ 線程池的封裝實現
為了充分利用多核的優勢,我們利用多線程來進行任務處理,但線程也同樣不能濫用,會帶來一下幾個問題:
1)線程本身存在開銷,系統必須為每個線程分配如棧,TLS(線程局部存儲),寄存器等。
2)線程管理會給系統帶來開銷,context切換同樣會給系統帶來成本。
3)線程本身是可以重用的資源,不需要每次都進行初始化。

所以往往在使用中,我們無需把線程與task任務進行一對一對應,只需要預先初始化有限的線程個數來處理無限的task任務即可,線程池應運而生,原理也就是如此。

20151123145708426.png (508×534)

主要含有三個隊列

  1. 工作隊列
  2. 工作線程隊列
  3. 忙碌線程隊列

工作隊列是一個阻塞隊列,任務(仿函數)任務不算被push進來(notify阻塞獲取的工作線程),工作線程隊列(一直不變)則從該隊列中獲取任務執行(wait獲取,當任務隊列為空時阻塞等待通知),如果獲取到任務,則將線程會進入忙碌線程隊列中,執行任務的仿函數,當工作完成,重新移出工作線程隊列。


定義線程池專屬異常:

 C++ Code 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
struct TC_ThreadPool_Exception : public TC_Exception
{
    TC_ThreadPool_Exception(const string &buffer) : TC_Exception(buffer) {};
    TC_ThreadPool_Exception(const string &buffer, int err) : TC_Exception(buffer, err) {};
    ~TC_ThreadPool_Exception () throw () {};
};


/**
 * @brief 用通線程池類, 與tc_functor, tc_functorwrapper配合使用.
 *
 * 使用方式說明:
 * 1 采用tc_functorwrapper封裝一個調用
 * 2 用tc_threadpool對調用進行執行
 * 具體示例代碼請參見:test/test_tc_thread_pool.cpp
 */


/**線程池本身繼承自鎖,可以幫助鎖定**/
class TC_ThreadPool : public TC_ThreadLock
{
public:

    /**
     * @brief 構造函數
     *
     */

    TC_ThreadPool ();

    /**
     * @brief 析構, 會停止所有線程
     */

    ~TC_ThreadPool ();

    /**
     * @brief 初始化.
     *
     * @param num 工作線程個數
     */

    void init(size_t num);

    /**
     * @brief 獲取線程個數.
     *
     * @return size_t 線程個數
     */

    size_t getThreadNum()
    {
        Lock sync(* this);
        return _jobthread. size();
    }

    /**
     * @brief 獲取線程池的任務數( exec添加進去的).
     *
     * @return size_t 線程池的任務數
     */

    size_t getJobNum()
    {
        return _jobqueue. size();
    }

    /**
     * @brief 停止所有線程
     */

    void stop();

    /**
     * @brief 啟動所有線程
     */

    void start();

    /**
     * @brief 啟動所有線程并, 執行初始化對象.
     *
     * @param ParentFunctor
     * @param tf
     */

    template<class ParentFunctor>
    void start(const TC_FunctorWrapper< ParentFunctor> &tf)
    {
        for(size_t i = 0; i < _jobthread .size(); i++)
        {
            _startqueue. push_back(new TC_FunctorWrapper<ParentFunctor >(tf));
        }

        start();
    }

    /**
     * @brief 添加對象到線程池執行,該函數馬上返回,
     *   線程池的線程執行對象
     */

    template<class ParentFunctor>
    void exec(const TC_FunctorWrapper< ParentFunctor> &tf)
    {
        _jobqueue.push_back(new TC_FunctorWrapper<ParentFunctor >(tf));
    }

    /**
     * @brief 等待所有工作全部結束(隊列無任務, 無空閑線程).
     *
     * @param millsecond 等待的時間( ms), -1:永遠等待
     * @return      true, 所有工作都處理完畢
     *            false,超時退出
     */

    bool waitForAllDone(int millsecond = -1);

public:

    /**
     * @brief 線程數據基類,所有線程的私有數據繼承于該類
     */

    class ThreadData
    {
    public:
        /**
         * @brief 構造
         */

        ThreadData() {};
        /**
         * @brief 析夠
         */

        virtual ~ThreadData() {};

        /**
          * @brief 生成數據.
          *
          * @ param T
         * @return ThreadData*
         */

        template<typename T>
        static T *makeThreadData()
        {
            return new T;
        }
    };

    /**
     * @brief 設置線程數據.
     *
     * @param p 線程數據
     */

    static void setThreadData(ThreadData *p);

    /**
     * @brief 獲取線程數據.
     *
     * @return ThreadData* 線程數據
     */

    static ThreadData *getThreadData();

    /**
     * @brief 設置線程數據, key需要自己維護.
     *
     * @param pkey 線程私有數據key
     * @param p  線程指針
     */

    static void setThreadData(pthread_key_t pkey, ThreadData *p);

    /**
     * @brief 獲取線程數據, key需要自己維護.
     *
     * @param pkey 線程私有數據key
     * @return   指向線程的ThreadData*指針
     */

    static ThreadData *getThreadData(pthread_key_t pkey);

protected:

    /**
     * @brief 釋放資源.
     *
     * @param p
     */

    static void destructor(void *p);

    /**
     * @brief 初始化key
     */

    class KeyInitialize
    {
    public:
        /**
         * @brief 初始化key
         */

        KeyInitialize()
        {
            int ret = pthread_key_create(&TC_ThreadPool::g_key, TC_ThreadPool::destructor);
            if(ret != 0)
            {
                throw TC_ThreadPool_Exception("[TC_ThreadPool::KeyInitialize] pthread_key_create error", ret);
            }
        }

        /**
         * @brief 釋放key
         */

        ~KeyInitialize()
        {
            pthread_key_delete(TC_ThreadPool::g_key);
        }
    };

    /**
     * @brief 初始化key的控制
     */

    static KeyInitialize g_key_initialize;

    /**
     * @brief 數據key
     */

    static pthread_key_t g_key;

protected:
    /**
     * @brief 線程池中的工作線程
     */

    class ThreadWorker : public TC_Thread
    {
    public:
        /**
          * @brief 工作線程構造函數.
          *
         * @ param tpool
         */

        ThreadWorker(TC_ThreadPool *tpool);

        /**
         * @brief 通知工作線程結束
         */

        void terminate();

    protected:
        /**
         * @brief 運行
         */

        virtual void run();

    protected:
        /**
         * 線程池指針
         */

        TC_ThreadPool   *_tpool;

        /**
         * 是否結束線程
         */

        bool      _bTerminate;
    };

protected:

    /**
     * @brief 清除
     */

    void clear();

    /**
     * @brief 獲取任務, 如果沒有任務, 則為NULL.
     *
     * @return TC_FunctorWrapperinterface*
     */

    TC_FunctorWrapperInterface *get(ThreadWorker *ptw);

    /**
     * @brief 獲取啟動任務.
     *
     * @return TC_FunctorWrapperInterface*
     */

    TC_FunctorWrapperInterface *get();

    /**
     * @brief 空閑了一個線程.
     *
     * @param ptw
     */

    void idle(ThreadWorker *ptw);

    /**
     * @brief 通知等待在任務隊列上的工作線程醒來
     */

    void notifyT();

    /**
     * @brief 是否處理結束.
     *
     * @return bool
     */

    bool finish();

    /**
     * @brief 線程退出時調用
     */

    void exit();

    friend class ThreadWorker;
protected:

    /**
     * 任務隊列
     */

    TC_ThreadQueue< TC_FunctorWrapperInterface *> _jobqueue;

    /**
     * 啟動任務
     */

    TC_ThreadQueue< TC_FunctorWrapperInterface *> _startqueue;

    /**
     * 工作線程
     */

    std::vector<ThreadWorker *>         _jobthread;

    /**
     * 繁忙線程
     */

    std::set<ThreadWorker *>           _busthread;

    /**
     * 任務隊列的鎖
     */

    TC_ThreadLock                _tmutex;

    /**
    * 是否所有任務都執行完畢
    */

    bool                    _bAllDone;
};

工作線程設計如下:

 C++ Code 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
TC_ThreadPool ::ThreadWorker::ThreadWorker(TC_ThreadPool *tpool)
    : _tpool (tpool)
    , _bTerminate ( false)
{
}

void TC_ThreadPool ::ThreadWorker::terminate()
{
    _bTerminate = true;
    _tpool->notifyT();
}

void TC_ThreadPool ::ThreadWorker::run()
{
    //調用初始化部分
    TC_FunctorWrapperInterface *pst = _tpool->get();
    if(pst)
    {
        try
        {
            (*pst)();
        }
        catch ( ... )
        {
        }
        delete pst;
        pst = NULL;
    }

    //調用處理部分
    while (! _bTerminate)
    {
        TC_FunctorWrapperInterface *pfw = _tpool->get( this);
        if(pfw != NULL)
        {
            auto_ptr< TC_FunctorWrapperInterface> apfw(pfw);

            try
            {
                (*pfw)();
            }
            catch ( ... )
            {
            }

            _tpool->idle( this);
        }
    }

    //結束
    _tpool->exit();
}

每個工作線程在剛開始時都會執行一下初始化操作,并進入一個無限循環的部分//調用處理部分
while (! _bTerminate)
{
    TC_FunctorWrapperInterface *pfw = _tpool->get( this);
    if(pfw != NULL)
    {
        auto_ptr< TC_FunctorWrapperInterface> apfw(pfw);

        try
        {
            (*pfw)();
        }
        catch ( ... )
        {
        }

        _tpool->idle( this);
    }
}

該工作主要是無限的從線程池的工作隊列中獲取任務并執行,如果成功獲取任務,則會將線程移進忙碌隊列:

 C++ Code 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
TC_FunctorWrapperInterface *TC_ThreadPool:: get(ThreadWorker *ptw)
{

    TC_FunctorWrapperInterface *pFunctorWrapper = NULL;
    if(! _jobqueue. pop_front(pFunctorWrapper, 1000))
    {
        return NULL;
    }

    {
        Lock sync( _tmutex);
        _busthread. insert(ptw);
    }
    return pFunctorWrapper;
}

執行完,移回工作線程隊列:_tpool->idle( this);

 C++ Code 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
void TC_ThreadPool:: idle(ThreadWorker *ptw)
{
    Lock sync( _tmutex);
    _busthread. erase(ptw);

    //無繁忙線程, 通知等待在線程池結束的線程醒過來
    if( _busthread. empty())
    {
        _bAllDone = true;
        _tmutex.notifyAll();
    }
}

此處jobThread隊列初始化后不會改變(因為沒有實現自增長功能),所以非線程安全的vector隊列即可,busthread的忙碌線程隊列會被移進移出,但是操作會自帶Lock sync( _tmutex),該互斥量是線程池本身繼承的,所以是共有的,也無需另外使用線程安全的TC_ThreadQueue,使用vector即可。

TC_ThreadPool:: idle中的

 C++ Code 
1
2
3
4
5
6
if( _busthread. empty())
{
    _bAllDone = true;
    _tmutex.notifyAll();
}

主要用于當線程池工作起來后的waitForAllDone方法:

 C++ Code 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
bool TC_ThreadPool:: waitForAllDone( int millsecond)
{
    Lock sync( _tmutex);

start1:
    //任務隊列和繁忙線程都是空的
    if (finish())
    {
        return true;
    }

    //永遠等待
    if(millsecond < 0)
    {
        _tmutex.timedWait(1000);
        goto start1;
    }

    int64_t iNow = TC_Common:: now2ms();
    int m    = millsecond;
start2:

    bool b = _tmutex.timedWait(millsecond);
    //完成處理了
    if(finish())
    {
        return true;
    }

    if(!b)
    {
        return false;
    }

    millsecond = max((int64_t )0, m - (TC_Common ::now2ms() - iNow));
    goto start2;

    return false;
}

_tmutex.timedWait(millsecond)方法喚醒。反復判斷是否所有的工作是否完成:

bool TC_ThreadPool:: finish()
{
    return _startqueue. empty() && _jobqueue .empty() && _busthread. empty() && _bAllDone;
}

整體cpp實現如下:

 C++ Code 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
TC_ThreadPool ::KeyInitialize TC_ThreadPool::g_key_initialize;
pthread_key_t TC_ThreadPool::g_key ;

void TC_ThreadPool::destructor( void *p)
{
    ThreadData *ttd = ( ThreadData *)p;
    if(ttd)
    {
        delete ttd;
    }
}

void TC_ThreadPool::exit()
{
    TC_ThreadPool:: ThreadData *p = getThreadData();
    if(p)
    {
        delete p;
        int ret = pthread_setspecific( g_key, NULL );
        if(ret != 0)
        {
            throw TC_ThreadPool_Exception ("[TC_ThreadPool::setThreadData] pthread_setspecific error", ret);
        }
    }

    _jobqueue. clear();
}

void TC_ThreadPool::setThreadData( TC_ThreadPool:: ThreadData *p)
{
    TC_ThreadPool:: ThreadData *pOld = getThreadData();
    if(pOld != NULL && pOld != p)
    {
        delete pOld;
    }

    int ret = pthread_setspecific( g_key, ( void *)p);
    if(ret != 0)
    {
        throw TC_ThreadPool_Exception ("[TC_ThreadPool::setThreadData] pthread_setspecific error", ret);
    }
}

TC_ThreadPool ::ThreadData *TC_ThreadPool::getThreadData ()
{
    return ( ThreadData *) pthread_getspecific( g_key);
}

void TC_ThreadPool::setThreadData( pthread_key_t pkey, ThreadData *p)
{
    TC_ThreadPool:: ThreadData *pOld = getThreadData(pkey);
    if(pOld != NULL && pOld != p)
    {
        delete pOld;
    }

    int ret = pthread_setspecific(pkey, ( void *)p);
    if(ret != 0)
    {
        throw TC_ThreadPool_Exception ("[TC_ThreadPool::setThreadData] pthread_setspecific error", ret);
    }
}

TC_ThreadPool ::ThreadData *TC_ThreadPool::getThreadData( pthread_key_t pkey)
{
    return ( ThreadData *) pthread_getspecific(pkey);
}

TC_ThreadPool::TC_ThreadPool()
    : _bAllDone ( true)
{
}

TC_ThreadPool::~TC_ThreadPool()
{
    stop();
    clear();
}

void TC_ThreadPool::clear()
{
    std::vector< ThreadWorker *>::iterator it = _jobthread. begin();
    while(it != _jobthread. end())
    {
        delete (*it);
        ++it;
    }

    _jobthread. clear();
    _busthread. clear();
}

void TC_ThreadPool::init( size_t num)
{
    stop();

    Lock sync(* this);

    clear();

    for( size_t i = 0; i < num; i++)
    {
        _jobthread. push_back( new ThreadWorker( this));
    }
}

void TC_ThreadPool::stop()
{
    Lock sync(* this);

    std::vector< ThreadWorker *>::iterator it = _jobthread. begin();
    while(it != _jobthread. end())
    {
        if ((*it)-> isAlive())
        {
            (*it)-> terminate();
            (*it)-> getThreadControl().join ();
        }
        ++it;
    }
    _bAllDone = true;
}

void TC_ThreadPool::start()
{
    Lock sync(* this);

    std::vector< ThreadWorker *>::iterator it = _jobthread. begin();
    while(it != _jobthread. end())
    {
        (*it)-> start();
        ++it;
    }
    _bAllDone = false;
}

bool TC_ThreadPool:: finish()
{
    return _startqueue. empty() && _jobqueue .empty() && _busthread. empty() && _bAllDone;
}

bool TC_ThreadPool::waitForAllDone( int millsecond)
{
    Lock sync( _tmutex);

start1:
    //任務隊列和繁忙線程都是空的
    if (finish ())
    {
        return true;
    }

    //永遠等待
    if(millsecond < 0)
    {
        _tmutex.timedWait(1000);
        goto start1;
    }

    int64_t iNow = TC_Common:: now2ms();
    int m    = millsecond;
start2:

    bool b = _tmutex.timedWait(millsecond);
    //完成處理了
    if(finish ())
    {
        return true;
    }

    if(!b)
    {
        return false;
    }

    millsecond = max((int64_t )0, m - (TC_Common ::now2ms() - iNow));
    goto start2;

    return false;
}

TC_FunctorWrapperInterface *TC_ThreadPool::get( ThreadWorker *ptw)
{

    TC_FunctorWrapperInterface *pFunctorWrapper = NULL;
    if(! _jobqueue. pop_front(pFunctorWrapper, 1000))
    {
        return NULL;
    }

    {
        Lock sync( _tmutex);
        _busthread. insert(ptw);
    }
    return pFunctorWrapper;
}

TC_FunctorWrapperInterface *TC_ThreadPool::get()
{
    TC_FunctorWrapperInterface *pFunctorWrapper = NULL;
    if(! _startqueue. pop_front(pFunctorWrapper))
    {
        return NULL;
    }

    return pFunctorWrapper;
}

void TC_ThreadPool::idle( ThreadWorker *ptw)
{
    Lock sync( _tmutex);
    _busthread. erase(ptw);

    //無繁忙線程, 通知等待在線程池結束的線程醒過來
    if( _busthread. empty())
    {
        _bAllDone = true;
        _tmutex.notifyAll();
    }
}

void TC_ThreadPool::notifyT()
{
    _jobqueue. notifyT();
}

線程池使用后記
線程池適合場合
事 實上,線程池并不是萬能的。它有其特定的使用場合。線程池致力于減少線程本身的開銷對應用所產生的影響,這是有前提的,前提就是線程本身開銷與線程執行任 務相比不可忽略。如果線程本身的開銷相對于線程任務執行開銷而言是可以忽略不計的,那么此時線程池所帶來的好處是不明顯的,比如對于FTP服務器以及Telnet服務器,通常傳送文件的時間較長,開銷較大,那么此時,我們采用線程池未必是理想的方法,我們可以選擇“即時創建,即時銷毀”的策略。
 總之線程池通常適合下面的幾個場合:
 
(1)  單位時間內處理任務頻繁而且任務處理時間短
 
(2)  對實時性要求較高。如果接受到任務后在創建線程,可能滿足不了實時要求,因此必須采用線程池進行預創建。
 
(3)  必須經常面對高突發性事件,比如Web服務器,如果有足球轉播,則服務器將產生巨大的沖擊。此時如果采取傳統方法,則必須不停的大量產生線程,銷毀線程。此時采用動態線程池可以避免這種情況的發生。

posted on 2016-08-26 16:46 sheng 閱讀(416) 評論(0)  編輯 收藏 引用


只有注冊用戶登錄后才能發表評論。
網站導航: 博客園   IT新聞   BlogJava   博問   Chat2DB   管理


導航

<2025年11月>
2627282930311
2345678
9101112131415
16171819202122
23242526272829
30123456

統計

常用鏈接

留言簿(1)

隨筆檔案

收藏夾

同行

搜索

最新評論

閱讀排行榜

評論排行榜

青青草原综合久久大伊人导航_色综合久久天天综合_日日噜噜夜夜狠狠久久丁香五月_热久久这里只有精品
  • <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>
            亚洲欧洲一区二区三区久久| 一区二区成人精品| 一本色道久久加勒比88综合| 亚洲激情一区二区| 亚洲精品欧美日韩专区| 99精品国产99久久久久久福利| 亚洲欧洲视频| 亚洲一区二区在线播放| 亚洲欧美中文另类| 久久久欧美精品sm网站| 欧美成人自拍| 亚洲无毛电影| 久久国产精品免费一区| 欧美成人精品福利| 国产精品久久久久久av下载红粉| 国产欧美精品| 最新国产成人av网站网址麻豆| 亚洲一区二区三区免费观看| 久久国产手机看片| 亚洲激情第一页| 亚洲深夜福利视频| 久久亚洲国产精品一区二区| 欧美视频在线不卡| 在线精品一区| 小嫩嫩精品导航| 免费欧美日韩国产三级电影| 亚洲免费电影在线| 久久视频在线视频| 国产精品扒开腿爽爽爽视频| 在线日韩av片| 久久国产精品久久w女人spa| 91久久久精品| 久久久国产精品亚洲一区| 国产精品极品美女粉嫩高清在线| 在线电影一区| 久久久久久久久久久久久久一区 | 久久精视频免费在线久久完整在线看| 免费在线看一区| 亚洲一级电影| 欧美日韩性视频在线| 亚洲春色另类小说| 久久久久九九九九| 亚洲综合首页| 亚洲第一成人在线| 久久精品国产99国产精品| 欧美三级网址| 一区二区福利| 亚洲精品偷拍| 欧美另类一区二区三区| 亚洲毛片在线观看.| 欧美国内亚洲| 久久婷婷国产综合尤物精品| 国产在线欧美日韩| 久久精品免费电影| 欧美一进一出视频| 国产欧美精品一区二区色综合| 亚洲一区二区三区在线看| 亚洲精品视频在线看| 欧美精品激情| 亚洲网友自拍| 亚洲一区二区精品在线观看| 国产精品国产| 欧美影院在线播放| 欧美在线观看一区二区| 国内精品一区二区三区| 久久亚洲一区| 久热精品视频在线观看| 亚洲国产精品成人一区二区| 欧美顶级艳妇交换群宴| 欧美国产一区二区| 亚洲视屏一区| 午夜精品视频一区| 一区二区三区在线观看视频| 浪潮色综合久久天堂| 老色鬼久久亚洲一区二区 | 亚洲免费影院| 亚洲欧美日韩天堂| 激情久久综合| 亚洲精品国产精品国产自| 国产精品jizz在线观看美国 | 亚洲精品久久久久久久久| 欧美另类变人与禽xxxxx| 亚洲一区在线免费| 久久精品综合一区| 一区二区三区欧美| 一本色道久久88综合亚洲精品ⅰ | 毛片一区二区三区| 欧美成人乱码一区二区三区| 国产精品99久久久久久宅男| 亚洲永久免费| 亚洲品质自拍| 亚洲在线中文字幕| 亚洲国产综合视频在线观看| 一本色道久久加勒比88综合| 激情婷婷亚洲| 99伊人成综合| 亚洲福利电影| 亚洲综合色在线| 亚洲乱码国产乱码精品精可以看 | 欧美成人一区二区三区片免费| 99精品热6080yy久久| 午夜精品区一区二区三| 日韩视频在线免费观看| 香蕉久久夜色精品国产使用方法| 亚洲精品乱码| 久久国产综合精品| 亚洲欧美成aⅴ人在线观看| 免费在线日韩av| 久久久久国产精品麻豆ai换脸| 欧美日韩1080p| 欧美成人视屏| 国产午夜亚洲精品不卡| 一区二区高清视频在线观看| 91久久精品日日躁夜夜躁国产| 欧美一区二区三区在线观看视频 | 欧美日韩久久久久久| 久久综合色88| 国产亚洲欧美中文| 一区二区三区久久| 日韩一区二区精品在线观看| 久久夜色精品| 久久视频在线免费观看| 欧美色欧美亚洲另类二区 | 久久gogo国模啪啪人体图| 欧美日韩精品免费观看视频完整 | 亚洲国产美女精品久久久久∴| 亚洲午夜三级在线| 在线亚洲电影| 欧美日韩国产麻豆| 亚洲欧洲久久| 亚洲精品国偷自产在线99热| 久久人91精品久久久久久不卡| 久久精品国产69国产精品亚洲| 国产精品影院在线观看| 亚洲影视在线| 久久久久国产精品www| 狠狠88综合久久久久综合网| 欧美亚洲一级| 久久久久久久999精品视频| 国产一区二区在线观看免费播放| 亚洲欧美激情精品一区二区| 亚洲欧美成人网| 国产精品视频网| 亚洲欧美日韩国产综合精品二区 | 亚洲黑丝在线| 欧美激情视频网站| 99视频精品在线| 午夜欧美精品| 国产亚洲永久域名| 久久久久99| 亚洲国产精品成人一区二区 | 欧美精品一区二区三| 91久久久久久国产精品| 一区二区高清在线| 国产精品一区二区久激情瑜伽| 午夜精品福利视频| 免费视频一区二区三区在线观看| 91久久黄色| 欧美视频在线观看免费网址| 亚洲嫩草精品久久| 欧美va亚洲va日韩∨a综合色| 99亚洲伊人久久精品影院红桃| 国产精品国产福利国产秒拍| 午夜电影亚洲| 亚洲国产美女久久久久| 亚洲伊人观看| 在线观看亚洲精品视频| 欧美日韩亚洲一区二区| 欧美在线观看网站| 99精品热视频| 免费一区二区三区| 中日韩美女免费视频网址在线观看 | 欧美国产日韩一区| 亚洲自拍偷拍一区| 嫩草国产精品入口| 午夜国产精品影院在线观看| 激情成人综合网| 国产精品草莓在线免费观看| 久久精品国产亚洲aⅴ| 99re8这里有精品热视频免费| 久久嫩草精品久久久精品| 正在播放亚洲| 韩国av一区二区三区| 欧美视频一区二区| 狂野欧美激情性xxxx欧美| 一区二区三区四区五区精品视频 | 欧美高清在线播放| 久久av一区二区三区漫画| 一区二区三区国产精华| 国产一区二区精品久久91| 欧美日韩蜜桃| 欧美国产一区二区| 久久精品亚洲精品| 亚洲图片欧美一区| 亚洲人体大胆视频| 欧美国产91| 免费观看30秒视频久久| 久久精品成人一区二区三区 | 羞羞漫画18久久大片| 亚洲图片欧洲图片av|