#
系統中,用戶的消息在移動設備與接入服務器建立的Tcp長連接上傳遞。這些消息包括:文本,復合文本,位置信息,音頻剪輯,圖像等等。 發送者傳送消息到平臺系統內部并將消息寫入gridfs,待接收者上線時平臺將消息推送至接收者。 考慮到帶寬利用,接收者得到的消息將不包含二進制數據,例如: 音頻,圖像等等。 這要求接收者對平臺發起一次獲取消息包內指定的音頻和圖像數據的請求。 移動端向平臺請求二進制數據的情況還包含 【離線文件傳送】場景 。 二進制數據往往是指那些數據量比較大的對象,這些對象在移動兩端交換時,交互通道將不占用與接入服務器的連接通道,而是通過nginx傳送到平臺內部; 同樣接收者獲取二進制數據也是通過nginx獲取。這種請求是HTTP的。 這里整理的是如何在平臺部署 【負載均衡的集群的分布式的文件服務】 nginx : http服務,提供反向代理和負載均衡服務(集群可用DNS或考慮LVS方案) mongodb+gridfs : 用于文件服務提供,其內置gridfs提供了分布式,海量存儲的方案 gevent+webpy : nginx直接讀取gridfs是不合適的,配置了cgi才能完成特定功能,這里使用webpy,比django更輕更好用。 webpy的作用是接收到上傳和下傳文件的請求,讀寫gridfs文件內容給移動端。 gevent是高效的通信框架,雖然單線程工作,但性能非常的好; 用好gevent關鍵在與外部的io必須全部都是異步的,例如: 數據庫,文件磁盤訪問等等。 mongodb對gevent已經支持,gevent對webpy,django,psycopg2支持也相當的好,所以要提供webservice服務那就考慮用gevent+webpy或django把,性能是杠杠的,比 apache+mod_wsgi要好很多 ,而且gevent是進程內的不同的HTTP REQUEST可以是共享數據的,這一點非常誘惑(apache+mod_wsgi的REQUEST可是隔離的哦!除非您通過redis的PUB/SUB實現兩個REQUEST的通信) 關注的問題: 1.下傳大文件時的處理 如果直接用nginx當然沒有這個問題 ,但用webpy讀取文件返回HttpResponse時問題來了,總不至于讀取整個文件,然后再return。 這種方式在php有flush方法,python只能用yield來做 2.上傳大文件時的處理 當接收到http的文件POST請求時,文件已經全部緩存到web服務器,如果同時幾千個文件上傳在進行,服務器就會被擠爆,這也是很多網站不允許大文件上傳的緣故吧。關于這個問題,我想就需要修改一下webpy關于文件上傳的處理代碼了,將接收到的文件數據以流的形式寫入到gridfs里去作為臨時文件被緩存,等完全接收文件時,才通知到handler代碼,這樣必定高效很多(新的問題又來了,會不會把gridfs搞爆掉! 處理時考慮延時緩存提交gridfs把)。 BUF_SIZE = 262144 class download: def GET(self): file_name = 'file_name' file_path = os.path.join('file_path', file_name) f = None try: f = open(file_path, "rb") webpy.header('Content-Type','application/octet-stream') webpy.header('Content-disposition', 'attachment; filename=%s.dat' % file_name) while True: c = f.read(BUF_SIZE) if c: yield c else: break except Exception, e: print e yield 'Error' finally: if f: f.close()
links: http://api.mongodb.org/python http://webpy.org/cookbook/storeupload.zh-cn http://webpy.org/cookbook/streaming_large_files http://gevent.org 下份代碼 demo很值得看哦 gevent 1.0 由libev 替換了libevent
摘要: 貼代碼 Code highlighting produced by Actipro CodeHighlighter (freeware)http://www.CodeHighlighter.com/--> 1 #--coding:utf-8-- 2 3 ... 閱讀全文
多樣的文本消息 ----------------- struct MimeText_t{ int type; string text; }; MimeText_t 可以包含普通的文本、圖像和音頻文件的id 圖像和音頻數據發送到服務器,服務器并不直接將數據發送到接收者,而是發送 音頻和圖像的描述uri信息 接收者解釋json,顯示text文本,讀取emoticon編號,顯示表情圖片; image,audio則顯示占位(如果當前wifi可用,則自己自動加載image和audio資源) ,如果非wifi信號則待用戶點擊此占位,然后從服務器請求image和audio資源到本地。 文本描述: 字體大小,顏色,文本link,表情符號 文本用json組織 , { set:[ text:{text:'this is',bg-color:#ff0000,color:#ffffff,font-name:'arial',font-size:20,bold:true,italic:true}, text:{text:'shanghai',color:#ff0000,font-name:'arial',font-size:20,bold:true,italic:true,link:'http://sw2us.com/images/shanghai.png'}, image:{id:1001,width:200,height:200,uri:'http://sw2us.com/images/bear.png'}, audio:{id:2001,duration:5,uri:'http://sw2us.com/clips/a001.mp3'}, location:{lon:121.221,lat,time,speed,direction,text:'立月路2001號浦星公路口'}, emoticon:{id:201} ], } 屬性名簡化: --------------------- ----------------------- 1 - text [ 1: text , 2: bg-color , 3: color , 4: font-name, 5:font-size, 6:bold, 7:italic ] 2 - image [ 1: id , 2:width , 3:height , 4:uri] 3 - audio [ 1:id , 2:duration,3:uri] 4 - location [ 1:lon, 2:lat, 3:time, 4:speed, 5:direction, 6:text] 5 - emoticon [ 1: id ] ----------------------- 0 - false 1 - true
接口定義: 1 interface IAuthServer{ 2 CallReturn_t userAuth(string user,string passwd,int device_type); 3 CallReturn_t registerUser(UserRegisterInfo_t reginfo); // tested 4 }; 定義認證服務器接口,userAuth()返回認證用戶的token 接口服務實現: 1 import os,os.path,sys,struct,time,traceback,signal,threading,copy,base64 2 import datetime,base64 3 4 from datetime import datetime 5 from base import * 6 import tcelib as tce 7 from showbox import * 8 import utils.misc 9 import utils.config 10 import utils.cipher 11 12 13 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "database.showbox.showbox.settings") 14 15 from django.db import connection 16 from django.db.models import Sum 17 from django.db import transaction 18 # import sns.core.models as cm 19 import database.showbox.core.models as core 20 import database.showbox.nosql.models as nosql 21 22 class AuthServerImpl(IAuthServer): 23 def __init__(self,app): 24 IAuthServer.__init__(self) 25 self.app = app 26 27 def userAuth(self, user, passwd, device_type, ctx): 28 cr = CallReturn_t() 29 try: 30 r = core.User.objects.get(user=user,passwd=passwd) 31 userinfo = { 32 "id":r.id, 33 "user":user, 34 "name":r.name, 35 "login_time":int(time.time()), 36 "user_type":SnsConsts.Authorized_User 37 } 38 token = utils.cipher.encryptToken(userinfo) 39 cr.value = token 40 except: 41 print traceback.format_exc() 42 cr = CallReturn_Error() 43 return cr 44 45 def registerUser(self, reginfo, ctx): 46 return IAuthServer.registerUser(self, reginfo, ctx) 47 48 49 50 class ServerApp: 51 def __init__(self): 52 pass 53 54 def getConfig(self): 55 #return self.app.getConfig() 56 pass 57 58 _handle = None 59 @classmethod 60 def instance(cls): 61 if cls._handle == None: 62 cls._handle = cls() 63 return cls._handle 64 65 def run(self): 66 tce.RpcCommunicator.instance().init('authserver').initMessageRoute('./services.xml') 67 server = tce.RpcCommunicator.instance().currentServer().findEndPointByName('mq_authserver').impl 68 adapter = tce.RpcAdapterEasyMQ.create('server',server) 69 #沒有主動發送消息的情形 70 servant = AuthServerImpl(self) 71 adapter.addServant(servant) 72 tce.RpcCommunicator.instance().waitForShutdown() 73 74 if __name__ == '__main__': 75 ServerApp.instance().run() 服務器很簡單,實現接口IAuthService的功能函數,定義一個ServerApp,然后運行 客戶調用測試: 1 from datetime import datetime 2 from base import * 3 import tcelib as tce 4 from showbox import * 5 import utils.misc 6 import utils.config 7 import utils.cipher 8 9 10 def userAuthResult(result,prx): 11 print result 12 13 # queue:client 必須在調用服務器的write 隊列mq 14 communicator =tce.RpcCommunicator.instance().init() 15 conn = tce.RpcConnectionEasyMQ.create('127.0.0.1',12301,'queue:mq_authserver') 16 local = tce.RpcConnectionEasyMQ.create('127.0.0.1',12301,'queue:mq_test_client',tce.AF_READ) 17 conn.setLoopbackMQ(local) 18 19 20 prx = IAuthServerPrx(conn) 21 prx.userAuth_async('test','111111',1,userAuthResult) #異步調用 22 print prx.userAuth('test','111111',1) #同步調用 21,22行分別測試兩種調用模式 client與server通過EasyMQ進行傳遞 easyMQ是個最簡單的消息隊列實現
在tce構架的平臺系統中,采集的用戶位置gps信息從網關gatewayserver接收并通過mq_gps消息隊列存儲到多個位置單元服務器 LocationUnit, 系統中存在若干個LocationServer提供查詢功能,當一次位置查詢時,LocationServer對集群的LocationUnit進行Map Reduce計算
idl的保留關鍵字:'byte','bool','short','int','long','float','double','string' ,均不能用于定義module,class,interface和變量名稱 定義的變量名稱如果包含以下單詞:'def','import','from','type','str','int','float','class' , tce生成python代碼時自動給添加'_'后綴,比如: struct xx{ string name; string from; } xx結構的from變量名將生成from_ 接口定義: module test{ dictionary<string,string> Properties_t; sequence<string> IpAddressList_t; interface ITerminal{ void onGetServerMessage(string text); } interface Server{ IpAddressList_t getIpAddresses(); Properties_t getProperties(); void ping(string fromhost); string login(string user,string passwd,ctx); }; } struct: tce將結構struct映射為class對象 ,初始化成員變量并創建散列函數 marshall/unmarshall sequence<T>: tce將數組類型直接映射為[] 例如 : dictionary<K,V> tce將字典映射為 {} python實現Server接口的getIpAddresses()方法: def getIpAddresses(): return ['192.168.14.101','192.168.12.50'] 定義服務器接口實現: tce為interface生成接口基類: class Server 我們提供一個實現類 : class ServerImpl(Server): def __init__(self): Server.__init__(self) def getIpAddresses(self,ctx): return [] 在這里我們提供了ServerImpl類,然后編寫實現函數getIpAddresses. 每個接口函數都攜帶ctx參數,ctx攜帶rpc請求的附屬信息,比如: 外帶數據(dict),底部的連接對象 等等 。 服務接口被稱為一個服務類servant ,接下來演示如何將這個servant裝配并提供客戶。 tce.RpcCommunicator.instance().init() ep = tce.RpcEndPoint(host='127.0.0.1',port=16005) 定義一個通信端點 adapter = tce.RpcCommunicator.instance().createAdapter('first_server',ep) 創建一個通信適配器 servant = ServerImpl() 創建服務接口對象 adapter.addServant(servant) 添加進適配器 tce.RpcCommunicator.instance().waitForShutdown() 進入通信循環 調用服務: tce.RpcCommunicator.instance().init() prx = test.ServerProxy.create(127.0.0.1,16005) ips = prx.getIpAddresses() 多種呼叫模式: tce將接口函數自動生成 normal,oneway,async三種調用接口方法 ,rpc調用出現異常,底部將拋出異常,所以用戶需要異常捕獲。 1.normal: 原型: fun_name(參數..,timeout=0,extra=None) 調用函數自動添加timeout,extra參數。timeout默認為0,將自動采用tce默認的30s等待調用返回時間; extra 指此次調用攜帶的附屬數據,extra ={'name':'scott','age':100} extra數據在服務端接口函數的ctx中獲?。?ctx.msg.extra 函數調用時將阻塞客戶線程,直到timeout超時或者服務器數據返回 2. oneway fun_name_oneway(參數...,extra=None) 只有類型void的接口函數才會生成oneway調用方法.oneway調用不會阻塞用戶線程,通常用于單向傳輸的場景,例如 Server接口的ping()函數 3. async fun_name_async(參數,async_callback,extra=None) 異步調用模式不會阻塞客戶線程,async_callback指定了rpc調用的返回接收函數 接收函數原型: void fun_name_CallBack(result,proxy) 例如: def getIpAddressesResult(result,proxy): print result #result - IpAddressList_t prx.getIpAddresses_async(getIpAddressesResult) *連接復用 在互聯網應用場景,服務器將接入大量的客戶端設備,客戶端是不能被尋址,所以服務器要完成推送消息給客戶端,必須在客戶端建立的連接上反向傳輸。 tce使這個工作變得相當簡單: 1. 客戶端定義接收消息的接口 ITerminal,定義接收函數onGetServerMessage() class TermnialImpl(ITerminal): ... 2. 創建到服務器的連接代理 tce.RpcCommunicator.instance().init() prx = test.ServerProxy.create(127.0.0.1,16005) 創建但并不馬上連接 3. 添加服務類實現 adapter = tce.RpcCommAdapter('adapter') impl = TerminalImpl() adapter.addConnection(prx.conn) adapter.addServant(impl) 加到通信器對象 3. 請求一次調用 prx.login('scott','1234') 4. 服務器端反向調用ITerminal的onGetServerMessage() def login(self,user,passwd,ctx): prx = ITerminalProxy(ctx.conn) prx.onGetServerMessage('server message..') 完成一次對設備端的接口調用
同樣在函數中連接pgsql,然后執行500次查詢, 測試gevent模式、串行查詢、多線程查詢 數據如下: multithread_test cost time: 2.45199990273 normal_test cost time: 4.04299998283 gevent_test cost time: 2.12800002098 結果 串行最慢4.4s, 多線程 2.45s ,gevent最快2.12 ,yes! 測試代碼: 1 import gevent 2 import gevent.queue 3 4 import psycopg2 5 import psycopg2.extensions 6 7 import psycogreen.gevent 8 9 psycogreen.gevent.patch_psycopg() 10 11 sys.path.insert(0,'../') 12 13 import easymq 14 15 ''' 16 在同一線程中,同一個連接conn上兩次創建的cur將會是一樣滴,因為是異步wait_read()緣故 17 所以要么每次創建數據庫連接,要么使用dbpool 18 ''' 19 20 21 def readThread(): 22 conn = psycopg2.connect(database='postgres',user='postgres',password='111111') 23 24 # cur = conn.cursor(cursor_factory=psycopg2.extensions.DictCursor) 25 cur = conn.cursor(cursor_factory=psycopg2.extensions.cursor) 26 27 # cur.execute("select pg_sleep(%s)", (2,)) 28 for n in range(10): 29 cur.execute("select CURRENT_DATE") 30 # print cur.fetchone() 31 # print 'read end..' 32 conn = None 33 34 35 def gevent_test(): 36 jobs=[] 37 for n in range(100): 38 jobs.append(gevent.spawn(readThread)) 39 gevent.joinall(jobs) 40 41 def normal_test(): 42 for n in range(100): 43 readThread() 44 45 def multithread_test(): 46 threads=[] 47 for n in range(100): 48 thread = threading.Thread(target=readThread) 49 threads.append(thread) 50 thread.start() 51 for thread in threads: 52 thread.join() 53 54 start = time.time() 55 normal_test() 56 end = time.time() 57 print 'normal_test cost time:',end-start 58 59 start = time.time() 60 gevent_test() 61 end = time.time() 62 print 'gevent_test cost time:',end-start 63 64 # start = time.time() 65 # multithread_test() 66 # end = time.time() 67 # print 'multithread_test cost time:',end-start 68
搞了這么久的RPC通信框架TCE,完成java,c++,python,javascript,actionscript之間的互相調來調去,感覺很舒服。
作為移動應用平臺,海量并發和高效傳輸是首要考慮要點。 市面上充值著都差不多的解決技術方案,無非那些 webserver+db ngnix+webserver+mq+logic-server ngnix+gevent-wsgi+db webapi已經被高舉到不可超越的地步
而我,不走尋常路,我得另辟捷徑 -http的效率根本無法跟socket的長連接媲美 -服務器是需要反向推送消息到移動設備的 -操作接口是簡單的易擴展的,屏蔽掉通信細節 -支持htm5的websocket,支持java,支持python,支持python客戶端調用
那我的方案是tce為基礎的RPC框架平臺,拋棄那些xmls,json,讓開發者從無盡的網絡編解碼工作中脫離出來,不用考慮多種通信模式,同步和異步。 font-gate : 前端接入服務器 easymq : 平臺服務總線消息隊列 logic-service : 不同的邏輯服務器
設想,在android手機上java代碼調用函數 whats_yourname(), 這個函數并不在本地,而是存在遠端平臺內部的一個服務器上,調用并被執行返回'scott'到手機終端,這是多么令人快樂的事情,用戶不用關心消息如何被列集,如何被分派,這一切都是透明的。 同樣,服務器主動推送商品打折信息到手機上,服務器僅僅需要調用手機接收函數,并填寫要傳輸的參數即可。 其實,這些就是RPC的實現,這樣的東東到處都是,DCOM,CORBA,ICE,只是我做得更加靈活
總是想做些令人輕松并快樂的事情!
gevent作為一款優秀的網絡通信框架,其出色的性能得到大家一致認可,但在處理并行任務的時候也要注意很多問題,不然您的服務器將變得異常緩慢。 http://blog.163.com/lxl_1995/blog/static/677173392012724103742746/ 這篇博文講的非常清楚,建議讀一下 gevent的特點如下: 1. 單線程執行,所有協程都在同一進程中被模擬和調度分派 2. 可以創建成千上萬的 協程,而不會受任何性能影響 3. 由于spawn的協程不是os分配和管理,所以不會有額外的線程資源分配,cpu也不用在這些線程之間調度切換 4. 單線程執行,無需考慮資源互斥 5. 協程之間切換是通過gevent的io阻塞完成,例如 gevent.sleep(0), queue.get/put,event,socket.... 每調用一次gevent 的api,gevent就能獲得一次schedule的機會(這很類似操作系統的用戶調用中斷,由用戶態切換到內核態) 以上特點保證gevent的性能非常出色,但當我們的server用到第三方軟件包的時候那要非常小心了,特別是這些包內部涉及了io操作。 如果第三方軟件包是純python的那很簡單,只需要gevent.monkey_patch(xxx)就okay; 但如果包是擴展clib,那要當心了,monkey_patch 并不能將其相關io操作打上補丁,為了使用這些第三方軟件包,要求這些軟件包必須支持 協程異步 接口(調用其同步io接口,將阻塞住gevent的執行線程,那gevent就完蛋了)。 gevent的patch對psycopg2無效,因為psycopg2的通信部分是c接口的函數庫,還好psycopg2內部支持協程,需要使用 到 psycogreen 這個東東 psycogreen.gevent.patch_psycopg() 支持協程
之后的在gevent的線程中執行sql并等待數據返回時,gevent立馬將執行切換到另外的線程 gvent項目中會用到各種諸多的第三方庫,必須要求這些庫的io接口不能是阻塞的,也就是能支持到gevent異步模式 應用邏輯代碼在被執行時(無系統api呼叫),單線程比多線程執行速度更快。循環執行一段計算二次函數代碼,由于期間沒有系統api調用,os不能進行內核tasklet切換,所以導致cpu的峰值可以攀升到90%,直到硬件、時鐘等中斷產生,強行切換到其他線程。 多核心cpu表現為單個核始終異常的忙碌,其他幾個比較空閑。
easymq 用于替代qpid的消息中間件。通信基礎采用tce引擎,提供topic和queue兩種隊列。 mq服務器啟動加載mq條目,建立mq內存對象,提供認證,客戶程序連接時指定mq名稱和認證口令, 管理程序可以動態增加、刪除和監視隊列。 mq持久化支持,根據創建參數控制durable。 easymq第一個版利用可以用python實現,之后考慮資源利用和系統會用c++實現 easymq是tce一個很好的應用。 定位夠輕,夠簡單,暫不考慮負載均橫和自動路由。 實例化mq服務器 1 def start(self): 2 tce.RpcCommunicator.instance().init('easymq.server') 3 ep = tce.RpcEndPoint(host=self.default_host,port=self.default_port) 4 adapter = tce.RpcCommunicator.instance().createAdapter('first',ep) 5 servant = self 6 adapter.addServant(servant) 7 print 'wait for shutdown..' 8 tce.RpcCommunicator.instance().waitForShutdown() 1 server = Server.instance() 2 print 'easymq server launched..' 3 server.init().start() 接收消息 1 import easymq 2 3 def readThread(conn): 4 while True: 5 m = conn.read( ) 6 print 'got one:',m 7 8 if __name__=='__main__': 9 easymq.init() 10 conn = easymq.Connection(('127.0.0.1',12301),'test',mode=easymq.READWRITE) 11 conn.open() 12 readThread(conn) 發送消息到接收者 1 import easymq 2 3 if __name__=='__main__': 4 easymq.init() 5 6 conn = easymq.Connection(('127.0.0.1',12301),'test',mode=easymq.WRITE) 7 conn.open() 8 for n in range(100): 9 conn.write(str(n)*10) 10 # waitForShutdown() 11 gevent.sleep(2)
|