• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>
            Fork me on GitHub
            隨筆 - 215  文章 - 13  trackbacks - 0
            <2016年1月>
            272829303112
            3456789
            10111213141516
            17181920212223
            24252627282930
            31123456


            專注即時通訊及網游服務端編程
            ------------------------------------
            Openresty 官方模塊
            Openresty 標準模塊(Opm)
            Openresty 三方模塊
            ------------------------------------
            本博收藏大部分文章為轉載,并在文章開頭給出了原文出處,如有再轉,敬請保留相關信息,這是大家對原創作者勞動成果的自覺尊重!!如為您帶來不便,請于本博下留言,謝謝配合。

            常用鏈接

            留言簿(1)

            隨筆分類

            隨筆檔案

            相冊

            Awesome

            Blog

            Book

            GitHub

            Link

            搜索

            •  

            積分與排名

            • 積分 - 215745
            • 排名 - 118

            最新評論

            閱讀排行榜

            http://www.tuicool.com/articles/zYfAnea


            最近實現一個二維碼掃描登錄的功能,當用戶用移動設備掃描PC端頁面的二維碼之后,移動設備通過常規HTTP短連接向服務器獲取認證數據,認證通過后,服務器向PC瀏覽器主動推送帳號相關信息以完成PC端頁面的登錄。

            服務器主動向瀏覽器推送數據,基本上就是ajax輪詢、iframe stream、websocket等等,可以參見 《Comet (web技術)》

            推送服務器有很多種,當然用強大穩定又順手的nginx了。 nginx相關的推送插件模塊有nginx-push-stream-module、nginx_http_push_module,但是很遺憾,可配置不可編程。又到我們的主角 OpenResty (OpenResty (aka. ngx_openresty) is a full-fledged web application server by bundling the standard Nginx core, lots of 3rd-party Nginx modules, as well as most of their external dependencies)出馬的時候了。

            1、PC瀏覽器向php發起ajax請求,獲取一個與當前session相關的唯一的二維碼URL,和一個唯一的sub訂閱URL。

            2、PC瀏覽器顯示二維碼,并對sub訂閱URL發起ajax長連接或者websocket連接,這個請求將直接由nginx來hold住,超時時間由配置參數push_free_timeout決定。

            3、手機端掃描并解析二維碼,向php發起認證。

            4、php收到移動設備的請求后,解出sessionid,向nginx的pub接口發布數據,該數據將被直接投遞到對應的sub接口,并回傳到瀏覽器。

            5、如果sub接口在push_free_timeout指定的時間內一直沒有收到數據,將主動斷開與瀏覽器端的連接。此時,瀏覽器可以根據業務場景決定是否重新發起連接。

            出于性能考慮,使用ngx.shared共享內存存儲消息,只能共享于一個ngx實例內,對于10k級別的聊天室并發連接應該是夠用了。
            使用redis作為外部存儲,也是可以的,如果100k的并發,需要注意ngx對nosql發起連接時耗盡socket,當然這個是可以解決的。
            更大規模的并發,值得自研推送服務器。

            在生產環境某個LB節點上試運行過,openresty跑著waf\fastcgi proxy\http proxy\comet。常態1k并發連接數,load 0.01,40k并發時,load只有0.15。

            下面是相關測試代碼,如果有空完善了再托管到github上,考慮寫一個聊天室的完整demo。

            resty.push基礎模塊(需要使用到ngx.shared共享內存來存儲消息,在nginx.conf的http段配置lua_shared_dict push 10m;)

            --[[
            -- /usr/local/openresty/lualib/resty/push.lua
            -- push.lua ,resty.push 基于nginx_lua的push推送方案
            -- 支持多對多頻道 
            -- 支持long-pooling, stream, websocket
            --
            -- Author: chuyinfeng.com <Liujiaxiong@kingsoft.com> 
            -- 2014.03.12
            --]]

            local _M = {_VERSION = '0.01'}

            local function debug(msg)
              --ngx.say(msg)
              --ngx.flush(true)
            end

            -- 配置信息
            _M.config = {
              -- 推送間隔,1s
              ['push_interval'] = 1,
              -- 消息隊列最大長度
              ['msglist_len'] = 100,
              -- 消息生存周期
              ['msg_lefttime'] = 3,
              -- 頻道空閑超時
              ['channel_timeout'] = 30,
              -- 推送空閑超時,在改時間段內無消息則關閉當前推送連接
              ['push_free_timeout'] = 10,
              -- 共享內存名
              ['store_name'] = 'push',
              -- 頻道號
              ['channels'] = {1, 2},
            }


            -- 頻道數量
            _M.channels_len = 0
            -- 當前讀位置
            _M.idx_read = 0
            -- 共享內存
            _M.store = nil


            -- cjson 模塊
            local cjson = require "cjson"

            --[[ 
            -- 設置
            --]]
            _M.opt = function(self, k, v)
              local t = type(k)
              if t == 'table' then
                for key, val in pairs(k) do
                  self.config[key] = val
                end
              end

              if t == 'string' then
                self.config[k] = v
              end

              self.channels_len = table.maxn(self.config['channels'])
              self.store = ngx.shared[self.config['store_name']]

            end


            --[[
            -- 向頻道寫入消息
            --
            -- @param ngx.shared.dict, 共享內存
            -- @param string channel_id,可用ngx.crc32_long生成
            -- @param int channel_timeout, 頻道空閑超時時間
            -- @param string msg,消息內容 必須為字符串
            -- @param int msg_lefttime, 消息生存周期
            -- @param int msglist_len, 消息隊列長度
            -- @return boolean
            --]]
            local function _write(store, channel_id, channel_timeout, msg, msg_lefttime, msglist_len)
              local idx, ok, err

              -- 消息當前讀取位置計數器+1
              idx, err = store:incr(channel_id, 1)

              -- 如果異常,則新建頻道
              if err then
                ok, err = store:set(channel_id, 1, channel_timeout)
                if err then return 0 end
                idx = 1
              else
                store:replace(channel_id, idx, channel_timeout)
              end


              -- 寫入消息
              debug("write " .. channel_id .. idx .. " , lefttime: " .. msg_lefttime.. " , msg: " .. msg)
              ok, err = store:set('m' .. channel_id .. idx, msg,  msg_lefttime)
              if err then return 0 end

              -- 清除隊列之前的舊消息
              if idx > msglist_len then
                store:delete('m' .. channel_id .. (idx - msglist_len))
              end

              return idx
            end

            --[[
            -- 從頻道讀取消息
            --
            -- @param int channel_id, 必須為整形,可用ngx.crc32_long生成
            -- @param int offset,歷史偏移量,最小為0
            -- @return int len, 剩余消息數量
            -- @return string msg, 消息  
            --]]
            local _read = function (store, channel_id, msglist_len, idx_read)
              local idx_msg, err, msg

               

              -- 獲取最新消息的位置
              idx_msg, _ = store:get(channel_id)
              idx_msg = idx_msg or 0

              if idx_msg == 0 then
                idx_read = 0
              end
              
              if idx_msg - idx_read > msglist_len then
                idx_read = idx_msg - msglist_len
              end
              
              if idx_read < idx_msg then
                idx_read = idx_read + 1
                msg, _ = store:get('m' .. channel_id .. idx_read) 
              end

              -- 返回讀的位置和消息的最大位置,以及消息
              return idx_read, idx_msg, msg
            end

            --[[
            -- 推送消息
            -- @param callback wrapper, 消息包裝回調函數
            --]]
            _M.push = function(self, wrapper)
              local flag_work = true
              local flag_read = true
              local idx_read, idx_msg, msg, err
              local time_last_msg = ngx.time()
              
              while flag_work do
                 for i = 1, self.channels_len do
                  -- 循環讀取當前頻道,直到EOF
                  flag_read = true
                  while flag_read do
                    debug("read from idx_read: " .. self.idx_read)
                    self.idx_read, idx_msg, msg = _read(self.store, self.config['channels'][i], self.config['msglist_len'], self.idx_read)
                    if msg ~= nil then
                      debug("got msg and wrapper  msg: " .. msg)
                      time_last_msg = ngx.time()
                      wrapper(msg)
                    end
                    debug("idx_read: " .. self.idx_read .. ", idx_msg: " .. idx_msg)
                    
                    if self.idx_read == idx_msg then flag_read = false end
                  end
                  
                end

                debug("push_free: " .. ngx.time() - time_last_msg)
                if ngx.time() - time_last_msg  >= self.config['push_free_timeout'] then
                  debug("push_timeout: " .. " last: " .. time_last_msg .. " , now: " .. ngx.time())
                  flag_work = false
                end
                

                debug("sleep: " .. self.config['push_interval'])
                ngx.sleep(self.config['push_interval'])
              end

            end

            --[[
            -- 發送消息到指定頻道
            --]]
            _M.send = function(self, msg)
              local idx = 0
              for i = 1, self.channels_len do
                idx = _write(self.store, self.config['channels'][i], self.config['channel_timeout'], msg, self.config['msg_lefttime'], self.config['msglist_len'])
              end
              return true
            end

            --[[
            -- jsonp格式化
            --]]
            _M.jsonp = function(self, data, cb)
                if cb then
                    return cb .. "(" .. cjson.encode(data)  .. ");"
                else
                    return cjson.encode(data)
                end
            end


            --[[
            -- 公開成員
            --]]

            _M.new = function(self)
              return setmetatable({}, { __index = _M })
            end

            return _M

            public發布

            local push = require "resty.push"

            local function exit(is_ws)
              if is_ws == nil then ngx.eof() end
              ngx.exit(444)
            end
            local ok, err = ngx.on_abort(exit)
            if err then return end

            local pub = push:new()

            pub:opt({
              ['channels'] = 123,
              ['push_interval'] = 0.1,
              ['push_free_timeout'] = 27,
            })

            ngx.req.read_body()
            local body, err = ngx.req.get_post_args()
            if err then exit() end
            if pub:send(body['data'] or '') then
              ngx.header['Content-Type'] = 'text/javascript;charset=UTF-8'
              ngx.status = ngx.HTTP_OK
              ngx.say(pub:jsonp({['status'] = 1}, args['callback']))
            end

            subscribe 訂閱接口

            local push = require "resty.push"

            local function exit(is_ws)
              if is_ws == nil then ngx.eof() end
              ngx.exit(444)
            end
            local ok, err = ngx.on_abort(exit)
            if err then return end

            local args = ngx.req.get_uri_args()

            local sub = push:new()

            sub:opt({
              ['channels'] = 123,
              ['push_interval'] = 0.1,
              ['push_free_timeout'] = 27,
            })

            local wrapper = function(msg)
              ngx.header['Content-Type'] = 'text/javascript;charset=UTF-8'
              ngx.status = ngx.HTTP_OK
              ngx.say(sub:jsonp(msg, args['callback']))
              exit()
            end

            sub:push(wrapper)
            wrapper(sub:jsonp({['status'] = 1, ['tips'] = 'timeout'}))
            posted on 2016-08-04 08:55 思月行云 閱讀(4356) 評論(0)  編輯 收藏 引用 所屬分類: Nginx\Openresty
            国产欧美久久久精品| 国产精品久久久99| 久久久这里有精品| 性色欲网站人妻丰满中文久久不卡| 久久人人爽人人人人爽AV| 综合人妻久久一区二区精品| 久久精品国产亚洲av日韩| 国产国产成人精品久久| 久久精品国产欧美日韩| 欧洲成人午夜精品无码区久久| 精品久久久噜噜噜久久久| 欧美日韩精品久久久久| 久久人人爽人人爽人人片AV不 | 久久久久亚洲av成人无码电影 | 久久九色综合九色99伊人| 欧美久久一区二区三区| 成人久久精品一区二区三区| 久久久久无码精品国产app| 久久久久成人精品无码中文字幕 | 精品多毛少妇人妻AV免费久久 | 精品久久久久久亚洲| 亚洲欧美精品一区久久中文字幕| 无码国内精品久久人妻| 久久99国产精品成人欧美| 久久精品亚洲中文字幕无码麻豆 | 亚洲欧美日韩精品久久| 99久久夜色精品国产网站| 国产精品成人精品久久久 | 久久婷婷人人澡人人| 久久国产精品99国产精| 午夜精品久久久久久影视riav| 中文字幕亚洲综合久久2| 国产Av激情久久无码天堂| 青青草原综合久久大伊人| 三级片免费观看久久| 精品久久久久久久中文字幕| 久久免费视频观看| 久久久久久九九99精品| 亚洲中文字幕无码久久2017| 亚洲AⅤ优女AV综合久久久| 久久99精品免费一区二区|