-
Notifications
You must be signed in to change notification settings - Fork 3.8k
2019 09 03 手写RPC框架第三章《RPC中间件》
作者:小傅哥
博客:https://bugstack.cn - 原创系列专题
沉淀、分享、成长,让自己和他人都能有所收获!
结合上面两章节,本章将实现rpc的基础功能;提供一给rpc中间件jar给生产端和服务端。 技术点;
- 注册中心,生产者在启动的时候需要将本地接口发布到注册中心,我们这里采用redis作为注册中心,随机取数模拟权重。
- 客户端在启动的时候,连接到注册中心,也就是我们的redis。连接成功后将配置的生产者方法发布到注册中心{接口+别名}。
- 服务端配置生产者的信息后,在加载xml时候由中间件生成动态代理类,当发生发放调用时实际则调用了我们代理类的方法,代理里会通过netty的futuer通信方式进行数据交互。
- jdk 1.8.0
- IntelliJ IDEA Community Edition 2018.3.1 x64
- windows redis
itstack-demo-rpc-03
└── src
└── main
│ ├── java
│ │ └── org.itstack.demo.rpc
│ │ ├── config
│ │ ├── domain
│ │ ├── network
│ │ │ ├── client
│ │ │ │ ├── ClientSocket.java
│ │ │ │ └── MyClientHandler.java
│ │ │ ├── codec
│ │ │ │ ├── RpcDecoder.java
│ │ │ │ └── RpcEncoder.java
│ │ │ ├── future
│ │ │ │ ├── SyncWrite.java
│ │ │ │ ├── SyncWriteFuture.java
│ │ │ │ ├── SyncWriteMap.java
│ │ │ │ └── WriteFuture.java
│ │ │ ├── msg
│ │ │ │ ├── Request.java
│ │ │ │ └── Response.java
│ │ │ ├── server
│ │ │ │ ├── MyServerHandler.java
│ │ │ │ └── ServerSocket.java
│ │ │ └── util
│ │ │ └── SerializationUtil.java
│ │ ├── reflect
│ │ │ ├── JDKInvocationHandler.java
│ │ │ └── JDKProxy.java
│ │ ├── registry
│ │ │ └── RedisRegistryCenter.java
│ │ └── util
│ └── resource
│ └── META-INF
│ ├── rpc.xsd
│ ├── spring.handlers
│ └── spring.schemas
└── test
├── java
│ └── org.itstack.demo.test
│ ├── service
│ │ ├── impl
│ │ │ └── HelloServiceImpl.java
│ │ └── HelloService.java
│ └── ApiTest.java
└── resource
├── itstack-rpc-center.xml
├── itstack-rpc-consumer.xml
├── itstack-rpc-provider.xml
└── log4j.xml
ConsumerBean.java
package org.itstack.demo.rpc.config.spring.bean;
import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelFuture;
import org.itstack.demo.rpc.config.ConsumerConfig;
import org.itstack.demo.rpc.domain.RpcProviderConfig;
import org.itstack.demo.rpc.network.client.ClientSocket;
import org.itstack.demo.rpc.network.msg.Request;
import org.itstack.demo.rpc.reflect.JDKProxy;
import org.itstack.demo.rpc.registry.RedisRegistryCenter;
import org.itstack.demo.rpc.util.ClassLoaderUtils;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.util.Assert;
/**
* http://www.itstack.org
* create by fuzhengwei on 2019/5/6
*/
public class ConsumerBean<T> extends ConsumerConfig<T> implements FactoryBean {
private ChannelFuture channelFuture;
private RpcProviderConfig rpcProviderConfig;
@Override
public Object getObject() throws Exception {
//从redis获取链接
if (null == rpcProviderConfig) {
String infoStr = RedisRegistryCenter.obtainProvider(nozzle, alias);
rpcProviderConfig = JSON.parseObject(infoStr, RpcProviderConfig.class);
}
Assert.isTrue(null != rpcProviderConfig);
//获取通信channel
if (null == channelFuture) {
ClientSocket clientSocket = new ClientSocket(rpcProviderConfig.getHost(), rpcProviderConfig.getPort());
new Thread(clientSocket).start();
for (int i = 0; i < 100; i++) {
if (null != channelFuture) break;
Thread.sleep(500);
channelFuture = clientSocket.getFuture();
}
}
Assert.isTrue(null != channelFuture);
Request request = new Request();
request.setChannel(channelFuture.channel());
request.setNozzle(nozzle);
request.setRef(rpcProviderConfig.getRef());
request.setAlias(alias);
return (T) JDKProxy.getProxy(ClassLoaderUtils.forName(nozzle), request);
}
@Override
public Class<?> getObjectType() {
try {
return ClassLoaderUtils.forName(nozzle);
} catch (ClassNotFoundException e) {
return null;
}
}
@Override
public boolean isSingleton() {
return true;
}
}
ProviderBean.java
package org.itstack.demo.rpc.config.spring.bean;
import com.alibaba.fastjson.JSON;
import org.itstack.demo.rpc.config.ProviderConfig;
import org.itstack.demo.rpc.domain.LocalServerInfo;
import org.itstack.demo.rpc.domain.RpcProviderConfig;
import org.itstack.demo.rpc.registry.RedisRegistryCenter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
/**
* http://www.itstack.org
* create by fuzhengwei on 2019/5/6
*/
public class ProviderBean extends ProviderConfig implements ApplicationContextAware {
private Logger logger = LoggerFactory.getLogger(ProviderBean.class);
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
RpcProviderConfig rpcProviderConfig = new RpcProviderConfig();
rpcProviderConfig.setNozzle(nozzle);
rpcProviderConfig.setRef(ref);
rpcProviderConfig.setAlias(alias);
rpcProviderConfig.setHost(LocalServerInfo.LOCAL_HOST);
rpcProviderConfig.setPort(LocalServerInfo.LOCAL_PORT);
//注册生产者
long count = RedisRegistryCenter.registryProvider(nozzle, alias, JSON.toJSONString(rpcProviderConfig));
logger.info("注册生产者:{} {} {}", nozzle, alias, count);
}
}
ServerBean.java
package org.itstack.demo.rpc.config.spring.bean;
import org.itstack.demo.rpc.config.ServerConfig;
import org.itstack.demo.rpc.domain.LocalServerInfo;
import org.itstack.demo.rpc.network.server.ServerSocket;
import org.itstack.demo.rpc.registry.RedisRegistryCenter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
/**
* http://www.itstack.org
* create by fuzhengwei on 2019/5/6
*/
public class ServerBean extends ServerConfig implements ApplicationContextAware {
private Logger logger = LoggerFactory.getLogger(ServerBean.class);
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
//启动注册中心
logger.info("启动注册中心 ...");
RedisRegistryCenter.init(host, port);
logger.info("启动注册中心完成 {} {}", host, port);
//初始化服务端
logger.info("初始化生产端服务 ...");
ServerSocket serverSocket = new ServerSocket(applicationContext);
Thread thread = new Thread(serverSocket);
thread.start();
while (!serverSocket.isActiveSocketServer()) {
try {
Thread.sleep(500);
} catch (InterruptedException ignore) {
}
}
logger.info("初始化生产端服务完成 {} {}", LocalServerInfo.LOCAL_HOST, LocalServerInfo.LOCAL_PORT);
}
}
MyClientHandler.java
package org.itstack.demo.rpc.network.client;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.itstack.demo.rpc.network.future.SyncWriteFuture;
import org.itstack.demo.rpc.network.future.SyncWriteMap;
import org.itstack.demo.rpc.network.msg.Response;
/**
* http://www.itstack.org
* create by fuzhengwei on 2019/5/6
*/
public class MyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception {
Response msg = (Response) obj;
String requestId = msg.getRequestId();
SyncWriteFuture future = (SyncWriteFuture) SyncWriteMap.syncKey.get(requestId);
if (future != null) {
future.setResponse(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
MyServerHandler.java
package org.itstack.demo.rpc.network.server;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import org.itstack.demo.rpc.network.msg.Request;
import org.itstack.demo.rpc.network.msg.Response;
import org.itstack.demo.rpc.util.ClassLoaderUtils;
import org.springframework.context.ApplicationContext;
import java.lang.reflect.Method;
/**
* http://www.itstack.org
* create by fuzhengwei on 2019/5/6
*/
public class MyServerHandler extends ChannelInboundHandlerAdapter {
private ApplicationContext applicationContext;
MyServerHandler(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object obj) {
try {
Request msg = (Request) obj;
//调用
Class<?> classType = ClassLoaderUtils.forName(msg.getNozzle());
Method addMethod = classType.getMethod(msg.getMethodName(), msg.getParamTypes());
Object objectBean = applicationContext.getBean(msg.getRef());
Object result = addMethod.invoke(objectBean, msg.getArgs());
//反馈
Response request = new Response();
request.setRequestId(msg.getRequestId());
request.setResult(result);
ctx.writeAndFlush(request);
//释放
ReferenceCountUtil.release(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
}
JDKInvocationHandler.java
package org.itstack.demo.rpc.reflect;
import org.itstack.demo.rpc.network.future.SyncWrite;
import org.itstack.demo.rpc.network.msg.Request;
import org.itstack.demo.rpc.network.msg.Response;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
public class JDKInvocationHandler implements InvocationHandler {
private Request request;
public JDKInvocationHandler(Request request) {
this.request = request;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class[] paramTypes = method.getParameterTypes();
if ("toString".equals(methodName) && paramTypes.length == 0) {
return request.toString();
} else if ("hashCode".equals(methodName) && paramTypes.length == 0) {
return request.hashCode();
} else if ("equals".equals(methodName) && paramTypes.length == 1) {
return request.equals(args[0]);
}
//设置参数
request.setMethodName(methodName);
request.setParamTypes(paramTypes);
request.setArgs(args);
request.setRef(request.getRef());
Response response = new SyncWrite().writeAndSync(request.getChannel(), request, 5000);
//异步调用
return response.getResult();
}
}
JDKProxy.java
package org.itstack.demo.rpc.reflect;
import org.itstack.demo.rpc.network.msg.Request;
import org.itstack.demo.rpc.util.ClassLoaderUtils;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
public class JDKProxy {
public static <T> T getProxy(Class<T> interfaceClass, Request request) throws Exception {
InvocationHandler handler = new JDKInvocationHandler(request);
ClassLoader classLoader = ClassLoaderUtils.getCurrentClassLoader();
T result = (T) Proxy.newProxyInstance(classLoader, new Class[]{interfaceClass}, handler);
return result;
}
}
RedisRegistryCenter.java
package org.itstack.demo.rpc.registry;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/**
* http://www.itstack.org
* create by fuzhengwei on 2019/5/7
* redis 模拟RPC注册中心
*/
public class RedisRegistryCenter {
private static Jedis jedis; //非切片额客户端连接
//初始化redis
public static void init(String host, int port) {
// 池基本配置
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxIdle(5);
config.setTestOnBorrow(false);
JedisPool jedisPool = new JedisPool(config, host, port);
jedis = jedisPool.getResource();
}
/**
* 注册生产者
*
* @param nozzle 接口
* @param alias 别名
* @param info 信息
* @return 注册结果
*/
public static Long registryProvider(String nozzle, String alias, String info) {
return jedis.sadd(nozzle + "_" + alias, info);
}
/**
* 获取生产者
* 模拟权重,随机获取
* @param nozzle 接口名称
*/
public static String obtainProvider(String nozzle, String alias) {
return jedis.srandmember(nozzle + "_" + alias);
}
public static Jedis jedis() {
return jedis;
}
}
ApiTest.java
public class ApiTest {
public static void main(String[] args) {
String[] configs = {"itstack-rpc-center.xml", "itstack-rpc-provider.xml", "itstack-rpc-consumer.xml"};
new ClassPathXmlApplicationContext(configs);
}
}
2019-....ClassPathXmlApplicationContext:prepareRefresh:510] - Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@299a06ac: startup date [Tue May 07 20:19:47 CST 2019]; root of context hierarchy
2019-...ml.XmlBeanDefinitionReader:loadBeanDefinitions:315] - Loading XML bean definitions from class path resource [spring/itstack-rpc-center.xml]
2019-...ml.XmlBeanDefinitionReader:loadBeanDefinitions:315] - Loading XML bean definitions from class path resource [spring/itstack-rpc-provider.xml]
2019-...ml.XmlBeanDefinitionReader:loadBeanDefinitions:315] - Loading XML bean definitions from class path resource [spring/itstack-rpc-consumer.xml]
2019-...upport.DefaultListableBeanFactory:preInstantiateSingletons:577] - Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@7e0b0338: defining beans [consumer_itstack,provider_helloService,consumer_helloService]; root of factory hierarchy
2019-...bean.ServerBean:setApplicationContext:25] - 启动注册中心 ...
2019-...bean.ServerBean:setApplicationContext:27] - 启动注册中心完成 127.0.0.1 6379
2019-...bean.ServerBean:setApplicationContext:30] - 初始化生产端服务 ...
2019-...bean.ServerBean:setApplicationContext:41] - 初始化生产端服务完成 10.13.81.104 22201
2019-...bean.ProviderBean:setApplicationContext:35] - 注册生产者:org.itstack.demo.test.service.HelloService itStackRpc 0
为了测试我们写两个测试工程;itstack-demo-rpc-provider、itstack-demo-rpc-consumer
itstack-demo-rpc-provider 提供生产者接口
itstack-demo-rpc-provider
├── itstack-demo-rpc-provider-export
│ └── src
│ └── main
│ └── java
│ └── org.itstack.demo.rpc.provider.export
│ ├── domain
│ │ └── Hi.java
│ └── HelloService.java
│
└── itstack-demo-rpc-provider-web
└── src
└── main
├── java
│ └── org.itstack.demo.rpc.provider.web
│ └── HelloServiceImpl.java
└── resources
└── spring
└── spring-itstack-rpc-provider.xml
HelloService.java
public interface HelloService {
String hi();
String say(String str);
String sayHi(Hi hi);
}
HelloServiceImpl.java
@Controller("helloService")
public class HelloServiceImpl implements HelloService {
@Override
public String hi() {
return "hi itstack rpc";
}
@Override
public String say(String str) {
return str;
}
@Override
public String sayHi(Hi hi) {
return hi.getUserName() + " say:" + hi.getSayMsg();
}
}
spring-itstack-rpc-provider.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rpc="http://rpc.itstack.org/schema/rpc"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://rpc.itstack.org/schema/rpc http://rpc.itstack.org/schema/rpc/rpc.xsd">
<!-- 注册中心 -->
<rpc:server id="rpcServer" host="127.0.0.1" port="6379"/>
<rpc:provider id="helloServiceRpc" nozzle="org.itstack.demo.rpc.provider.export.HelloService"
ref="helloService" alias="itstackRpc"/>
</beans>
itstack-demo-rpc-consumer 提供消费者调用
itstack-demo-rpc-consumer
└── src
├── main
│ ├── java
│ └── resources
│ └── spring
│ └── spring-itstack-rpc-consumer.xml
│
└── test
└── java
└── org.itstack.demo.test
└── ConsumerTest.java
spring-itstack-rpc-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rpc="http://rpc.itstack.org/schema/rpc"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://rpc.itstack.org/schema/rpc http://rpc.itstack.org/schema/rpc/rpc.xsd">
<!-- 注册中心 -->
<rpc:server id="consumer_itstack" host="127.0.0.1" port="6379"/>
<rpc:consumer id="helloService" nozzle="org.itstack.demo.rpc.provider.export.HelloService" alias="itstackRpc"/>
</beans>
ConsumerTest.java
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("/spring-config.xml")
public class ConsumerTest {
@Resource(name = "helloService")
private HelloService helloService;
@Test
public void test() {
String hi = helloService.hi();
System.out.println("测试结果:" + hi);
String say = helloService.say("hello world");
System.out.println("测试结果:" + say);
Hi hiReq = new Hi();
hiReq.setUserName("付栈");
hiReq.setSayMsg("付可敌国,栈无不胜");
String hiRes = helloService.sayHi(hiReq);
System.out.println("测试结果:" + hiRes);
}
}
redis 127.0.0.1:6379> srandmember org.itstack.demo.rpc.provider.export.HelloService_itstackRpc
"{\"alias\":\"itstackRpc\",\"host\":\"10.13.81.104\",\"nozzle\":\"org.itstack.demo.rpc.provider.export.HelloService\",\"port\":22201,\"ref\":\"helloService\"}"
redis 127.0.0.1:6379>
log4j:WARN No appenders could be found for logger (org.springframework.test.context.junit4.SpringJUnit4ClassRunner).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
测试结果:hi itstack rpc
测试结果:hello world
测试结果:付栈 say:付可敌国,栈无不胜
Process finished with exit code 0
下一篇:websocket与下位机通过netty方式通信传输行为信息
微信搜索「bugstack虫洞栈」公众号,关注后回复「rpc案例源码」获取本文源码&更多原创专题案例!
小傅哥(微信:fustack),公众号:bugstack虫洞栈
| bugstack.cn - 沉淀、分享、成长,让自己和他人都能有所收获!
🌏 知识星球:码农会锁
实战项目:「DDD+RPC分布式抽奖系统
」、专属小册、问题解答、简历指导、架构图稿、视频课程
🐲 头条
-
💥
🎁 Lottery 抽奖系统
- 基于领域驱动设计的四层架构的互联网分布式开发实践 -
小傅哥的《重学 Java 设计模式》
- 全书彩印、重绘类图、添加内容 -
⭐小傅哥的《Java 面经手册》
- 全书5章29节,417页11.5万字,完稿&发版 -
小傅哥的《手撸 Spring》
- 通过带着读者手写简化版 Spring 框架,了解 Spring 核心原理 -
🌈小傅哥的《SpringBoot 中间件设计和开发》
- 小册16个中间件开发30个代码库
⛳ 目录
💋 精选
🐾 友链
建立本开源项目的初衷是基于个人学习与工作中对 Java 相关技术栈的总结记录,在这里也希望能帮助一些在学习 Java 过程中遇到问题的小伙伴,如果您需要转载本仓库的一些文章到自己的博客,请按照以下格式注明出处,谢谢合作。
作者:小傅哥
链接:https://bugstack.cn
来源:bugstack虫洞栈
2021年10月24日,小傅哥
的文章全部开源到代码库 CodeGuide
中,与同好同行,一起进步,共同维护。
这里我提供 3 种方式:
-
提出
Issue
:在 Issue 中指出你觉得需要改进/完善的地方(能够独立解决的话,可以在提出 Issue 后再提交PR
)。 -
处理
Issue
: 帮忙处理一些待处理的Issue
。 -
提交
PR
: 对于错别字/笔误这类问题可以直接提交PR
,无需提交Issue
确认。
详细参考:CodeGuide 贡献指南 - 非常感谢你的支持,这里会留下你的足迹
- 加群交流 本群的宗旨是给大家提供一个良好的技术学习交流平台,所以杜绝一切广告!由于微信群人满 100 之后无法加入,请扫描下方二维码先添加作者 “小傅哥” 微信(fustack),备注:加群。
- 公众号(bugstack虫洞栈) - 沉淀、分享、成长,专注于原创专题案例,以最易学习编程的方式分享知识,让自己和他人都能有所收获。
感谢以下人员对本仓库做出的贡献或者对小傅哥的赞赏,当然不仅仅只有这些贡献者,这里就不一一列举了。如果你希望被添加到这个名单中,并且提交过 Issue 或者 PR,请与我联系。