diff --git a/lib/config.lua b/lib/config.lua index e9affec..262f0e1 100644 --- a/lib/config.lua +++ b/lib/config.lua @@ -28,6 +28,9 @@ local storeConfigs = { -- Wiola Runtime configuration local wiolaConf = { + socketTimeout = 100, + maxPayloadLen = 65536, + realms = {}, store = "redis", storeConfig = { host = "unix:///tmp/redis.sock", @@ -92,6 +95,18 @@ function _M.config(config) return wiolaConf end + if config.socketTimeout then + wiolaConf.socketTimeout = config.socketTimeout + end + + if config.maxPayloadLen then + wiolaConf.maxPayloadLen = config.maxPayloadLen + end + + if config.realms then + wiolaConf.realms = config.realms + end + if config.store then wiolaConf.store = config.store end diff --git a/lib/raw-handler.lua b/lib/raw-handler.lua index b36e96d..7e53bfc 100644 --- a/lib/raw-handler.lua +++ b/lib/raw-handler.lua @@ -3,32 +3,86 @@ -- User: Konstantin Burkalev -- Date: 16.03.14 -- -local wsServer = require "resty.websocket.server" +local WAMP_PAYLOAD_LENGTHS = { + [512] = 0, + [1024] = 1, + [2048] = 2, + [4096] = 3, + [8192] = 4, + [16384] = 5, + [32768] = 6, + [65536] = 7, + [131072] = 8, + [262144] = 9, + [524288] = 10, + [1048576] = 11, + [2097152] = 12, + [4194304] = 13, + [8388608] = 14, + [16777216] = 15 +} + local wiola = require "wiola" -local webSocket, wampServer, ok, err, bytes +local config = require("wiola.config").config() +local wiola_max_payload_len = WAMP_PAYLOAD_LENGTHS[config.maxPayloadLen] or 65536 +local bit = require "bit" +local tcpSocket, wampServer, cliMaxLength, serializer, serializerStr, data, err, ok, cliData -webSocket, err = wsServer:new({ - timeout = tonumber(ngx.var.wiola_socket_timeout, 10) or 100, - max_payload_len = tonumber(ngx.var.wiola_max_payload_len, 10) or 65535 -}) +tcpSocket, err = ngx.req.socket(true) -if not webSocket then +if not tcpSocket then return ngx.exit(444) end +tcpSocket:settimeout(config.socketTimeout) + wampServer, err = wiola:new() if not wampServer then return ngx.exit(444) end -local sessionId, dataType = wampServer:addConnection(ngx.var.connection, ngx.header["Sec-WebSocket-Protocol"]) +data, err = tcpSocket:receive(4) + +if data == nil then + return ngx.exit(444) -- tcpSocket:close() +end + +if string.byte(data) ~= 0x7F then + return ngx.exit(444) +elseif string.byte(data, 3) ~= 0x0 or string.byte(data, 4) ~= 0x0 then + cliData = string.char(0x7F, bit.bor(bit.lshift(3, 4), 0), 0, 0) + tcpSocket:send(cliData) + return ngx.exit(444) +end + +cliMaxLength = math.pow(2, 9 + bit.rshift(string.byte(data, 2), 4)) +serializer = bit.band(string.byte(data, 2), 0xf) + +if serializer == 1 then + serializerStr = "wamp.2.json" +elseif serializer == 2 then + serializerStr = "wamp.2.msgpack" +else + cliData = string.char(0x7F, bit.bor(bit.lshift(1, 4), 0), 0, 0) + tcpSocket:send(cliData) + return ngx.exit(444) +end + +local sessionId, dataType = wampServer:addConnection(ngx.var.connection, serializerStr) + +cliData = string.char(0x7F, bit.bor(bit.lshift(wiola_max_payload_len, 4), serializer), 0, 0) +data, err = tcpSocket:send(cliData) + +if not data then + return ngx.exit(444) +end local function removeConnection(_, sessId) - local config = require("wiola.config").config() + local wconfig = require("wiola.config").config() local store = require('wiola.stores.' .. config.store) - ok, err = store:init(config) + ok, err = store:init(wconfig) if not ok then else store:removeSession(sessId) @@ -44,21 +98,26 @@ if not ok then ngx.exit(444) end +local function getLenBytes(len) + local b3 = bit.band(len, 0xff) + len = bit.rshift(len, 8) + local b2 = bit.band(len, 0xff) + len = bit.rshift(len, 8) + local b1 = bit.band(len, 0xff) + return string.char(b1, b2, b3) +end + while true do - local cliData, data, typ, hflags + local hflags, msgType, msgLen hflags = wampServer:getHandlerFlags(sessionId) if hflags ~= nil then if hflags.sendLast == true then cliData = wampServer:getPendingData(sessionId, true) - if dataType == 'binary' then - bytes, err = webSocket:send_binary(cliData) - else - bytes, err = webSocket:send_text(cliData) - end + data, err = tcpSocket:send(cliData) - if not bytes then + if not data then end end @@ -70,57 +129,62 @@ while true do cliData = wampServer:getPendingData(sessionId) while cliData ~= ngx.null do - if dataType == 'binary' then - bytes, err = webSocket:send_binary(cliData) - else - bytes, err = webSocket:send_text(cliData) - end - if not bytes then + msgLen = string.len(cliData) + + if msgLen < cliMaxLength then + cliData = string.char(0) .. getLenBytes(msgLen) .. cliData + data, err = tcpSocket:send(cliData) + + if not data then + end end + -- TODO Handle exceeded message length situation + cliData = wampServer:getPendingData(sessionId) end - if webSocket.fatal then + data, err = tcpSocket:receive(4) + + if data == nil then ngx.timer.at(0, removeConnection, sessionId) return ngx.exit(444) end - data, typ = webSocket:recv_frame() + msgType = bit.band(string.byte(data), 0xff) + msgLen = bit.lshift(string.byte(data, 2), 16) + + bit.lshift(string.byte(data, 3), 8) + + string.byte(data, 4) - if not data then + if msgType == 0 then -- regular WAMP message - bytes, err = webSocket:send_ping() - if not bytes then + data, err = tcpSocket:receive(msgLen) + + if data == nil then ngx.timer.at(0, removeConnection, sessionId) return ngx.exit(444) end - elseif typ == "close" then - bytes, err = webSocket:send_close(1000, "Closing connection") - if not bytes then - return - end - ngx.timer.at(0, removeConnection, sessionId) - webSocket:send_close() - break + wampServer:receiveData(sessionId, data) - elseif typ == "ping" then + elseif msgType == 1 then -- PING - bytes, err = webSocket:send_pong() - if not bytes then + data, err = tcpSocket:receive(msgLen) + + if data == nil then ngx.timer.at(0, removeConnection, sessionId) return ngx.exit(444) end --- elseif typ == "pong" then + cliData = string.char(2) .. msgLen .. data + data, err = tcpSocket:send(cliData) - elseif typ == "text" then -- Received something texty - wampServer:receiveData(sessionId, data) + if not data then + end - elseif typ == "binary" then -- Received something binary - wampServer:receiveData(sessionId, data) +-- elseif msgType == 2 then -- PONG + -- TODO Implement server initiated ping end end diff --git a/lib/wiola.lua b/lib/wiola.lua index 1c22ea9..30a1072 100644 --- a/lib/wiola.lua +++ b/lib/wiola.lua @@ -7,7 +7,7 @@ local _M = { - _VERSION = '0.9.0', + _VERSION = '0.9.1', } _M.__index = _M @@ -336,9 +336,9 @@ end --- @param session table session object --- @param requestId number request Id --- @param rpcArgsL table Array-like payload ---- @param rpcArgsKw table Object-like payload +-- - @ param rpcArgsKw table Object-like payload --- -function _M:_callMetaRPC(part, rpcUri, session, requestId, rpcArgsL, rpcArgsKw) +function _M:_callMetaRPC(part, rpcUri, session, requestId, rpcArgsL) --, rpcArgsKw) local data local details = setmetatable({}, { __jsontype = 'object' }) @@ -351,7 +351,7 @@ function _M:_callMetaRPC(part, rpcUri, session, requestId, rpcArgsL, rpcArgsKw) elseif rpcUri == 'wamp.session.list' then - local count, sessList = store:getSessionCount(session.realm, rpcArgsL) + local _, sessList = store:getSessionCount(session.realm, rpcArgsL) data = { WAMP_MSG_SPEC.RESULT, requestId, details, sessList } elseif rpcUri == 'wamp.session.get' then @@ -401,7 +401,13 @@ function _M:_callMetaRPC(part, rpcUri, session, requestId, rpcArgsL, rpcArgsKw) if sessList ~= nil then data = { WAMP_MSG_SPEC.RESULT, requestId, details, sessList } else - data = { WAMP_MSG_SPEC.ERROR, WAMP_MSG_SPEC.CALL, requestId, details, "wamp.error.no_such_subscription" } + data = { + WAMP_MSG_SPEC.ERROR, + WAMP_MSG_SPEC.CALL, + requestId, + details, + "wamp.error.no_such_subscription" + } end elseif rpcUri == 'wamp.subscription.count_subscribers' then @@ -410,7 +416,13 @@ function _M:_callMetaRPC(part, rpcUri, session, requestId, rpcArgsL, rpcArgsKw) if sessCount ~= nil then data = { WAMP_MSG_SPEC.RESULT, requestId, details, { sessCount } } else - data = { WAMP_MSG_SPEC.ERROR, WAMP_MSG_SPEC.CALL, requestId, details, "wamp.error.no_such_subscription" } + data = { + WAMP_MSG_SPEC.ERROR, + WAMP_MSG_SPEC.CALL, + requestId, + details, + "wamp.error.no_such_subscription" + } end elseif rpcUri == 'wamp.registration.list' then @@ -443,7 +455,13 @@ function _M:_callMetaRPC(part, rpcUri, session, requestId, rpcArgsL, rpcArgsKw) if rpcInfo then data = { WAMP_MSG_SPEC.RESULT, requestId, details, { rpcInfo.calleeSesId } } else - data = { WAMP_MSG_SPEC.ERROR, WAMP_MSG_SPEC.CALL, requestId, details, "wamp.error.no_such_registration" } + data = { + WAMP_MSG_SPEC.ERROR, + WAMP_MSG_SPEC.CALL, + requestId, + details, + "wamp.error.no_such_registration" + } end elseif rpcUri == 'wamp.registration.count_callees' then @@ -454,7 +472,13 @@ function _M:_callMetaRPC(part, rpcUri, session, requestId, rpcArgsL, rpcArgsKw) if rpcInfo then data = { WAMP_MSG_SPEC.RESULT, requestId, details, { 1 } } else - data = { WAMP_MSG_SPEC.ERROR, WAMP_MSG_SPEC.CALL, requestId, details, "wamp.error.no_such_registration" } + data = { + WAMP_MSG_SPEC.ERROR, + WAMP_MSG_SPEC.CALL, + requestId, + details, + "wamp.error.no_such_registration" + } end else @@ -557,7 +581,15 @@ function _M:receiveData(regId, data) self:_publishMetaEvent('session', 'wamp.session.on_leave', session) else local realm = dataObj[2] - if self:_validateURI(realm, false, false) then + + if not has(config.realms, realm) and config.realms[1] ~= "*" then + -- WAMP SPEC: [ABORT, Details|dict, Reason|uri] + self:_putData(session, { + WAMP_MSG_SPEC.ABORT, + setmetatable({}, { __jsontype = 'object' }), + "wamp.error.no_such_realm" + }) + elseif self:_validateURI(realm, false, false) then if config.wampCRA.authType ~= "none" then diff --git a/lib/ws-handler.lua b/lib/ws-handler.lua index b36e96d..fb4f9ca 100644 --- a/lib/ws-handler.lua +++ b/lib/ws-handler.lua @@ -5,11 +5,12 @@ -- local wsServer = require "resty.websocket.server" local wiola = require "wiola" +local config = require("wiola.config").config() local webSocket, wampServer, ok, err, bytes webSocket, err = wsServer:new({ - timeout = tonumber(ngx.var.wiola_socket_timeout, 10) or 100, - max_payload_len = tonumber(ngx.var.wiola_max_payload_len, 10) or 65535 + timeout = config.socketTimeout, + max_payload_len = config.maxPayloadLen }) if not webSocket then @@ -25,10 +26,10 @@ local sessionId, dataType = wampServer:addConnection(ngx.var.connection, ngx.hea local function removeConnection(_, sessId) - local config = require("wiola.config").config() + local wconfig = require("wiola.config").config() local store = require('wiola.stores.' .. config.store) - ok, err = store:init(config) + ok, err = store:init(wconfig) if not ok then else store:removeSession(sessId)