Skip to content

Commit

Permalink
Created prod version
Browse files Browse the repository at this point in the history
  • Loading branch information
Konstantin Burkalev authored and Konstantin Burkalev committed May 22, 2018
1 parent be54be4 commit 900ce5a
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 57 deletions.
15 changes: 15 additions & 0 deletions lib/config.lua
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ local storeConfigs = {

-- Wiola Runtime configuration
local wiolaConf = {
socketTimeout = 100,
maxPayloadLen = 65536,
realms = {},
store = "redis",
storeConfig = {
host = "unix:///tmp/redis.sock",
Expand Down Expand Up @@ -92,6 +95,18 @@ function _M.config(config)
return wiolaConf
end

if config.socketTimeout then
wiolaConf.socketTimeout = config.socketTimeout
end

if config.maxPayloadLen then
wiolaConf.maxPayloadLen = config.maxPayloadLen
end

if config.realms then
wiolaConf.realms = config.realms
end

if config.store then
wiolaConf.store = config.store
end
Expand Down
152 changes: 108 additions & 44 deletions lib/raw-handler.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,86 @@
-- User: Konstantin Burkalev
-- Date: 16.03.14
--
local wsServer = require "resty.websocket.server"
local WAMP_PAYLOAD_LENGTHS = {
[512] = 0,
[1024] = 1,
[2048] = 2,
[4096] = 3,
[8192] = 4,
[16384] = 5,
[32768] = 6,
[65536] = 7,
[131072] = 8,
[262144] = 9,
[524288] = 10,
[1048576] = 11,
[2097152] = 12,
[4194304] = 13,
[8388608] = 14,
[16777216] = 15
}

local wiola = require "wiola"
local webSocket, wampServer, ok, err, bytes
local config = require("wiola.config").config()
local wiola_max_payload_len = WAMP_PAYLOAD_LENGTHS[config.maxPayloadLen] or 65536
local bit = require "bit"
local tcpSocket, wampServer, cliMaxLength, serializer, serializerStr, data, err, ok, cliData

webSocket, err = wsServer:new({
timeout = tonumber(ngx.var.wiola_socket_timeout, 10) or 100,
max_payload_len = tonumber(ngx.var.wiola_max_payload_len, 10) or 65535
})
tcpSocket, err = ngx.req.socket(true)

if not webSocket then
if not tcpSocket then
return ngx.exit(444)
end

tcpSocket:settimeout(config.socketTimeout)

wampServer, err = wiola:new()
if not wampServer then
return ngx.exit(444)
end

local sessionId, dataType = wampServer:addConnection(ngx.var.connection, ngx.header["Sec-WebSocket-Protocol"])
data, err = tcpSocket:receive(4)

if data == nil then
return ngx.exit(444) -- tcpSocket:close()
end

if string.byte(data) ~= 0x7F then
return ngx.exit(444)
elseif string.byte(data, 3) ~= 0x0 or string.byte(data, 4) ~= 0x0 then
cliData = string.char(0x7F, bit.bor(bit.lshift(3, 4), 0), 0, 0)
tcpSocket:send(cliData)
return ngx.exit(444)
end

cliMaxLength = math.pow(2, 9 + bit.rshift(string.byte(data, 2), 4))
serializer = bit.band(string.byte(data, 2), 0xf)

if serializer == 1 then
serializerStr = "wamp.2.json"
elseif serializer == 2 then
serializerStr = "wamp.2.msgpack"
else
cliData = string.char(0x7F, bit.bor(bit.lshift(1, 4), 0), 0, 0)
tcpSocket:send(cliData)
return ngx.exit(444)
end

local sessionId, dataType = wampServer:addConnection(ngx.var.connection, serializerStr)

cliData = string.char(0x7F, bit.bor(bit.lshift(wiola_max_payload_len, 4), serializer), 0, 0)
data, err = tcpSocket:send(cliData)

if not data then
return ngx.exit(444)
end

local function removeConnection(_, sessId)

local config = require("wiola.config").config()
local wconfig = require("wiola.config").config()
local store = require('wiola.stores.' .. config.store)

ok, err = store:init(config)
ok, err = store:init(wconfig)
if not ok then
else
store:removeSession(sessId)
Expand All @@ -44,21 +98,26 @@ if not ok then
ngx.exit(444)
end

local function getLenBytes(len)
local b3 = bit.band(len, 0xff)
len = bit.rshift(len, 8)
local b2 = bit.band(len, 0xff)
len = bit.rshift(len, 8)
local b1 = bit.band(len, 0xff)
return string.char(b1, b2, b3)
end

while true do
local cliData, data, typ, hflags
local hflags, msgType, msgLen

hflags = wampServer:getHandlerFlags(sessionId)
if hflags ~= nil then
if hflags.sendLast == true then
cliData = wampServer:getPendingData(sessionId, true)

if dataType == 'binary' then
bytes, err = webSocket:send_binary(cliData)
else
bytes, err = webSocket:send_text(cliData)
end
data, err = tcpSocket:send(cliData)

if not bytes then
if not data then
end
end

Expand All @@ -70,57 +129,62 @@ while true do
cliData = wampServer:getPendingData(sessionId)

while cliData ~= ngx.null do
if dataType == 'binary' then
bytes, err = webSocket:send_binary(cliData)
else
bytes, err = webSocket:send_text(cliData)
end

if not bytes then
msgLen = string.len(cliData)

if msgLen < cliMaxLength then
cliData = string.char(0) .. getLenBytes(msgLen) .. cliData
data, err = tcpSocket:send(cliData)

if not data then
end
end

-- TODO Handle exceeded message length situation

cliData = wampServer:getPendingData(sessionId)
end

if webSocket.fatal then
data, err = tcpSocket:receive(4)

if data == nil then
ngx.timer.at(0, removeConnection, sessionId)
return ngx.exit(444)
end

data, typ = webSocket:recv_frame()
msgType = bit.band(string.byte(data), 0xff)
msgLen = bit.lshift(string.byte(data, 2), 16) +
bit.lshift(string.byte(data, 3), 8) +
string.byte(data, 4)

if not data then
if msgType == 0 then -- regular WAMP message

bytes, err = webSocket:send_ping()
if not bytes then
data, err = tcpSocket:receive(msgLen)

if data == nil then
ngx.timer.at(0, removeConnection, sessionId)
return ngx.exit(444)
end

elseif typ == "close" then
bytes, err = webSocket:send_close(1000, "Closing connection")
if not bytes then
return
end
ngx.timer.at(0, removeConnection, sessionId)
webSocket:send_close()
break
wampServer:receiveData(sessionId, data)

elseif typ == "ping" then
elseif msgType == 1 then -- PING

bytes, err = webSocket:send_pong()
if not bytes then
data, err = tcpSocket:receive(msgLen)

if data == nil then
ngx.timer.at(0, removeConnection, sessionId)
return ngx.exit(444)
end

-- elseif typ == "pong" then
cliData = string.char(2) .. msgLen .. data
data, err = tcpSocket:send(cliData)

elseif typ == "text" then -- Received something texty
wampServer:receiveData(sessionId, data)
if not data then
end

elseif typ == "binary" then -- Received something binary
wampServer:receiveData(sessionId, data)
-- elseif msgType == 2 then -- PONG
-- TODO Implement server initiated ping

end
end
50 changes: 41 additions & 9 deletions lib/wiola.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@


local _M = {
_VERSION = '0.9.0',
_VERSION = '0.9.1',
}

_M.__index = _M
Expand Down Expand Up @@ -336,9 +336,9 @@ end
--- @param session table session object
--- @param requestId number request Id
--- @param rpcArgsL table Array-like payload
--- @param rpcArgsKw table Object-like payload
-- - @ param rpcArgsKw table Object-like payload
---
function _M:_callMetaRPC(part, rpcUri, session, requestId, rpcArgsL, rpcArgsKw)
function _M:_callMetaRPC(part, rpcUri, session, requestId, rpcArgsL) --, rpcArgsKw)
local data
local details = setmetatable({}, { __jsontype = 'object' })

Expand All @@ -351,7 +351,7 @@ function _M:_callMetaRPC(part, rpcUri, session, requestId, rpcArgsL, rpcArgsKw)

elseif rpcUri == 'wamp.session.list' then

local count, sessList = store:getSessionCount(session.realm, rpcArgsL)
local _, sessList = store:getSessionCount(session.realm, rpcArgsL)
data = { WAMP_MSG_SPEC.RESULT, requestId, details, sessList }

elseif rpcUri == 'wamp.session.get' then
Expand Down Expand Up @@ -401,7 +401,13 @@ function _M:_callMetaRPC(part, rpcUri, session, requestId, rpcArgsL, rpcArgsKw)
if sessList ~= nil then
data = { WAMP_MSG_SPEC.RESULT, requestId, details, sessList }
else
data = { WAMP_MSG_SPEC.ERROR, WAMP_MSG_SPEC.CALL, requestId, details, "wamp.error.no_such_subscription" }
data = {
WAMP_MSG_SPEC.ERROR,
WAMP_MSG_SPEC.CALL,
requestId,
details,
"wamp.error.no_such_subscription"
}
end

elseif rpcUri == 'wamp.subscription.count_subscribers' then
Expand All @@ -410,7 +416,13 @@ function _M:_callMetaRPC(part, rpcUri, session, requestId, rpcArgsL, rpcArgsKw)
if sessCount ~= nil then
data = { WAMP_MSG_SPEC.RESULT, requestId, details, { sessCount } }
else
data = { WAMP_MSG_SPEC.ERROR, WAMP_MSG_SPEC.CALL, requestId, details, "wamp.error.no_such_subscription" }
data = {
WAMP_MSG_SPEC.ERROR,
WAMP_MSG_SPEC.CALL,
requestId,
details,
"wamp.error.no_such_subscription"
}
end

elseif rpcUri == 'wamp.registration.list' then
Expand Down Expand Up @@ -443,7 +455,13 @@ function _M:_callMetaRPC(part, rpcUri, session, requestId, rpcArgsL, rpcArgsKw)
if rpcInfo then
data = { WAMP_MSG_SPEC.RESULT, requestId, details, { rpcInfo.calleeSesId } }
else
data = { WAMP_MSG_SPEC.ERROR, WAMP_MSG_SPEC.CALL, requestId, details, "wamp.error.no_such_registration" }
data = {
WAMP_MSG_SPEC.ERROR,
WAMP_MSG_SPEC.CALL,
requestId,
details,
"wamp.error.no_such_registration"
}
end

elseif rpcUri == 'wamp.registration.count_callees' then
Expand All @@ -454,7 +472,13 @@ function _M:_callMetaRPC(part, rpcUri, session, requestId, rpcArgsL, rpcArgsKw)
if rpcInfo then
data = { WAMP_MSG_SPEC.RESULT, requestId, details, { 1 } }
else
data = { WAMP_MSG_SPEC.ERROR, WAMP_MSG_SPEC.CALL, requestId, details, "wamp.error.no_such_registration" }
data = {
WAMP_MSG_SPEC.ERROR,
WAMP_MSG_SPEC.CALL,
requestId,
details,
"wamp.error.no_such_registration"
}
end

else
Expand Down Expand Up @@ -557,7 +581,15 @@ function _M:receiveData(regId, data)
self:_publishMetaEvent('session', 'wamp.session.on_leave', session)
else
local realm = dataObj[2]
if self:_validateURI(realm, false, false) then

if not has(config.realms, realm) and config.realms[1] ~= "*" then
-- WAMP SPEC: [ABORT, Details|dict, Reason|uri]
self:_putData(session, {
WAMP_MSG_SPEC.ABORT,
setmetatable({}, { __jsontype = 'object' }),
"wamp.error.no_such_realm"
})
elseif self:_validateURI(realm, false, false) then

if config.wampCRA.authType ~= "none" then

Expand Down
9 changes: 5 additions & 4 deletions lib/ws-handler.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
--
local wsServer = require "resty.websocket.server"
local wiola = require "wiola"
local config = require("wiola.config").config()
local webSocket, wampServer, ok, err, bytes

webSocket, err = wsServer:new({
timeout = tonumber(ngx.var.wiola_socket_timeout, 10) or 100,
max_payload_len = tonumber(ngx.var.wiola_max_payload_len, 10) or 65535
timeout = config.socketTimeout,
max_payload_len = config.maxPayloadLen
})

if not webSocket then
Expand All @@ -25,10 +26,10 @@ local sessionId, dataType = wampServer:addConnection(ngx.var.connection, ngx.hea

local function removeConnection(_, sessId)

local config = require("wiola.config").config()
local wconfig = require("wiola.config").config()
local store = require('wiola.stores.' .. config.store)

ok, err = store:init(config)
ok, err = store:init(wconfig)
if not ok then
else
store:removeSession(sessId)
Expand Down

0 comments on commit 900ce5a

Please sign in to comment.