http://www.tuicool.com/articles/zYfAnea
最近實現一個二維碼掃描登錄的功能,當用戶用移動設備掃描PC端頁面的二維碼之后,移動設備通過常規HTTP短連接向服務器獲取認證數據,認證通過后,服務器向PC瀏覽器主動推送帳號相關信息以完成PC端頁面的登錄。
服務器主動向瀏覽器推送數據,基本上就是ajax輪詢、iframe stream、websocket等等,可以參見 《Comet (web技術)》
推送服務器有很多種,當然用強大穩定又順手的nginx了。 nginx相關的推送插件模塊有nginx-push-stream-module、nginx_http_push_module,但是很遺憾,可配置不可編程。又到我們的主角 OpenResty (OpenResty (aka. ngx_openresty) is a full-fledged web application server by bundling the standard Nginx core, lots of 3rd-party Nginx modules, as well as most of their external dependencies)出馬的時候了。
1、PC瀏覽器向php發起ajax請求,獲取一個與當前session相關的唯一的二維碼URL,和一個唯一的sub訂閱URL。
2、PC瀏覽器顯示二維碼,并對sub訂閱URL發起ajax長連接或者websocket連接,這個請求將直接由nginx來hold住,超時時間由配置參數push_free_timeout決定。
3、手機端掃描并解析二維碼,向php發起認證。
4、php收到移動設備的請求后,解出sessionid,向nginx的pub接口發布數據,該數據將被直接投遞到對應的sub接口,并回傳到瀏覽器。
5、如果sub接口在push_free_timeout指定的時間內一直沒有收到數據,將主動斷開與瀏覽器端的連接。此時,瀏覽器可以根據業務場景決定是否重新發起連接。
出于性能考慮,使用ngx.shared共享內存存儲消息,只能共享于一個ngx實例內,對于10k級別的聊天室并發連接應該是夠用了。
使用redis作為外部存儲,也是可以的,如果100k的并發,需要注意ngx對nosql發起連接時耗盡socket,當然這個是可以解決的。
更大規模的并發,值得自研推送服務器。
在生產環境某個LB節點上試運行過,openresty跑著waf\fastcgi proxy\http proxy\comet。常態1k并發連接數,load 0.01,40k并發時,load只有0.15。
下面是相關測試代碼,如果有空完善了再托管到github上,考慮寫一個聊天室的完整demo。
resty.push基礎模塊(需要使用到ngx.shared共享內存來存儲消息,在nginx.conf的http段配置lua_shared_dict push 10m;)
--[[
-- /usr/local/openresty/lualib/resty/push.lua
-- push.lua ,resty.push 基于nginx_lua的push推送方案
-- 支持多對多頻道
-- 支持long-pooling, stream, websocket
--
-- Author: chuyinfeng.com <Liujiaxiong@kingsoft.com>
-- 2014.03.12
--]]
local _M = {_VERSION = '0.01'}
local function debug(msg)
--ngx.say(msg)
--ngx.flush(true)
end
-- 配置信息
_M.config = {
-- 推送間隔,1s
['push_interval'] = 1,
-- 消息隊列最大長度
['msglist_len'] = 100,
-- 消息生存周期
['msg_lefttime'] = 3,
-- 頻道空閑超時
['channel_timeout'] = 30,
-- 推送空閑超時,在改時間段內無消息則關閉當前推送連接
['push_free_timeout'] = 10,
-- 共享內存名
['store_name'] = 'push',
-- 頻道號
['channels'] = {1, 2},
}
-- 頻道數量
_M.channels_len = 0
-- 當前讀位置
_M.idx_read = 0
-- 共享內存
_M.store = nil
-- cjson 模塊
local cjson = require "cjson"
--[[
-- 設置
--]]
_M.opt = function(self, k, v)
local t = type(k)
if t == 'table' then
for key, val in pairs(k) do
self.config[key] = val
end
end
if t == 'string' then
self.config[k] = v
end
self.channels_len = table.maxn(self.config['channels'])
self.store = ngx.shared[self.config['store_name']]
end
--[[
-- 向頻道寫入消息
--
-- @param ngx.shared.dict, 共享內存
-- @param string channel_id,可用ngx.crc32_long生成
-- @param int channel_timeout, 頻道空閑超時時間
-- @param string msg,消息內容 必須為字符串
-- @param int msg_lefttime, 消息生存周期
-- @param int msglist_len, 消息隊列長度
-- @return boolean
--]]
local function _write(store, channel_id, channel_timeout, msg, msg_lefttime, msglist_len)
local idx, ok, err
-- 消息當前讀取位置計數器+1
idx, err = store:incr(channel_id, 1)
-- 如果異常,則新建頻道
if err then
ok, err = store:set(channel_id, 1, channel_timeout)
if err then return 0 end
idx = 1
else
store:replace(channel_id, idx, channel_timeout)
end
-- 寫入消息
debug("write " .. channel_id .. idx .. " , lefttime: " .. msg_lefttime.. " , msg: " .. msg)
ok, err = store:set('m' .. channel_id .. idx, msg, msg_lefttime)
if err then return 0 end
-- 清除隊列之前的舊消息
if idx > msglist_len then
store:delete('m' .. channel_id .. (idx - msglist_len))
end
return idx
end
--[[
-- 從頻道讀取消息
--
-- @param int channel_id, 必須為整形,可用ngx.crc32_long生成
-- @param int offset,歷史偏移量,最小為0
-- @return int len, 剩余消息數量
-- @return string msg, 消息
--]]
local _read = function (store, channel_id, msglist_len, idx_read)
local idx_msg, err, msg
-- 獲取最新消息的位置
idx_msg, _ = store:get(channel_id)
idx_msg = idx_msg or 0
if idx_msg == 0 then
idx_read = 0
end
if idx_msg - idx_read > msglist_len then
idx_read = idx_msg - msglist_len
end
if idx_read < idx_msg then
idx_read = idx_read + 1
msg, _ = store:get('m' .. channel_id .. idx_read)
end
-- 返回讀的位置和消息的最大位置,以及消息
return idx_read, idx_msg, msg
end
--[[
-- 推送消息
-- @param callback wrapper, 消息包裝回調函數
--]]
_M.push = function(self, wrapper)
local flag_work = true
local flag_read = true
local idx_read, idx_msg, msg, err
local time_last_msg = ngx.time()
while flag_work do
for i = 1, self.channels_len do
-- 循環讀取當前頻道,直到EOF
flag_read = true
while flag_read do
debug("read from idx_read: " .. self.idx_read)
self.idx_read, idx_msg, msg = _read(self.store, self.config['channels'][i], self.config['msglist_len'], self.idx_read)
if msg ~= nil then
debug("got msg and wrapper msg: " .. msg)
time_last_msg = ngx.time()
wrapper(msg)
end
debug("idx_read: " .. self.idx_read .. ", idx_msg: " .. idx_msg)
if self.idx_read == idx_msg then flag_read = false end
end
end
debug("push_free: " .. ngx.time() - time_last_msg)
if ngx.time() - time_last_msg >= self.config['push_free_timeout'] then
debug("push_timeout: " .. " last: " .. time_last_msg .. " , now: " .. ngx.time())
flag_work = false
end
debug("sleep: " .. self.config['push_interval'])
ngx.sleep(self.config['push_interval'])
end
end
--[[
-- 發送消息到指定頻道
--]]
_M.send = function(self, msg)
local idx = 0
for i = 1, self.channels_len do
idx = _write(self.store, self.config['channels'][i], self.config['channel_timeout'], msg, self.config['msg_lefttime'], self.config['msglist_len'])
end
return true
end
--[[
-- jsonp格式化
--]]
_M.jsonp = function(self, data, cb)
if cb then
return cb .. "(" .. cjson.encode(data) .. ");"
else
return cjson.encode(data)
end
end
--[[
-- 公開成員
--]]
_M.new = function(self)
return setmetatable({}, { __index = _M })
end
return _M
public發布
local push = require "resty.push"
local function exit(is_ws)
if is_ws == nil then ngx.eof() end
ngx.exit(444)
end
local ok, err = ngx.on_abort(exit)
if err then return end
local pub = push:new()
pub:opt({
['channels'] = 123,
['push_interval'] = 0.1,
['push_free_timeout'] = 27,
})
ngx.req.read_body()
local body, err = ngx.req.get_post_args()
if err then exit() end
if pub:send(body['data'] or '') then
ngx.header['Content-Type'] = 'text/javascript;charset=UTF-8'
ngx.status = ngx.HTTP_OK
ngx.say(pub:jsonp({['status'] = 1}, args['callback']))
end
subscribe 訂閱接口
local push = require "resty.push"
local function exit(is_ws)
if is_ws == nil then ngx.eof() end
ngx.exit(444)
end
local ok, err = ngx.on_abort(exit)
if err then return end
local args = ngx.req.get_uri_args()
local sub = push:new()
sub:opt({
['channels'] = 123,
['push_interval'] = 0.1,
['push_free_timeout'] = 27,
})
local wrapper = function(msg)
ngx.header['Content-Type'] = 'text/javascript;charset=UTF-8'
ngx.status = ngx.HTTP_OK
ngx.say(sub:jsonp(msg, args['callback']))
exit()
end
sub:push(wrapper)
wrapper(sub:jsonp({['status'] = 1, ['tips'] = 'timeout'}))
posted on 2016-08-04 08:55
思月行云 閱讀(4356)
評論(0) 編輯 收藏 引用 所屬分類:
Nginx\Openresty