diff --git a/.gitignore b/.gitignore index 9ee6cde35..f40078bf9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,10 +1,10 @@ *.o *.a -./skynet -./skynet.pid +/skynet +/skynet.pid 3rd/lua/lua 3rd/lua/luac -./cservice -./luaclib +/cservice +/luaclib *.so *.dSYM diff --git a/HISTORY.md b/HISTORY.md index 0ca753325..5926bc02a 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,3 +1,15 @@ +v0.4.2 (2014-7-14) +----------- +* Bugfix : invalid negative socket id +* Add optional TCP_NODELAY support +* Add worker thread weight +* Add skynet.queue +* Bugfix: socketchannel +* cluster can throw error +* Add readline and writeline to clientsocket lib +* Add cluster.reload to reload config file +* Add datacenter.wait + v0.4.1 (2014-7-7) ----------- * Add SERVICE_NAME in loader diff --git a/README.md b/README.md index 08eaf8770..e2d604f1f 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ For linux, install autoconf first for jemalloc ``` -git clone git@github.com:cloudwu/skynet.git +git clone https://github.com/cloudwu/skynet.git cd skynet make 'PLATFORM' # PLATFORM can be linux, macosx, freebsd now ``` @@ -38,5 +38,4 @@ You can also use the offical lua version , edit the makefile by yourself . * http://blog.codingnow.com/2012/09/the_design_of_skynet.html * http://blog.codingnow.com/2012/08/skynet.html -* http://blog.codingnow.com/2012/08/skynet_harbor_rpc.html * http://blog.codingnow.com/eo/skynet/ diff --git a/examples/client.lua b/examples/client.lua index 9197a1c34..278922e63 100644 --- a/examples/client.lua +++ b/examples/client.lua @@ -39,7 +39,7 @@ end while true do dispatch() - local cmd = socket.readline() + local cmd = socket.readstdin() if cmd then local args = {} string.gsub(cmd, '[^ ]+', function(v) table.insert(args, v) end ) diff --git a/examples/main.lua b/examples/main.lua index 8019c2bf9..abbd27511 100644 --- a/examples/main.lua +++ b/examples/main.lua @@ -12,6 +12,7 @@ skynet.start(function() port = 8888, maxclient = max_client, }) + print("Watchdog listen on ", 8888) skynet.exit() end) diff --git a/examples/watchdog.lua b/examples/watchdog.lua index 8ead2253e..e28ab9fb1 100644 --- a/examples/watchdog.lua +++ b/examples/watchdog.lua @@ -33,6 +33,7 @@ function SOCKET.data(fd, msg) end function CMD.start(conf) + skynet.call(gate, "lua", "nodelay", true) skynet.call(gate, "lua", "open" , conf) end diff --git a/lualib-src/lua-clientsocket.c b/lualib-src/lua-clientsocket.c index 1e499f2d3..2676c267a 100644 --- a/lualib-src/lua-clientsocket.c +++ b/lualib-src/lua-clientsocket.c @@ -125,14 +125,19 @@ unpack(lua_State *L, uint8_t *buffer, int sz, int n) { boolean (true: data, false: block, nil: close) string last */ + +struct socket_buffer { + void * buffer; + int sz; +}; + static int -lrecv(lua_State *L) { +recv_socket(lua_State *L, char *tmp, struct socket_buffer *result) { int fd = luaL_checkinteger(L,1); size_t sz = 0; const char * last = lua_tolstring(L,2,&sz); luaL_checktype(L, 3, LUA_TTABLE); - char tmp[CACHE_SIZE]; char * buffer; int r = recv(fd, tmp, CACHE_SIZE, 0); if (r == 0) { @@ -163,10 +168,64 @@ lrecv(lua_State *L) { lua_pushnil(L); lua_rawseti(L, 3, i); } + result->buffer = buffer; + result->sz = r + sz; + return -1; +} - return unpack(L, (uint8_t *)buffer, r+sz, 0); +static int +lrecv(lua_State *L) { + struct socket_buffer sb; + char tmp[CACHE_SIZE]; + int ret = recv_socket(L, tmp, &sb); + if (ret < 0) { + return unpack(L, sb.buffer, sb.sz, 0); + } else { + return ret; + } } +static int +unpack_line(lua_State *L, uint8_t *buffer, int sz, int n) { + if (sz == 0) + goto _block; + if (buffer[0] == '\n') { + return unpack_line(L, buffer+1, sz-1, n); + } + int i; + for (i=1;ihead == q->tail) { @@ -236,6 +295,18 @@ lreadline(lua_State *L) { return 1; } +static int +lwriteline(lua_State *L) { + size_t sz = 0; + int fd = luaL_checkinteger(L,1); + const char * msg = luaL_checklstring(L, 2, &sz); + block_send(L, fd, msg, sz); + char nl[1] = { '\n' }; + block_send(L, fd, nl, 1); + + return 0; +} + int luaopen_clientsocket(lua_State *L) { luaL_checkversion(L); @@ -245,14 +316,16 @@ luaopen_clientsocket(lua_State *L) { { "send", lsend }, { "close", lclose }, { "usleep", lusleep }, + { "readline", lreadline }, + { "writeline", lwriteline }, { NULL, NULL }, }; luaL_newlib(L, l); struct queue * q = lua_newuserdata(L, sizeof(*q)); memset(q, 0, sizeof(*q)); - lua_pushcclosure(L, lreadline, 1); - lua_setfield(L, -2, "readline"); + lua_pushcclosure(L, lreadstdin, 1); + lua_setfield(L, -2, "readstdin"); pthread_t pid ; pthread_create(&pid, NULL, readline_stdin, q); diff --git a/lualib-src/lua-cluster.c b/lualib-src/lua-cluster.c index 92134ced8..29ebb08a3 100644 --- a/lualib-src/lua-cluster.c +++ b/lualib-src/lua-cluster.c @@ -15,7 +15,7 @@ uint32_t next_session */ -#define TEMP_LENGTH 0x10002 +#define TEMP_LENGTH 0x10007 static void fill_uint32(uint8_t * buf, uint32_t n) { @@ -146,6 +146,7 @@ lunpackrequest(lua_State *L) { /* int session + boolean ok lightuserdata msg int sz return string response @@ -155,15 +156,27 @@ lpackresponse(lua_State *L) { uint32_t session = luaL_checkunsigned(L,1); // clusterd.lua:command.socket call lpackresponse, // and the msg/sz is return by skynet.rawcall , so don't free(msg) - void * msg = lua_touserdata(L,2); - size_t sz = luaL_checkunsigned(L, 3); + int ok = lua_toboolean(L,2); + void * msg; + size_t sz; + + if (lua_type(L,3) == LUA_TSTRING) { + msg = (void *)lua_tolstring(L, 3, &sz); + if (sz > 0x1000) { + sz = 0x1000; + } + } else { + msg = lua_touserdata(L,3); + sz = luaL_checkunsigned(L, 4); + } uint8_t buf[TEMP_LENGTH]; - fill_header(L, buf, sz+4, msg); + fill_header(L, buf, sz+5, msg); fill_uint32(buf+2, session); - memcpy(buf+6,msg,sz); + buf[6] = ok; + memcpy(buf+7,msg,sz); - lua_pushlstring(L, (const char *)buf, sz+6); + lua_pushlstring(L, (const char *)buf, sz+7); return 1; } @@ -178,13 +191,13 @@ static int lunpackresponse(lua_State *L) { size_t sz; const char * buf = luaL_checklstring(L, 1, &sz); - if (sz < 4) { + if (sz < 5) { return 0; } uint32_t session = unpack_uint32((const uint8_t *)buf); lua_pushunsigned(L, session); - lua_pushboolean(L, 1); - lua_pushlstring(L, buf+4, sz-4); + lua_pushboolean(L, buf[4]); + lua_pushlstring(L, buf+5, sz-5); return 3; } diff --git a/lualib-src/lua-mongo.c b/lualib-src/lua-mongo.c index 3e4666dc6..855cc385f 100644 --- a/lualib-src/lua-mongo.c +++ b/lualib-src/lua-mongo.c @@ -506,7 +506,7 @@ op_insert(lua_State *L) { int i; for (i=1;i<=s;i++) { lua_rawgeti(L,3,i); - document doc = lua_touserdata(L,3); + document doc = lua_touserdata(L,-1); luaL_addlstring(&b, (const char *)doc, get_length(doc)); lua_pop(L,1); } diff --git a/lualib-src/lua-socket.c b/lualib-src/lua-socket.c index ad717f941..8bfefc0ed 100644 --- a/lualib-src/lua-socket.c +++ b/lualib-src/lua-socket.c @@ -14,6 +14,7 @@ #define BACKLOG 32 // 2 ** 12 == 4096 #define LARGE_PAGE_NODE 12 +#define BUFFER_LIMIT (256 * 1024) struct buffer_node { char * msg; @@ -22,6 +23,7 @@ struct buffer_node { }; struct socket_buffer { + int limit; int size; int offset; struct buffer_node *head; @@ -64,6 +66,7 @@ lnewpool(lua_State *L, int sz) { static int lnewbuffer(lua_State *L) { struct socket_buffer * sb = lua_newuserdata(L, sizeof(*sb)); + sb->limit = luaL_optint(L,1,BUFFER_LIMIT); sb->size = 0; sb->offset = 0; sb->head = NULL; @@ -126,6 +129,9 @@ lpushbuffer(lua_State *L) { sb->size += sz; lua_pushinteger(L, sb->size); + if (sb->limit > 0 && sb->size > sb->limit) { + return luaL_error(L, "buffer overflow (limit = %d, size = %d)", sb->limit, sb->size); + } return 1; } @@ -476,6 +482,14 @@ lstart(lua_State *L) { return 0; } +static int +lnodelay(lua_State *L) { + struct skynet_context * ctx = lua_touserdata(L, lua_upvalueindex(1)); + int id = luaL_checkinteger(L, 1); + skynet_socket_nodelay(ctx,id); + return 0; +} + int luaopen_socketdriver(lua_State *L) { luaL_checkversion(L); @@ -502,6 +516,7 @@ luaopen_socketdriver(lua_State *L) { { "lsend", lsendlow }, { "bind", lbind }, { "start", lstart }, + { "nodelay", lnodelay }, { NULL, NULL }, }; lua_getfield(L, LUA_REGISTRYINDEX, "skynet_context"); diff --git a/lualib/cluster.lua b/lualib/cluster.lua index bc4312b37..d53e6d9b3 100644 --- a/lualib/cluster.lua +++ b/lualib/cluster.lua @@ -16,6 +16,10 @@ function cluster.open(port) end end +function cluster.reload() + skynet.call(clusterd, "lua", "reload") +end + skynet.init(function() clusterd = skynet.uniqueservice("clusterd") end) diff --git a/lualib/datacenter.lua b/lualib/datacenter.lua index 347c140e5..ebf68ba18 100644 --- a/lualib/datacenter.lua +++ b/lualib/datacenter.lua @@ -10,5 +10,9 @@ function datacenter.set(...) return skynet.call("DATACENTER", "lua", "UPDATE", ...) end +function datacenter.wait(...) + return skynet.call("DATACENTER", "lua", "WAIT", ...) +end + return datacenter diff --git a/lualib/mqueue.lua b/lualib/mqueue.lua index 11931200c..f21921cad 100644 --- a/lualib/mqueue.lua +++ b/lualib/mqueue.lua @@ -1,3 +1,5 @@ +-- This is a deprecated module, use skynet.queue instead. + local skynet = require "skynet" local c = require "skynet.c" diff --git a/lualib/skynet.lua b/lualib/skynet.lua index f9d934c33..2b984e2a6 100644 --- a/lualib/skynet.lua +++ b/lualib/skynet.lua @@ -22,7 +22,7 @@ local skynet = { PTYPE_HARBOR = 5, PTYPE_SOCKET = 6, PTYPE_ERROR = 7, - PTYPE_QUEUE = 8, + PTYPE_QUEUE = 8, -- use in deprecated mqueue, use skynet.queue instead PTYPE_DEBUG = 9, PTYPE_LUA = 10, PTYPE_SNAX = 11, @@ -229,7 +229,10 @@ function skynet.self() end function skynet.localname(name) - return string_to_handle(c.command("QUERY", name)) + local addr = c.command("QUERY", name) + if addr then + return string_to_handle(addr) + end end function skynet.launch(...) @@ -257,7 +260,7 @@ function skynet.exit() local address = session_coroutine_address[co] local self = skynet.self() if session~=0 and address then - skynet.redirect(self, address, "error", session, "") + skynet.redirect(address, self, "error", session, "") end end c.command("EXIT") @@ -325,6 +328,9 @@ end function skynet.rawcall(addr, typename, msg, sz) local p = proto[typename] + if watching_service[addr] == false then + error("Service is dead") + end local session = assert(c.send(addr, p.id , nil , msg, sz), "call to invalid address") return yield_call(addr, session) end diff --git a/lualib/skynet/queue.lua b/lualib/skynet/queue.lua new file mode 100644 index 000000000..9f40e2473 --- /dev/null +++ b/lualib/skynet/queue.lua @@ -0,0 +1,33 @@ +local skynet = require "skynet" +local coroutine = coroutine +local pcall = pcall +local table = table + +function skynet.queue() + local current_thread + local ref = 0 + local thread_queue = {} + return function(f, ...) + local thread = coroutine.running() + if ref == 0 then + current_thread = thread + elseif current_thread ~= thread then + table.insert(thread_queue, thread) + skynet.wait() + assert(ref == 0) + end + ref = ref + 1 + local ok, err = pcall(f, ...) + ref = ref - 1 + if ref == 0 then + current_thread = nil + local co = table.remove(thread_queue,1) + if co then + skynet.wakeup(co) + end + end + assert(ok,err) + end +end + +return skynet.queue \ No newline at end of file diff --git a/lualib/socket.lua b/lualib/socket.lua index d7bc49d53..282687479 100644 --- a/lualib/socket.lua +++ b/lualib/socket.lua @@ -2,6 +2,7 @@ local driver = require "socketdriver" local skynet = require "skynet" local assert = assert +local buffer_limit = -1 local socket = {} -- api local buffer_pool = {} -- store all message buffer object local socket_pool = setmetatable( -- store all socket object @@ -47,7 +48,13 @@ socket_message[1] = function(id, size, data) return end - local sz = driver.push(s.buffer, buffer_pool, data, size) + local ok , sz = pcall(driver.push, s.buffer, buffer_pool, data, size) + if not ok then + skynet.error("socket: error on ", id , sz) + driver.clear(s.buffer,buffer_pool) + driver.close(id) + return + end local rr = s.read_required local rrt = type(rr) if rrt == "number" then @@ -123,7 +130,7 @@ skynet.register_protocol { local function connect(id, func) local newbuffer if func == nil then - newbuffer = driver.buffer() + newbuffer = driver.buffer(buffer_limit) end local s = { id = id, @@ -318,4 +325,8 @@ function socket.abandon(id) socket_pool[id] = nil end +function socket.limit(limit) + buffer_limit = limit +end + return socket diff --git a/lualib/socketchannel.lua b/lualib/socketchannel.lua index 32615ac0c..e6a061f00 100644 --- a/lualib/socketchannel.lua +++ b/lualib/socketchannel.lua @@ -135,6 +135,9 @@ local function dispatch_by_order(self) if result ~= socket_error then errmsg = result_ok end + self.__result[co] = socket_error + self.__result_data[co] = errmsg + skynet.wakeup(co) wakeup_all(self, errmsg) end end diff --git a/service/clusterd.lua b/service/clusterd.lua index 405e28ab7..f7a9e90ff 100644 --- a/service/clusterd.lua +++ b/service/clusterd.lua @@ -5,7 +5,14 @@ local cluster = require "cluster.c" local config_name = skynet.getenv "cluster" local node_address = {} -assert(loadfile(config_name, "t", node_address))() + +local function loadconfig() + local f = assert(io.open(config_name)) + local source = f:read "*a" + f:close() + assert(load(source, "@"..config_name, "t", node_address))() +end + local node_session = {} local command = {} @@ -30,6 +37,11 @@ end local node_channel = setmetatable({}, { __index = open_channel }) +function command.reload() + loadconfig() + skynet.ret(skynet.pack(nil)) +end + function command.listen(source, addr, port) local gate = skynet.newservice("gate") if port == nil then @@ -53,8 +65,13 @@ local request_fd = {} function command.socket(source, subcmd, fd, msg) if subcmd == "data" then local addr, session, msg = cluster.unpackrequest(msg) - local msg, sz = skynet.rawcall(addr, "lua", msg) - local response = cluster.packresponse(session, msg, sz) + local ok , msg, sz = pcall(skynet.rawcall, addr, "lua", msg) + local response + if ok then + response = cluster.packresponse(session, true, msg, sz) + else + response = cluster.packresponse(session, false, msg) + end socket.write(fd, response) elseif subcmd == "open" then skynet.error(string.format("socket accept from %s", msg)) @@ -65,7 +82,8 @@ function command.socket(source, subcmd, fd, msg) end skynet.start(function() - skynet.dispatch("lua", function(_, source, cmd, ...) + loadconfig() + skynet.dispatch("lua", function(session , source, cmd, ...) local f = assert(command[cmd]) f(source, ...) end) diff --git a/service/datacenterd.lua b/service/datacenterd.lua index ad431ae28..1a0fee65b 100644 --- a/service/datacenterd.lua +++ b/service/datacenterd.lua @@ -2,6 +2,8 @@ local skynet = require "skynet" local command = {} local database = {} +local wait_queue = {} +local mode = {} local function query(db, key, ...) if key == nil then @@ -22,7 +24,7 @@ local function update(db, key, value, ...) if select("#",...) == 0 then local ret = db[key] db[key] = value - return ret + return ret, value else if db[key] == nil then db[key] = {} @@ -31,13 +33,82 @@ local function update(db, key, value, ...) end end +local function wakeup(db, key1, key2, value, ...) + if key1 == nil then + return + end + local q = db[key1] + if q == nil then + return + end + if q[mode] == "queue" then + db[key1] = nil + if value then + -- throw error because can't wake up a branch + for _,v in ipairs(q) do + local session = v[1] + local source = v[2] + skynet.redirect(source, 0, "error", session, "") + end + else + return q + end + else + -- it's branch + return wakeup(q , key2, value, ...) + end +end + function command.UPDATE(...) - return update(database, ...) + local ret, value = update(database, ...) + if ret or value == nil then + return ret + end + local q = wakeup(wait_queue, ...) + if q then + for _, v in ipairs(q) do + local session = v[1] + local source = v[2] + skynet.redirect(source, 0, "response", session, skynet.pack(value)) + end + end +end + +local function waitfor(session, source, db, key1, key2, ...) + if key2 == nil then + -- push queue + local q = db[key1] + if q == nil then + q = { [mode] = "queue" } + db[key1] = q + else + assert(q[mode] == "queue") + end + table.insert(q, { session, source }) + else + local q = db[key1] + if q == nil then + q = { [mode] = "branch" } + db[key1] = q + else + assert(q[mode] == "branch") + end + return waitfor(session, source, q, key2, ...) + end end skynet.start(function() - skynet.dispatch("lua", function (_, source, cmd, ...) - local f = assert(command[cmd]) - skynet.ret(skynet.pack(f(...))) + skynet.dispatch("lua", function (session, source, cmd, ...) + if cmd == "WAIT" then + local ret = command.QUERY(...) + if ret then + skynet.ret(skynet.pack(ret)) + else + waitfor(session, source, wait_queue, ...) + end + else + local f = assert(command[cmd]) + skynet.ret(skynet.pack(f(...))) + end end) end) diff --git a/service/gate.lua b/service/gate.lua index 54bb3e914..772f135a6 100644 --- a/service/gate.lua +++ b/service/gate.lua @@ -8,6 +8,7 @@ local watchdog local maxclient local client_number = 0 local CMD = setmetatable({}, { __gc = function() netpack.clear(queue) end }) +local nodelay = false local connection = {} -- fd -> connection : { fd , client, agent , ip, mode } local forwarding = {} -- agent -> connection @@ -22,6 +23,13 @@ function CMD.open( source , conf ) socketdriver.start(socket) end +function CMD.nodelay(source, v) + if v ~= false then + v = true + end + nodelay = v +end + function CMD.close() assert(socket) socketdriver.close(socket) @@ -106,6 +114,9 @@ function MSG.open(fd, msg) } connection[fd] = c client_number = client_number + 1 + if nodelay then + socketdriver.nodelay(fd) + end skynet.send(watchdog, "lua", "socket", "open", fd, msg) end diff --git a/service/service_mgr.lua b/service/service_mgr.lua index f49a79f7b..db9499d43 100644 --- a/service/service_mgr.lua +++ b/service/service_mgr.lua @@ -176,7 +176,7 @@ skynet.start(function() end end) local handle = skynet.localname ".service" - if handle ~= 0 then + if handle then skynet.error(".service is already register by ", skynet.address(handle)) skynet.exit() else diff --git a/skynet-src/skynet_server.c b/skynet-src/skynet_server.c index 017d72069..3e902cc96 100644 --- a/skynet-src/skynet_server.c +++ b/skynet-src/skynet_server.c @@ -245,7 +245,7 @@ skynet_context_dispatchall(struct skynet_context * ctx) { } struct message_queue * -skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q) { +skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) { if (q == NULL) { q = skynet_globalmq_pop(); if (q==NULL) @@ -261,18 +261,27 @@ skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue return skynet_globalmq_pop(); } + int i,n=1; struct skynet_message msg; - if (skynet_mq_pop(q,&msg)) { - skynet_context_release(ctx); - return skynet_globalmq_pop(); - } - skynet_monitor_trigger(sm, msg.source , handle); + for (i=0;i= 0) { + n = skynet_mq_length(q); + n >>= weight; + } - if (ctx->cb == NULL) { - skynet_free(msg.data); - } else { - _dispatch_message(ctx, &msg); + skynet_monitor_trigger(sm, msg.source , handle); + + if (ctx->cb == NULL) { + skynet_free(msg.data); + } else { + _dispatch_message(ctx, &msg); + } + + skynet_monitor_trigger(sm, 0,0); } assert(q == ctx->queue); @@ -285,8 +294,6 @@ skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue } skynet_context_release(ctx); - skynet_monitor_trigger(sm, 0,0); - return q; } @@ -361,8 +368,10 @@ static const char * cmd_query(struct skynet_context * context, const char * param) { if (param[0] == '.') { uint32_t handle = skynet_handle_findname(param+1); - sprintf(context->result, ":%x", handle); - return context->result; + if (handle) { + sprintf(context->result, ":%x", handle); + return context->result; + } } return NULL; } diff --git a/skynet-src/skynet_server.h b/skynet-src/skynet_server.h index be4e301ff..5d07ba704 100644 --- a/skynet-src/skynet_server.h +++ b/skynet-src/skynet_server.h @@ -16,7 +16,7 @@ void skynet_context_init(struct skynet_context *, uint32_t handle); int skynet_context_push(uint32_t handle, struct skynet_message *message); void skynet_context_send(struct skynet_context * context, void * msg, size_t sz, uint32_t source, int type, int session); int skynet_context_newsession(struct skynet_context *); -struct message_queue * skynet_context_message_dispatch(struct skynet_monitor *, struct message_queue *); // return next queue +struct message_queue * skynet_context_message_dispatch(struct skynet_monitor *, struct message_queue *, int weight); // return next queue int skynet_context_total(); void skynet_context_dispatchall(struct skynet_context * context); // for skynet_error output before exit diff --git a/skynet-src/skynet_socket.c b/skynet-src/skynet_socket.c index 610bf2fc9..989f88e08 100644 --- a/skynet-src/skynet_socket.c +++ b/skynet-src/skynet_socket.c @@ -150,3 +150,8 @@ skynet_socket_start(struct skynet_context *ctx, int id) { uint32_t source = skynet_context_handle(ctx); socket_server_start(SOCKET_SERVER, source, id); } + +void +skynet_socket_nodelay(struct skynet_context *ctx, int id) { + socket_server_nodelay(SOCKET_SERVER, id); +} diff --git a/skynet-src/skynet_socket.h b/skynet-src/skynet_socket.h index 2c0eb93b0..7f0b46437 100644 --- a/skynet-src/skynet_socket.h +++ b/skynet-src/skynet_socket.h @@ -28,5 +28,6 @@ int skynet_socket_connect(struct skynet_context *ctx, const char *host, int port int skynet_socket_bind(struct skynet_context *ctx, int fd); void skynet_socket_close(struct skynet_context *ctx, int id); void skynet_socket_start(struct skynet_context *ctx, int id); +void skynet_socket_nodelay(struct skynet_context *ctx, int id); #endif diff --git a/skynet-src/skynet_start.c b/skynet-src/skynet_start.c index c101da305..bef06b4ca 100644 --- a/skynet-src/skynet_start.c +++ b/skynet-src/skynet_start.c @@ -27,6 +27,7 @@ struct monitor { struct worker_parm { struct monitor *m; int id; + int weight; }; #define CHECK_ABORT if (skynet_context_total()==0) break; @@ -118,12 +119,13 @@ static void * _worker(void *p) { struct worker_parm *wp = p; int id = wp->id; + int weight = wp->weight; struct monitor *m = wp->m; struct skynet_monitor *sm = m->m[id]; skynet_initthread(THREAD_WORKER); struct message_queue * q = NULL; for (;;) { - q = skynet_context_message_dispatch(sm, q); + q = skynet_context_message_dispatch(sm, q, weight); if (q == NULL) { CHECK_ABORT if (pthread_mutex_lock(&m->mutex) == 0) { @@ -169,10 +171,20 @@ _start(int thread) { create_thread(&pid[1], _timer, m); create_thread(&pid[2], _socket, m); + static int weight[] = { + -1, -1, -1, -1, 0, 0, 0, 0, + 1, 1, 1, 1, 1, 1, 1, 1, + 2, 2, 2, 2, 2, 2, 2, 2, + 3, 3, 3, 3, 3, 3, 3, 3, }; struct worker_parm wp[thread]; for (i=0;i #include +#include #include #include #include @@ -34,6 +35,8 @@ #define PRIORITY_HIGH 0 #define PRIORITY_LOW 1 +#define HASH_ID(id) (((unsigned)id) % MAX_SOCKET) + struct write_buffer { struct write_buffer * next; char *ptr; @@ -107,6 +110,12 @@ struct request_start { uintptr_t opaque; }; +struct request_setopt { + int id; + int what; + int value; +}; + struct request_package { uint8_t header[8]; // 6 bytes dummy union { @@ -117,6 +126,7 @@ struct request_package { struct request_listen listen; struct request_bind bind; struct request_start start; + struct request_setopt setopt; } u; uint8_t dummy[256]; }; @@ -144,7 +154,7 @@ reserve_id(struct socket_server *ss) { if (id < 0) { id = __sync_and_and_fetch(&(ss->alloc_id), 0x7fffffff); } - struct socket *s = &ss->slot[id % MAX_SOCKET]; + struct socket *s = &ss->slot[HASH_ID(id)]; if (s->type == SOCKET_TYPE_INVALID) { if (__sync_bool_compare_and_swap(&s->type, SOCKET_TYPE_INVALID, SOCKET_TYPE_RESERVE)) { s->id = id; @@ -267,7 +277,7 @@ check_wb_list(struct wb_list *s) { static struct socket * new_fd(struct socket_server *ss, int id, int fd, uintptr_t opaque, bool add) { - struct socket * s = &ss->slot[id % MAX_SOCKET]; + struct socket * s = &ss->slot[HASH_ID(id)]; assert(s->type == SOCKET_TYPE_RESERVE); if (add) { @@ -357,7 +367,7 @@ open_socket(struct socket_server *ss, struct request_open * request, struct sock return -1; _failed: freeaddrinfo( ai_list ); - ss->slot[id % MAX_SOCKET].type = SOCKET_TYPE_INVALID; + ss->slot[HASH_ID(id)].type = SOCKET_TYPE_INVALID; return SOCKET_ERROR; } @@ -502,7 +512,7 @@ send_buffer_empty(struct socket *s) { static int send_socket(struct socket_server *ss, struct request_send * request, struct socket_message *result, int priority) { int id = request->id; - struct socket * s = &ss->slot[id % MAX_SOCKET]; + struct socket * s = &ss->slot[HASH_ID(id)]; if (s->type == SOCKET_TYPE_INVALID || s->id != id || s->type == SOCKET_TYPE_HALFCLOSE || s->type == SOCKET_TYPE_PACCEPT) { @@ -556,7 +566,7 @@ listen_socket(struct socket_server *ss, struct request_listen * request, struct result->id = id; result->ud = 0; result->data = NULL; - ss->slot[id % MAX_SOCKET].type = SOCKET_TYPE_INVALID; + ss->slot[HASH_ID(id)].type = SOCKET_TYPE_INVALID; return SOCKET_ERROR; } @@ -564,7 +574,7 @@ listen_socket(struct socket_server *ss, struct request_listen * request, struct static int close_socket(struct socket_server *ss, struct request_close *request, struct socket_message *result) { int id = request->id; - struct socket * s = &ss->slot[id % MAX_SOCKET]; + struct socket * s = &ss->slot[HASH_ID(id)]; if (s->type == SOCKET_TYPE_INVALID || s->id != id) { result->id = id; result->opaque = request->opaque; @@ -612,7 +622,7 @@ start_socket(struct socket_server *ss, struct request_start *request, struct soc result->opaque = request->opaque; result->ud = 0; result->data = NULL; - struct socket *s = &ss->slot[id % MAX_SOCKET]; + struct socket *s = &ss->slot[HASH_ID(id)]; if (s->type == SOCKET_TYPE_INVALID || s->id !=id) { return SOCKET_ERROR; } @@ -633,6 +643,17 @@ start_socket(struct socket_server *ss, struct request_start *request, struct soc return -1; } +static void +setopt_socket(struct socket_server *ss, struct request_setopt *request) { + int id = request->id; + struct socket *s = &ss->slot[HASH_ID(id)]; + if (s->type == SOCKET_TYPE_INVALID || s->id !=id) { + return; + } + int v = request->value; + setsockopt(s->fd, IPPROTO_TCP, request->what, &v, sizeof(v)); +} + static void block_readpipe(int pipefd, void *buffer, int sz) { for (;;) { @@ -696,6 +717,9 @@ ctrl_cmd(struct socket_server *ss, struct socket_message *result) { return send_socket(ss, (struct request_send *)buffer, result, PRIORITY_HIGH); case 'P': return send_socket(ss, (struct request_send *)buffer, result, PRIORITY_LOW); + case 'T': + setopt_socket(ss, (struct request_setopt *)buffer); + return -1; default: fprintf(stderr, "socket-server: Unknown ctrl %c.\n",type); return -1; @@ -942,7 +966,7 @@ socket_server_connect(struct socket_server *ss, uintptr_t opaque, const char * a // return -1 when error int64_t socket_server_send(struct socket_server *ss, int id, const void * buffer, int sz) { - struct socket * s = &ss->slot[id % MAX_SOCKET]; + struct socket * s = &ss->slot[HASH_ID(id)]; if (s->id != id || s->type == SOCKET_TYPE_INVALID) { return -1; } @@ -958,7 +982,7 @@ socket_server_send(struct socket_server *ss, int id, const void * buffer, int sz void socket_server_send_lowpriority(struct socket_server *ss, int id, const void * buffer, int sz) { - struct socket * s = &ss->slot[id % MAX_SOCKET]; + struct socket * s = &ss->slot[HASH_ID(id)]; if (s->id != id || s->type == SOCKET_TYPE_INVALID) { return; } @@ -1053,4 +1077,11 @@ socket_server_start(struct socket_server *ss, uintptr_t opaque, int id) { send_request(ss, &request, 'S', sizeof(request.u.start)); } - +void +socket_server_nodelay(struct socket_server *ss, int id) { + struct request_package request; + request.u.setopt.id = id; + request.u.setopt.what = TCP_NODELAY; + request.u.setopt.value = 1; + send_request(ss, &request, 'T', sizeof(request.u.setopt)); +} diff --git a/skynet-src/socket_server.h b/skynet-src/socket_server.h index ea15c0e12..66648719a 100644 --- a/skynet-src/socket_server.h +++ b/skynet-src/socket_server.h @@ -36,4 +36,6 @@ int socket_server_listen(struct socket_server *, uintptr_t opaque, const char * int socket_server_connect(struct socket_server *, uintptr_t opaque, const char * addr, int port); int socket_server_bind(struct socket_server *, uintptr_t opaque, int fd); +void socket_server_nodelay(struct socket_server *, int id); + #endif diff --git a/test/pingserver.lua b/test/pingserver.lua index 635cfeabf..2ddefd7fb 100644 --- a/test/pingserver.lua +++ b/test/pingserver.lua @@ -1,4 +1,5 @@ local skynet = require "skynet" +local queue = require "skynet.queue" local i = 0 local hello = "hello" @@ -8,9 +9,27 @@ function response.ping(hello) return hello end +-- response.sleep and accept.hello share one lock +local lock + +function accept.sleep(queue, n) + if queue then + lock( + function() + print("queue=",queue, n) + skynet.sleep(n) + end) + else + print("queue=",queue, n) + skynet.sleep(n) + end +end + function accept.hello() + lock(function() i = i + 1 print (i, hello) + end) end function response.error() @@ -19,6 +38,8 @@ end function init( ... ) print ("ping server start:", ...) + -- init queue + lock = queue() -- You can return "queue" for queue service mode -- return "queue" diff --git a/test/testdatacenter.lua b/test/testdatacenter.lua new file mode 100644 index 000000000..194d2219d --- /dev/null +++ b/test/testdatacenter.lua @@ -0,0 +1,23 @@ +local skynet = require "skynet" +local datacenter = require "datacenter" + +local function f1() + print("====1==== wait hello") + print("\t1>",datacenter.wait ("hello")) + print("====1==== wait key.foobar") + print("\t1>", pcall(datacenter.wait,"key")) -- will failed, because "key" is a branch + print("\t1>",datacenter.wait ("key", "foobar")) +end + +local function f2() + skynet.sleep(10) + print("====2==== set key.foobar") + datacenter.set("key", "foobar", "bingo") +end + +skynet.start(function() + datacenter.set("hello", "world") + print(datacenter.get "hello") + skynet.fork(f1) + skynet.fork(f2) +end) diff --git a/test/testdeadcall.lua b/test/testdeadcall.lua index c5d67b8c0..9017d380a 100644 --- a/test/testdeadcall.lua +++ b/test/testdeadcall.lua @@ -17,9 +17,7 @@ else local test = skynet.newservice(SERVICE_NAME, "test") -- launch self in test mode print(pcall(function() - skynet.send(test,"lua", "hello world") - skynet.send(test,"lua", "never get there") - skynet.call(test,"lua", "fake call") + skynet.call(test,"lua", "dead call") end)) skynet.exit() diff --git a/test/testqueue.lua b/test/testqueue.lua new file mode 100644 index 000000000..2076531e2 --- /dev/null +++ b/test/testqueue.lua @@ -0,0 +1,18 @@ +local skynet = require "skynet" +local snax = require "snax" + +skynet.start(function() + local ps = snax.uniqueservice ("pingserver", "test queue") + for i=1, 10 do + ps.post.sleep(true,i*10) + ps.post.hello() + end + for i=1, 10 do + ps.post.sleep(false,i*10) + ps.post.hello() + end + + skynet.exit() +end) + + diff --git a/test/testsocket.lua b/test/testsocket.lua index c465079c0..05313a115 100644 --- a/test/testsocket.lua +++ b/test/testsocket.lua @@ -21,6 +21,9 @@ if mode == "agent" then id = tonumber(id) skynet.start(function() + -- A small limit, if socket buffer overflow, close the client + socket.limit(64) + skynet.fork(function() echo(id) skynet.exit() @@ -30,7 +33,7 @@ else local function accept(id) socket.start(id) socket.write(id, "Hello Skynet\n") - skynet.newservice("testsocket", "agent", id) + skynet.newservice(SERVICE_NAME, "agent", id) -- notice: Some data on this connection(id) may lost before new service start. -- So, be careful when you want to use start / abandon / start . socket.abandon(id)