diff --git a/BUILD.md b/BUILD.md index 389785058..b9e58c6df 100644 --- a/BUILD.md +++ b/BUILD.md @@ -90,6 +90,12 @@ make libhv ## options +### compile without c++ +``` +./configure --without-evpp +make clean && make +``` + ### compile WITH_OPENSSL Enable SSL in libhv is so easy, just only two apis: ``` @@ -126,3 +132,9 @@ make clean && make bin/httpd -s restart -d bin/curl -v http://localhost:8080 --http2 ``` + +### compile WITH_KCP +``` +./configure --with-kcp +make clean && make +``` diff --git a/CMakeLists.txt b/CMakeLists.txt index cd9d1c3fb..3a8657aa8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -28,6 +28,8 @@ option(WITH_OPENSSL "with openssl library" OFF) option(WITH_GNUTLS "with gnutls library" OFF) option(WITH_MBEDTLS "with mbedtls library" OFF) +option(WITH_KCP "with kcp" OFF) + set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake;${CMAKE_MODULE_PATH}") include(utils) include(vars) @@ -148,8 +150,12 @@ if(APPLE) endif() # see Makefile -set(ALL_SRCDIRS . base ssl event util cpputil evpp protocol http http/client http/server) -set(LIBHV_SRCDIRS . base ssl event util) +set(ALL_SRCDIRS . base ssl event event/kcp util cpputil evpp protocol http http/client http/server) +set(CORE_SRCDIRS . base ssl event) +if(WITH_KCP) + set(CORE_SRCDIRS ${CORE_SRCDIRS} event/kcp) +endif() +set(LIBHV_SRCDIRS ${CORE_SRCDIRS} util) set(LIBHV_HEADERS hv.h hconfig.h hexport.h) set(LIBHV_HEADERS ${LIBHV_HEADERS} ${BASE_HEADERS} ${SSL_HEADERS} ${EVENT_HEADERS} ${UTIL_HEADERS}) diff --git a/Makefile b/Makefile index 55c4542f7..cd6cec401 100644 --- a/Makefile +++ b/Makefile @@ -2,9 +2,13 @@ include config.mk include Makefile.vars MAKEF=$(MAKE) -f Makefile.in -ALL_SRCDIRS=. base ssl event util cpputil evpp protocol http http/client http/server +ALL_SRCDIRS=. base ssl event event/kcp util cpputil evpp protocol http http/client http/server +CORE_SRCDIRS=. base ssl event +ifeq ($(WITH_KCP), yes) +CORE_SRCDIRS += event/kcp +endif -LIBHV_SRCDIRS = . base ssl event util +LIBHV_SRCDIRS = $(CORE_SRCDIRS) util LIBHV_HEADERS = hv.h hconfig.h hexport.h LIBHV_HEADERS += $(BASE_HEADERS) $(SSL_HEADERS) $(EVENT_HEADERS) $(UTIL_HEADERS) @@ -75,78 +79,78 @@ hmain_test: prepare $(MAKEF) TARGET=$@ SRCDIRS=". base cpputil" SRCS="examples/hmain_test.cpp" htimer_test: prepare - $(MAKEF) TARGET=$@ SRCDIRS=". base ssl event" SRCS="examples/htimer_test.c" + $(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS)" SRCS="examples/htimer_test.c" hloop_test: prepare - $(MAKEF) TARGET=$@ SRCDIRS=". base ssl event" SRCS="examples/hloop_test.c" + $(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS)" SRCS="examples/hloop_test.c" tcp_echo_server: prepare - $(MAKEF) TARGET=$@ SRCDIRS=". base ssl event" SRCS="examples/tcp_echo_server.c" + $(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS)" SRCS="examples/tcp_echo_server.c" tcp_chat_server: prepare - $(MAKEF) TARGET=$@ SRCDIRS=". base ssl event" SRCS="examples/tcp_chat_server.c" + $(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS)" SRCS="examples/tcp_chat_server.c" tcp_proxy_server: prepare - $(MAKEF) TARGET=$@ SRCDIRS=". base ssl event" SRCS="examples/tcp_proxy_server.c" + $(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS)" SRCS="examples/tcp_proxy_server.c" udp_echo_server: prepare - $(MAKEF) TARGET=$@ SRCDIRS=". base ssl event" SRCS="examples/udp_echo_server.c" + $(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS)" SRCS="examples/udp_echo_server.c" udp_proxy_server: prepare - $(MAKEF) TARGET=$@ SRCDIRS=". base ssl event" SRCS="examples/udp_proxy_server.c" + $(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS)" SRCS="examples/udp_proxy_server.c" multi-acceptor-processes: prepare - $(MAKEF) TARGET=$@ SRCDIRS=". base ssl event" SRCS="examples/multi-thread/multi-acceptor-processes.c" + $(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS)" SRCS="examples/multi-thread/multi-acceptor-processes.c" multi-acceptor-threads: prepare - $(MAKEF) TARGET=$@ SRCDIRS=". base ssl event" SRCS="examples/multi-thread/multi-acceptor-threads.c" + $(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS)" SRCS="examples/multi-thread/multi-acceptor-threads.c" one-acceptor-multi-workers: prepare - $(MAKEF) TARGET=$@ SRCDIRS=". base ssl event" SRCS="examples/multi-thread/one-acceptor-multi-workers.c" + $(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS)" SRCS="examples/multi-thread/one-acceptor-multi-workers.c" nc: prepare - $(MAKEF) TARGET=$@ SRCDIRS=". base ssl event" SRCS="examples/nc.c" + $(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS)" SRCS="examples/nc.c" nmap: prepare - $(MAKEF) TARGET=$@ SRCDIRS=". base ssl event cpputil examples/nmap" DEFINES="PRINT_DEBUG" + $(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) cpputil examples/nmap" DEFINES="PRINT_DEBUG" wrk: prepare - $(MAKEF) TARGET=$@ SRCDIRS=". base ssl event util cpputil evpp http" SRCS="examples/wrk.cpp" + $(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) util cpputil evpp http" SRCS="examples/wrk.cpp" httpd: prepare $(RM) examples/httpd/*.o - $(MAKEF) TARGET=$@ SRCDIRS=". base ssl event util cpputil evpp http http/client http/server examples/httpd" + $(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) util cpputil evpp http http/client http/server examples/httpd" consul: prepare - $(MAKEF) TARGET=$@ SRCDIRS=". base ssl event util cpputil evpp http http/client examples/consul" DEFINES="PRINT_DEBUG" + $(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) util cpputil evpp http http/client examples/consul" DEFINES="PRINT_DEBUG" curl: prepare - $(MAKEF) TARGET=$@ SRCDIRS=". base ssl event util cpputil evpp http http/client" SRCS="examples/curl.cpp" - # $(MAKEF) TARGET=$@ SRCDIRS=". base ssl event util cpputil evpp http http/client" SRCS="examples/curl.cpp" WITH_CURL=yes + $(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) util cpputil evpp http http/client" SRCS="examples/curl.cpp" + # $(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) util cpputil evpp http http/client" SRCS="examples/curl.cpp" WITH_CURL=yes wget: prepare - $(MAKEF) TARGET=$@ SRCDIRS=". base ssl event util cpputil evpp http http/client" SRCS="examples/wget.cpp" + $(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) util cpputil evpp http http/client" SRCS="examples/wget.cpp" http_server_test: prepare - $(MAKEF) TARGET=$@ SRCDIRS=". base ssl event util cpputil evpp http http/server" SRCS="examples/http_server_test.cpp" + $(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) util cpputil evpp http http/server" SRCS="examples/http_server_test.cpp" http_client_test: prepare - $(MAKEF) TARGET=$@ SRCDIRS=". base ssl event util cpputil evpp http http/client" SRCS="examples/http_client_test.cpp" + $(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) util cpputil evpp http http/client" SRCS="examples/http_client_test.cpp" websocket_server_test: prepare - $(MAKEF) TARGET=$@ SRCDIRS=". base ssl event util cpputil evpp http http/server" SRCS="examples/websocket_server_test.cpp" + $(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) util cpputil evpp http http/server" SRCS="examples/websocket_server_test.cpp" websocket_client_test: prepare - $(MAKEF) TARGET=$@ SRCDIRS=". base ssl event util cpputil evpp http http/client" SRCS="examples/websocket_client_test.cpp" + $(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) util cpputil evpp http http/client" SRCS="examples/websocket_client_test.cpp" jsonrpc: jsonrpc_client jsonrpc_server jsonrpc_client: prepare - $(MAKEF) TARGET=$@ SRCDIRS=". base ssl event" SRCS="examples/jsonrpc/jsonrpc_client.c examples/jsonrpc/jsonrpc.c examples/jsonrpc/cJSON.c" + $(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS)" SRCS="examples/jsonrpc/jsonrpc_client.c examples/jsonrpc/jsonrpc.c examples/jsonrpc/cJSON.c" jsonrpc_server: prepare $(RM) examples/jsonrpc/*.o - $(MAKEF) TARGET=$@ SRCDIRS=". base ssl event" SRCS="examples/jsonrpc/jsonrpc_server.c examples/jsonrpc/jsonrpc.c examples/jsonrpc/cJSON.c" + $(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS)" SRCS="examples/jsonrpc/jsonrpc_server.c examples/jsonrpc/jsonrpc.c examples/jsonrpc/cJSON.c" protorpc: protorpc_client protorpc_server @@ -154,17 +158,18 @@ protorpc_protoc: bash examples/protorpc/proto/protoc.sh protorpc_client: prepare protorpc_protoc - $(MAKEF) TARGET=$@ SRCDIRS=". base ssl event cpputil evpp examples/protorpc/generated" \ + $(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) cpputil evpp examples/protorpc/generated" \ SRCS="examples/protorpc/protorpc_client.cpp examples/protorpc/protorpc.c" \ LIBS="protobuf" protorpc_server: prepare protorpc_protoc $(RM) examples/protorpc/*.o - $(MAKEF) TARGET=$@ SRCDIRS=". base ssl event cpputil evpp examples/protorpc/generated" \ + $(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) cpputil evpp examples/protorpc/generated" \ SRCS="examples/protorpc/protorpc_server.cpp examples/protorpc/protorpc.c" \ LIBS="protobuf" unittest: prepare + $(CC) -g -Wall -O0 -std=c99 -I. -Ibase -o bin/rbtree_test unittest/rbtree_test.c base/rbtree.c $(CC) -g -Wall -O0 -std=c99 -I. -Ibase -o bin/mkdir_p unittest/mkdir_test.c base/hbase.c $(CC) -g -Wall -O0 -std=c99 -I. -Ibase -o bin/rmdir_p unittest/rmdir_test.c base/hbase.c $(CC) -g -Wall -O0 -std=c99 -I. -Ibase -o bin/date unittest/date_test.c base/htime.c diff --git a/README-CN.md b/README-CN.md index 02bbd789e..fe2f1dd70 100644 --- a/README-CN.md +++ b/README-CN.md @@ -25,6 +25,7 @@ - 高性能事件循环(网络IO事件、定时器事件、空闲事件、自定义事件) - TCP/UDP服务端/客户端/代理 - TCP支持心跳、转发、拆包、多线程安全write和close等特性 +- 可靠UDP支持: WITH_KCP - SSL/TLS加密通信(可选WITH_OPENSSL、WITH_GNUTLS、WITH_MBEDTLS) - HTTP服务端/客户端(支持https http1/x http2 grpc) - HTTP支持静态文件服务、目录服务、同步/异步API处理函数 diff --git a/README.md b/README.md index 26a99c417..a42bf2f55 100644 --- a/README.md +++ b/README.md @@ -24,9 +24,10 @@ but simpler api and richer protocols. ## ✨ Features - Cross-platform (Linux, Windows, MacOS, Solaris) -- EventLoop (IO, timer, idle, custom) +- High-performance EventLoop (IO, timer, idle, custom) - TCP/UDP client/server/proxy - TCP supports heartbeat, upstream, unpack, MultiThread-safe write and close, etc. +- RUDP support: WITH_KCP - SSL/TLS support: (via WITH_OPENSSL or WITH_GNUTLS or WITH_MBEDTLS) - HTTP client/server (support https http1/x http2 grpc) - HTTP static file service, indexof service, sync/async API handler diff --git a/config.ini b/config.ini index e5e76ee64..fc8f64ec2 100644 --- a/config.ini +++ b/config.ini @@ -33,3 +33,6 @@ WITH_NGHTTP2=no WITH_OPENSSL=no WITH_GNUTLS=no WITH_MBEDTLS=no + +# rudp +WITH_KCP=no diff --git a/config.mk b/config.mk index e1b6d243c..3eb61ddd6 100644 --- a/config.mk +++ b/config.mk @@ -16,4 +16,5 @@ WITH_NGHTTP2=no WITH_OPENSSL=no WITH_GNUTLS=no WITH_MBEDTLS=no -CONFIG_DATE=20210817 \ No newline at end of file +WITH_KCP=no +CONFIG_DATE=20211124 diff --git a/configure b/configure index eb7baaa31..997507a1b 100755 --- a/configure +++ b/configure @@ -36,6 +36,9 @@ dependencies: --with-gnutls compile with gnutls? (DEFAULT: $WITH_GNUTLS) --with-mbedtls compile with mbedtls? (DEFAULT: $WITH_MBEDTLS) +rudp: + --with-kcp compile with kcp? (DEFAULT: $WITH_KCP) + END } @@ -250,6 +253,7 @@ option=WITH_GNUTLS && check_option option=WITH_MBEDTLS && check_option option=ENABLE_UDS && check_option option=USE_MULTIMAP && check_option +option=WITH_KCP && check_option # end confile cat << END >> $confile diff --git a/docs/PLAN.md b/docs/PLAN.md index b578cda72..b6c5701ea 100644 --- a/docs/PLAN.md +++ b/docs/PLAN.md @@ -1,6 +1,8 @@ ## Done +- base: cross platfrom infrastructure - event: select/poll/epoll/kqueue/port +- ssl: openssl/guntls/mbedtls - evpp: c++ EventLoop interface similar to muduo and evpp - http client/server: include https http1/x http2 - websocket client/server @@ -18,5 +20,8 @@ - lua binding - js binding - hrpc = libhv + protobuf -- reliable udp: FEC, ARQ, KCP, UDT, QUIC +- rudp: FEC, ARQ, KCP, UDT, QUIC - have a taste of io_uring +- coroutine +- IM-libhv +- GameServer-libhv diff --git a/event/README.md b/event/README.md index 5106923f6..d92834284 100644 --- a/event/README.md +++ b/event/README.md @@ -5,6 +5,8 @@ ├── hloop.h 事件循环模块对外头文件 ├── hevent.h 事件结构体定义 ├── nlog.h 网络日志 +├── unpack.h 拆包 +├── rudp.h 可靠UDP ├── iowatcher.h IO多路复用统一抽象接口 ├── select.c EVENT_SELECT实现 ├── poll.c EVENT_POLL实现 diff --git a/event/hevent.c b/event/hevent.c index 333e8dd15..336950966 100644 --- a/event/hevent.c +++ b/event/hevent.c @@ -40,8 +40,12 @@ static void fill_io_type(hio_t* io) { } static void hio_socket_init(hio_t* io) { - // nonblocking - nonblocking(io->fd); + if (io->io_type & HIO_TYPE_SOCK_RAW || io->io_type & HIO_TYPE_SOCK_DGRAM) { + // NOTE: sendto multiple peeraddr cannot use io->write_queue + blocking(io->fd); + } else { + nonblocking(io->fd); + } // fill io->localaddr io->peeraddr if (io->localaddr == NULL) { HV_ALLOC(io->localaddr, sizeof(sockaddr_u)); @@ -52,12 +56,8 @@ static void hio_socket_init(hio_t* io) { socklen_t addrlen = sizeof(sockaddr_u); int ret = getsockname(io->fd, io->localaddr, &addrlen); printd("getsockname fd=%d ret=%d errno=%d\n", io->fd, ret, socket_errno()); - // NOTE: - // tcp_server peeraddr set by accept - // udp_server peeraddr set by recvfrom - // tcp_client/udp_client peeraddr set by hio_setpeeraddr + // NOTE: udp peeraddr set by recvfrom/sendto if (io->io_type & HIO_TYPE_SOCK_STREAM) { - // tcp acceptfd addrlen = sizeof(sockaddr_u); ret = getpeername(io->fd, io->peeraddr, &addrlen); printd("getpeername fd=%d ret=%d errno=%d\n", io->fd, ret, socket_errno()); @@ -142,6 +142,12 @@ void hio_ready(hio_t* io) { if (io->io_type & HIO_TYPE_SOCKET) { hio_socket_init(io); } + +#if WITH_RUDP + if (io->io_type & HIO_TYPE_SOCK_RAW || io->io_type & HIO_TYPE_SOCK_DGRAM) { + rudp_init(&io->rudp); + } +#endif } void hio_done(hio_t* io) { @@ -163,6 +169,12 @@ void hio_done(hio_t* io) { } write_queue_cleanup(&io->write_queue); hrecursive_mutex_unlock(&io->write_mutex); + +#if WITH_RUDP + if (io->io_type & HIO_TYPE_SOCK_RAW || io->io_type & HIO_TYPE_SOCK_DGRAM) { + rudp_cleanup(&io->rudp); + } +#endif } void hio_free(hio_t* io) { @@ -610,3 +622,122 @@ hio_t* hio_setup_udp_upstream(hio_t* io, const char* host, int port) { hio_read_upstream(io); return upstream_io; } + +#if WITH_RUDP +rudp_entry_t* hio_get_rudp(hio_t* io) { + rudp_entry_t* rudp = rudp_get(&io->rudp, io->peeraddr); + rudp->io = io; + return rudp; +} + +static void hio_close_rudp_event_cb(hevent_t* ev) { + rudp_entry_t* entry = (rudp_entry_t*)ev->userdata; + rudp_del(&entry->io->rudp, (struct sockaddr*)&entry->addr); + // rudp_entry_free(entry); +} + +int hio_close_rudp(hio_t* io, struct sockaddr* peeraddr) { + if (peeraddr == NULL) peeraddr = io->peeraddr; + // NOTE: do rudp_del for thread-safe + rudp_entry_t* entry = rudp_get(&io->rudp, peeraddr); + // NOTE: just rudp_remove first, do rudp_entry_free async for safe. + // rudp_entry_t* entry = rudp_remove(&io->rudp, peeraddr); + if (entry) { + hevent_t ev; + memset(&ev, 0, sizeof(ev)); + ev.cb = hio_close_rudp_event_cb; + ev.userdata = entry; + ev.priority = HEVENT_HIGH_PRIORITY; + hloop_post_event(io->loop, &ev); + } + return 0; +} +#endif + +#if WITH_KCP +static kcp_setting_t s_kcp_setting; +static int __kcp_output(const char* buf, int len, ikcpcb* ikcp, void* userdata) { + // printf("ikcp_output len=%d\n", len); + rudp_entry_t* rudp = (rudp_entry_t*)userdata; + assert(rudp != NULL && rudp->io != NULL); + int nsend = sendto(rudp->io->fd, buf, len, 0, &rudp->addr.sa, SOCKADDR_LEN(&rudp->addr)); + // printf("sendto nsend=%d\n", nsend); + return nsend; +} + +static void __kcp_update_timer_cb(htimer_t* timer) { + rudp_entry_t* rudp = (rudp_entry_t*)timer->privdata; + assert(rudp != NULL && rudp->io != NULL && rudp->kcp.ikcp != NULL); + ikcp_update(rudp->kcp.ikcp, (IUINT32)(rudp->io->loop->cur_hrtime / 1000)); +} + +int hio_set_kcp(hio_t* io, kcp_setting_t* setting) { + io->io_type = HIO_TYPE_KCP; + io->kcp_setting = setting; + return 0; +} + +kcp_t* hio_get_kcp(hio_t* io) { + rudp_entry_t* rudp = hio_get_rudp(io); + assert(rudp != NULL); + kcp_t* kcp = &rudp->kcp; + if (kcp->ikcp != NULL) return kcp; + if (io->kcp_setting == NULL) { + io->kcp_setting = &s_kcp_setting; + } + kcp_setting_t* setting = io->kcp_setting; + assert(io->kcp_setting != NULL); + kcp->ikcp = ikcp_create(setting->conv, rudp); + // printf("ikcp_create ikcp=%p\n", kcp->ikcp); + kcp->ikcp->output = __kcp_output; + if (setting->interval > 0) { + ikcp_nodelay(kcp->ikcp, setting->nodelay, setting->interval, setting->fastresend, setting->nocwnd); + } + if (setting->sndwnd > 0 && setting->rcvwnd > 0) { + ikcp_wndsize(kcp->ikcp, setting->sndwnd, setting->rcvwnd); + } + if (setting->mtu > 0) { + ikcp_setmtu(kcp->ikcp, setting->mtu); + } + if (kcp->update_timer == NULL) { + int update_interval = setting->update_interval; + if (update_interval == 0) { + update_interval = DEFAULT_KCP_UPDATE_INTERVAL; + } + kcp->update_timer = htimer_add(io->loop, __kcp_update_timer_cb, update_interval, INFINITE); + kcp->update_timer->privdata = rudp; + } + // NOTE: alloc kcp->readbuf when hio_read_kcp + return kcp; +} + +int hio_write_kcp(hio_t* io, const void* buf, size_t len) { + kcp_t* kcp = hio_get_kcp(io); + int nsend = ikcp_send(kcp->ikcp, (const char*)buf, len); + // printf("ikcp_send len=%d nsend=%d\n", (int)len, nsend); + if (nsend < 0) { + hio_close(io); + } + ikcp_update(kcp->ikcp, (IUINT32)io->loop->cur_hrtime / 1000); + return nsend; +} + +int hio_read_kcp (hio_t* io, void* buf, int readbytes) { + kcp_t* kcp = hio_get_kcp(io); + // printf("ikcp_input len=%d\n", readbytes); + ikcp_input(kcp->ikcp, (const char*)buf, readbytes); + if (kcp->readbuf.base == NULL || kcp->readbuf.len == 0) { + kcp->readbuf.len = DEFAULT_KCP_READ_BUFSIZE; + HV_ALLOC(kcp->readbuf.base, kcp->readbuf.len); + } + int ret = 0; + while (1) { + int nrecv = ikcp_recv(kcp->ikcp, kcp->readbuf.base, kcp->readbuf.len); + // printf("ikcp_recv nrecv=%d\n", nrecv); + if (nrecv < 0) break; + hio_read_cb(io, kcp->readbuf.base, nrecv); + ret += nrecv; + } + return ret; +} +#endif diff --git a/event/hevent.h b/event/hevent.h index e6bce634f..93a52c7d1 100644 --- a/event/hevent.h +++ b/event/hevent.h @@ -3,6 +3,7 @@ #include "hloop.h" #include "iowatcher.h" +#include "rudp.h" #include "hbuf.h" #include "hmutex.h" @@ -148,9 +149,17 @@ struct hio_s { #if defined(EVENT_POLL) || defined(EVENT_KQUEUE) int event_index[2]; // for poll,kqueue #endif + #ifdef EVENT_IOCP void* hovlp; // for iocp/overlapio #endif + +#if WITH_RUDP + rudp_t rudp; +#if WITH_KCP + kcp_setting_t* kcp_setting; +#endif +#endif }; /* * hio lifeline: @@ -189,6 +198,15 @@ static inline bool hio_is_alloced_readbuf(hio_t* io) { void hio_alloc_readbuf(hio_t* io, int len); void hio_free_readbuf(hio_t* io); +#if WITH_RUDP +rudp_entry_t* hio_get_rudp(hio_t* io); +#if WITH_KCP +kcp_t* hio_get_kcp(hio_t* io); +int hio_write_kcp(hio_t* io, const void* buf, size_t len); +int hio_read_kcp (hio_t* io, void* buf, int readbytes); +#endif +#endif + #define EVENT_ENTRY(p) container_of(p, hevent_t, pending_node) #define IDLE_ENTRY(p) container_of(p, hidle_t, node) #define TIMER_ENTRY(p) container_of(p, htimer_t, node) diff --git a/event/hloop.h b/event/hloop.h index 1989ea0bc..e768ca2f7 100644 --- a/event/hloop.h +++ b/event/hloop.h @@ -95,6 +95,7 @@ typedef enum { HIO_TYPE_SOCK_RAW = 0x00000F00, HIO_TYPE_UDP = 0x00001000, + HIO_TYPE_KCP = 0x00002000, HIO_TYPE_DTLS = 0x00010000, HIO_TYPE_SOCK_DGRAM = 0x000FF000, @@ -507,6 +508,58 @@ unpack_setting_t grpc_unpack_setting = { }; */ +//-----------------rudp--------------------------------------------- +#if WITH_KCP +#define WITH_RUDP 1 +#endif + +#if WITH_RUDP +// NOTE: hio_close_rudp is thread-safe. +HV_EXPORT int hio_close_rudp(hio_t* io, struct sockaddr* peeraddr DEFAULT(NULL)); +#endif + +#if WITH_KCP +typedef struct kcp_setting_s { + // ikcp_create(conv, ...) + int conv; + // ikcp_nodelay(kcp, nodelay, interval, fastresend, nocwnd) + int nodelay; + int interval; + int fastresend; + int nocwnd; + // ikcp_wndsize(kcp, sndwnd, rcvwnd) + int sndwnd; + int rcvwnd; + // ikcp_setmtu(kcp, mtu) + int mtu; + // ikcp_update + int update_interval; + +#ifdef __cplusplus + kcp_setting_s() { + conv = 0x11223344; + // normal mode + nodelay = 0; + interval = 40; + fastresend = 0; + nocwnd = 0; + // fast mode + // nodelay = 1; + // interval = 10; + // fastresend = 2; + // nocwnd = 1; + + sndwnd = 0; + rcvwnd = 0; + mtu = 1400; + update_interval = 10; // ms + } +#endif +} kcp_setting_t; + +HV_EXPORT int hio_set_kcp(hio_t* io, kcp_setting_t* setting DEFAULT(NULL)); +#endif + END_EXTERN_C #endif // HV_LOOP_H_ diff --git a/event/kcp/ikcp.c b/event/kcp/ikcp.c new file mode 100644 index 000000000..fd4cf7d87 --- /dev/null +++ b/event/kcp/ikcp.c @@ -0,0 +1,1299 @@ +//===================================================================== +// +// KCP - A Better ARQ Protocol Implementation +// skywind3000 (at) gmail.com, 2010-2011 +// +// Features: +// + Average RTT reduce 30% - 40% vs traditional ARQ like tcp. +// + Maximum RTT reduce three times vs tcp. +// + Lightweight, distributed as a single source file. +// +//===================================================================== +#include "ikcp.h" + +#include +#include +#include +#include +#include + + + +//===================================================================== +// KCP BASIC +//===================================================================== +const IUINT32 IKCP_RTO_NDL = 30; // no delay min rto +const IUINT32 IKCP_RTO_MIN = 100; // normal min rto +const IUINT32 IKCP_RTO_DEF = 200; +const IUINT32 IKCP_RTO_MAX = 60000; +const IUINT32 IKCP_CMD_PUSH = 81; // cmd: push data +const IUINT32 IKCP_CMD_ACK = 82; // cmd: ack +const IUINT32 IKCP_CMD_WASK = 83; // cmd: window probe (ask) +const IUINT32 IKCP_CMD_WINS = 84; // cmd: window size (tell) +const IUINT32 IKCP_ASK_SEND = 1; // need to send IKCP_CMD_WASK +const IUINT32 IKCP_ASK_TELL = 2; // need to send IKCP_CMD_WINS +const IUINT32 IKCP_WND_SND = 32; +const IUINT32 IKCP_WND_RCV = 128; // must >= max fragment size +const IUINT32 IKCP_MTU_DEF = 1400; +const IUINT32 IKCP_ACK_FAST = 3; +const IUINT32 IKCP_INTERVAL = 100; +const IUINT32 IKCP_OVERHEAD = 24; +const IUINT32 IKCP_DEADLINK = 20; +const IUINT32 IKCP_THRESH_INIT = 2; +const IUINT32 IKCP_THRESH_MIN = 2; +const IUINT32 IKCP_PROBE_INIT = 7000; // 7 secs to probe window size +const IUINT32 IKCP_PROBE_LIMIT = 120000; // up to 120 secs to probe window +const IUINT32 IKCP_FASTACK_LIMIT = 5; // max times to trigger fastack + + +//--------------------------------------------------------------------- +// encode / decode +//--------------------------------------------------------------------- + +/* encode 8 bits unsigned int */ +static inline char *ikcp_encode8u(char *p, unsigned char c) +{ + *(unsigned char*)p++ = c; + return p; +} + +/* decode 8 bits unsigned int */ +static inline const char *ikcp_decode8u(const char *p, unsigned char *c) +{ + *c = *(unsigned char*)p++; + return p; +} + +/* encode 16 bits unsigned int (lsb) */ +static inline char *ikcp_encode16u(char *p, unsigned short w) +{ +#if IWORDS_BIG_ENDIAN || IWORDS_MUST_ALIGN + *(unsigned char*)(p + 0) = (w & 255); + *(unsigned char*)(p + 1) = (w >> 8); +#else + memcpy(p, &w, 2); +#endif + p += 2; + return p; +} + +/* decode 16 bits unsigned int (lsb) */ +static inline const char *ikcp_decode16u(const char *p, unsigned short *w) +{ +#if IWORDS_BIG_ENDIAN || IWORDS_MUST_ALIGN + *w = *(const unsigned char*)(p + 1); + *w = *(const unsigned char*)(p + 0) + (*w << 8); +#else + memcpy(w, p, 2); +#endif + p += 2; + return p; +} + +/* encode 32 bits unsigned int (lsb) */ +static inline char *ikcp_encode32u(char *p, IUINT32 l) +{ +#if IWORDS_BIG_ENDIAN || IWORDS_MUST_ALIGN + *(unsigned char*)(p + 0) = (unsigned char)((l >> 0) & 0xff); + *(unsigned char*)(p + 1) = (unsigned char)((l >> 8) & 0xff); + *(unsigned char*)(p + 2) = (unsigned char)((l >> 16) & 0xff); + *(unsigned char*)(p + 3) = (unsigned char)((l >> 24) & 0xff); +#else + memcpy(p, &l, 4); +#endif + p += 4; + return p; +} + +/* decode 32 bits unsigned int (lsb) */ +static inline const char *ikcp_decode32u(const char *p, IUINT32 *l) +{ +#if IWORDS_BIG_ENDIAN || IWORDS_MUST_ALIGN + *l = *(const unsigned char*)(p + 3); + *l = *(const unsigned char*)(p + 2) + (*l << 8); + *l = *(const unsigned char*)(p + 1) + (*l << 8); + *l = *(const unsigned char*)(p + 0) + (*l << 8); +#else + memcpy(l, p, 4); +#endif + p += 4; + return p; +} + +static inline IUINT32 _imin_(IUINT32 a, IUINT32 b) { + return a <= b ? a : b; +} + +static inline IUINT32 _imax_(IUINT32 a, IUINT32 b) { + return a >= b ? a : b; +} + +static inline IUINT32 _ibound_(IUINT32 lower, IUINT32 middle, IUINT32 upper) +{ + return _imin_(_imax_(lower, middle), upper); +} + +static inline long _itimediff(IUINT32 later, IUINT32 earlier) +{ + return ((IINT32)(later - earlier)); +} + +//--------------------------------------------------------------------- +// manage segment +//--------------------------------------------------------------------- +typedef struct IKCPSEG IKCPSEG; + +static void* (*ikcp_malloc_hook)(size_t) = NULL; +static void (*ikcp_free_hook)(void *) = NULL; + +// internal malloc +static void* ikcp_malloc(size_t size) { + if (ikcp_malloc_hook) + return ikcp_malloc_hook(size); + return malloc(size); +} + +// internal free +static void ikcp_free(void *ptr) { + if (ikcp_free_hook) { + ikcp_free_hook(ptr); + } else { + free(ptr); + } +} + +// redefine allocator +void ikcp_allocator(void* (*new_malloc)(size_t), void (*new_free)(void*)) +{ + ikcp_malloc_hook = new_malloc; + ikcp_free_hook = new_free; +} + +// allocate a new kcp segment +static IKCPSEG* ikcp_segment_new(ikcpcb *kcp, int size) +{ + return (IKCPSEG*)ikcp_malloc(sizeof(IKCPSEG) + size); +} + +// delete a segment +static void ikcp_segment_delete(ikcpcb *kcp, IKCPSEG *seg) +{ + ikcp_free(seg); +} + +// write log +void ikcp_log(ikcpcb *kcp, int mask, const char *fmt, ...) +{ + char buffer[1024]; + va_list argptr; + if ((mask & kcp->logmask) == 0 || kcp->writelog == 0) return; + va_start(argptr, fmt); + vsprintf(buffer, fmt, argptr); + va_end(argptr); + kcp->writelog(buffer, kcp, kcp->user); +} + +// check log mask +static int ikcp_canlog(const ikcpcb *kcp, int mask) +{ + if ((mask & kcp->logmask) == 0 || kcp->writelog == NULL) return 0; + return 1; +} + +// output segment +static int ikcp_output(ikcpcb *kcp, const void *data, int size) +{ + assert(kcp); + assert(kcp->output); + if (ikcp_canlog(kcp, IKCP_LOG_OUTPUT)) { + ikcp_log(kcp, IKCP_LOG_OUTPUT, "[RO] %ld bytes", (long)size); + } + if (size == 0) return 0; + return kcp->output((const char*)data, size, kcp, kcp->user); +} + +// output queue +void ikcp_qprint(const char *name, const struct IQUEUEHEAD *head) +{ +#if 0 + const struct IQUEUEHEAD *p; + printf("<%s>: [", name); + for (p = head->next; p != head; p = p->next) { + const IKCPSEG *seg = iqueue_entry(p, const IKCPSEG, node); + printf("(%lu %d)", (unsigned long)seg->sn, (int)(seg->ts % 10000)); + if (p->next != head) printf(","); + } + printf("]\n"); +#endif +} + + +//--------------------------------------------------------------------- +// create a new kcpcb +//--------------------------------------------------------------------- +ikcpcb* ikcp_create(IUINT32 conv, void *user) +{ + ikcpcb *kcp = (ikcpcb*)ikcp_malloc(sizeof(struct IKCPCB)); + if (kcp == NULL) return NULL; + kcp->conv = conv; + kcp->user = user; + kcp->snd_una = 0; + kcp->snd_nxt = 0; + kcp->rcv_nxt = 0; + kcp->ts_recent = 0; + kcp->ts_lastack = 0; + kcp->ts_probe = 0; + kcp->probe_wait = 0; + kcp->snd_wnd = IKCP_WND_SND; + kcp->rcv_wnd = IKCP_WND_RCV; + kcp->rmt_wnd = IKCP_WND_RCV; + kcp->cwnd = 0; + kcp->incr = 0; + kcp->probe = 0; + kcp->mtu = IKCP_MTU_DEF; + kcp->mss = kcp->mtu - IKCP_OVERHEAD; + kcp->stream = 0; + + kcp->buffer = (char*)ikcp_malloc((kcp->mtu + IKCP_OVERHEAD) * 3); + if (kcp->buffer == NULL) { + ikcp_free(kcp); + return NULL; + } + + iqueue_init(&kcp->snd_queue); + iqueue_init(&kcp->rcv_queue); + iqueue_init(&kcp->snd_buf); + iqueue_init(&kcp->rcv_buf); + kcp->nrcv_buf = 0; + kcp->nsnd_buf = 0; + kcp->nrcv_que = 0; + kcp->nsnd_que = 0; + kcp->state = 0; + kcp->acklist = NULL; + kcp->ackblock = 0; + kcp->ackcount = 0; + kcp->rx_srtt = 0; + kcp->rx_rttval = 0; + kcp->rx_rto = IKCP_RTO_DEF; + kcp->rx_minrto = IKCP_RTO_MIN; + kcp->current = 0; + kcp->interval = IKCP_INTERVAL; + kcp->ts_flush = IKCP_INTERVAL; + kcp->nodelay = 0; + kcp->updated = 0; + kcp->logmask = 0; + kcp->ssthresh = IKCP_THRESH_INIT; + kcp->fastresend = 0; + kcp->fastlimit = IKCP_FASTACK_LIMIT; + kcp->nocwnd = 0; + kcp->xmit = 0; + kcp->dead_link = IKCP_DEADLINK; + kcp->output = NULL; + kcp->writelog = NULL; + + return kcp; +} + + +//--------------------------------------------------------------------- +// release a new kcpcb +//--------------------------------------------------------------------- +void ikcp_release(ikcpcb *kcp) +{ + assert(kcp); + if (kcp) { + IKCPSEG *seg; + while (!iqueue_is_empty(&kcp->snd_buf)) { + seg = iqueue_entry(kcp->snd_buf.next, IKCPSEG, node); + iqueue_del(&seg->node); + ikcp_segment_delete(kcp, seg); + } + while (!iqueue_is_empty(&kcp->rcv_buf)) { + seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node); + iqueue_del(&seg->node); + ikcp_segment_delete(kcp, seg); + } + while (!iqueue_is_empty(&kcp->snd_queue)) { + seg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node); + iqueue_del(&seg->node); + ikcp_segment_delete(kcp, seg); + } + while (!iqueue_is_empty(&kcp->rcv_queue)) { + seg = iqueue_entry(kcp->rcv_queue.next, IKCPSEG, node); + iqueue_del(&seg->node); + ikcp_segment_delete(kcp, seg); + } + if (kcp->buffer) { + ikcp_free(kcp->buffer); + } + if (kcp->acklist) { + ikcp_free(kcp->acklist); + } + + kcp->nrcv_buf = 0; + kcp->nsnd_buf = 0; + kcp->nrcv_que = 0; + kcp->nsnd_que = 0; + kcp->ackcount = 0; + kcp->buffer = NULL; + kcp->acklist = NULL; + ikcp_free(kcp); + } +} + + +//--------------------------------------------------------------------- +// set output callback, which will be invoked by kcp +//--------------------------------------------------------------------- +void ikcp_setoutput(ikcpcb *kcp, int (*output)(const char *buf, int len, + ikcpcb *kcp, void *user)) +{ + kcp->output = output; +} + + +//--------------------------------------------------------------------- +// user/upper level recv: returns size, returns below zero for EAGAIN +//--------------------------------------------------------------------- +int ikcp_recv(ikcpcb *kcp, char *buffer, int len) +{ + struct IQUEUEHEAD *p; + int ispeek = (len < 0)? 1 : 0; + int peeksize; + int recover = 0; + IKCPSEG *seg; + assert(kcp); + + if (iqueue_is_empty(&kcp->rcv_queue)) + return -1; + + if (len < 0) len = -len; + + peeksize = ikcp_peeksize(kcp); + + if (peeksize < 0) + return -2; + + if (peeksize > len) + return -3; + + if (kcp->nrcv_que >= kcp->rcv_wnd) + recover = 1; + + // merge fragment + for (len = 0, p = kcp->rcv_queue.next; p != &kcp->rcv_queue; ) { + int fragment; + seg = iqueue_entry(p, IKCPSEG, node); + p = p->next; + + if (buffer) { + memcpy(buffer, seg->data, seg->len); + buffer += seg->len; + } + + len += seg->len; + fragment = seg->frg; + + if (ikcp_canlog(kcp, IKCP_LOG_RECV)) { + ikcp_log(kcp, IKCP_LOG_RECV, "recv sn=%lu", (unsigned long)seg->sn); + } + + if (ispeek == 0) { + iqueue_del(&seg->node); + ikcp_segment_delete(kcp, seg); + kcp->nrcv_que--; + } + + if (fragment == 0) + break; + } + + assert(len == peeksize); + + // move available data from rcv_buf -> rcv_queue + while (! iqueue_is_empty(&kcp->rcv_buf)) { + seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node); + if (seg->sn == kcp->rcv_nxt && kcp->nrcv_que < kcp->rcv_wnd) { + iqueue_del(&seg->node); + kcp->nrcv_buf--; + iqueue_add_tail(&seg->node, &kcp->rcv_queue); + kcp->nrcv_que++; + kcp->rcv_nxt++; + } else { + break; + } + } + + // fast recover + if (kcp->nrcv_que < kcp->rcv_wnd && recover) { + // ready to send back IKCP_CMD_WINS in ikcp_flush + // tell remote my window size + kcp->probe |= IKCP_ASK_TELL; + } + + return len; +} + + +//--------------------------------------------------------------------- +// peek data size +//--------------------------------------------------------------------- +int ikcp_peeksize(const ikcpcb *kcp) +{ + struct IQUEUEHEAD *p; + IKCPSEG *seg; + int length = 0; + + assert(kcp); + + if (iqueue_is_empty(&kcp->rcv_queue)) return -1; + + seg = iqueue_entry(kcp->rcv_queue.next, IKCPSEG, node); + if (seg->frg == 0) return seg->len; + + if (kcp->nrcv_que < seg->frg + 1) return -1; + + for (p = kcp->rcv_queue.next; p != &kcp->rcv_queue; p = p->next) { + seg = iqueue_entry(p, IKCPSEG, node); + length += seg->len; + if (seg->frg == 0) break; + } + + return length; +} + + +//--------------------------------------------------------------------- +// user/upper level send, returns below zero for error +//--------------------------------------------------------------------- +int ikcp_send(ikcpcb *kcp, const char *buffer, int len) +{ + IKCPSEG *seg; + int count, i; + + assert(kcp->mss > 0); + if (len < 0) return -1; + + // append to previous segment in streaming mode (if possible) + if (kcp->stream != 0) { + if (!iqueue_is_empty(&kcp->snd_queue)) { + IKCPSEG *old = iqueue_entry(kcp->snd_queue.prev, IKCPSEG, node); + if (old->len < kcp->mss) { + int capacity = kcp->mss - old->len; + int extend = (len < capacity)? len : capacity; + seg = ikcp_segment_new(kcp, old->len + extend); + assert(seg); + if (seg == NULL) { + return -2; + } + iqueue_add_tail(&seg->node, &kcp->snd_queue); + memcpy(seg->data, old->data, old->len); + if (buffer) { + memcpy(seg->data + old->len, buffer, extend); + buffer += extend; + } + seg->len = old->len + extend; + seg->frg = 0; + len -= extend; + iqueue_del_init(&old->node); + ikcp_segment_delete(kcp, old); + } + } + if (len <= 0) { + return 0; + } + } + + if (len <= (int)kcp->mss) count = 1; + else count = (len + kcp->mss - 1) / kcp->mss; + + if (count >= (int)IKCP_WND_RCV) return -2; + + if (count == 0) count = 1; + + // fragment + for (i = 0; i < count; i++) { + int size = len > (int)kcp->mss ? (int)kcp->mss : len; + seg = ikcp_segment_new(kcp, size); + assert(seg); + if (seg == NULL) { + return -2; + } + if (buffer && len > 0) { + memcpy(seg->data, buffer, size); + } + seg->len = size; + seg->frg = (kcp->stream == 0)? (count - i - 1) : 0; + iqueue_init(&seg->node); + iqueue_add_tail(&seg->node, &kcp->snd_queue); + kcp->nsnd_que++; + if (buffer) { + buffer += size; + } + len -= size; + } + + return 0; +} + + +//--------------------------------------------------------------------- +// parse ack +//--------------------------------------------------------------------- +static void ikcp_update_ack(ikcpcb *kcp, IINT32 rtt) +{ + IINT32 rto = 0; + if (kcp->rx_srtt == 0) { + kcp->rx_srtt = rtt; + kcp->rx_rttval = rtt / 2; + } else { + long delta = rtt - kcp->rx_srtt; + if (delta < 0) delta = -delta; + kcp->rx_rttval = (3 * kcp->rx_rttval + delta) / 4; + kcp->rx_srtt = (7 * kcp->rx_srtt + rtt) / 8; + if (kcp->rx_srtt < 1) kcp->rx_srtt = 1; + } + rto = kcp->rx_srtt + _imax_(kcp->interval, 4 * kcp->rx_rttval); + kcp->rx_rto = _ibound_(kcp->rx_minrto, rto, IKCP_RTO_MAX); +} + +static void ikcp_shrink_buf(ikcpcb *kcp) +{ + struct IQUEUEHEAD *p = kcp->snd_buf.next; + if (p != &kcp->snd_buf) { + IKCPSEG *seg = iqueue_entry(p, IKCPSEG, node); + kcp->snd_una = seg->sn; + } else { + kcp->snd_una = kcp->snd_nxt; + } +} + +static void ikcp_parse_ack(ikcpcb *kcp, IUINT32 sn) +{ + struct IQUEUEHEAD *p, *next; + + if (_itimediff(sn, kcp->snd_una) < 0 || _itimediff(sn, kcp->snd_nxt) >= 0) + return; + + for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = next) { + IKCPSEG *seg = iqueue_entry(p, IKCPSEG, node); + next = p->next; + if (sn == seg->sn) { + iqueue_del(p); + ikcp_segment_delete(kcp, seg); + kcp->nsnd_buf--; + break; + } + if (_itimediff(sn, seg->sn) < 0) { + break; + } + } +} + +static void ikcp_parse_una(ikcpcb *kcp, IUINT32 una) +{ + struct IQUEUEHEAD *p, *next; + for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = next) { + IKCPSEG *seg = iqueue_entry(p, IKCPSEG, node); + next = p->next; + if (_itimediff(una, seg->sn) > 0) { + iqueue_del(p); + ikcp_segment_delete(kcp, seg); + kcp->nsnd_buf--; + } else { + break; + } + } +} + +static void ikcp_parse_fastack(ikcpcb *kcp, IUINT32 sn, IUINT32 ts) +{ + struct IQUEUEHEAD *p, *next; + + if (_itimediff(sn, kcp->snd_una) < 0 || _itimediff(sn, kcp->snd_nxt) >= 0) + return; + + for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = next) { + IKCPSEG *seg = iqueue_entry(p, IKCPSEG, node); + next = p->next; + if (_itimediff(sn, seg->sn) < 0) { + break; + } + else if (sn != seg->sn) { + #ifndef IKCP_FASTACK_CONSERVE + seg->fastack++; + #else + if (_itimediff(ts, seg->ts) >= 0) + seg->fastack++; + #endif + } + } +} + + +//--------------------------------------------------------------------- +// ack append +//--------------------------------------------------------------------- +static void ikcp_ack_push(ikcpcb *kcp, IUINT32 sn, IUINT32 ts) +{ + IUINT32 newsize = kcp->ackcount + 1; + IUINT32 *ptr; + + if (newsize > kcp->ackblock) { + IUINT32 *acklist; + IUINT32 newblock; + + for (newblock = 8; newblock < newsize; newblock <<= 1); + acklist = (IUINT32*)ikcp_malloc(newblock * sizeof(IUINT32) * 2); + + if (acklist == NULL) { + assert(acklist != NULL); + abort(); + } + + if (kcp->acklist != NULL) { + IUINT32 x; + for (x = 0; x < kcp->ackcount; x++) { + acklist[x * 2 + 0] = kcp->acklist[x * 2 + 0]; + acklist[x * 2 + 1] = kcp->acklist[x * 2 + 1]; + } + ikcp_free(kcp->acklist); + } + + kcp->acklist = acklist; + kcp->ackblock = newblock; + } + + ptr = &kcp->acklist[kcp->ackcount * 2]; + ptr[0] = sn; + ptr[1] = ts; + kcp->ackcount++; +} + +static void ikcp_ack_get(const ikcpcb *kcp, int p, IUINT32 *sn, IUINT32 *ts) +{ + if (sn) sn[0] = kcp->acklist[p * 2 + 0]; + if (ts) ts[0] = kcp->acklist[p * 2 + 1]; +} + + +//--------------------------------------------------------------------- +// parse data +//--------------------------------------------------------------------- +void ikcp_parse_data(ikcpcb *kcp, IKCPSEG *newseg) +{ + struct IQUEUEHEAD *p, *prev; + IUINT32 sn = newseg->sn; + int repeat = 0; + + if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) >= 0 || + _itimediff(sn, kcp->rcv_nxt) < 0) { + ikcp_segment_delete(kcp, newseg); + return; + } + + for (p = kcp->rcv_buf.prev; p != &kcp->rcv_buf; p = prev) { + IKCPSEG *seg = iqueue_entry(p, IKCPSEG, node); + prev = p->prev; + if (seg->sn == sn) { + repeat = 1; + break; + } + if (_itimediff(sn, seg->sn) > 0) { + break; + } + } + + if (repeat == 0) { + iqueue_init(&newseg->node); + iqueue_add(&newseg->node, p); + kcp->nrcv_buf++; + } else { + ikcp_segment_delete(kcp, newseg); + } + +#if 0 + ikcp_qprint("rcvbuf", &kcp->rcv_buf); + printf("rcv_nxt=%lu\n", kcp->rcv_nxt); +#endif + + // move available data from rcv_buf -> rcv_queue + while (! iqueue_is_empty(&kcp->rcv_buf)) { + IKCPSEG *seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node); + if (seg->sn == kcp->rcv_nxt && kcp->nrcv_que < kcp->rcv_wnd) { + iqueue_del(&seg->node); + kcp->nrcv_buf--; + iqueue_add_tail(&seg->node, &kcp->rcv_queue); + kcp->nrcv_que++; + kcp->rcv_nxt++; + } else { + break; + } + } + +#if 0 + ikcp_qprint("queue", &kcp->rcv_queue); + printf("rcv_nxt=%lu\n", kcp->rcv_nxt); +#endif + +#if 1 +// printf("snd(buf=%d, queue=%d)\n", kcp->nsnd_buf, kcp->nsnd_que); +// printf("rcv(buf=%d, queue=%d)\n", kcp->nrcv_buf, kcp->nrcv_que); +#endif +} + + +//--------------------------------------------------------------------- +// input data +//--------------------------------------------------------------------- +int ikcp_input(ikcpcb *kcp, const char *data, long size) +{ + IUINT32 prev_una = kcp->snd_una; + IUINT32 maxack = 0, latest_ts = 0; + int flag = 0; + + if (ikcp_canlog(kcp, IKCP_LOG_INPUT)) { + ikcp_log(kcp, IKCP_LOG_INPUT, "[RI] %d bytes", (int)size); + } + + if (data == NULL || (int)size < (int)IKCP_OVERHEAD) return -1; + + while (1) { + IUINT32 ts, sn, len, una, conv; + IUINT16 wnd; + IUINT8 cmd, frg; + IKCPSEG *seg; + + if (size < (int)IKCP_OVERHEAD) break; + + data = ikcp_decode32u(data, &conv); + if (conv != kcp->conv) return -1; + + data = ikcp_decode8u(data, &cmd); + data = ikcp_decode8u(data, &frg); + data = ikcp_decode16u(data, &wnd); + data = ikcp_decode32u(data, &ts); + data = ikcp_decode32u(data, &sn); + data = ikcp_decode32u(data, &una); + data = ikcp_decode32u(data, &len); + + size -= IKCP_OVERHEAD; + + if ((long)size < (long)len || (int)len < 0) return -2; + + if (cmd != IKCP_CMD_PUSH && cmd != IKCP_CMD_ACK && + cmd != IKCP_CMD_WASK && cmd != IKCP_CMD_WINS) + return -3; + + kcp->rmt_wnd = wnd; + ikcp_parse_una(kcp, una); + ikcp_shrink_buf(kcp); + + if (cmd == IKCP_CMD_ACK) { + if (_itimediff(kcp->current, ts) >= 0) { + ikcp_update_ack(kcp, _itimediff(kcp->current, ts)); + } + ikcp_parse_ack(kcp, sn); + ikcp_shrink_buf(kcp); + if (flag == 0) { + flag = 1; + maxack = sn; + latest_ts = ts; + } else { + if (_itimediff(sn, maxack) > 0) { + #ifndef IKCP_FASTACK_CONSERVE + maxack = sn; + latest_ts = ts; + #else + if (_itimediff(ts, latest_ts) > 0) { + maxack = sn; + latest_ts = ts; + } + #endif + } + } + if (ikcp_canlog(kcp, IKCP_LOG_IN_ACK)) { + ikcp_log(kcp, IKCP_LOG_IN_ACK, + "input ack: sn=%lu rtt=%ld rto=%ld", (unsigned long)sn, + (long)_itimediff(kcp->current, ts), + (long)kcp->rx_rto); + } + } + else if (cmd == IKCP_CMD_PUSH) { + if (ikcp_canlog(kcp, IKCP_LOG_IN_DATA)) { + ikcp_log(kcp, IKCP_LOG_IN_DATA, + "input psh: sn=%lu ts=%lu", (unsigned long)sn, (unsigned long)ts); + } + if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) < 0) { + ikcp_ack_push(kcp, sn, ts); + if (_itimediff(sn, kcp->rcv_nxt) >= 0) { + seg = ikcp_segment_new(kcp, len); + seg->conv = conv; + seg->cmd = cmd; + seg->frg = frg; + seg->wnd = wnd; + seg->ts = ts; + seg->sn = sn; + seg->una = una; + seg->len = len; + + if (len > 0) { + memcpy(seg->data, data, len); + } + + ikcp_parse_data(kcp, seg); + } + } + } + else if (cmd == IKCP_CMD_WASK) { + // ready to send back IKCP_CMD_WINS in ikcp_flush + // tell remote my window size + kcp->probe |= IKCP_ASK_TELL; + if (ikcp_canlog(kcp, IKCP_LOG_IN_PROBE)) { + ikcp_log(kcp, IKCP_LOG_IN_PROBE, "input probe"); + } + } + else if (cmd == IKCP_CMD_WINS) { + // do nothing + if (ikcp_canlog(kcp, IKCP_LOG_IN_WINS)) { + ikcp_log(kcp, IKCP_LOG_IN_WINS, + "input wins: %lu", (unsigned long)(wnd)); + } + } + else { + return -3; + } + + data += len; + size -= len; + } + + if (flag != 0) { + ikcp_parse_fastack(kcp, maxack, latest_ts); + } + + if (_itimediff(kcp->snd_una, prev_una) > 0) { + if (kcp->cwnd < kcp->rmt_wnd) { + IUINT32 mss = kcp->mss; + if (kcp->cwnd < kcp->ssthresh) { + kcp->cwnd++; + kcp->incr += mss; + } else { + if (kcp->incr < mss) kcp->incr = mss; + kcp->incr += (mss * mss) / kcp->incr + (mss / 16); + if ((kcp->cwnd + 1) * mss <= kcp->incr) { + #if 1 + kcp->cwnd = (kcp->incr + mss - 1) / ((mss > 0)? mss : 1); + #else + kcp->cwnd++; + #endif + } + } + if (kcp->cwnd > kcp->rmt_wnd) { + kcp->cwnd = kcp->rmt_wnd; + kcp->incr = kcp->rmt_wnd * mss; + } + } + } + + return 0; +} + + +//--------------------------------------------------------------------- +// ikcp_encode_seg +//--------------------------------------------------------------------- +static char *ikcp_encode_seg(char *ptr, const IKCPSEG *seg) +{ + ptr = ikcp_encode32u(ptr, seg->conv); + ptr = ikcp_encode8u(ptr, (IUINT8)seg->cmd); + ptr = ikcp_encode8u(ptr, (IUINT8)seg->frg); + ptr = ikcp_encode16u(ptr, (IUINT16)seg->wnd); + ptr = ikcp_encode32u(ptr, seg->ts); + ptr = ikcp_encode32u(ptr, seg->sn); + ptr = ikcp_encode32u(ptr, seg->una); + ptr = ikcp_encode32u(ptr, seg->len); + return ptr; +} + +static int ikcp_wnd_unused(const ikcpcb *kcp) +{ + if (kcp->nrcv_que < kcp->rcv_wnd) { + return kcp->rcv_wnd - kcp->nrcv_que; + } + return 0; +} + + +//--------------------------------------------------------------------- +// ikcp_flush +//--------------------------------------------------------------------- +void ikcp_flush(ikcpcb *kcp) +{ + IUINT32 current = kcp->current; + char *buffer = kcp->buffer; + char *ptr = buffer; + int count, size, i; + IUINT32 resent, cwnd; + IUINT32 rtomin; + struct IQUEUEHEAD *p; + int change = 0; + int lost = 0; + IKCPSEG seg; + + // 'ikcp_update' haven't been called. + if (kcp->updated == 0) return; + + seg.conv = kcp->conv; + seg.cmd = IKCP_CMD_ACK; + seg.frg = 0; + seg.wnd = ikcp_wnd_unused(kcp); + seg.una = kcp->rcv_nxt; + seg.len = 0; + seg.sn = 0; + seg.ts = 0; + + // flush acknowledges + count = kcp->ackcount; + for (i = 0; i < count; i++) { + size = (int)(ptr - buffer); + if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) { + ikcp_output(kcp, buffer, size); + ptr = buffer; + } + ikcp_ack_get(kcp, i, &seg.sn, &seg.ts); + ptr = ikcp_encode_seg(ptr, &seg); + } + + kcp->ackcount = 0; + + // probe window size (if remote window size equals zero) + if (kcp->rmt_wnd == 0) { + if (kcp->probe_wait == 0) { + kcp->probe_wait = IKCP_PROBE_INIT; + kcp->ts_probe = kcp->current + kcp->probe_wait; + } + else { + if (_itimediff(kcp->current, kcp->ts_probe) >= 0) { + if (kcp->probe_wait < IKCP_PROBE_INIT) + kcp->probe_wait = IKCP_PROBE_INIT; + kcp->probe_wait += kcp->probe_wait / 2; + if (kcp->probe_wait > IKCP_PROBE_LIMIT) + kcp->probe_wait = IKCP_PROBE_LIMIT; + kcp->ts_probe = kcp->current + kcp->probe_wait; + kcp->probe |= IKCP_ASK_SEND; + } + } + } else { + kcp->ts_probe = 0; + kcp->probe_wait = 0; + } + + // flush window probing commands + if (kcp->probe & IKCP_ASK_SEND) { + seg.cmd = IKCP_CMD_WASK; + size = (int)(ptr - buffer); + if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) { + ikcp_output(kcp, buffer, size); + ptr = buffer; + } + ptr = ikcp_encode_seg(ptr, &seg); + } + + // flush window probing commands + if (kcp->probe & IKCP_ASK_TELL) { + seg.cmd = IKCP_CMD_WINS; + size = (int)(ptr - buffer); + if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) { + ikcp_output(kcp, buffer, size); + ptr = buffer; + } + ptr = ikcp_encode_seg(ptr, &seg); + } + + kcp->probe = 0; + + // calculate window size + cwnd = _imin_(kcp->snd_wnd, kcp->rmt_wnd); + if (kcp->nocwnd == 0) cwnd = _imin_(kcp->cwnd, cwnd); + + // move data from snd_queue to snd_buf + while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) { + IKCPSEG *newseg; + if (iqueue_is_empty(&kcp->snd_queue)) break; + + newseg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node); + + iqueue_del(&newseg->node); + iqueue_add_tail(&newseg->node, &kcp->snd_buf); + kcp->nsnd_que--; + kcp->nsnd_buf++; + + newseg->conv = kcp->conv; + newseg->cmd = IKCP_CMD_PUSH; + newseg->wnd = seg.wnd; + newseg->ts = current; + newseg->sn = kcp->snd_nxt++; + newseg->una = kcp->rcv_nxt; + newseg->resendts = current; + newseg->rto = kcp->rx_rto; + newseg->fastack = 0; + newseg->xmit = 0; + } + + // calculate resent + resent = (kcp->fastresend > 0)? (IUINT32)kcp->fastresend : 0xffffffff; + rtomin = (kcp->nodelay == 0)? (kcp->rx_rto >> 3) : 0; + + // flush data segments + for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) { + IKCPSEG *segment = iqueue_entry(p, IKCPSEG, node); + int needsend = 0; + if (segment->xmit == 0) { + needsend = 1; + segment->xmit++; + segment->rto = kcp->rx_rto; + segment->resendts = current + segment->rto + rtomin; + } + else if (_itimediff(current, segment->resendts) >= 0) { + needsend = 1; + segment->xmit++; + kcp->xmit++; + if (kcp->nodelay == 0) { + segment->rto += _imax_(segment->rto, (IUINT32)kcp->rx_rto); + } else { + IINT32 step = (kcp->nodelay < 2)? + ((IINT32)(segment->rto)) : kcp->rx_rto; + segment->rto += step / 2; + } + segment->resendts = current + segment->rto; + lost = 1; + } + else if (segment->fastack >= resent) { + if ((int)segment->xmit <= kcp->fastlimit || + kcp->fastlimit <= 0) { + needsend = 1; + segment->xmit++; + segment->fastack = 0; + segment->resendts = current + segment->rto; + change++; + } + } + + if (needsend) { + int need; + segment->ts = current; + segment->wnd = seg.wnd; + segment->una = kcp->rcv_nxt; + + size = (int)(ptr - buffer); + need = IKCP_OVERHEAD + segment->len; + + if (size + need > (int)kcp->mtu) { + ikcp_output(kcp, buffer, size); + ptr = buffer; + } + + ptr = ikcp_encode_seg(ptr, segment); + + if (segment->len > 0) { + memcpy(ptr, segment->data, segment->len); + ptr += segment->len; + } + + if (segment->xmit >= kcp->dead_link) { + kcp->state = (IUINT32)-1; + } + } + } + + // flash remain segments + size = (int)(ptr - buffer); + if (size > 0) { + ikcp_output(kcp, buffer, size); + } + + // update ssthresh + if (change) { + IUINT32 inflight = kcp->snd_nxt - kcp->snd_una; + kcp->ssthresh = inflight / 2; + if (kcp->ssthresh < IKCP_THRESH_MIN) + kcp->ssthresh = IKCP_THRESH_MIN; + kcp->cwnd = kcp->ssthresh + resent; + kcp->incr = kcp->cwnd * kcp->mss; + } + + if (lost) { + kcp->ssthresh = cwnd / 2; + if (kcp->ssthresh < IKCP_THRESH_MIN) + kcp->ssthresh = IKCP_THRESH_MIN; + kcp->cwnd = 1; + kcp->incr = kcp->mss; + } + + if (kcp->cwnd < 1) { + kcp->cwnd = 1; + kcp->incr = kcp->mss; + } +} + + +//--------------------------------------------------------------------- +// update state (call it repeatedly, every 10ms-100ms), or you can ask +// ikcp_check when to call it again (without ikcp_input/_send calling). +// 'current' - current timestamp in millisec. +//--------------------------------------------------------------------- +void ikcp_update(ikcpcb *kcp, IUINT32 current) +{ + IINT32 slap; + + kcp->current = current; + + if (kcp->updated == 0) { + kcp->updated = 1; + kcp->ts_flush = kcp->current; + } + + slap = _itimediff(kcp->current, kcp->ts_flush); + + if (slap >= 10000 || slap < -10000) { + kcp->ts_flush = kcp->current; + slap = 0; + } + + if (slap >= 0) { + kcp->ts_flush += kcp->interval; + if (_itimediff(kcp->current, kcp->ts_flush) >= 0) { + kcp->ts_flush = kcp->current + kcp->interval; + } + ikcp_flush(kcp); + } +} + + +//--------------------------------------------------------------------- +// Determine when should you invoke ikcp_update: +// returns when you should invoke ikcp_update in millisec, if there +// is no ikcp_input/_send calling. you can call ikcp_update in that +// time, instead of call update repeatly. +// Important to reduce unnacessary ikcp_update invoking. use it to +// schedule ikcp_update (eg. implementing an epoll-like mechanism, +// or optimize ikcp_update when handling massive kcp connections) +//--------------------------------------------------------------------- +IUINT32 ikcp_check(const ikcpcb *kcp, IUINT32 current) +{ + IUINT32 ts_flush = kcp->ts_flush; + IINT32 tm_flush = 0x7fffffff; + IINT32 tm_packet = 0x7fffffff; + IUINT32 minimal = 0; + struct IQUEUEHEAD *p; + + if (kcp->updated == 0) { + return current; + } + + if (_itimediff(current, ts_flush) >= 10000 || + _itimediff(current, ts_flush) < -10000) { + ts_flush = current; + } + + if (_itimediff(current, ts_flush) >= 0) { + return current; + } + + tm_flush = _itimediff(ts_flush, current); + + for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) { + const IKCPSEG *seg = iqueue_entry(p, const IKCPSEG, node); + IINT32 diff = _itimediff(seg->resendts, current); + if (diff <= 0) { + return current; + } + if (diff < tm_packet) tm_packet = diff; + } + + minimal = (IUINT32)(tm_packet < tm_flush ? tm_packet : tm_flush); + if (minimal >= kcp->interval) minimal = kcp->interval; + + return current + minimal; +} + + + +int ikcp_setmtu(ikcpcb *kcp, int mtu) +{ + char *buffer; + if (mtu < 50 || mtu < (int)IKCP_OVERHEAD) + return -1; + buffer = (char*)ikcp_malloc((mtu + IKCP_OVERHEAD) * 3); + if (buffer == NULL) + return -2; + kcp->mtu = mtu; + kcp->mss = kcp->mtu - IKCP_OVERHEAD; + ikcp_free(kcp->buffer); + kcp->buffer = buffer; + return 0; +} + +int ikcp_interval(ikcpcb *kcp, int interval) +{ + if (interval > 5000) interval = 5000; + else if (interval < 10) interval = 10; + kcp->interval = interval; + return 0; +} + +int ikcp_nodelay(ikcpcb *kcp, int nodelay, int interval, int resend, int nc) +{ + if (nodelay >= 0) { + kcp->nodelay = nodelay; + if (nodelay) { + kcp->rx_minrto = IKCP_RTO_NDL; + } + else { + kcp->rx_minrto = IKCP_RTO_MIN; + } + } + if (interval >= 0) { + if (interval > 5000) interval = 5000; + else if (interval < 10) interval = 10; + kcp->interval = interval; + } + if (resend >= 0) { + kcp->fastresend = resend; + } + if (nc >= 0) { + kcp->nocwnd = nc; + } + return 0; +} + + +int ikcp_wndsize(ikcpcb *kcp, int sndwnd, int rcvwnd) +{ + if (kcp) { + if (sndwnd > 0) { + kcp->snd_wnd = sndwnd; + } + if (rcvwnd > 0) { // must >= max fragment size + kcp->rcv_wnd = _imax_(rcvwnd, IKCP_WND_RCV); + } + } + return 0; +} + +int ikcp_waitsnd(const ikcpcb *kcp) +{ + return kcp->nsnd_buf + kcp->nsnd_que; +} + + +// read conv +IUINT32 ikcp_getconv(const void *ptr) +{ + IUINT32 conv; + ikcp_decode32u((const char*)ptr, &conv); + return conv; +} + + diff --git a/event/kcp/ikcp.h b/event/kcp/ikcp.h new file mode 100644 index 000000000..e525105c8 --- /dev/null +++ b/event/kcp/ikcp.h @@ -0,0 +1,416 @@ +//===================================================================== +// +// KCP - A Better ARQ Protocol Implementation +// skywind3000 (at) gmail.com, 2010-2011 +// +// Features: +// + Average RTT reduce 30% - 40% vs traditional ARQ like tcp. +// + Maximum RTT reduce three times vs tcp. +// + Lightweight, distributed as a single source file. +// +//===================================================================== +#ifndef __IKCP_H__ +#define __IKCP_H__ + +#include +#include +#include + + +//===================================================================== +// 32BIT INTEGER DEFINITION +//===================================================================== +#ifndef __INTEGER_32_BITS__ +#define __INTEGER_32_BITS__ +#if defined(_WIN64) || defined(WIN64) || defined(__amd64__) || \ + defined(__x86_64) || defined(__x86_64__) || defined(_M_IA64) || \ + defined(_M_AMD64) + typedef unsigned int ISTDUINT32; + typedef int ISTDINT32; +#elif defined(_WIN32) || defined(WIN32) || defined(__i386__) || \ + defined(__i386) || defined(_M_X86) + typedef unsigned long ISTDUINT32; + typedef long ISTDINT32; +#elif defined(__MACOS__) + typedef UInt32 ISTDUINT32; + typedef SInt32 ISTDINT32; +#elif defined(__APPLE__) && defined(__MACH__) + #include + typedef u_int32_t ISTDUINT32; + typedef int32_t ISTDINT32; +#elif defined(__BEOS__) + #include + typedef u_int32_t ISTDUINT32; + typedef int32_t ISTDINT32; +#elif (defined(_MSC_VER) || defined(__BORLANDC__)) && (!defined(__MSDOS__)) + typedef unsigned __int32 ISTDUINT32; + typedef __int32 ISTDINT32; +#elif defined(__GNUC__) + #include + typedef uint32_t ISTDUINT32; + typedef int32_t ISTDINT32; +#else + typedef unsigned long ISTDUINT32; + typedef long ISTDINT32; +#endif +#endif + + +//===================================================================== +// Integer Definition +//===================================================================== +#ifndef __IINT8_DEFINED +#define __IINT8_DEFINED +typedef char IINT8; +#endif + +#ifndef __IUINT8_DEFINED +#define __IUINT8_DEFINED +typedef unsigned char IUINT8; +#endif + +#ifndef __IUINT16_DEFINED +#define __IUINT16_DEFINED +typedef unsigned short IUINT16; +#endif + +#ifndef __IINT16_DEFINED +#define __IINT16_DEFINED +typedef short IINT16; +#endif + +#ifndef __IINT32_DEFINED +#define __IINT32_DEFINED +typedef ISTDINT32 IINT32; +#endif + +#ifndef __IUINT32_DEFINED +#define __IUINT32_DEFINED +typedef ISTDUINT32 IUINT32; +#endif + +#ifndef __IINT64_DEFINED +#define __IINT64_DEFINED +#if defined(_MSC_VER) || defined(__BORLANDC__) +typedef __int64 IINT64; +#else +typedef long long IINT64; +#endif +#endif + +#ifndef __IUINT64_DEFINED +#define __IUINT64_DEFINED +#if defined(_MSC_VER) || defined(__BORLANDC__) +typedef unsigned __int64 IUINT64; +#else +typedef unsigned long long IUINT64; +#endif +#endif + +#ifndef INLINE +#if defined(__GNUC__) + +#if (__GNUC__ > 3) || ((__GNUC__ == 3) && (__GNUC_MINOR__ >= 1)) +#define INLINE __inline__ __attribute__((always_inline)) +#else +#define INLINE __inline__ +#endif + +#elif (defined(_MSC_VER) || defined(__BORLANDC__) || defined(__WATCOMC__)) +#define INLINE __inline +#else +#define INLINE +#endif +#endif + +#if (!defined(__cplusplus)) && (!defined(inline)) +#define inline INLINE +#endif + + +//===================================================================== +// QUEUE DEFINITION +//===================================================================== +#ifndef __IQUEUE_DEF__ +#define __IQUEUE_DEF__ + +struct IQUEUEHEAD { + struct IQUEUEHEAD *next, *prev; +}; + +typedef struct IQUEUEHEAD iqueue_head; + + +//--------------------------------------------------------------------- +// queue init +//--------------------------------------------------------------------- +#define IQUEUE_HEAD_INIT(name) { &(name), &(name) } +#define IQUEUE_HEAD(name) \ + struct IQUEUEHEAD name = IQUEUE_HEAD_INIT(name) + +#define IQUEUE_INIT(ptr) ( \ + (ptr)->next = (ptr), (ptr)->prev = (ptr)) + +#define IOFFSETOF(TYPE, MEMBER) ((size_t) &((TYPE *)0)->MEMBER) + +#define ICONTAINEROF(ptr, type, member) ( \ + (type*)( ((char*)((type*)ptr)) - IOFFSETOF(type, member)) ) + +#define IQUEUE_ENTRY(ptr, type, member) ICONTAINEROF(ptr, type, member) + + +//--------------------------------------------------------------------- +// queue operation +//--------------------------------------------------------------------- +#define IQUEUE_ADD(node, head) ( \ + (node)->prev = (head), (node)->next = (head)->next, \ + (head)->next->prev = (node), (head)->next = (node)) + +#define IQUEUE_ADD_TAIL(node, head) ( \ + (node)->prev = (head)->prev, (node)->next = (head), \ + (head)->prev->next = (node), (head)->prev = (node)) + +#define IQUEUE_DEL_BETWEEN(p, n) ((n)->prev = (p), (p)->next = (n)) + +#define IQUEUE_DEL(entry) (\ + (entry)->next->prev = (entry)->prev, \ + (entry)->prev->next = (entry)->next, \ + (entry)->next = 0, (entry)->prev = 0) + +#define IQUEUE_DEL_INIT(entry) do { \ + IQUEUE_DEL(entry); IQUEUE_INIT(entry); } while (0) + +#define IQUEUE_IS_EMPTY(entry) ((entry) == (entry)->next) + +#define iqueue_init IQUEUE_INIT +#define iqueue_entry IQUEUE_ENTRY +#define iqueue_add IQUEUE_ADD +#define iqueue_add_tail IQUEUE_ADD_TAIL +#define iqueue_del IQUEUE_DEL +#define iqueue_del_init IQUEUE_DEL_INIT +#define iqueue_is_empty IQUEUE_IS_EMPTY + +#define IQUEUE_FOREACH(iterator, head, TYPE, MEMBER) \ + for ((iterator) = iqueue_entry((head)->next, TYPE, MEMBER); \ + &((iterator)->MEMBER) != (head); \ + (iterator) = iqueue_entry((iterator)->MEMBER.next, TYPE, MEMBER)) + +#define iqueue_foreach(iterator, head, TYPE, MEMBER) \ + IQUEUE_FOREACH(iterator, head, TYPE, MEMBER) + +#define iqueue_foreach_entry(pos, head) \ + for( (pos) = (head)->next; (pos) != (head) ; (pos) = (pos)->next ) + + +#define __iqueue_splice(list, head) do { \ + iqueue_head *first = (list)->next, *last = (list)->prev; \ + iqueue_head *at = (head)->next; \ + (first)->prev = (head), (head)->next = (first); \ + (last)->next = (at), (at)->prev = (last); } while (0) + +#define iqueue_splice(list, head) do { \ + if (!iqueue_is_empty(list)) __iqueue_splice(list, head); } while (0) + +#define iqueue_splice_init(list, head) do { \ + iqueue_splice(list, head); iqueue_init(list); } while (0) + + +#ifdef _MSC_VER +#pragma warning(disable:4311) +#pragma warning(disable:4312) +#pragma warning(disable:4996) +#endif + +#endif + + +//--------------------------------------------------------------------- +// BYTE ORDER & ALIGNMENT +//--------------------------------------------------------------------- +#ifndef IWORDS_BIG_ENDIAN + #ifdef _BIG_ENDIAN_ + #if _BIG_ENDIAN_ + #define IWORDS_BIG_ENDIAN 1 + #endif + #endif + #ifndef IWORDS_BIG_ENDIAN + #if defined(__hppa__) || \ + defined(__m68k__) || defined(mc68000) || defined(_M_M68K) || \ + (defined(__MIPS__) && defined(__MIPSEB__)) || \ + defined(__ppc__) || defined(__POWERPC__) || defined(_M_PPC) || \ + defined(__sparc__) || defined(__powerpc__) || \ + defined(__mc68000__) || defined(__s390x__) || defined(__s390__) + #define IWORDS_BIG_ENDIAN 1 + #endif + #endif + #ifndef IWORDS_BIG_ENDIAN + #define IWORDS_BIG_ENDIAN 0 + #endif +#endif + +#ifndef IWORDS_MUST_ALIGN + #if defined(__i386__) || defined(__i386) || defined(_i386_) + #define IWORDS_MUST_ALIGN 0 + #elif defined(_M_IX86) || defined(_X86_) || defined(__x86_64__) + #define IWORDS_MUST_ALIGN 0 + #elif defined(__amd64) || defined(__amd64__) + #define IWORDS_MUST_ALIGN 0 + #else + #define IWORDS_MUST_ALIGN 1 + #endif +#endif + + +//===================================================================== +// SEGMENT +//===================================================================== +struct IKCPSEG +{ + struct IQUEUEHEAD node; + IUINT32 conv; + IUINT32 cmd; + IUINT32 frg; + IUINT32 wnd; + IUINT32 ts; + IUINT32 sn; + IUINT32 una; + IUINT32 len; + IUINT32 resendts; + IUINT32 rto; + IUINT32 fastack; + IUINT32 xmit; + char data[1]; +}; + + +//--------------------------------------------------------------------- +// IKCPCB +//--------------------------------------------------------------------- +struct IKCPCB +{ + IUINT32 conv, mtu, mss, state; + IUINT32 snd_una, snd_nxt, rcv_nxt; + IUINT32 ts_recent, ts_lastack, ssthresh; + IINT32 rx_rttval, rx_srtt, rx_rto, rx_minrto; + IUINT32 snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe; + IUINT32 current, interval, ts_flush, xmit; + IUINT32 nrcv_buf, nsnd_buf; + IUINT32 nrcv_que, nsnd_que; + IUINT32 nodelay, updated; + IUINT32 ts_probe, probe_wait; + IUINT32 dead_link, incr; + struct IQUEUEHEAD snd_queue; + struct IQUEUEHEAD rcv_queue; + struct IQUEUEHEAD snd_buf; + struct IQUEUEHEAD rcv_buf; + IUINT32 *acklist; + IUINT32 ackcount; + IUINT32 ackblock; + void *user; + char *buffer; + int fastresend; + int fastlimit; + int nocwnd, stream; + int logmask; + int (*output)(const char *buf, int len, struct IKCPCB *kcp, void *user); + void (*writelog)(const char *log, struct IKCPCB *kcp, void *user); +}; + + +typedef struct IKCPCB ikcpcb; + +#define IKCP_LOG_OUTPUT 1 +#define IKCP_LOG_INPUT 2 +#define IKCP_LOG_SEND 4 +#define IKCP_LOG_RECV 8 +#define IKCP_LOG_IN_DATA 16 +#define IKCP_LOG_IN_ACK 32 +#define IKCP_LOG_IN_PROBE 64 +#define IKCP_LOG_IN_WINS 128 +#define IKCP_LOG_OUT_DATA 256 +#define IKCP_LOG_OUT_ACK 512 +#define IKCP_LOG_OUT_PROBE 1024 +#define IKCP_LOG_OUT_WINS 2048 + +#ifdef __cplusplus +extern "C" { +#endif + +//--------------------------------------------------------------------- +// interface +//--------------------------------------------------------------------- + +// create a new kcp control object, 'conv' must equal in two endpoint +// from the same connection. 'user' will be passed to the output callback +// output callback can be setup like this: 'kcp->output = my_udp_output' +ikcpcb* ikcp_create(IUINT32 conv, void *user); + +// release kcp control object +void ikcp_release(ikcpcb *kcp); + +// set output callback, which will be invoked by kcp +void ikcp_setoutput(ikcpcb *kcp, int (*output)(const char *buf, int len, + ikcpcb *kcp, void *user)); + +// user/upper level recv: returns size, returns below zero for EAGAIN +int ikcp_recv(ikcpcb *kcp, char *buffer, int len); + +// user/upper level send, returns below zero for error +int ikcp_send(ikcpcb *kcp, const char *buffer, int len); + +// update state (call it repeatedly, every 10ms-100ms), or you can ask +// ikcp_check when to call it again (without ikcp_input/_send calling). +// 'current' - current timestamp in millisec. +void ikcp_update(ikcpcb *kcp, IUINT32 current); + +// Determine when should you invoke ikcp_update: +// returns when you should invoke ikcp_update in millisec, if there +// is no ikcp_input/_send calling. you can call ikcp_update in that +// time, instead of call update repeatly. +// Important to reduce unnacessary ikcp_update invoking. use it to +// schedule ikcp_update (eg. implementing an epoll-like mechanism, +// or optimize ikcp_update when handling massive kcp connections) +IUINT32 ikcp_check(const ikcpcb *kcp, IUINT32 current); + +// when you received a low level packet (eg. UDP packet), call it +int ikcp_input(ikcpcb *kcp, const char *data, long size); + +// flush pending data +void ikcp_flush(ikcpcb *kcp); + +// check the size of next message in the recv queue +int ikcp_peeksize(const ikcpcb *kcp); + +// change MTU size, default is 1400 +int ikcp_setmtu(ikcpcb *kcp, int mtu); + +// set maximum window size: sndwnd=32, rcvwnd=32 by default +int ikcp_wndsize(ikcpcb *kcp, int sndwnd, int rcvwnd); + +// get how many packet is waiting to be sent +int ikcp_waitsnd(const ikcpcb *kcp); + +// fastest: ikcp_nodelay(kcp, 1, 20, 2, 1) +// nodelay: 0:disable(default), 1:enable +// interval: internal update timer interval in millisec, default is 100ms +// resend: 0:disable fast resend(default), 1:enable fast resend +// nc: 0:normal congestion control(default), 1:disable congestion control +int ikcp_nodelay(ikcpcb *kcp, int nodelay, int interval, int resend, int nc); + + +void ikcp_log(ikcpcb *kcp, int mask, const char *fmt, ...); + +// setup allocator +void ikcp_allocator(void* (*new_malloc)(size_t), void (*new_free)(void*)); + +// read conv +IUINT32 ikcp_getconv(const void *ptr); + + +#ifdef __cplusplus +} +#endif + +#endif + + diff --git a/event/nio.c b/event/nio.c index acef74360..3ae08ba1b 100644 --- a/event/nio.c +++ b/event/nio.c @@ -55,6 +55,12 @@ static void __read_cb(hio_t* io, void* buf, int readbytes) { hio_read_stop(io); } +#if WITH_KCP + if (io->io_type == HIO_TYPE_KCP) { + hio_read_kcp(io, buf, readbytes); + return; + } +#endif hio_read_cb(io, buf, readbytes); } @@ -236,6 +242,7 @@ static int __nio_read(hio_t* io, void* buf, int len) { #endif break; case HIO_TYPE_UDP: + case HIO_TYPE_KCP: case HIO_TYPE_IP: { socklen_t addrlen = sizeof(sockaddr_u); @@ -264,6 +271,7 @@ static int __nio_write(hio_t* io, const void* buf, int len) { #endif break; case HIO_TYPE_UDP: + case HIO_TYPE_KCP: case HIO_TYPE_IP: nwrite = sendto(io->fd, buf, len, 0, io->peeraddr, SOCKADDR_LEN(io->peeraddr)); break; @@ -459,6 +467,11 @@ int hio_write (hio_t* io, const void* buf, size_t len) { hloge("hio_write called but fd[%d] already closed!", io->fd); return -1; } +#if WITH_KCP + if (io->io_type == HIO_TYPE_KCP) { + return hio_write_kcp(io, buf, len); + } +#endif int nwrite = 0, err = 0; hrecursive_mutex_lock(&io->write_mutex); if (write_queue_empty(&io->write_queue)) { diff --git a/event/rudp.c b/event/rudp.c new file mode 100644 index 000000000..fa6ea77dd --- /dev/null +++ b/event/rudp.c @@ -0,0 +1,151 @@ +#include "rudp.h" + +#if WITH_RUDP + +#if WITH_KCP +void kcp_release(kcp_t* kcp) { + if (kcp->ikcp == NULL) return; + if (kcp->update_timer) { + htimer_del(kcp->update_timer); + kcp->update_timer = NULL; + } + HV_FREE(kcp->readbuf.base); + kcp->readbuf.len = 0; + // printf("ikcp_release ikcp=%p\n", kcp->ikcp); + ikcp_release(kcp->ikcp); + kcp->ikcp = NULL; +} +#endif + +void rudp_entry_free(rudp_entry_t* entry) { +#if WITH_KCP + kcp_release(&entry->kcp); +#endif + HV_FREE(entry); +} + +void rudp_init(rudp_t* rudp) { + // printf("rudp init\n"); + rudp->rb_root.rb_node = NULL; + hmutex_init(&rudp->mutex); +} + +void rudp_cleanup(rudp_t* rudp) { + // printf("rudp cleaup\n"); + struct rb_node* n = NULL; + rudp_entry_t* e = NULL; + while ((n = rudp->rb_root.rb_node)) { + e = rb_entry(n, rudp_entry_t, rb_node); + rb_erase(n, &rudp->rb_root); + rudp_entry_free(e); + } + hmutex_destroy(&rudp->mutex); +} + +bool rudp_insert(rudp_t* rudp, rudp_entry_t* entry) { + struct rb_node** n = &rudp->rb_root.rb_node; + struct rb_node* parent = NULL; + rudp_entry_t* e = NULL; + int cmp = 0; + bool exists = false; + while (*n) { + parent = *n; + e = rb_entry(*n, rudp_entry_t, rb_node); + cmp = memcmp(&entry->addr, &e->addr, sizeof(sockaddr_u)); + if (cmp < 0) { + n = &(*n)->rb_left; + } else if (cmp > 0) { + n = &(*n)->rb_right; + } else { + exists = true; + break; + } + } + + if (!exists) { + rb_link_node(&entry->rb_node, parent, n); + rb_insert_color(&entry->rb_node, &rudp->rb_root); + } + return !exists; +} + +rudp_entry_t* rudp_search(rudp_t* rudp, struct sockaddr* addr) { + struct rb_node* n = rudp->rb_root.rb_node; + rudp_entry_t* e = NULL; + int cmp = 0; + bool exists = false; + while (n) { + e = rb_entry(n, rudp_entry_t, rb_node); + cmp = memcmp(addr, &e->addr, sizeof(sockaddr_u)); + if (cmp < 0) { + n = n->rb_left; + } else if (cmp > 0) { + n = n->rb_right; + } else { + exists = true; + break; + } + } + return exists ? e : NULL; +} + +rudp_entry_t* rudp_remove(rudp_t* rudp, struct sockaddr* addr) { + hmutex_lock(&rudp->mutex); + rudp_entry_t* e = rudp_search(rudp, addr); + if (e) { + // printf("rudp_remove "); + // SOCKADDR_PRINT(addr); + rb_erase(&e->rb_node, &rudp->rb_root); + } + hmutex_unlock(&rudp->mutex); + return e; +} + +rudp_entry_t* rudp_get(rudp_t* rudp, struct sockaddr* addr) { + hmutex_lock(&rudp->mutex); + struct rb_node** n = &rudp->rb_root.rb_node; + struct rb_node* parent = NULL; + rudp_entry_t* e = NULL; + int cmp = 0; + bool exists = false; + // search + while (*n) { + parent = *n; + e = rb_entry(*n, rudp_entry_t, rb_node); + cmp = memcmp(addr, &e->addr, sizeof(sockaddr_u)); + if (cmp < 0) { + n = &(*n)->rb_left; + } else if (cmp > 0) { + n = &(*n)->rb_right; + } else { + exists = true; + break; + } + } + + if (!exists) { + // insert + // printf("rudp_insert "); + // SOCKADDR_PRINT(addr); + HV_ALLOC_SIZEOF(e); + memcpy(&e->addr, addr, SOCKADDR_LEN(addr)); + rb_link_node(&e->rb_node, parent, n); + rb_insert_color(&e->rb_node, &rudp->rb_root); + } + hmutex_unlock(&rudp->mutex); + return e; +} + +void rudp_del(rudp_t* rudp, struct sockaddr* addr) { + hmutex_lock(&rudp->mutex); + rudp_entry_t* e = rudp_search(rudp, addr); + if (e) { + // printf("rudp_remove "); + // SOCKADDR_PRINT(addr); + rb_erase(&e->rb_node, &rudp->rb_root); + rudp_entry_free(e); + } + hmutex_unlock(&rudp->mutex); +} + +#endif diff --git a/event/rudp.h b/event/rudp.h new file mode 100644 index 000000000..b2702f63e --- /dev/null +++ b/event/rudp.h @@ -0,0 +1,61 @@ +#ifndef HV_RUDP_H_ +#define HV_RUDP_H_ + +#include "hloop.h" + +#if WITH_RUDP + +#include "rbtree.h" +#include "hsocket.h" +#include "hmutex.h" + +#if WITH_KCP +#include "kcp/ikcp.h" +#include "hbuf.h" +#define DEFAULT_KCP_UPDATE_INTERVAL 10 // ms +#define DEFAULT_KCP_READ_BUFSIZE 1400 + +typedef struct kcp_s { + ikcpcb* ikcp; + htimer_t* update_timer; + hbuf_t readbuf; +} kcp_t; + +// NOTE: kcp_create in hio_get_kcp +void kcp_release(kcp_t* kcp); +#endif + +typedef struct rudp_s { + struct rb_root rb_root; + hmutex_t mutex; +} rudp_t; + +typedef struct rudp_entry_s { + struct rb_node rb_node; + sockaddr_u addr; // key + // val + hio_t* io; +#if WITH_KCP + kcp_t kcp; +#endif +} rudp_entry_t; +// NOTE: rudp_entry_t alloc when rudp_get +void rudp_entry_free(rudp_entry_t* entry); + +void rudp_init(rudp_t* rudp); +void rudp_cleanup(rudp_t* rudp); + +bool rudp_insert(rudp_t* rudp, rudp_entry_t* entry); +// NOTE: just rb_erase, not free +rudp_entry_t* rudp_remove(rudp_t* rudp, struct sockaddr* addr); +rudp_entry_t* rudp_search(rudp_t* rudp, struct sockaddr* addr); +#define rudp_has(rudp, addr) (rudp_search(rudp, addr) != NULL) + +// rudp_search + malloc + rudp_insert +rudp_entry_t* rudp_get(rudp_t* rudp, struct sockaddr* addr); +// rudp_remove + free +void rudp_del(rudp_t* rudp, struct sockaddr* addr); + +#endif // WITH_RUDP + +#endif // HV_RUDP_H_ diff --git a/evpp/UdpClient.h b/evpp/UdpClient.h index 3a57d77ed..b464941ec 100644 --- a/evpp/UdpClient.h +++ b/evpp/UdpClient.h @@ -12,6 +12,9 @@ namespace hv { class UdpClient { public: UdpClient() { +#if WITH_KCP + enable_kcp = false; +#endif } virtual ~UdpClient() { @@ -47,6 +50,11 @@ class UdpClient { onWriteComplete(channel, buf); } }; +#if WITH_KCP + if (enable_kcp) { + hio_set_kcp(channel->io(), &kcp_setting); + } +#endif return channel->startRead(); } @@ -59,6 +67,7 @@ class UdpClient { int sendto(const void* data, int size, struct sockaddr* peeraddr = NULL) { if (channel == NULL) return -1; + std::lock_guard locker(sendto_mutex); if (peeraddr) hio_set_peeraddr(channel->io(), peeraddr, SOCKADDR_LEN(peeraddr)); return channel->write(data, size); } @@ -69,13 +78,29 @@ class UdpClient { return sendto(str.data(), str.size(), peeraddr); } +#if WITH_KCP + void setKcp(kcp_setting_t* setting) { + if (setting) { + enable_kcp = true; + kcp_setting = *setting; + } else { + enable_kcp = false; + } + } +#endif + public: SocketChannelPtr channel; +#if WITH_KCP + bool enable_kcp; + kcp_setting_t kcp_setting; +#endif // Callback MessageCallback onMessage; WriteCompleteCallback onWriteComplete; private: + std::mutex sendto_mutex; EventLoopThread loop_thread; }; diff --git a/evpp/UdpServer.h b/evpp/UdpServer.h index d57dea8a5..ed07e43c4 100644 --- a/evpp/UdpServer.h +++ b/evpp/UdpServer.h @@ -12,6 +12,9 @@ namespace hv { class UdpServer { public: UdpServer() { +#if WITH_KCP + enable_kcp = false; +#endif } virtual ~UdpServer() { @@ -47,6 +50,11 @@ class UdpServer { onWriteComplete(channel, buf); } }; +#if WITH_KCP + if (enable_kcp) { + hio_set_kcp(channel->io(), &kcp_setting); + } +#endif return channel->startRead(); } @@ -59,6 +67,7 @@ class UdpServer { int sendto(const void* data, int size, struct sockaddr* peeraddr = NULL) { if (channel == NULL) return -1; + std::lock_guard locker(sendto_mutex); if (peeraddr) hio_set_peeraddr(channel->io(), peeraddr, SOCKADDR_LEN(peeraddr)); return channel->write(data, size); } @@ -71,11 +80,16 @@ class UdpServer { public: SocketChannelPtr channel; +#if WITH_KCP + bool enable_kcp; + kcp_setting_t kcp_setting; +#endif // Callback MessageCallback onMessage; WriteCompleteCallback onWriteComplete; private: + std::mutex sendto_mutex; EventLoopThread loop_thread; }; diff --git a/examples/nc.c b/examples/nc.c index 917d606f9..64f99de9f 100644 --- a/examples/nc.c +++ b/examples/nc.c @@ -26,6 +26,17 @@ */ #define TEST_SSL 0 +/* + * @test kcp_client + * #define TEST_KCP 1 + * + * @build ./configure --with-kcp && make clean && make + * @server bin/udp_echo_server 1234 + * @client bin/nc -u 127.0.0.1 1234 + * + */ +#define TEST_KCP 0 + #define RECV_BUFSIZE 8192 static char recvbuf[RECV_BUFSIZE]; @@ -113,6 +124,13 @@ static void on_stdin(hio_t* io, void* buf, int readbytes) { } hio_write(sockio, buf, readbytes); + +#if TEST_KCP + if (strncmp(str, "CLOSE", 5) == 0) { + printf("call hio_close\n"); + hio_close(sockio); + } +#endif } static void on_close(hio_t* io) { @@ -189,6 +207,9 @@ Examples: nc 127.0.0.1 80\n\ else if (protocol == 2) { // udp sockio = hloop_create_udp_client(loop, host, port); +#if TEST_KCP + hio_set_kcp(sockio, NULL); +#endif hio_read(sockio); } if (sockio == NULL) { diff --git a/examples/udp_echo_server.c b/examples/udp_echo_server.c index 5ea5ca1b8..d3e9647ad 100644 --- a/examples/udp_echo_server.c +++ b/examples/udp_echo_server.c @@ -11,6 +11,17 @@ #include "hloop.h" #include "hsocket.h" +/* + * @test kcp_server + * #define TEST_KCP 1 + * + * @build ./configure --with-kcp && make clean && make + * @server bin/udp_echo_server 1234 + * @client bin/nc -u 127.0.0.1 1234 + * + */ +#define TEST_KCP 0 + static void on_recvfrom(hio_t* io, void* buf, int readbytes) { printf("on_recvfrom fd=%d readbytes=%d\n", hio_fd(io), readbytes); char localaddrstr[SOCKADDR_STRLEN] = {0}; @@ -18,10 +29,18 @@ static void on_recvfrom(hio_t* io, void* buf, int readbytes) { printf("[%s] <=> [%s]\n", SOCKADDR_STR(hio_localaddr(io), localaddrstr), SOCKADDR_STR(hio_peeraddr(io), peeraddrstr)); - printf("< %.*s", readbytes, (char*)buf); + + char* str = (char*)buf; + printf("< %.*s", readbytes, str); // echo - printf("> %.*s", readbytes, (char*)buf); + printf("> %.*s", readbytes, str); hio_write(io, buf, readbytes); + +#if TEST_KCP + if (strncmp(str, "CLOSE", 5) == 0) { + hio_close_rudp(io, hio_peeraddr(io)); + } +#endif } int main(int argc, char** argv) { @@ -42,6 +61,9 @@ int main(int argc, char** argv) { if (io == NULL) { return -20; } +#if TEST_KCP + hio_set_kcp(io, NULL); +#endif hio_setcb_read(io, on_recvfrom); hio_read(io); hloop_run(loop); diff --git a/hconfig.h b/hconfig.h index b91dc28d2..61d33fd2d 100644 --- a/hconfig.h +++ b/hconfig.h @@ -70,5 +70,6 @@ /* #undef WITH_MBEDTLS */ /* #undef ENABLE_UDS */ /* #undef USE_MULTIMAP */ +/* #undef WITH_KCP */ #endif // HV_CONFIG_H_ diff --git a/hconfig.h.in b/hconfig.h.in index 6b06bca83..29bb557a7 100644 --- a/hconfig.h.in +++ b/hconfig.h.in @@ -72,4 +72,6 @@ #cmakedefine ENABLE_UDS 1 #cmakedefine USE_MULTIMAP 1 +#cmakedefine WITH_KCP 1 + #endif // HV_CONFIG_H_ diff --git a/scripts/unittest.sh b/scripts/unittest.sh index 8959eafdf..f737715c6 100755 --- a/scripts/unittest.sh +++ b/scripts/unittest.sh @@ -4,6 +4,8 @@ SCRIPT_DIR=$(cd `dirname $0`; pwd) ROOT_DIR=${SCRIPT_DIR}/.. cd ${ROOT_DIR} +bin/rbtree_test + bin/date bin/ifconfig bin/mkdir_p 123/456