引言
在筆者參與的四川省重點污染源企業(yè)環(huán)境遠程監(jiān)控系統(tǒng)中,有一項非常重要的工作:將多達80臺的遠程DVS(視頻服務器)的監(jiān)測數(shù)據(jù)通過因特網(wǎng)傳輸,由上位機收集上來,寫入SQL Server 2005數(shù)據(jù)庫中。遠程數(shù)據(jù)每隔一分鐘發(fā)送一次實時數(shù)據(jù)。如果數(shù)據(jù)在一分鐘內(nèi)傳送不成功,那么DVS將認為網(wǎng)絡已經(jīng)斷開,又要不斷的發(fā)起新的連接。因此,上位機能不能及時的準確的收集、寫入,是系統(tǒng)成敗的關鍵。
項目分析
80多臺遠程DVS正在不間斷的采集數(shù)據(jù),在網(wǎng)絡正常的情況下,會不間斷的向上位機發(fā)送數(shù)據(jù)。如果采用傳統(tǒng)的單線程結(jié)構(gòu),上位機接受連接請求,接收處理數(shù)據(jù),將數(shù)據(jù)寫入數(shù)據(jù)庫,然后再接受新的連接請求,接收處理數(shù)據(jù),……,這樣,上位機程序異常繁忙,CPU利用率幾乎將達100%。由于服務器不能迅速處理請求,DVS只好等待。
更為重要的是,為了減少上位機發(fā)送響應連接的次數(shù),設備采用的是長連接,即發(fā)送一次連接請求并得到響應后,發(fā)送數(shù)據(jù)時不再發(fā)送連接請求。因此,要求上位機能夠保存客戶端的Socket。
為了避免這種情形發(fā)生。筆者采用了異步、多線程來處理。所謂異步,是程序調(diào)用一個方法后立即返回,總體而言,主線程與方法線程并行執(zhí)行。而同步即程序執(zhí)行一個方法,等該方法返回之后,繼續(xù)往下走,本系統(tǒng)從功能上分成3個模塊,即3個前后關聯(lián)的線程:主線程、數(shù)據(jù)接收線程、存入數(shù)據(jù)庫線程,它們異步執(zhí)行。
主線程
主線程工作流程如圖一所示。其主要功能是:初始化參數(shù),如連接端口號、IP地址等,偵聽連接請求,將傳入的連接保留到TcpClient對象數(shù)組sockets,而這個數(shù)組sockets恰恰是我們后面線程中要用到的全局變量。 為了不使線程間爭用這個數(shù)組變量,這里用到了VB.net提供的Monitor類,它提供同步對象的訪問的機制。
當主線程偵聽到遠程DVS有連接請求時,立即執(zhí)行AcceptTcpClient方法,創(chuàng)建一個TcpClient實例,并將它放入sockets數(shù)組。同時創(chuàng)建線程對象serverthread。
聲明創(chuàng)建線程時,使用 ThreadStart 委托作為其唯一參數(shù)的構(gòu)造函數(shù)創(chuàng)建 Thread 類的新實例,創(chuàng)建線程時需要傳遞處理連接的過程或函數(shù)的地址以被線程調(diào)用。創(chuàng)建線程委托,傳遞需要操作的過程的地址,這部分的代碼如下所示:
Public Sub WaitData()
Try
Dim ipHostInfo As IPHostEntry = Dns.Resolve(Dns.GetHostName())
Dim localAddr As IPAddress = ipHostInfo.AddressList(0)
s = New TcpListener(localAddr, ListenPort)
s.Start()’開始偵聽連接請求
Dim Recdatathread As New Thread(New ThreadStart(AddressOf RecDataProc)) ’創(chuàng)建數(shù)據(jù)接收線程
Recdatathread.IsBackground = True
Recdatathread.Start()’啟動線程
While True
Dim client As TcpClient = s.AcceptTcpClient()
Monitor.Enter(sockets) '在指定對象上獲取排他鎖
sockets(socketcount) = client
socketcount = socketcount + 1
Monitor.Exit(sockets) '釋放指定對象上的排他鎖
End While
Catch e As SocketException
s.Stop()
saveErrLog(Date.Now, CType(s.AcceptTcpClient.Client.RemoteEndPoint, IPEndPoint).Address.ToString(), e.Message)’寫入錯誤日志
Catch e As ThreadAbortException
t.Abort()
saveErrLog(Date.Now, CType(s.AcceptTcpClient.Client.RemoteEndPoint, IPEndPoint).Address.ToString(), e.Message) ’寫入錯誤日志
Finally
t.Abort()
End
End Try
End Sub
數(shù)據(jù)接收線程
數(shù)據(jù)接收線程的工作流程如圖二所示。主要功能是:將掛起連接的DVS上傳數(shù)據(jù)從流中讀取出來,創(chuàng)建數(shù)據(jù)寫入線程,并在listbox中顯示。
從保存的socket數(shù)組中讀取字節(jié)流時,必須考慮以下問題:
一、有些DVS可能會在工作一段時間后發(fā)生設備故障或者網(wǎng)絡中斷,但服務器保存的是其歷史socket,因此,必須判斷其connect屬性,即設備是否在線。
二、為了減少服務器的空等時間,必須判斷流對象(stream)的DataAvailable屬性。
三、創(chuàng)建線程saveToDb時,必須考慮傳入?yún)?shù)的問題。通常的線程創(chuàng)建是不可提供參數(shù)的。我們將線程saveToDb的執(zhí)行體封裝到一個類中,通過初始化類的成員變量的方法,來達到傳送參數(shù)的目的。
四、由于本線程是長駐內(nèi)存并循環(huán)執(zhí)行的。因此,應當在適當?shù)牡胤阶柚梗駝t,CPU的利用率將達幾乎100%。
這部分的代碼如下:
Public Sub RecDataProc()
Dim i As Integer
Dim c As TcpClient
While (True)
Try
For i = 0 To socketList.Count - 1
If socketList.Item(i).client.connected Then '如果該連接在線
Dim dh1 As DelegateHandler = New DelegateHandler(AddressOf displayStatusBarPanel2)
'New 出一個委托并指定委托方法
Me.Invoke(dh1, New Object() {CStr(i)}) '調(diào)用invoke方法
c = socketList.Item(i)
Dim stream As NetworkStream = c.GetStream()
If stream.DataAvailable Then
Dim dh As DelegateHandler1 = New DelegateHandler1(AddressOf ShowInBox)
Dim readbuff As New ReadBuffClass(c, stream, Connection, dh) '由構(gòu)造函數(shù)來初始化成員變量
ThreadPool.QueueUserWorkItem(New WaitCallback(AddressOf readbuff.ReadBuff), readbuff)’把具體從流中讀取數(shù)據(jù)的工作交給線程池的線程來進行
Dim workerThreads, portThreads As Integer
ThreadPool.GetAvailableThreads(workerThreads, portThreads)
Dim dh2 As DelegateHandler = New DelegateHandler(AddressOf displayStatusBarPanel4)
'New 出一個委托并指定委托方法
Me.Invoke(dh2, New Object() {workerThreads.ToString}) '調(diào)用invoke方法
End If
Thread.Sleep(20) '如果不阻止,則CPU利用率將為100%
End If
Next
Catch ex As System.ArgumentOutOfRangeException
Catch ex As System.InvalidOperationException
Catch ex As ObjectDisposedException 'TcpClient 已關閉
Catch ex As SocketException
Catch ex As ThreadAbortException
Catch ex As System.IO.IOException
Catch ex As System.AccessViolationException
Finally
End Try
End While
End Sub
數(shù)據(jù)處理線程
這部份線程每個都由線程池來調(diào)度運行。由于要接收線程參數(shù),因此,線程本身被封裝到一個類中,限于篇幅的原因,只描述類的結(jié)構(gòu)。
Public Class ReadBuffClass
Private sck As TcpClient
Private ns As NetworkStream
Private sqlcnn As SqlConnection
Private delg As frmServerMain.DelegateHandler1
Dim sqlcmd As SqlCommand
Dim sqlda As SqlDataAdapter
Public Sub New(ByVal sc As TcpClient, ByVal n As NetworkStream, ByVal cn As SqlConnection, ByVal dh As frmServerMain.DelegateHandler1) '由構(gòu)造函數(shù)來初始化成員變量
Me.sck = sc
Me.ns = n
Me.sqlcnn = cn
Me.delg = dh
End Sub
Public Sub ReadBuff(ByVal state As Object) ' 線程的入口函數(shù)
Dim datastring As String = ""
ns.ReadTimeout = 100 '讀取失敗前經(jīng)歷的毫秒數(shù)
Try
While (True)
Dim bytes(2048) As Byte
ns.Read(bytes, 0, 2048)
datastring = datastring + Encoding.ASCII.GetString(bytes)
If datastring.IndexOf(vbCrLf) > 0 Then
Exit While
End If
End While
delg.Invoke(datastring, sck) '通過委托的方式,將參數(shù)傳給UI
Dim tmparr() As String = datastring.Split("##")
Dim i As Integer
For i = 0 To tmparr.Length - 1
If tmparr(i) <> "" Then
ProcessInfo(tmparr(i))
End If
Next
Catch ex As System.AccessViolationException
Catch ex As NotSupportedException
Catch ex As ArgumentNullException
Catch ex As ArgumentOutOfRangeException
Catch ex As ObjectDisposedException
Catch ex As IO.IOException '
Catch ex As SocketException
Catch ex As ThreadAbortException
Finally
End Try
End Sub
Private Sub ProcessInfo(ByVal tmpString As String) '對收到的數(shù)據(jù)進行解析、處理
……
End Sub
……
End Class
結(jié)束語
本文著重論述的是在VB2005的環(huán)境下,運用多線程異步實現(xiàn)遠程DVS數(shù)據(jù)收集的原理,重點考慮的是怎樣提高程序的反應速度,特別討論了程序開發(fā)中的一些細節(jié)問題,對有志于從事遠程臨控系統(tǒng)開發(fā)的軟件人員有一定的參考意義。
文中代碼在windows2003+VB2005+SqlServer2005的環(huán)境下調(diào)試通過,現(xiàn)在正在使用。