-
Notifications
You must be signed in to change notification settings - Fork 1.3k
中文教程
libhv
是一个比libevent、libev、libuv
更易用的跨平台国产网络库,用来开发TCP/UDP/SSL/HTTP/WebSocket
客户端/服务端。
项目地址:https://github.com/ithewei/libhv.git
码云镜像:https://gitee.com/libhv/libhv.git
QQ技术交流群:739352073
libhv博客专栏:https://hewei.blog.csdn.net/category_9866493.html
libhv源码分析:https://blog.csdn.net/qu1993/category_10637982.html
- libhv教程01--介绍与体验
- libhv教程02--编译与安装
- libhv教程03--链库与使用
- libhv教程04--编写一个完整的命令行程序
- libhv教程05--事件循环以及定时器的简单使用
- libhv教程06--创建一个简单的TCP服务端
- libhv教程07--创建一个简单的TCP客户端
- libhv教程08--创建一个简单的UDP服务端
- libhv教程09--创建一个简单的UDP客户端
- libhv教程10--创建一个简单的HTTP服务端
- libhv教程11--创建一个简单的HTTP客户端
- libhv教程12--创建一个简单的WebSocket服务端
- libhv教程13--创建一个简单的WebSocket客户端
- libhv教程14--200行实现一个纯C版jsonrpc框架
- libhv教程15--200行实现一个C++版protorpc框架
- libhv教程16--多线程/多进程服务端编程
- libhv教程17--Qt中使用libhv
- libhv教程18--动手写一个tinyhttpd
- 事件循环: examples/hloop_test.c
- TCP回显服务: examples/tcp_echo_server.c
- TCP聊天服务: examples/tcp_chat_server.c
- TCP代理服务: examples/tcp_proxy_server.c
- UDP回显服务: examples/udp_echo_server.c
- UDP代理服务: examples/udp_proxy_server.c
- jsonRPC示例: examples/jsonrpc
- tinyhttpd示例: examples/tinyhttpd.c
- 事件循环: evpp/EventLoop_test.cpp
- 事件循环线程: evpp/EventLoopThread_test.cpp
- 事件循环线程池: evpp/EventLoopThreadPool_test.cpp
- TCP服务端: evpp/TcpServer_test.cpp
- TCP客户端: evpp/TcpClient_test.cpp
- UDP服务端: evpp/UdpServer_test.cpp
- UDP客户端: evpp/UdpClient_test.cpp
- HTTP服务端: examples/http_server_test.cpp
- HTTP客户端: examples/http_client_test.cpp
- WebSocket服务端: examples/websocket_server_test.cpp
- WebSocket客户端: examples/websocket_client_test.cpp
- protobufRPC示例: examples/protorpc
- 网络连接工具: examples/nc
- 网络扫描工具: examples/nmap
- HTTP服务程序: examples/httpd
- URL请求工具: examples/curl
- 文件下载工具: examples/wget
- 服务注册与发现: examples/consul
创作不易,如果你觉得不错,请在 github 上star
下吧。
libhv
是一个类似于libevent、libev、libuv
的跨平台网络库,提供了带非阻塞IO和定时器的事件循环。
libhv的名称也正是继承此派,寓意高性能的事件循环High-performance event loop library
。
- 编写跨平台c/c++程序;
- 基于TCP/UDP/SSL开发自定义协议网络程序;
- 编写HTTP客户端/服务端程序;
- 编写WebSocket客户端/服务端程序;
- 学习实践网络编程;
- libevent最为古老、有历史包袱,bufferevent虽为精妙,却也难以上手;
- libev可以说是libevent的简化版,代码极为精简,但宏定义用的过多,代码可读性不强,且在Windows上实现不佳;
- libuv是nodejs的c底层库,最先也是由libevent+对Windows IOCP支持,后来才改写自成一体,同时实现了管道、文件的异步读写,很强大,但结构体比较多,封装比较深;
- libhv本身是参考了libevent、libev、libuv的实现思路,它们的核心都是事件循环(即在一个事件循环中处理IO、定时器等事件),但提供的接口最为精简,API接近原生系统调用,最容易上手;
- 具体这几个库的写法比较见echo-servers,可见libhv是最简单的;
- 此外libhv支持心跳、转发、拆包、多线程安全write和close等特性,提供了HTTP、WebSocket等协议实现;
- 当然这几个库的性能是接近的,都将非阻塞IO多路复用用到了极致;
linux与mac下的用户可直接执行./getting_started.sh
脚本,即可体验使用libhv编写的http服务端httpd
与http客户端curl
的便利之处。
运行效果如下:
$ ./getting_started.sh
Welcome to libhv!
Press any key to run ...
bin/httpd -c etc/httpd.conf -s restart -d
httpd start/running
hw 83110 0.0 0.0 5100160 588 ?? S 4:14下午 0:00.00 httpd: worker process
hw 83109 0.0 0.0 4951680 580 ?? S 4:14下午 0:00.00 httpd: worker process
hw 83108 0.0 0.0 4820608 572 ?? S 4:14下午 0:00.00 httpd: worker process
hw 83107 0.0 0.0 4689536 600 ?? S 4:14下午 0:00.00 httpd: worker process
hw 83103 0.0 0.0 4950656 808 ?? Ss 4:14下午 0:00.00 httpd: master process
bin/curl -v localhost:8080
GET / HTTP/1.1
Accept: */*
Connection: keep-alive
Content-Length: 0
Host: localhost:8080
User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36
HTTP/1.1 200 OK
Connection: keep-alive
Content-Length: 130
Content-Type: text/html
Date: Fri, 05 Feb 2021 08:14:14 GMT
Etag: "5fc1057e-82"
Last-Modified: Fri, 27 Nov 2020 13:56:14 GMT
Server: httpd/1.21.2.5
<!DOCTYPE html>
<html>
<head>
<title>httpd</title>
</head>
<body>
<center><h1>Welcome to httpd!</h1></center>
</body>
</html>
httpd与curl代码均可在examples目录下找到,是完整的命令行程序。
$ bin/httpd -h
Usage: httpd [hvc:ts:dp:]
Options:
-h|--help Print this information
-v|--version Print version
-c|--confile <confile> Set configure file, default etc/{program}.conf
-t|--test Test configure file and exit
-s|--signal <signal> Send <signal> to process,
<signal>=[start,stop,restart,status,reload]
-d|--daemon Daemonize
-p|--port <port> Set listen port
$ bin/curl -h
Usage: curl [hVvX:H:d:F:n:] url
Options:
-h|--help Print this message.
-V|--version Print version.
-v|--verbose Show verbose infomation.
-X|--method Set http method.
-H|--header Add http headers, -H "Content-Type:application/json Accept:*/*"
-d|--data Set http body.
-F|--form Set http form, -F "name1=content;name2=@filename"
-n|--count Send request count, used for test keep-alive
--http2 Use http2
--grpc Use grpc over http2
Examples:
curl -v localhost:8080
curl -v localhost:8080/hello
curl -v localhost:8080/query?page_no=1&page_size=10
curl -v localhost:8080/echo -d 'hello,world!'
curl -v localhost:8080/kv -H "Content-Type:application/x-www-form-urlencoded" -d 'user=admin&pswd=123456'
curl -v localhost:8080/json -H "Content-Type:application/json" -d '{"user":"admin","pswd":"123456"}'
curl -v localhost:8080/form -F 'file=@filename'
当然你也可以通过浏览器访问,地址栏输入:
- 127.0.0.1:8080/
- 127.0.0.1:8080/downloads/
- 127.0.0.1:8080/ping
libhv提供了原生Makefile
(这里仅指适用于类unix系统的Makefile)和cmake
两种构建方式。
CLI
即Command Line Interface
命令行界面。鄙人强烈推荐使用的一种,特别是对于服务端开发人员,必备技能。
对于类Unix系统平台来说,推荐使用Makefile三部曲
./configure
make
sudo make install
Windows平台编译libhv请使用cmake先生成VS工程,各平台具体编译步骤见BUILD.md
cmake -B build
cmake --build build
一般使用默认配置即可,如需勾选WITH_OPENSSL
,请先自行安装openssl
也可通过vcpkg
安装:
vcpkg install libhv
注:vcpkg上的版本可能更新较慢,如需体验最新版还是推荐下载源码编译。
类unix系统默认安装在/usr/local/include/hv
目录下
.
├── Buffer.h 缓存类
├── Callback.h 回调定义
├── Channel.h IO通道类
├── Event.h 事件类
├── EventLoop.h 事件循环类
├── EventLoopThread.h 事件循环线程类
├── EventLoopThreadPool.h 事件循环线程池类
├── HttpMessage.h HTTP消息类
├── HttpParser.h HTTP解析类
├── HttpServer.h HTTP服务类
├── HttpService.h HTTP业务类
├── Status.h 状态类
├── TcpClient.h TCP客户端类
├── TcpServer.h TCP服务端类
├── ThreadLocalStorage.h 线程本地存储类
├── UdpClient.h UDP客户端类
├── UdpServer.h UDP服务端类
├── WebSocketChannel.h WebSocket通道类
├── WebSocketClient.h WebSocket客户端类
├── WebSocketParser.h WebSocket解析类
├── WebSocketServer.h WebSocket服务端类
├── base64.h BASE64编解码
├── grpcdef.h grpc定义
├── hatomic.h 原子操作
├── hbase.h 基本函数
├── hbuf.h 缓存buffer
├── hconfig.h configure生成配置
├── hdef.h 常见宏定义
├── hdir.h 目录(ls实现)
├── hendian.h 大小端
├── herr.h 错误码定义
├── hexport.h DLL导出宏
├── hfile.h 文件类
├── hlog.h 日志
├── hloop.h 事件循环
├── hmain.h 命令行解析
├── hmath.h 数学函数
├── hmutex.h 互斥锁
├── hobjectpool.h 对象池
├── hplatform.h 平台相关宏
├── hproc.h 进程
├── hscope.h 作用域
├── hsocket.h 套接字
├── hssl.h SSL/TLS加密
├── hstring.h 字符串操作
├── hsysinfo.h 系统信息
├── hthread.h 线程操作
├── hthreadpool.h 线程池类
├── htime.h 日期时间
├── http2def.h http2定义
├── http_client.h HTTP客户端
├── http_content.h HTTP Content-Type
├── httpdef.h http定义
├── hurl.h URL操作
├── hv.h hv总头文件
├── hversion.h 版本
├── ifconfig.h ifconfig实现
├── iniparser.h INI解析类
├── json.hpp JSON解析
├── md5.h MD5数字摘要
├── nlog.h 网络日志
├── nmap.h 主机发现
├── requests.h 模拟python requests api
├── sha1.h SHA1安全散列算法
└── singleton.h 单例模式宏
- 静态库
libhv.a
或libhv_static.a
-
windows
动态库hv.dll
-
linux
动态库libhv.so
-
macosx
动态库libhv.dylib
├── hmain_test 命令行解析测试程序
├── hloop_test 事件循环测试程序
├── htimer_test 定时器测试程序
├── http_client_test HTTP客户端测试程序
├── http_server_test HTTP服务端测试程序
├── websocket_client_test WebSocket客户端测试程序
├── websocket_server_test WebSocket服务端测试程序
├── curl HTTP客户端
├── httpd HTTP服务端
├── nc 网络客户端
├── nmap 主机发现
├── tcp_chat_server TCP聊天服务
├── tcp_echo_server TCP回显服务
├── tcp_proxy_server TCP代理服务
└── udp_echo_server UDP回显服务
另外,仓库通过 Github Actions 确保master
分支在linux、windows、macosx
三个平台编译通过,大家再也不用担心编译不过了。
在上一篇中,我们已经生成了头文件与库文件,接下来我们写个测试程序链库验证下。
测试代码如下:
#include "hv/hv.h"
int main() {
char exe_filepath[MAX_PATH] = {0};
char run_dir[MAX_PATH] = {0};
// 获取hv编译版本
const char* version = hv_compile_version();
// 获取可执行文件路径
get_executable_path(exe_filepath, sizeof(exe_filepath));
// 获取运行目录
get_run_dir(run_dir, sizeof(run_dir));
printf("exe_filepath=%s\n", exe_filepath);
printf("run_dir=%s\n", run_dir);
// 写日志
LOGI("libhv version: %s", version);
return 0;
}
编译运行:
$ cc -std=c99 test.c -o test -lhv
$ ./test
exe_filepath=/home/hw/github/libhv/test
run_dir=/home/hw/github/libhv
$ cat libhv*.log
2021-02-06 00:16:40.989 INFO libhv version: 1.21.1.31 [test.c:19:main]
cmake生成vs工程,打开hv.sln
编译后会生成头文件include/hv
、静态库lib/hv_static.lib
和动态库lib/hv.dll
,所以有动态库和静态库两种链库方式。
工程 => 属性 => Linker => Input => Addtional Dependencies 加hv.lib
或代码里添加
#pragma comment(lib, "hv.lib")
工程 => 属性 => c/c++ => 预处理器 => 预处理器定义中添加HV_STATICLIB
预编译宏,以屏蔽hexport.h
头文件中动态库导入宏
#define HV_EXPORT __declspec(dllimport)
工程 => 属性 => Linker => Input => Addtional Dependencies 加 hv_static.lib
或代码里添加
#pragma comment(lib, "hv_static.lib")
首先,一个完整的命令行程序应该包含哪些功能?
- 命令行参数解析
- 配置文件解析
- 打印帮助信息和版本信息
- 信号处理
- 日志、pid文件
- 如果是服务端长时间运行后台程序,还需要看门狗(崩溃自动重启)
看看libhv是如何提供这些功能的,参考示例代码见examples/hmain_test.cpp
编译运行:
$ c++ -std=c++11 examples/hmain_test.cpp -o bin/hmain_test -I/usr/local/include/hv -lhv
$ bin/hmain_test -h
Usage: hmain_test [hvc:ts:dp:]
Options:
-h|--help Print this information
-v|--version Print version
-c|--confile <confile> Set configure file, default etc/{program}.conf
-t|--test Test configure file and exit
-s|--signal <signal> Send <signal> to process,
<signal>=[start,stop,restart,status,reload]
-d|--daemon Daemonize
-p|--port <port> Set listen port
$ bin/hmain_test -v
hmain_test version 1.21.1.31
$ bin/hmain_test -c etc/hmain_test.conf -t
Test confile [etc/hmain_test.conf] OK!
$ bin/hmain_test -d
$ bin/hmain_test -s restart -d
hmain_test stop/waiting
hmain_test start/running
$ bin/hmain_test -s status
hmain_test start/running, pid=27766
$ cat logs/hmain_test.pid
27776
$ cat logs/hmain_test*.log
2021-02-06 12:18:53.509 INFO hmain_test version: 1.21.1.31 [hmain_test.cpp:94:parse_confile]
2021-02-06 12:18:53.509 DEBUG worker_processes=ncpu=2 [hmain_test.cpp:103:parse_confile]
2021-02-06 12:18:53.509 INFO parse_confile('/home/hw/github/libhv/etc/hmain_test.conf') OK [hmain_test.cpp:129:parse_confile]
2021-02-06 12:18:53.509 INFO create_pidfile('/home/hw/github/libhv/logs/hmain_test.pid') pid=27766 [hmain.cpp:317:create_pidfile]
2021-02-06 12:18:53.509 INFO workers[0] start/running, pid=27767 [hmain.cpp:611:master_workers_run]
2021-02-06 12:18:53.509 INFO workers[1] start/running, pid=27768 [hmain.cpp:611:master_workers_run]
2021-02-06 12:18:53.509 INFO master start/running, pid=27766 [hmain.cpp:614:master_workers_run]
2021-02-06 12:18:53.509 INFO worker_thread pid=27767 tid=27767 [hmain.cpp:539:worker_thread]
2021-02-06 12:18:53.510 INFO worker_thread pid=27768 tid=27768 [hmain.cpp:539:worker_thread]
2021-02-06 12:18:53.510 INFO worker_thread pid=27767 tid=27769 [hmain.cpp:539:worker_thread]
2021-02-06 12:18:53.510 INFO worker_thread pid=27768 tid=27770 [hmain.cpp:539:worker_thread]
$ ps aux | grep hmain_test
hw 27776 0.0 0.0 18000 2084 ? Ss 12:20 0:00 hmain_test: master process
hw 27777 0.0 0.0 91732 240 ? Sl 12:20 0:00 hmain_test: worker process
hw 27778 0.0 0.0 91732 240 ? Sl 12:20 0:00 hmain_test: worker process
$ sudo kill -9 27778
$ ps aux | grep hmain_test
hw 27776 0.0 0.0 18000 2084 ? Ss 12:20 0:00 hmain_test: master process
hw 27777 0.0 0.0 91732 240 ? Sl 12:20 0:00 hmain_test: worker process
hw 27796 0.0 0.0 91732 244 ? Sl 12:27 0:00 hmain_test: worker process
可以看到,hmain_test
提供了打印帮助信息、打印版本信息、测试配置文件、后台运行、创建pid文件、查看进程状态、开始|停止|重启进程、master-workers
多进程模式、崩溃自动重启等功能。
流程图:
st=>start: main
e=>end: 结束
main_ctx_init=>operation: main_ctx_init
main入口初始化
parse_opt=>operation: parse_opt
解析命令行参数
parse_confile=>operation: parse_confile
解析配置文件
hlog_set_xxx=>operation: hlog_set_xxx
日志设置
signal_init=>operation: signal_init
信号初始化
signal_handle=>operation: signal_handle
信号处理
daemon=>operation: daemon
后台运行
create_pidfile=>operation: create_pidfile
创建pid文件
master_workers_run=>operation: master_workers_run
扩展多进程|多线程模式
run=>operation: worker_fn
长时间运行...
st->main_ctx_init->parse_opt->parse_confile->hlog_set_xxx->signal_init->signal_handle->daemon->create_pidfile->master_workers_run->run
很多同学不理解事件循环的概念,所以这里有必要前置说明一下。 对于大多数长时间运行程序来说,都会有主循环的存在。
如窗口界面程序,就是等待键盘、鼠标等外设的输入,界面做出相应的变化。
我们以windows窗口消息机制
举例说明:
// windows窗口消息循环
MSG msg;
while (GetMessage(&msg, NULL, 0, 0)) {
TranslateMessage(&msg);
DispatchMessage(&msg);
}
此循环所在的线程我们称之为GUI线程
(即窗口所在线程),MFC、WPF、Qt
等界面框架不过是将此过程给封装了。
理解了窗口消息循环的存在,其实就不难理解windows下老生常谈的问题:SendMessage
与PostMessage
的区别。
SendMessage
和PostMessage
都是windows提供的用来向窗口线程发送消息的API。
不同之处是SendMessage
是同步的,如果SendMessage
调用线程和窗口线程位于同一线程,则直接调用窗口过程处理此消息;如果不是同一线程,则会阻塞等待窗口线程处理完此消息再返回。
PostMessage
是异步的,将消息投递到窗口消息队列中就返回了,所以使用PostMessage
传递参数时需要注意不能使用栈上的局部变量。
GUI线程具有主循环,网络IO线程亦是如此。
我们都知道IO可分为阻塞BIO
与非阻塞NIO
。
libhv的头文件hsocket.h中提供了跨平台的设置阻塞与非阻塞的方法:
#ifdef OS_WIN
static inline int blocking(int sockfd) {
unsigned long nb = 0;
return ioctlsocket(sockfd, FIONBIO, &nb);
}
static inline int nonblocking(int sockfd) {
unsigned long nb = 1;
return ioctlsocket(sockfd, FIONBIO, &nb);
}
#else
#define blocking(s) fcntl(s, F_SETFL, fcntl(s, F_GETFL) & ~O_NONBLOCK)
#define nonblocking(s) fcntl(s, F_SETFL, fcntl(s, F_GETFL) | O_NONBLOCK)
#endif
对于BIO,伪代码如下:
while (1) {
readbytes = read(fd, buf, len);
if (readbytes <= 0) {
close(fd);
break;
}
...
writebytes = write(fd, buf, len);
if (writebytes <= 0) {
close(fd);
break;
}
}
因为读写都是阻塞的,所以一个IO线程只能处理一个fd,对于客户端尚可接受,对于服务端来说,每accept
一个连接,就创建一个IO线程去读写这个套接字,并发达到几千就需要创建几千个线程,线程上下文的切换开销都会把系统占满。
所以IO多路复用机制应运而生,如最早期的select
、后来的poll
,linux
的epoll
、windows
的iocp
、bsd
的kqueue
、solaris
的port
等,都属于IO多路复用机制。非阻塞NIO搭配IO多路复用机制就是高并发的钥匙
。
关于select、poll、epoll
的区别,可自行百度,这里就不展开说了。仅以select为例,写下伪代码:
while (1) {
int nselect = select(max_fd+1, &readfds, &writefds, &exceptfds, timeout);
if (nselect == 0) continue;
for (int fd = 0; fd <= max_fd; ++fd) {
// 可读
if (FD_ISSET(fd, &readfds)) {
...
read(fd, buf, len);
}
// 可写
if (FD_ISSET(fd, &writefds)) {
...
write(fd, buf, len);
}
}
}
通过IO多路复用机制,一个IO线程就可以同时监听多个fd了,以现代计算机的性能,一个IO线程即可处理几十万数量级别的IO读写。
libhv下的event模块正是封装了多种平台的IO多路复用机制,提供了统一的事件接口
,是libhv的核心模块。
libhv中的事件包括IO事件
、timer定时器事件
、idle空闲事件
、自定义事件
(见hloop_post_event
接口,作用类似于windows窗口消息机制的PostMessage
)。
源码分析推荐群友qu1993的博客
#include "hv/hloop.h"
// 定时器回调函数
static void on_timer(htimer_t* timer) {
printf("time=%lus\n", (unsigned long)time(NULL));
}
int main() {
// 新建一个事件循环结构体
hloop_t* loop = hloop_new(0);
// 添加一个定时器
htimer_add(loop, on_timer, 1000, INFINITE);
// 运行事件循环
hloop_run(loop);
// 释放事件循环结构体
hloop_free(&loop);
return 0;
}
事件循环测试代码examples/hloop_test.c 定时器测试代码见examples/htimer_test.c
#include "hv/EventLoop.h"
using namespace hv;
int main() {
// 新建一个事件循环对象
EventLoopPtr loop(new EventLoop);
// 设置一个定时器
loop->setInterval(1000, [](TimerID timerID){
printf("time=%lus\n", (unsigned long)time(NULL));
});
// 运行事件循环
loop->run();
return 0;
}
evpp模块被设计成只包含头文件,不参与编译。 hloop.h中的c接口被封装成了c++的类,参考了muduo和evpp。 类设计如下:
├── Buffer.h 缓存类
├── Channel.h 通道类,封装了hio_t
├── Event.h 事件类,封装了hevent_t、htimer_t
├── EventLoop.h 事件循环类,封装了hloop_t
├── EventLoopThread.h 事件循环线程类,组合了EventLoop和thread
├── EventLoopThreadPool.h 事件循环线程池类,组合了EventLoop和ThreadPool
├── TcpClient.h TCP客户端类
├── TcpServer.h TCP服务端类
├── UdpClient.h UDP客户端类
└── UdpServer.h UDP服务端类
示例代码位于evpp目录下
多说两句:
-
EventLoop
中实现了muduo
有的两个接口,runInLoop
和queueInLoop
,我觉得命名不错,也直接采用了。runInLoop
对应SendMessage
,queueInLoop
对应PostMessage
,这么解释大家是不是更理解文章开头的铺垫了; -
EventLoopThreadPool
的核心思想就是one loop per thread
;
下文以TCP echo server
为例,使用libhv创建TCP服务端。
#include "hv/hloop.h"
void on_close(hio_t* io) {
}
void on_recv(hio_t* io, void* buf, int readbytes) {
// 回显数据
hio_write(io, buf, readbytes);
}
void on_accept(hio_t* io) {
// 设置close回调
hio_setcb_close(io, on_close);
// 设置read回调
hio_setcb_read(io, on_recv);
// 开始读
hio_read(io);
}
int main(int argc, char** argv) {
if (argc < 2) {
printf("Usage: cmd port\n");
return -10;
}
int port = atoi(argv[1]);
// 创建事件循环
hloop_t* loop = hloop_new(0);
// 创建TCP服务
hio_t* listenio = hloop_create_tcp_server(loop, "0.0.0.0", port, on_accept);
if (listenio == NULL) {
return -20;
}
// 运行事件循环
hloop_run(loop);
// 释放事件循环
hloop_free(&loop);
return 0;
}
编译运行:
$ cc examples/tcp_echo_server.c -o bin/tcp_echo_server -I/usr/local/include/hv -lhv
$ bin/tcp_echo_server 1234
类unix系统可使用nc作为客户端测试:
$ nc 127.0.0.1 1234
< hello
> hello
windows端可使用telnet作为客户端测试:
$ telent 127.0.0.1 1234
更多TCP服务端示例参考:
代码示例参考evpp/TcpServer_test.cpp
#include "hv/TcpServer.h"
using namespace hv;
int main(int argc, char* argv[]) {
if (argc < 2) {
printf("Usage: %s port\n", argv[0]);
return -10;
}
int port = atoi(argv[1]);
TcpServer srv;
int listenfd = srv.createsocket(port);
if (listenfd < 0) {
return -20;
}
printf("server listen on port %d, listenfd=%d ...\n", port, listenfd);
srv.onConnection = [](const SocketChannelPtr& channel) {
std::string peeraddr = channel->peeraddr();
if (channel->isConnected()) {
printf("%s connected! connfd=%d\n", peeraddr.c_str(), channel->fd());
} else {
printf("%s disconnected! connfd=%d\n", peeraddr.c_str(), channel->fd());
}
};
srv.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
// echo
printf("< %.*s\n", (int)buf->size(), (char*)buf->data());
channel->write(buf);
};
srv.onWriteComplete = [](const SocketChannelPtr& channel, Buffer* buf) {
printf("> %.*s\n", (int)buf->size(), (char*)buf->data());
};
srv.setThreadNum(4);
srv.start();
while (1) sleep(1);
return 0;
}
编译运行:
$ c++ -std=c++11 evpp/TcpServer_test.cpp -o bin/TcpServer_test -I/usr/local/include/hv -lhv
$ bin/TcpServer_test 5678
TcpServer
更多实用接口
-
setThreadNum
:设置IO线程数 -
setMaxConnectionNum
:设置最大连接数 -
setUnpack
:设置拆包 -
withTLS
:SSL/TLS加密通信
#include "hv/hloop.h"
#include "hv/htime.h"
void on_timer(htimer_t* timer) {
char str[DATETIME_FMT_BUFLEN] = {0};
datetime_t dt = datetime_now();
datetime_fmt(&dt, str);
printf("> %s\n", str);
// 获取userdata
hio_t* io = (hio_t*)hevent_userdata(timer);
// 发送当前时间字符串
hio_write(io, str, strlen(str));
}
void on_close(hio_t* io) {
}
void on_recv(hio_t* io, void* buf, int readbytes) {
printf("< %.*s\n", readbytes, (char*)buf);
}
void on_connect(hio_t* io) {
// 设置close回调
hio_setcb_close(io, on_close);
// 设置read回调
hio_setcb_read(io, on_recv);
// 开始读
hio_read(io);
// 添加一个定时器
htimer_t* timer = htimer_add(hevent_loop(io), on_timer, 1000, INFINITE);
// 设置userdata
hevent_set_userdata(timer, io);
}
int main(int argc, char** argv) {
if (argc < 2) {
printf("Usage: cmd port\n");
return -10;
}
int port = atoi(argv[1]);
// 创建事件循环
hloop_t* loop = hloop_new(0);
// 创建TCP客户端
hio_t* listenio = hloop_create_tcp_client(loop, "127.0.0.1", port, on_connect);
if (listenio == NULL) {
return -20;
}
// 运行事件循环
hloop_run(loop);
// 释放事件循环
hloop_free(&loop);
return 0;
}
完整TCP/UDP客户端程序可参考examples/nc.c
示例代码见:evpp/TcpClient_test.cpp
#include "hv/TcpClient.h"
#include "hv/htime.h"
using namespace hv;
int main(int argc, char* argv[]) {
if (argc < 2) {
printf("Usage: %s port\n", argv[0]);
return -10;
}
int port = atoi(argv[1]);
TcpClient cli;
int connfd = cli.createsocket(port);
if (connfd < 0) {
return -20;
}
printf("client connect to port %d, connfd=%d ...\n", port, connfd);
cli.onConnection = [](const SocketChannelPtr& channel) {
std::string peeraddr = channel->peeraddr();
if (channel->isConnected()) {
printf("connected to %s! connfd=%d\n", peeraddr.c_str(), channel->fd());
// send(time) every 3s
setInterval(3000, [channel](TimerID timerID){
if (channel->isConnected()) {
char str[DATETIME_FMT_BUFLEN] = {0};
datetime_t dt = datetime_now();
datetime_fmt(&dt, str);
channel->write(str);
} else {
killTimer(timerID);
}
});
} else {
printf("disconnected to %s! connfd=%d\n", peeraddr.c_str(), channel->fd());
}
};
cli.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
printf("< %.*s\n", (int)buf->size(), (char*)buf->data());
};
cli.onWriteComplete = [](const SocketChannelPtr& channel, Buffer* buf) {
printf("> %.*s\n", (int)buf->size(), (char*)buf->data());
};
// reconnect: 1,2,4,8,10,10,10...
ReconnectInfo reconn;
reconn.min_delay = 1000;
reconn.max_delay = 10000;
reconn.delay_policy = 2;
cli.setReconnect(&reconn);
cli.start();
while (1) sleep(1);
return 0;
}
TcpClient
更多实用接口
-
setConnectTimeout
:设置连接超时 -
setReconnect
:设置重连 -
setUnpack
: 设置拆包 -
withTLS
:SSL/TLS加密通信
下文以UDP echo server
为例,使用libhv创建UDP服务端。
代码示例参考examples/udp_echo_server.c
#include "hv/hloop.h"
#include "hv/hsocket.h"
static void on_recvfrom(hio_t* io, void* buf, int readbytes) {
printf("on_recvfrom fd=%d readbytes=%d\n", hio_fd(io), readbytes);
char localaddrstr[SOCKADDR_STRLEN] = {0};
char peeraddrstr[SOCKADDR_STRLEN] = {0};
printf("[%s] <=> [%s]\n",
SOCKADDR_STR(hio_localaddr(io), localaddrstr),
SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
printf("< %.*s", readbytes, (char*)buf);
// 回显数据
printf("> %.*s", readbytes, (char*)buf);
hio_write(io, buf, readbytes);
}
int main(int argc, char** argv) {
if (argc < 2) {
printf("Usage: %s port\n", argv[0]);
return -10;
}
int port = atoi(argv[1]);
// 创建事件循环
hloop_t* loop = hloop_new(0);
// 创建UDP服务
hio_t* io = hloop_create_udp_server(loop, "0.0.0.0", port);
if (io == NULL) {
return -20;
}
// 设置read回调
hio_setcb_read(io, on_recvfrom);
// 开始读
hio_read(io);
// 运行事件循环
hloop_run(loop);
// 释放事件循环
hloop_free(&loop);
return 0;
}
编译运行:
$ cc examples/udp_echo_server.c -o bin/udp_echo_server -I/usr/local/include/hv -lhv
$ bin/udp_echo_server 1234
可使用nc作为客户端测试:
$ nc -u 127.0.0.1 1234
< hello
> hello
代码示例参考evpp/UdpServer_test.cpp
#include "hv/UdpServer.h"
using namespace hv;
int main(int argc, char* argv[]) {
if (argc < 2) {
printf("Usage: %s port\n", argv[0]);
return -10;
}
int port = atoi(argv[1]);
UdpServer srv;
int bindfd = srv.createsocket(port);
if (bindfd < 0) {
return -20;
}
printf("server bind on port %d, bindfd=%d ...\n", port, bindfd);
srv.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
// echo
printf("< %.*s\n", (int)buf->size(), (char*)buf->data());
channel->write(buf);
};
srv.onWriteComplete = [](const SocketChannelPtr& channel, Buffer* buf) {
printf("> %.*s\n", (int)buf->size(), (char*)buf->data());
};
srv.start();
while (1) sleep(1);
return 0;
}
编译运行:
$ c++ -std=c++11 evpp/UdpServer_test.cpp -o bin/UdpServer_test -I/usr/local/include/hv -lhv
$ bin/UdpServer_test 5678
#include "hv/hloop.h"
#include "hv/htime.h"
void on_timer(htimer_t* timer) {
char str[DATETIME_FMT_BUFLEN] = {0};
datetime_t dt = datetime_now();
datetime_fmt(&dt, str);
printf("> %s\n", str);
// 获取userdata
hio_t* io = (hio_t*)hevent_userdata(timer);
// 发送时间字符串
hio_write(io, str, strlen(str));
}
void on_recvfrom(hio_t* io, void* buf, int readbytes) {
printf("< %.*s\n", readbytes, (char*)buf);
}
int main(int argc, char** argv) {
if (argc < 2) {
printf("Usage: cmd port\n");
return -10;
}
int port = atoi(argv[1]);
// 创建事件循环
hloop_t* loop = hloop_new(0);
// 创建UDP客户端
hio_t* io = hloop_create_udp_client(loop, "127.0.0.1", port);
if (io == NULL) {
return -20;
}
// 设置read回调
hio_setcb_read(io, on_recvfrom);
// 开始读
hio_read(io);
// 添加一个定时器
htimer_t* timer = htimer_add(hevent_loop(io), on_timer, 1000, INFINITE);
// 设置userdata
hevent_set_userdata(timer, io);
// 运行事件循环
hloop_run(loop);
// 释放事件循环
hloop_free(&loop);
return 0;
}
完整TCP/UDP客户端程序可参考examples/nc.c
示例代码见:evpp/UdpClient_test.cpp
#include "hv/UdpClient.h"
#include "hv/htime.h"
using namespace hv;
int main(int argc, char* argv[]) {
if (argc < 2) {
printf("Usage: %s port\n", argv[0]);
return -10;
}
int port = atoi(argv[1]);
UdpClient cli;
int sockfd = cli.createsocket(port);
if (sockfd < 0) {
return -20;
}
printf("client sendto port %d, sockfd=%d ...\n", port, sockfd);
cli.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
printf("< %.*s\n", (int)buf->size(), (char*)buf->data());
};
cli.onWriteComplete = [](const SocketChannelPtr& channel, Buffer* buf) {
printf("> %.*s\n", (int)buf->size(), (char*)buf->data());
};
cli.start();
// sendto(time) every 3s
cli.loop()->setInterval(3000, [&cli](TimerID timerID) {
char str[DATETIME_FMT_BUFLEN] = {0};
datetime_t dt = datetime_now();
datetime_fmt(&dt, str);
cli.sendto(str);
});
while (1) sleep(1);
return 0;
}
HTTP协议作为本世纪最通用的应用层协议,本文就不加以介绍了,不熟悉的自行阅读awesome-http
示例代码参考examples/http_server_test.cpp
#include "hv/HttpServer.h"
int main() {
HttpService router;
router.GET("/ping", [](HttpRequest* req, HttpResponse* resp) {
return resp->String("pong");
});
router.GET("/data", [](HttpRequest* req, HttpResponse* resp) {
static char data[] = "0123456789";
return resp->Data(data, 10);
});
router.GET("/paths", [&router](HttpRequest* req, HttpResponse* resp) {
return resp->Json(router.Paths());
});
router.POST("/echo", [](const HttpContextPtr& ctx) {
return ctx->send(ctx->body(), ctx->type());
});
http_server_t server;
server.port = 8080;
server.service = &router;
http_server_run(&server);
return 0;
}
编译运行:
c++ -std=c++11 examples/http_server_test.cpp -o bin/http_server_test -lhv
bin/http_server_test
测试使用curl
或浏览器输入以下url
:
curl -v http://127.0.0.1:8080/ping
curl -v http://127.0.0.1:8080/data
curl -v http://127.0.0.1:8080/paths
curl -v http://127.0.0.1:8080/echo -d "hello,world"
完整的http服务端示例代码参考examples/httpd 测试步骤:
git clone https://github.com/ithewei/libhv.git
cd libhv
make httpd curl
bin/httpd -h
bin/httpd -d
#bin/httpd -c etc/httpd.conf -s restart -d
ps aux | grep httpd
# http web service
bin/curl -v localhost:8080
# http indexof service
bin/curl -v localhost:8080/downloads/
# http api service
bin/curl -v localhost:8080/ping
bin/curl -v localhost:8080/echo -d "hello,world!"
bin/curl -v localhost:8080/query?page_no=1\&page_size=10
bin/curl -v localhost:8080/kv -H "Content-Type:application/x-www-form-urlencoded" -d 'user=admin&pswd=123456'
bin/curl -v localhost:8080/json -H "Content-Type:application/json" -d '{"user":"admin","pswd":"123456"}'
bin/curl -v localhost:8080/form -F "user=admin pswd=123456"
bin/curl -v localhost:8080/upload -F "file=@LICENSE"
bin/curl -v localhost:8080/test -H "Content-Type:application/x-www-form-urlencoded" -d 'bool=1&int=123&float=3.14&string=hello'
bin/curl -v localhost:8080/test -H "Content-Type:application/json" -d '{"bool":true,"int":123,"float":3.14,"string":"hello"}'
bin/curl -v localhost:8080/test -F 'bool=1 int=123 float=3.14 string=hello'
# RESTful API: /group/:group_name/user/:user_id
bin/curl -v -X DELETE localhost:8080/group/test/user/123
使用apache
的ab
、或者wrk
都可以用来做压力测试,一般服务器单线程QPS
可轻松达到3w
# sudo apt install apache2-utils
ab -c 100 -n 100000 http://127.0.0.1:8080/
# sudo apt install wrk
wrk -c 100 -t 4 -d 10s http://127.0.0.1:8080/
更多HTTP压力测试工具参考awesome-http-benchmark
同步http客户端接口模拟实现了python
的requests
#include "requests.h"
int main() {
auto resp = requests::get("http://www.example.com");
if (resp == NULL) {
printf("request failed!\n");
} else {
printf("%d %s\r\n", resp->status_code, resp->status_message());
printf("%s\n", resp->body.c_str());
}
resp = requests::post("127.0.0.1:8080/echo", "hello,world!");
if (resp == NULL) {
printf("request failed!\n");
} else {
printf("%d %s\r\n", resp->status_code, resp->status_message());
printf("%s\n", resp->body.c_str());
}
return 0;
}
示例代码参考examples/http_client_test.cpp
#include "requests.h"
#include "hthread.h" // import hv_gettid
static void test_http_async_client(int* finished) {
printf("test_http_async_client request thread tid=%ld\n", hv_gettid());
HttpRequestPtr req(new HttpRequest);
req->method = HTTP_POST;
req->url = "127.0.0.1:8080/echo";
req->headers["Connection"] = "keep-alive";
req->body = "this is an async request.";
req->timeout = 10;
http_client_send_async(req, [finished](const HttpResponsePtr& resp) {
printf("test_http_async_client response thread tid=%ld\n", hv_gettid());
if (resp == NULL) {
printf("request failed!\n");
} else {
printf("%d %s\r\n", resp->status_code, resp->status_message());
printf("%s\n", resp->body.c_str());
}
*finished = 1;
});
}
int main() {
int finished = 0;
test_http_async_client(&finished);
// demo wait async ResponseCallback
while (!finished) {
hv_delay(100);
}
printf("finished!\n");
return 0;
}
完整的http客户端示例代码参考examples/curl.cpp,模拟实现了curl
命令行程序。
测试步骤:
git clone https://github.com/ithewei/libhv.git
cd libhv
make httpd curl
bin/httpd -h
bin/httpd -d
#bin/httpd -c etc/httpd.conf -s restart -d
ps aux | grep httpd
# http web service
bin/curl -v localhost:8080
# http indexof service
bin/curl -v localhost:8080/downloads/
# http api service
bin/curl -v localhost:8080/ping
bin/curl -v localhost:8080/echo -d "hello,world!"
bin/curl -v localhost:8080/query?page_no=1\&page_size=10
bin/curl -v localhost:8080/kv -H "Content-Type:application/x-www-form-urlencoded" -d 'user=admin&pswd=123456'
bin/curl -v localhost:8080/json -H "Content-Type:application/json" -d '{"user":"admin","pswd":"123456"}'
bin/curl -v localhost:8080/form -F "user=admin pswd=123456"
bin/curl -v localhost:8080/upload -F "file=@LICENSE"
bin/curl -v localhost:8080/test -H "Content-Type:application/x-www-form-urlencoded" -d 'bool=1&int=123&float=3.14&string=hello'
bin/curl -v localhost:8080/test -H "Content-Type:application/json" -d '{"bool":true,"int":123,"float":3.14,"string":"hello"}'
bin/curl -v localhost:8080/test -F 'bool=1 int=123 float=3.14 string=hello'
# RESTful API: /group/:group_name/user/:user_id
bin/curl -v -X DELETE localhost:8080/group/test/user/123
更多HTTP消息使用方式请阅读头文件HttpMessage.h
示例代码参考examples/websocket_server_test.cpp
#include "WebSocketServer.h"
#include "EventLoop.h"
#include "htime.h"
using namespace hv;
int main(int argc, char** argv) {
if (argc < 2) {
printf("Usage: %s port\n", argv[0]);
return -10;
}
int port = atoi(argv[1]);
WebSocketServerCallbacks ws;
ws.onopen = [](const WebSocketChannelPtr& channel, const std::string& url) {
printf("onopen: GET %s\n", url.c_str());
// send(time) every 1s
setInterval(1000, [channel](TimerID id) {
if (channel->isConnected()) {
char str[DATETIME_FMT_BUFLEN] = {0};
datetime_t dt = datetime_now();
datetime_fmt(&dt, str);
channel->send(str);
} else {
killTimer(id);
}
});
};
ws.onmessage = [](const WebSocketChannelPtr& channel, const std::string& msg) {
printf("onmessage: %s\n", msg.c_str());
};
ws.onclose = [](const WebSocketChannelPtr& channel) {
printf("onclose\n");
};
websocket_server_t server;
server.port = port;
server.ws = &ws;
websocket_server_run(&server);
return 0;
}
编译运行:
c++ -std=c++11 examples/websocket_server_test.cpp -o bin/websocket_server_test -I/usr/local/include/hv -lhv
bin/websocket_server_test 8888
测试客户端可使用html/websocket_client.html 或者websocket在线测试
在 WebSocket 协议出现以前,创建一个和服务端进行双通道通信的 web 应用,需要依赖HTTP协议进行不停的轮询,这会导致一些问题:
- 服务端被迫维持来自每个客户端的大量不同的连接
- 大量的轮询请求会造成高开销,比如会带上多余的header,造成了无用的数据传输
所以,为了解决这些问题,WebSocket 协议应运而生。
WebSocket 是一种在单个TCP连接上进行全双工通信的协议。WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。
在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接, 并进行双向数据传输。
客户端请求
GET / HTTP/1.1
Upgrade: websocket
Connection: Upgrade
Host: example.com
Origin: http://example.com
Sec-WebSocket-Key: sN9cRrP/n9NdMgdcy2VJFQ==
Sec-WebSocket-Version: 13
服务器回应
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: fFBooB7FAkLlXgRSz0BT3v4hq5s=
Sec-WebSocket-Location: ws://example.com/
WebSocket 通信协议本文居于篇幅,就不展开说明,感兴趣的推荐阅读下面这篇博文:
function WebSocketTest(url) {
var ws = new WebSocket(url);
ws.onopen = function() {
alert("连接已建立");
ws.send("hello");
};
ws.onmessage = function(ev) {
var received_msg = ev.data;
console.log("received websocket message: " + received_msg);
};
ws.onclose = function() {
alert("连接已关闭");
};
}
libhv提供的WebSocketClient
类使用起来与JS的WebSocket
一样简单。
示例代码见 examples/websocket_client_test.cpp
#include "WebSocketClient.h"
using namespace hv;
int main(int argc, char** argv) {
if (argc < 2) {
printf("Usage: %s url\n", argv[0]);
return -10;
}
const char* url = argv[1];
WebSocketClient ws;
ws.onopen = [&ws]() {
printf("onopen\n");
ws.send("hello");
};
ws.onclose = []() {
printf("onclose\n");
};
ws.onmessage = [](const std::string& msg) {
printf("onmessage: %s\n", msg.c_str());
};
// reconnect: 1,2,4,8,10,10,10...
ReconnectInfo reconn;
reconn.min_delay = 1000;
reconn.max_delay = 10000;
reconn.delay_policy = 2;
ws.setReconnect(&reconn);
ws.open(url);
while (1) hv_delay(1000);
return 0;
}
编译运行:
c++ -std=c++11 examples/websocket_client_test.cpp -o bin/websocket_client_test -I/usr/local/include/hv -lhv
bin/websocket_client_test ws://127.0.0.1:8888/
使用libhv可以在200行内实现一个完整的jsonrpc框架,这得益于libhv新提供的一个接口
hio_set_unpack
设置拆包规则,支持固定包长、分隔符、头部长度字段
三种常见的拆包方式,调用该接口设置拆包规则后,内部会根据拆包规则处理粘包与分包,保证回调上来的是完整的一包数据,大大节省了上层处理粘包与分包的成本,该接口具体定义如下:
typedef enum {
UNPACK_BY_FIXED_LENGTH = 1, // 根据固定长度拆包
UNPACK_BY_DELIMITER = 2, // 根据分隔符拆包,如常见的“\r\n”
UNPACK_BY_LENGTH_FIELD = 3, // 根据头部长度字段拆包
} unpack_mode_e;
#define DEFAULT_PACKAGE_MAX_LENGTH (1 << 21) // 2M
// UNPACK_BY_DELIMITER
#define PACKAGE_MAX_DELIMITER_BYTES 8
// UNPACK_BY_LENGTH_FIELD
typedef enum {
ENCODE_BY_VARINT = 1, // varint编码
ENCODE_BY_LITTEL_ENDIAN = LITTLE_ENDIAN, // 小端编码
ENCODE_BY_BIG_ENDIAN = BIG_ENDIAN, // 大端编码
} unpack_coding_e;
typedef struct unpack_setting_s {
unpack_mode_e mode; // 拆包模式
unsigned int package_max_length; // 最大包长度限制
// UNPACK_BY_FIXED_LENGTH
unsigned int fixed_length; // 固定包长度
// UNPACK_BY_DELIMITER
unsigned char delimiter[PACKAGE_MAX_DELIMITER_BYTES]; // 分隔符
unsigned short delimiter_bytes; // 分隔符长度
// UNPACK_BY_LENGTH_FIELD
unsigned short body_offset; // body偏移量(即头部长度)real_body_offset = body_offset + varint_bytes - length_field_bytes
unsigned short length_field_offset; // 头部长度字段偏移量
unsigned short length_field_bytes; // 头部长度字段所占字节数
unpack_coding_e length_field_coding; // 头部长度字段编码方式,支持varint、大小端三种编码方式,通常使用大端字节序(即网络字节序)
#ifdef __cplusplus
unpack_setting_s() {
// Recommended setting:
// head = flags:1byte + length:4bytes = 5bytes
mode = UNPACK_BY_LENGTH_FIELD;
package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
fixed_length = 0;
delimiter_bytes = 0;
body_offset = 5;
length_field_offset = 1;
length_field_bytes = 4;
length_field_coding = ENCODE_BY_BIG_ENDIAN;
}
#endif
} unpack_setting_t;
HV_EXPORT void hio_set_unpack(hio_t* io, unpack_setting_t* setting);
以ftp
为例(分隔符方式)可以这样设置:
unpack_setting_t ftp_unpack_setting;
memset(&ftp_unpack_setting, 0, sizeof(unpack_setting_t));
ftp_unpack_setting.package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
ftp_unpack_setting.mode = UNPACK_BY_DELIMITER;
ftp_unpack_setting.delimiter[0] = '\r';
ftp_unpack_setting.delimiter[1] = '\n';
ftp_unpack_setting.delimiter_bytes = 2;
以mqtt
为例(头部长度字段方式)可以这样设置:
unpack_setting_t mqtt_unpack_setting = {
.mode = UNPACK_BY_LENGTH_FIELD,
.package_max_length = DEFAULT_PACKAGE_MAX_LENGTH,
.body_offset = 2,
.length_field_offset = 1,
.length_field_bytes = 1,
.length_field_coding = ENCODE_BY_VARINT,
};
具体实现代码在event/unpack.c中,在内部readbuf
的基础上直接原地拆包与组包,基本做到零拷贝,比抛给上层处理更高效,感兴趣的可以研究一下。
#include "hloop.h"
#include "hbase.h"
#include "hsocket.h"
#include "jsonrpc.h"
#include "cJSON.h"
#include "router.h"
#include "handler.h"
// hloop_create_tcp_server -> on_accept -> hio_read -> on_recv -> hio_write
static unpack_setting_t jsonrpc_unpack_setting;
jsonrpc_router router[] = {
{"add", do_add},
{"sub", do_sub},
{"mul", do_mul},
{"div", do_div},
};
#define JSONRPC_ROUTER_NUM (sizeof(router)/sizeof(router[0]))
static void on_close(hio_t* io) {
printf("on_close fd=%d error=%d\n", hio_fd(io), hio_error(io));
}
static void on_recv(hio_t* io, void* readbuf, int readbytes) {
// unpack -> cJSON_Parse -> router -> cJSON_Print -> pack -> hio_write
// unpack
jsonrpc_message msg;
memset(&msg, 0, sizeof(msg));
int packlen = jsonrpc_unpack(&msg, readbuf, readbytes);
if (packlen < 0) {
printf("jsonrpc_unpack failed!\n");
return;
}
assert(packlen == readbytes);
// cJSON_Parse
printf("> %.*s\n", msg.head.length, msg.body);
cJSON* jreq = cJSON_ParseWithLength(msg.body, msg.head.length);
cJSON* jres = cJSON_CreateObject();
cJSON* jmethod = cJSON_GetObjectItem(jreq, "method");
if (!jmethod || !cJSON_IsString(jmethod)) {
bad_request(jreq, jres);
} else {
// router
char* method = cJSON_GetStringValue(jmethod);
bool found = false;
for (int i = 0; i < JSONRPC_ROUTER_NUM; ++i) {
if (strcmp(method, router[i].method) == 0) {
found = true;
router[i].handler(jreq, jres);
break;
}
}
if (!found) {
not_found(jreq, jres);
}
}
// cJSON_Print
memset(&msg, 0, sizeof(msg));
msg.body = cJSON_PrintUnformatted(jres);
msg.head.length = strlen(msg.body);
printf("< %.*s\n", msg.head.length, msg.body);
// pack
packlen = jsonrpc_package_length(&msg.head);
unsigned char* writebuf = NULL;
HV_ALLOC(writebuf, packlen);
packlen = jsonrpc_pack(&msg, writebuf, packlen);
if (packlen > 0) {
hio_write(io, writebuf, packlen);
}
cJSON_Delete(jreq);
cJSON_Delete(jres);
cJSON_free((void*)msg.body);
HV_FREE(writebuf);
}
static void on_accept(hio_t* io) {
printf("on_accept connfd=%d\n", hio_fd(io));
hio_setcb_close(io, on_close);
hio_setcb_read(io, on_recv);
hio_set_unpack(io, &jsonrpc_unpack_setting);
hio_read(io);
}
int main(int argc, char** argv) {
if (argc < 2) {
printf("Usage: %s port\n", argv[0]);
return -10;
}
int port = atoi(argv[1]);
// init jsonrpc_unpack_setting
memset(&jsonrpc_unpack_setting, 0, sizeof(unpack_setting_t));
jsonrpc_unpack_setting.mode = UNPACK_BY_LENGTH_FIELD;
jsonrpc_unpack_setting.package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
jsonrpc_unpack_setting.body_offset = JSONRPC_HEAD_LENGTH;
jsonrpc_unpack_setting.length_field_offset = 1;
jsonrpc_unpack_setting.length_field_bytes = 4;
jsonrpc_unpack_setting.length_field_coding = ENCODE_BY_BIG_ENDIAN;
hloop_t* loop = hloop_new(0);
hio_t* listenio = hloop_create_tcp_server(loop, "0.0.0.0", port, on_accept);
if (listenio == NULL) {
return -20;
}
printf("listenfd=%d\n", hio_fd(listenio));
hloop_run(loop);
hloop_free(&loop);
return 0;
}
- hloop_new:创建事件循环
- hloop_run: 运行事件循环
- hloop_create_tcp_server:创建TCP服务
- hio_set_unpack:设置拆包规则
- hio_read:开始接收数据
- hio_write: 发送数据
- jsonrpc_unpack:拆包
- jsonrpc_pack:组包
- cJSON_xxx:json编解码
git clone https://github.com/ithewei/libhv
cd libhv
make jsonrpc
# mkdir build && cd build && cmake .. && cmake --build . --target jsonrpc
bin/jsonrpc_server 1234
bin/jsonrpc_client 127.0.0.1 1234 add 1 2
bin/jsonrpc_client 127.0.0.1 1234 div 1 0
bin/jsonrpc_client 127.0.0.1 1234 xyz 1 2
结果如下: 服务端:
$ bin/jsonrpc_server 1234
listenfd=4
on_accept connfd=7
> {"id":1,"method":"add","params":[1,2]}
< {"id":1,"result":3}
on_close fd=7 error=0
客户端:
$ bin/jsonrpc_client 127.0.0.1 1234 add 1 2
on_connect fd=4
> {"id":1,"method":"add","params":[1,2]}
< {"id":1,"result":3}
on_close fd=4 error=0
$ bin/jsonrpc_client 127.0.0.1 1234 div 1 0
on_connect fd=4
> {"id":1,"method":"div","params":[1,0]}
< {"id":1,"error":{"code":400,"message":"Bad Request"}}
on_close fd=4 error=0
$ bin/jsonrpc_client 127.0.0.1 1234 xyz 1 2
on_connect fd=4
> {"id":1,"method":"xyz","params":[1,2]}
< {"id":1,"error":{"code":404,"message":"Not Found"}}
on_close fd=4 error=0
在上篇教程中,我们200行实现了一个纯C版的jsonrpc框架,使用的event模块+cJSON
实现,本篇中我们将介绍200行实现一个C++版的protorpc框架,使用evpp模块+protobuf
实现。
evpp模块是event模块的c++封装,具体介绍见evpp/README.md
protobuf是google出品的序列化/反序列化结构化数据存储格式,具体介绍可参考我的另一篇博客protobuf,也可参考protobuf官方文档
git clone https://github.com/protocolbuffers/protobuf
cd protobuf
./autogen.sh
./configure
make
sudo make install
sudo ldconfig
which protoc
protoc -h
#include "TcpServer.h"
using namespace hv;
#include "protorpc.h"
#include "router.h"
#include "handler/handler.h"
#include "handler/calc.h"
#include "handler/login.h"
protorpc_router router[] = {
{"add", calc_add},
{"sub", calc_sub},
{"mul", calc_mul},
{"div", calc_div},
{"login", login},
};
#define PROTORPC_ROUTER_NUM (sizeof(router)/sizeof(router[0]))
class ProtoRpcServer : public TcpServer {
public:
ProtoRpcServer() : TcpServer()
{
onConnection = [](const SocketChannelPtr& channel) {
std::string peeraddr = channel->peeraddr();
if (channel->isConnected()) {
printf("%s connected! connfd=%d\n", peeraddr.c_str(), channel->fd());
} else {
printf("%s disconnected! connfd=%d\n", peeraddr.c_str(), channel->fd());
}
};
onMessage = handleMessage;
// init protorpc_unpack_setting
unpack_setting_t protorpc_unpack_setting;
memset(&protorpc_unpack_setting, 0, sizeof(unpack_setting_t));
protorpc_unpack_setting.mode = UNPACK_BY_LENGTH_FIELD;
protorpc_unpack_setting.package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
protorpc_unpack_setting.body_offset = PROTORPC_HEAD_LENGTH;
protorpc_unpack_setting.length_field_offset = 1;
protorpc_unpack_setting.length_field_bytes = 4;
protorpc_unpack_setting.length_field_coding = ENCODE_BY_BIG_ENDIAN;
setUnpack(&protorpc_unpack_setting);
}
int listen(int port) { return createsocket(port); }
private:
static void handleMessage(const SocketChannelPtr& channel, Buffer* buf) {
// unpack -> Request::ParseFromArray -> router -> Response::SerializeToArray -> pack -> Channel::write
// protorpc_unpack
protorpc_message msg;
memset(&msg, 0, sizeof(msg));
int packlen = protorpc_unpack(&msg, buf->data(), buf->size());
if (packlen < 0) {
printf("protorpc_unpack failed!\n");
return;
}
assert(packlen == buf->size());
// Request::ParseFromArray
protorpc::Request req;
protorpc::Response res;
if (req.ParseFromArray(msg.body, msg.head.length)) {
printf("> %s\n", req.DebugString().c_str());
res.set_id(req.id());
// router
const char* method = req.method().c_str();
bool found = false;
for (int i = 0; i < PROTORPC_ROUTER_NUM; ++i) {
if (strcmp(method, router[i].method) == 0) {
found = true;
router[i].handler(req, &res);
break;
}
}
if (!found) {
not_found(req, &res);
}
} else {
bad_request(req, &res);
}
// Response::SerializeToArray + protorpc_pack
memset(&msg, 0, sizeof(msg));
msg.head.length = res.ByteSizeLong();
packlen = protorpc_package_length(&msg.head);
unsigned char* writebuf = NULL;
HV_ALLOC(writebuf, packlen);
packlen = protorpc_pack(&msg, writebuf, packlen);
if (packlen > 0) {
printf("< %s\n", res.DebugString().c_str());
res.SerializeToArray(writebuf + PROTORPC_HEAD_LENGTH, msg.head.length);
channel->write(writebuf, packlen);
}
HV_FREE(writebuf);
}
};
int main(int argc, char** argv) {
if (argc < 2) {
printf("Usage: %s port\n", argv[0]);
return -10;
}
int port = atoi(argv[1]);
ProtoRpcServer srv;
int listenfd = srv.listen(port);
if (listenfd < 0) {
return -20;
}
printf("protorpc_server listen on port %d, listenfd=%d ...\n", port, listenfd);
srv.setThreadNum(4);
srv.start();
while (1) hv_sleep(1);
return 0;
}
流程很清晰,启动一个TcpServer
,监听指定端口,通过setUnpack
接口设置拆包规则,onMessage
回调上来就是完整的一包数据,回调里调用protorpc_unpack
拆包、Request::ParseFromArray
反序列化得到结构化的请求,通过请求里的method
字段查找注册好的router路由表
,调用对应的handler
处理请求、填充响应,然后Response::SerializeToArray
序列化响应+protorpc_pack
加上头部封包后,最后调用Channel::write
发送出去。
base.proto定义如下:
syntax = "proto3";
package protorpc;
message Error {
int32 code = 1;
string message = 2;
}
message Request {
uint64 id = 1;
string method = 2;
repeated bytes params = 3;
}
message Response {
uint64 id = 1;
optional bytes result = 2;
optional Error error = 3;
}
执行该目录下的protoc.sh
会调用protoc
根据proto
定义文件自动生成对应代码。
git clone https://github.com/ithewei/libhv
cd libhv
make protorpc
bin/protorpc_server 1234
bin/protorpc_client 127.0.0.1 1234 add 1 2
bin/protorpc_client 127.0.0.1 1234 div 1 0
bin/protorpc_client 127.0.0.1 1234 xyz 1 2
结果如下: 服务端:
$ bin/protorpc_server 1234
protorpc_server listen on port 1234, listenfd=3 ...
客户端:
$ bin/protorpc_client 127.0.0.1 1234 add 1 2
connected to 127.0.0.1:1234! connfd=4
id: 1
method: "login"
params: "\n\005admin\022\006123456"
login success!
user_id: 123456
token: "admin:123456"
id: 2
method: "add"
params: "\010\001"
params: "\010\002"
calc success!
1 add 2 = 3
disconnected to 127.0.0.1:1234! connfd=4
$ bin/protorpc_client 127.0.0.1 1234 div 1 0
connected to 127.0.0.1:1234! connfd=4
id: 1
method: "login"
params: "\n\005admin\022\006123456"
login success!
user_id: 123456
token: "admin:123456"
id: 2
method: "div"
params: "\010\001"
params: ""
RPC error:
code: 400
message: "Bad Request"
calc failed!
disconnected to 127.0.0.1:1234! connfd=4
$ bin/protorpc_client 127.0.0.1 1234 xyz 1 2
connected to 127.0.0.1:1234! connfd=4
id: 1
method: "login"
params: "\n\005admin\022\006123456"
login success!
user_id: 123456
token: "admin:123456"
id: 2
method: "xyz"
params: "\010\001"
params: "\010\002"
RPC error:
code: 404
message: "Not Found"
calc failed!
disconnected to 127.0.0.1:1234! connfd=4
本篇介绍服务端编程的多线程/多进程模式
以及使用libhv
如何实现。
早期的apache
就是采用这种模式,用于学习服务端编程的Tinyhttpd也是这种模式。
伪代码如下:
void* worker_thread(void* userdata) {
int fd = (intptr_t)userdata;
while (1) {
readbytes = read(fd, buf, len);
if (readbytes <= 0) {
close(fd);
break;
}
...
writebytes = write(fd, buf, len);
if (writebytes <= 0) {
close(fd);
break;
}
}
}
void* accept_thread(void* userdata) {
int listenfd = (intptr_t)userdata;
while (1) {
int connfd = accept(listenfd, &sockaddr, &socklen);
// 创建一个线程为这个连接服务
pthread_create(worker_thread, (void *)(intptr_t)connfd);
}
}
这种模式的缺点显而易见,因为读写都是阻塞的,所以一个IO线程只能处理一个fd,对于客户端尚可接受,对于服务端来说,每accept一个连接,就创建一个IO线程去读写这个套接字,并发达到几千就需要创建几千个线程,线程上下文的切换开销都会把系统占满。
因为BIO
(阻塞IO)的局限性,所以IO多路复用机制应运而生,如最早期的select
、后来的poll
,linux的epoll
、windows的iocp
、bsd的kqueue
、solaris的port
等,都属于IO多路复用机制。非阻塞NIO搭配IO多路复用机制就是高并发的钥匙
。
每个线程里运行一个事件循环,每个事件循环里通过IO多路复用机制(即select、poll、epoll、kqueue
等)监听读写事件,这正是libevent、libev、libuv、libhv
这类事件循环库的核心思想。
以select
为例,伪代码如下:
void event_loop_run() {
while (1) {
int nselect = select(max_fd+1, &readfds, &writefds, &exceptfds, timeout);
if (nselect == 0) continue;
for (int fd = 0; fd <= max_fd; ++fd) {
// 可读
if (FD_ISSET(fd, &readfds)) {
...
read(fd, buf, len);
}
// 可写
if (FD_ISSET(fd, &writefds)) {
...
write(fd, buf, len);
}
}
}
}
通过IO多路复用机制,一个IO线程就可以同时监听多个fd了,以现代计算机的性能,一个IO线程即可处理几十万数量级别的IO读写。redis
就是单线程的,但可轻松达到几万QPS
。
为了充分利用现代计算机的多核处理器,掌握多线程服务端编程也就必不可少了。
libhv
的 examples/multi-thread 目录下给出了几种常见的多线程/多进程模式的具体写法。
/*
*
* @build make examples
* @server bin/multi-acceptor-processes 1234
* @client bin/nc 127.0.0.1 1234
* nc 127.0.0.1 1234
* telnet 127.0.0.1 1234
*/
#include "hloop.h"
#include "hsocket.h"
#include "hthread.h"
#include "hproc.h"
static const char* host = "0.0.0.0";
static int port = 1234;
static int process_num = 4;
static int listenfd = INVALID_SOCKET;
static void on_close(hio_t* io) {
printf("on_close fd=%d error=%d\n", hio_fd(io), hio_error(io));
}
static void on_recv(hio_t* io, void* buf, int readbytes) {
// echo
hio_write(io, buf, readbytes);
}
static void on_accept(hio_t* io) {
char localaddrstr[SOCKADDR_STRLEN] = {0};
char peeraddrstr[SOCKADDR_STRLEN] = {0};
printf("pid=%ld connfd=%d [%s] <= [%s]\n",
(long)hv_getpid(),
(int)hio_fd(io),
SOCKADDR_STR(hio_localaddr(io), localaddrstr),
SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
hio_setcb_close(io, on_close);
hio_setcb_read(io, on_recv);
hio_read(io);
}
static void loop_proc(void* userdata) {
hloop_t* loop = hloop_new(HLOOP_FLAG_AUTO_FREE);
haccept(loop, listenfd, on_accept);
hloop_run(loop);
}
int main(int argc, char** argv) {
if (argc < 2) {
printf("Usage: cmd port\n");
return -10;
}
port = atoi(argv[1]);
listenfd = Listen(port, host);
if (listenfd < 0) {
exit(1);
}
proc_ctx_t ctx;
memset(&ctx, 0, sizeof(ctx));
ctx.proc = loop_proc;
for (int i = 0; i < process_num; ++i) {
hproc_spawn(&ctx);
}
while(1) hv_sleep(1);
return 0;
}
关键之处就是通过hproc_spawn
(linux下就是调用fork
)衍生子进程,每个进程里运行一个事件循环(hloop_run
),accept
请求,将连接上来的fd加入到IO多路复用中,监听读写事件。
多进程模式的好处就是父进程可以通过捕获SIGCHLD
信号,即可知道子进程退出了(通常是异常崩溃了),然后重新fork
一个子进程,也即是崩溃自动重启功能。而且因为进程空间的隔离,一个子进程的崩溃不会影响其它的子进程,导致所有服务进程都不可用,所以鲁棒性比较强,nginx
就是采用的多进程模式
。libhv中提供的httpd示例也是如此。
注:libhv中提供了一个接口master_workers_run
来实现这种带崩溃自动重启的master-workers
多进程模式,具体示例可参考 examples/hmain_test.cpp
/*
*
* @build make examples
* @server bin/multi-acceptor-threads 1234
* @client bin/nc 127.0.0.1 1234
* nc 127.0.0.1 1234
* telnet 127.0.0.1 1234
*/
#include "hloop.h"
#include "hsocket.h"
#include "hthread.h"
static const char* host = "0.0.0.0";
static int port = 1234;
static int thread_num = 4;
static int listenfd = INVALID_SOCKET;
static void on_close(hio_t* io) {
printf("on_close fd=%d error=%d\n", hio_fd(io), hio_error(io));
}
static void on_recv(hio_t* io, void* buf, int readbytes) {
// echo
hio_write(io, buf, readbytes);
}
static void on_accept(hio_t* io) {
char localaddrstr[SOCKADDR_STRLEN] = {0};
char peeraddrstr[SOCKADDR_STRLEN] = {0};
printf("tid=%ld connfd=%d [%s] <= [%s]\n",
(long)hv_gettid(),
(int)hio_fd(io),
SOCKADDR_STR(hio_localaddr(io), localaddrstr),
SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
hio_setcb_close(io, on_close);
hio_setcb_read(io, on_recv);
hio_read(io);
}
static HTHREAD_RETTYPE loop_thread(void* userdata) {
hloop_t* loop = hloop_new(HLOOP_FLAG_AUTO_FREE);
haccept(loop, listenfd, on_accept);
hloop_run(loop);
return 0;
}
int main(int argc, char** argv) {
if (argc < 2) {
printf("Usage: cmd port\n");
return -10;
}
port = atoi(argv[1]);
listenfd = Listen(port, host);
if (listenfd < 0) {
exit(1);
}
for (int i = 0; i < thread_num; ++i) {
hthread_create(loop_thread, NULL);
}
while(1) hv_sleep(1);
return 0;
}
和上面多进程类似,只不过是调用了hthread_create
(linux下就是pthread_create
)创建线程而不是进程,然后每个线程里运行一个事件循环(hloop_run
)。
多线程模式相对于多进程模式的好处就是共享进程空间,也即是共享数据,而不是每个进程一份资源,当然这也带来了多线程同步的麻烦。
/*
*
* @build make examples
* @server bin/one-acceptor-multi-workers 1234
* @client bin/nc 127.0.0.1 1234
* nc 127.0.0.1 1234
* telnet 127.0.0.1 1234
*/
#include "hloop.h"
#include "hsocket.h"
#include "hthread.h"
static const char* host = "0.0.0.0";
static int port = 1234;
static int thread_num = 4;
static hloop_t* accept_loop = NULL;
static hloop_t** worker_loops = NULL;
static hloop_t* get_next_loop() {
static int s_cur_index = 0;
if (s_cur_index == thread_num) {
s_cur_index = 0;
}
return worker_loops[s_cur_index++];
}
static void on_close(hio_t* io) {
printf("on_close fd=%d error=%d\n", hio_fd(io), hio_error(io));
}
static void on_recv(hio_t* io, void* buf, int readbytes) {
// echo
hio_write(io, buf, readbytes);
}
static void new_conn_event(hevent_t* ev) {
hloop_t* loop = ev->loop;
hio_t* io = (hio_t*)hevent_userdata(ev);
hio_attach(loop, io);
char localaddrstr[SOCKADDR_STRLEN] = {0};
char peeraddrstr[SOCKADDR_STRLEN] = {0};
printf("tid=%ld connfd=%d [%s] <= [%s]\n",
(long)hv_gettid(),
(int)hio_fd(io),
SOCKADDR_STR(hio_localaddr(io), localaddrstr),
SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
hio_setcb_close(io, on_close);
hio_setcb_read(io, on_recv);
hio_read(io);
}
static void on_accept(hio_t* io) {
hio_detach(io);
hloop_t* worker_loop = get_next_loop();
hevent_t ev;
memset(&ev, 0, sizeof(ev));
ev.loop = worker_loop;
ev.cb = new_conn_event;
ev.userdata = io;
hloop_post_event(worker_loop, &ev);
}
static HTHREAD_RETTYPE worker_thread(void* userdata) {
hloop_t* loop = (hloop_t*)userdata;
hloop_run(loop);
return 0;
}
static HTHREAD_RETTYPE accept_thread(void* userdata) {
hloop_t* loop = (hloop_t*)userdata;
hio_t* listenio = hloop_create_tcp_server(loop, host, port, on_accept);
if (listenio == NULL) {
exit(1);
}
hloop_run(loop);
return 0;
}
int main(int argc, char** argv) {
if (argc < 2) {
printf("Usage: cmd port\n");
return -10;
}
port = atoi(argv[1]);
worker_loops = (hloop_t**)malloc(sizeof(hloop_t*) * thread_num);
for (int i = 0; i < thread_num; ++i) {
worker_loops[i] = hloop_new(HLOOP_FLAG_AUTO_FREE);
hthread_create(worker_thread, worker_loops[i]);
}
accept_loop = hloop_new(HLOOP_FLAG_AUTO_FREE);
accept_thread(accept_loop);
return 0;
}
这种模式相对于上面accept(listenfd, ...)
和read/write(connfd, ...)
都在一个线程里,多了一个步骤,当accept线程接收到一个连接时,需要挑选一个worker_loop(示例里就是简单的轮询
策略,实际应用里可能还有根据最少连接数
、IP hash
、URL hash
等负载均衡策略
),然后通知该worker_loop有新的连接到来,libhv里是通过hloop_post_event
这个接口来进行事件循环间通信
的,这个接口是多线程安全
的,内部实现是预先创建一对socketpair
,向一个fd写入,监听另外一个fd可读,感兴趣的可以读下 event/hloop.c 源码。
这种连接分发模式其实是当下比较流行的模式,因为比较方便控制负载均衡,不会出现一部分线程饱和,一部分线程饥饿的现象,memcached
即是使用libevent
实现的这种模式,但是因为libevent
并没有提供hloop_post_event
类似接口,所以需要用户自己实现,而且libevent
也没有集成openssl
、也没有提供拆包组包
等相关功能,所以memcached
里网络编程的代码并不清晰好读,如果使用libhv,我想事情会变得超级简单。
以上只是展示了网络IO相关的多线程模式,实际掺合业务所涉及的多线程编程比这个更加复杂,IO线程里是不允许做太多耗时操作的(一般只做接收/发送、轻量级的拆包/组包、序列化/反序列化操作)
,否则会影响线程里其他连接的读写,所以如果涉及CPU密集型计算,如音视频编解码、人脸识别、运动追踪等算法检测,则需要配合队列
(根据业务可能叫消息队列、请求队列、任务队列、帧缓存
等)+ 消费者线程/线程池
(如请求处理线程、任务执行线程、编解码线程
等)使用。好在libhv
里hio_write
、hio_close
都是多线程安全的,这可以让网络IO事件循环线程里接收数据、拆包组包、反序列化
后放入队列,消费者线程从队列里取出数据、处理后发送响应和关闭连接
,变得更加简单。