news 2026/4/16 13:12:30

so eazy!使用Netty和动态代理一键实现一个简单的RPC

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
so eazy!使用Netty和动态代理一键实现一个简单的RPC

RPC(remote procedure call)远程过程调用

RPC是为了在分布式应用中,两台主机的Java进程进行通信,当A主机调用B主机的方法时,过程简洁,就像是调用自己进程里的方法一样。
RPC框架的职责就是,封装好底层调用的细节,客户端只要调用方法,就能够获取服务提供者的响应,方便开发者编写代码。
RPC底层使用的是TCP协议,服务端和客户端和点对点通信。

作用

在RPC的应用场景中,客户端调用服务端的代码

客户端需要有相应的api接口,将方法名、方法参数类型、具体参数等等都发送给服务端

服务端需要有方法的具体实现,在接收到客户端的请求后,根据信息调用对应的方法,并返回响应给客户端

流程图演示

代码实现

首先客户端要知道服务端的接口,然后封装一个请求对象,发送给服务端

要调用一个方法需要有:方法名、方法参数类型、具体参数、执行方法的类名

View Code

由服务端返回给客户端的响应(方法调用结果)也使用一个对象进行封装

View Code

  • 如果是在多线程调用中,需要具体把每个响应返回给对应的请求,可以加一个ID进行标识

将对象通过网络传输,需要先进行序列化操作,这里使用的是jackson工具

<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.11.4</version> </dependency>

View Code

  • 在反序列化过程中,需要指定要转化的类型,而服务端接收request,客户端接收response,二者类型是不一样的,所以在后续传输时指定类型

有了需要传输的数据后,使用Netty开启网络服务进行传输

服务端

绑定端口号,开启连接

public class ServerNetty { public static void connect(int port) throws InterruptedException { EventLoopGroup workGroup = new NioEventLoopGroup(); EventLoopGroup bossGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.channel(NioServerSocketChannel.class) .group(bossGroup,workGroup) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { /** * 加入自定义协议的数据处理器,指定接收到的数据类型 * 加入服务端处理器 */ ch.pipeline().addLast(new NettyProtocolHandler(RpcRequest.class)); ch.pipeline().addLast(new ServerHandler()); } }); bootstrap.bind(port).sync(); } }

Netty中绑定了两个数据处理器

一个是数据处理器,服务端接收到请求->调用方法->返回响应,这些过程都在数据处理器中执行

public class ServerHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { RpcRequest rpcRequest = (RpcRequest)msg; // 获取使用反射需要的各个参数 String methodName = rpcRequest.getMethodName(); Class[] paramTypes = rpcRequest.getParamType(); Object[] args = rpcRequest.getArgs(); String className = rpcRequest.getClassName(); //从注册中心容器中获取对象 Object object = Server.hashMap.get(className); Method method = object.getClass().getMethod(methodName,paramTypes); //反射调用方法 String result = (String) method.invoke(object,args); // 将响应结果封装好后发送回去 RpcResponse rpcResponse = new RpcResponse(); rpcResponse.setCode(200); rpcResponse.setResult(result); ctx.writeAndFlush(rpcResponse); } }
  • 这里从hash表中获取对象,有一个预先进行的操作:将有可能被远程调用的对象放入容器中,等待使用

一个是自定义的TCP协议处理器,为了解决TCP的常见问题:因为客户端发送的数据包和服务端接收数据缓冲区之间,大小不匹配导致的粘包、拆包问题。

/** * 网络传输的自定义TCP协议 * 发送时:为传输的字节流添加两个魔数作为头部,再计算数据的长度,将数据长度也添加到头部,最后才是数据 * 接收时:识别出两个魔数后,下一个就是首部,最后使用长度对应的字节数组接收数据 */ public class NettyProtocolHandler extends ChannelDuplexHandler { private static final byte[] MAGIC = new byte[]{0x15,0x66}; private Class decodeType; public NettyProtocolHandler() { } public NettyProtocolHandler(Class decodeType){ this.decodeType = decodeType; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; //接收响应对象 Object dstObject; byte[] header = new byte[2]; in.readBytes(header); byte[] lenByte = new byte[4]; in.readBytes(lenByte); int len = ByteUtils.Bytes2Int_BE(lenByte); byte[] object = new byte[len]; in.readBytes(object); dstObject = JsonSerialization.deserialize(object, decodeType); //交给下一个数据处理器 ctx.fireChannelRead(dstObject); } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf byteBuf = Unpooled.buffer(); //写入魔数 byteBuf.writeBytes(MAGIC); byte[] object = JsonSerialization.serialize(msg); //数据长度转化为字节数组并写入 int len = object.length; byte[] bodyLen = ByteUtils.int2bytes(len); byteBuf.writeBytes(bodyLen); //写入对象 byteBuf.writeBytes(object); ctx.writeAndFlush(byteBuf); } }
  • 这个数据处理器是服务端和客户端都要使用的,就相当于是一个双方定好传输数据要遵守的协议
  • 在这里进行了对象的序列化和反序列化,所以反序列化类型在这个处理器中指定
  • 这里面要将数据的长度发送,需一个将整数类型转化为字节类型的工具

转化数据工具类

View Code

客户端

将Netty的操作封装了起来,最后返回一个Channle类型,由它进行发送数据的操作

public class ClientNetty { public static Channel connect(String host,int port) throws InterruptedException { InetSocketAddress address = new InetSocketAddress(host,port); EventLoopGroup workGroup = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class) .group(workGroup) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //自定义协议handler(客户端接收的是response) ch.pipeline().addLast(new NettyProtocolHandler(RpcResponse.class)); //处理数据handler ch.pipeline().addLast(new ClientHandler()); } }); Channel channel = bootstrap.connect(address).sync().channel(); return channel; } }

数据处理器负责接收response,并将响应结果放入在future中,future的使用在后续的动态代理中

public class ClientHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { RpcResponse rpcResponse = (RpcResponse) msg; //服务端正常情况返回码为200 if(rpcResponse.getCode() != 200){ throw new Exception(); } //将结果放到future里 RPCInvocationHandler.future.complete(rpcResponse.getResult()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); } }

要让客户端在调用远程方法时像调用本地方法一样,就需要一个代理对象,供客户端调用,让代理对象去调用服务端的实现。

代理对象构造

public class ProxyFactory { public static Object getProxy(Class<?>[] interfaces){ return Proxy.newProxyInstance(ProxyFactory.class.getClassLoader(), interfaces, new RPCInvocationHandler()); } }

客户端代理对象的方法执行

将request发送给服务端后,一直阻塞,等到future里面有了结果为止。

public class RPCInvocationHandler implements InvocationHandler { static public CompletableFuture future; static Channel channel; static { future = new CompletableFuture(); //开启netty网络服务 try { channel = ClientNetty.connect("127.0.0.1",8989); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { RpcRequest rpcRequest = new RpcRequest(); rpcRequest.setArgs(args); rpcRequest.setMethodName(method.getName()); rpcRequest.setParamType(method.getParameterTypes()); rpcRequest.setClassName(method.getDeclaringClass().getSimpleName()); channel.writeAndFlush(rpcRequest); //一个阻塞操作,等待网络传输的结果 String result = (String) future.get(); return result; } }
  • 这里用static修饰future和channle,没有考虑到客户端去连接多个服务端和多次远程调用
  • 可以使用一个hash表,存储与不同服务端对应的channle,每次调用时从hash表中获取即可
  • 用hash表存储与不同request对应的future,每个响应的结果与之对应

客户端

要进行远程调用需要拥有的接口

public interface OrderService { public String buy(); }

预先的操作和测试代码

public class Client { static OrderService orderService; public static void main(String[] args) throws InterruptedException { //创建一个代理对象给进行远程调用的类 orderService = (OrderService) ProxyFactory.getProxy(new Class[]{OrderService.class}); String result = orderService.buy(); System.out.println(result); } }

服务端

要接受远程调用需要拥有的具体实现类

public class OrderImpl implements OrderService { public OrderImpl() { } @Override public String buy() { System.out.println("调用buy方法"); return "调用buy方法成功"; } }

预先操作和测试代码

public class Server { public static HashMap<String ,Object> hashMap = new HashMap<>(); public static void main(String[] args) throws InterruptedException { //开启netty网络服务 ServerNetty.connect(8989); //提前将需要开放的服务注册到hash表中 hashMap.put("OrderService",new OrderImpl()); } }

执行结果

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/15 3:33:04

Vision Studio C#中属性set和get访问器、构造函数、析构函数的应用和介绍

属性set和get访问器属性的命名:public int Age { get; set; }, 特点: 它一般不存储数据&#xff0c;可以公开接口 也可以在外部进行访问 字段:private私有的 int age 字段可以存储数据&#xff0c;一般定义成私有的 目的是保证数据的安全性set和get访问器的区别如果对类里面的字…

作者头像 李华
网站建设 2026/4/15 22:48:07

计算机毕业设计springboot面向煤矿井下人员的不安全行为管理系统 基于 Spring Boot 的煤矿井下人员安全行为监管系统设计与实现 Spring Boot 框架下煤矿井下人员不安全行为监测

计算机毕业设计springboot面向煤矿井下人员的不安全行为管理系统4046y9&#xff08;配套有源码 程序 mysql数据库 论文&#xff09; 本套源码可以在文本联xi,先看具体系统功能演示视频领取&#xff0c;可分享源码参考。随着煤矿行业的不断发展&#xff0c;井下作业环境的复杂性…

作者头像 李华
网站建设 2026/4/15 13:26:38

Bookingo – Course Booking System for WordPress: Professional Review

Bookingo – Course Booking System for WordPress: Professional Review In today’s digital landscape, offering online courses and workshops directly from your WordPress website has become increasingly vital. But managing course schedules, student enrollment…

作者头像 李华
网站建设 2026/4/14 1:19:17

数据大屏:python汽车销售数据分析可视化系统 爬虫 可视化大屏+Flask框架 deepseek 大数据毕业设计(源码)

博主介绍&#xff1a;✌全网粉丝50W&#xff0c;前互联网大厂软件研发、集结硕博英豪成立软件开发工作室&#xff0c;专注于计算机相关专业项目实战6年之久&#xff0c;累计开发项目作品上万套。凭借丰富的经验与专业实力&#xff0c;已帮助成千上万的学生顺利毕业&#xff0c;…

作者头像 李华
网站建设 2026/4/10 23:02:35

大模型教我成为大模型算法工程师之day15: 图像分割 (Image Segmentation)

Day 15: 图像分割 (Image Segmentation)摘要&#xff1a;如果说目标检测是给物体画框&#xff0c;那么图像分割就是把物体从背景中“抠”出来。它是计算机视觉中像素级别的分类任务。本文将带你从语义分割的开山之作 FCN 出发&#xff0c;深入 U-Net 和 DeepLab 细节&#xff0…

作者头像 李华