首页 > 极客资料 博客日记
【Java RPC】使用netty手写一个RPC框架 结合新特性 虚拟线程
2025-01-09 17:00:06极客资料围观1次
【手写RPC框架】如何使用netty手写一个RPC框架 结合新特性 虚拟线程
什么是RPC框架
RPC(Remote Procedure Call)远程过程调用,是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC框架是一种远程调用的框架,它可以让你像调用本地方法一样调用远程方法。
避免了开发人员自己去封装网络请求、连接管理、序列化、反序列化等操作,提高了开发效率。
Netty是什么?为什么使用Netty
Netty是一个基于NIO的客户、服务器端编程框架,使用Netty可以快速开发网络应用,例如服务器和客户端。Netty是一个高性能、异步事件驱动的网络应用框架,它简化了网络编程,提供了一种新的方式来处理网络通信。
大白话粗略理解:因为Java的NIO的API使用起来比较复杂,Netty是对NIO的封装,使用起来更加简单。
所以这也是为什么我们使用Netty来实现RPC框架的原因,netty也被很多框架证明了它的稳定性和性能。
Java虚拟线程
Java虚拟线程是一个轻量级的线程,它不需要操作系统的线程支持,可以在一个线程中运行多个虚拟线程。Java虚拟线程是一个用户态的线程,它不需要操作系统的线程支持,可以在一个线程中运行多个虚拟线程。
虚拟线程实际上是通过传统的线程来管理多个虚拟线程,在Java的平台上去调度这些虚拟线程,从而实现了轻量级的线程称为虚拟线程,想要了解更加细节的可以去看下我的另一篇文章:【虚拟线程】Java虚拟线程 VirtualThread 是什么黑科技
虚拟线程的优势:
- 轻量级:虚拟线程是轻量级的线程,可以在一个线程中运行多个虚拟线程。
- 高效:虚拟线程是用户态的线程,不需要操作系统的线程支持,可以在一个线程中运行多个虚拟线程,线程的切换不涉及内核态和用户态的切换,效率更高。
适合的场景:
- 高并发:虚拟线程适合高并发的场景,可以在一个线程中运行多个虚拟线程,减少线程的创建和销毁,提高性能。
- IO密集型:虚拟线程适合IO密集型的场景,可以在一个线程中运行多个虚拟线程,减少线程的创建和销毁,提高性能。
- 任务短暂:虚拟线程适合任务短暂的场景,可以在一个线程中运行多个虚拟线程,减少线程的创建和销毁,提高性能。
写一个RPC框架需要哪些步骤
既然我们要写一个RPC框架,那么我们需要明确一下我们需要做哪些事情。
我们是从A服务调用B服务,那么就代表我们的服务A是客户端,服务B是服务端。但是我们的系统正常来说要调用别的服务,也会被别的服务调用,
所以我们的服务A也是服务端,服务B也是客户端。所以我们的系统要同时具备客户端和服务端的功能。
- 客户端的功能:发现服务、请求(负载均衡、发起连接、发送请求)、接收响应、关闭连接。
- 服务端的功能:注册服务、接收请求(接收连接、接收请求)、发送响应、关闭连接。
其实根据上面可以发现,服务端和客户端所做的事情是对应的,是一个镜像的关系。所以我们就是对应放在一起讲。
注意注意注意⚠️:
- 示例中的代码为了方便理解,我只摘取了主要逻辑,且做了简略,具体的实现可以看我放在最后的项目源码。
- 这里我们只是简单的实现一个RPC框架,所以我们只是实现了最基本的功能,实际的RPC框架还有很多功能,比如:熔断、限流、监控等等,这些功能可以根据实陫的需求来实现扩展。
1 发现服务、注册服务
注册服务:服务端想告诉别人我提供了哪些服务(接口的方法),我的地址是什么。
发现服务:客户端需要知道我调用的一些服务(接口的方法)有哪些地址(ip + 端口)可以调用。
服务发现和注册的方式有很多种,比如:zookeeper、nacos、consul、etcd等等。本次我们以zookeeper为例。
注册服务代码示例:
private static CuratorFramework client;
// 这里使用Curator框架来操作zookeeper
public ZookeeperRegistryCenter() {
final var zookeeper = PROPERTIES_THREAD_LOCAL.get().getRegistry().getZookeeper();
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
final var builder = CuratorFrameworkFactory.builder()
.connectString(zookeeper.getAddress())
.namespace(zookeeper.getRootPath());
client = builder.build();
}
// 创建一个zk客户端
private static void create(String path, CreateMode mode) throws Exception {
client.create()
.creatingParentsIfNeeded()
.withMode(mode)
.forPath(path);
}
发现服务代码示例:
// 发现服务,只要监听注册中心的变化
public void watch() {
// 观察者模式,监听注册中心的变化
registryCenter.watch((change, providerInfo) -> {
switch (change) {
case Change.ADD -> addServiceAddress(providerInfo);
case Change.UPDATE -> updateServiceAddress(providerInfo);
case Change.REMOVE -> deleteServiceAddress(providerInfo);
}
});
}
private void addOrUpdateServiceAddress(String methodStr, Pair<String, Integer> address) {
// 这里使用SERVICE_ADDRESS_MAP(ConcurrentHashMap)本地缓存服务地址,key是接口名+方法名,value是服务地址
SERVICE_ADDRESS_MAP.computeIfAbsent(methodStr, _ -> new CopyOnWriteArraySet<>())
.add(address);
}
2 请求、接收
请求代码示例:
// 请求
public Object send(RpcRequestMessage msg, Method method, Set<Pair<String, Integer>> addressSet) throws LRPCTimeOutException {
// 负载均衡选择服务地址
final var address = clazzToAddress(method, addressSet);
// 获取连接池
final var channelPool = getChannelPool(address);
// 在连接池中执行请求
return channelManager.executeWithChannelPool(channelPool, channelExeFunction, msg);
}
2.1 负载均衡
负载均衡:客户端在发现了服务的地址之后,可能有多个服务的地址,这时候需要做负载均衡,选择一个服务的地址来调用。
// 选择服务地址,负载均衡
private Pair<String, Integer> clazzToAddress(Method method, Set<Pair<String, Integer>> addressSet) {
if (addressSet != null && !addressSet.isEmpty()) {
// 若指定了服务地址,则在指定的服务地址中选择
return loadBalancer.selectServiceAddress(method, addressSet);
}
addressSet = serviceManager.getServiceAddress(method);
// 若未指定服务地址,则在注册中心的服务地址中选择
return loadBalancer.selectServiceAddress(method, addressSet);
}
2.2 发起连接、接收连接
因为我们的rpc的调用会比较频繁,所以我们需要保持长连接,避免频繁的创建连接和断开,这里我们使用连接池来管理连接。
发起连接:客户端在知道了服务的地址之后,需要和服务端建立连接,建立连接后,再发送请求。
接收连接:服务端需要接收客户端的连接,接收到连接后,再接收请求。
private FixedChannelPool getChannelPool(Pair<String, Integer> address) {
final var host = address.left;
final var port = address.right;
return serviceManager.getChannelPool(address,
// 创建连接池
_ -> LrpcChannelPoolFactory.createFixedChannelPool(host, port, lrpcProperties.getClient().getAddressMaxConnection()));
}
public FixedChannelPool getChannelPool(Pair<String, Integer> address, Function<String, FixedChannelPool> mappingFunction) {
final var host = address.left;
final var port = address.right;
return ADDRESS_POOL_MAP.computeIfAbsent(host + ":" + port, mappingFunction);
}
接收连接其实就是bossGroup的处理逻辑,这里就不贴代码了,可以看最后我贴的项目源码。
2.3 发送请求、接收请求
发送请求:客户端在建立连接后,在调用服务的方法时,需要发送报文体,发送本地需要保存请求ID和Promise(用于接收调用结果,netty包装一层的future)的映射关系,用来接收响应时,根据请求ID找到对应的请求。
接收请求:服务端在接收到客户端的连接后,需要接收到客户端的请求,解析请求,调用对应的方法。
我们本次使用自定义协议,所以需要约定好报文体的格式
报文体:16字节协议约定内容 + 请求体;
16字节协议约定内容:
(1):4个字节的长度来表示协议的魔数:就是一个固定的值,用来标识这是我们自定义的协议,这里使用'L'、'R'、'P'、'C'。
(2):1个字节的版本号:标识这个协议的版本号,这里因为是第一个版本,所以使用1。
(3):1个字节的序列化算法:标识这个协议使用的序列化算法,对应了序列化算法在枚举中的数组下标,这里使用的是0,表示使用JSON序列化。
(4):4个字节的请求ID:标识这个请求的ID,用来标识这个请求的唯一性,这里使用UUID生成,可以在客户端和服务端都保存一个Map,用来保存请求ID和请求的映射关系。
(5):1个字节的消息类型:标识这个消息的类型,是请求还是响应,这里使用1表示请求消息,2表示响应消息。
(6):4个字节的请求体的长度:使用Integer类型,表示请求体的长度,在接收请求时,根据这个长度来解析请求体。
(7):1个字节的补充位;无实际意义,只是为了对齐16字节。
请求体:序列化后转成字节数组,内容有:接口名 + 方法名 + 返回参数类型 + 请求参数类型数组 + 请求参数值数组。
按刚刚上面约定好的协议格式解析,然后将请求体的内容反序列化,得到消息类型,使用LengthFieldBasedFrameDecoder解码器,解决粘包和拆包问题,得到请求体的字节数组,然后反序列化,
得到消息后,获取到接口名、方法名、返回参数类型、请求参数类型数组、请求参数值数组,使用动态代理调用对应的方法,得到返回值。
public <T> T getProxy(Class<T> clazz, Set<Pair<String, Integer>> serviceAddress) {
// 使用代理的方式,调用方法
final var proxyInstance = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, (proxy, method, args) -> {
RpcRequestMessage msg = buildRpcRequestMessage(clazz, method, args);
return consumerManager.send(msg, method, serviceAddress);
});
return clazz.cast(proxyInstance);
}
public Object executeWithChannelPool(ChannelPool channelPool,
BiFunction<Channel, RpcRequestMessage, Promise<Object>> function,
RpcRequestMessage msg) throws LRPCTimeOutException {
// 1. 从连接池中获取连接,等待超市时间,未获取连接则抛出异常
final Future<Channel> future = channelPool.acquire();
Channel channel = future.get();
final var promise = function.apply(channel, msg);
try {
return getResult(promise, msg.getMessageId());
} finally {
// 这里的释放需要放在拿到结果之后,否则会导臃连接被释放
channelPool.release(channel);
}
}
private static BiFunction<Channel, RpcRequestMessage, Promise<Object>> channelExeFunction() {
// 发送请求,且处理写失败
return (channel, msg) -> {
final var promise = new DefaultPromise<>(channel.eventLoop());
RpcRespHandler.addPromise(msg.getMessageId(), promise);
// 发送请求,且处理写失败
final var channelFuture = channel.writeAndFlush(msg);
channelFuture.addListener(processAftermath(promise, msg));
return promise;
};
}
接收处理请求
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage msg) {
log.info("接收到消息 {}", JSON.toJSON(msg));
final var interfaceName = msg.getInterfaceName();
final var methodName = msg.getMethodName();
// 根据接口名获取服务的本地实例
final var service = serviceManager.getService(interfaceName);
final var response = new RpcResponseMessage();
response.setMessageId(msg.getMessageId());
try {
// 使用反射调用方法
final Class<?> aClass = service.getClass();
final var method = aClass.getMethod(methodName, msg.getParameterTypes());
final var result = method.invoke(service, msg.getParameterValues());
response.setReturnValue(result);
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
log.error("e : ", e);
response.setExceptionValue(new Error(e.getCause().getMessage()));
}
// 以下属于发送响应的逻辑
ctx.writeAndFlush(response).addListener(future -> {
if (future.isSuccess()) {
log.info("消息响应成功 {}", JSON.toJSON(msg));
return;
}
log.error("发送消息时有错误发生: ", future.cause());
});
}
4 发送响应、接收响应
得到第6步的返回值后,需要将返回值封装成响应报文体,发送给客户端。
这里发送响应的方式其实是和发送请求的方式是一样的,只是消息类型不一样,这里是响应消息。
客户端接收到响应后,根据请求ID找到对应的请求,将响应的内容返回给调用方。
发送响应
// 在刚刚接收请求处理的channelRead0函数中,处理发送响应的逻辑
ctx.writeAndFlush(response).addListener(future -> {
if (future.isSuccess()) {
log.info("消息响应成功 {}", JSON.toJSON(msg));
return;
}
log.error("发送消息时有错误发生: ", future.cause());
});
接收响应
private static Object getResult(Promise<Object> promise, Integer messageId) throws LRPCTimeOutException {
try {
// 超时等待
if (promise.await(5, TimeUnit.SECONDS)) {
if (promise.isSuccess()) {
return promise.getNow();
} else {
throw new RuntimeException(promise.cause());
}
} else {
throw new LRPCTimeOutException("请求超时");
}
} catch (InterruptedException e) {
throw new RuntimeException("操作被中断", e);
} finally {
// 确保 promise 被移除
RpcRespHandler.removePromise(messageId);
}
}
5 关闭连接
关闭连接:客户端和服务端在完成请求和响应后,会把连接放回连接池,等待下一次的调用,等连接池关闭时,会关闭连接,服务端感应到连接关闭,会关闭连接。
怎么将虚拟线程和Netty结合起来
分析
前面我们说过,虚拟线程适合高并发、IO密集型的场景,可以在一个线程中运行多个虚拟线程,减少线程的创建和销毁,提高性能。
看一下netty的服务端网络通信的架构简图:
在netty中,一个NioEventLoop中有一个Selector,一个Selector可以注册多个Channel,一个Channel对应一个连接,一个线程可以处理多个连接,这就是netty的高性能的原因。
在每次循环中,Selector就会阻塞监听Channel的事件,当有事件发生时,就会处理这个事件。
所以在这过程中,线程的数量,影响着Selector的数量,影响着Channel的数量,但是在传统的线程中,线程的数量是有限的,所以这就限制了Selector的数量,影响着Channel的数量,影响着性能,
所以我们可以使用虚拟线程来解决这个问题,虚拟线程可以在一个线程中运行多个虚拟线程,且虚拟线程会在其中一个虚拟线程阻塞时,会切换到其他虚拟线程,且没有系统级别的上下文切换,所以可以带来更高的性能。
所以我们这里主要是改变workerGroup的线程模型,使用虚拟线程来代替workerGroup里的传统的线程。
实现
根据netty的NioEventGroup的源码,线程来自三个地方:
- 构造函数的入参的线程工厂;
- 构造参数的入参的executor;
- 父类io.netty.channel.MultithreadEventLoopGroup#newDefaultThreadFactory()方法返回的线程工厂;
这里我们以重写父类的newDefaultThreadFactory()方法为例,来实现虚拟线程。
private NioEventLoopGroup getWorker() {
final var workerMax = lrpcProperties.getServer().getWorkerMax();
// 创建workerGroup
return new NioEventLoopGroup(workerMax) {
// 直接在创建的时候重写newDefaultThreadFactory()方法
@Override
protected ThreadFactory newDefaultThreadFactory() {
return new VirtualThreadFactory(NioEventLoopGroup.class, Thread.MAX_PRIORITY);
}
};
}
// 这里是重写的ThreadFactory
public class VirtualThreadFactory extends DefaultThreadFactory {
public VirtualThreadFactory(Class<?> poolType, int priority) {
super(poolType, priority);
}
@Override
protected Thread newThread(Runnable r, String name) {
// 这里使用FastThreadLocalThread,是因为FastThreadLocalThread是netty提供的一个线程,里面的方法有些功能,所以我们这里直接继承它,然后重写start()方法
return new FastThreadLocalThread(threadGroup, r, name){
// 这里的Thread.ofVirtual().unstarted(this)是创建一个虚拟线程
@Override
public void start() {
final var unstarted = Thread.ofVirtual().unstarted(this);
unstarted.setName(this.getName());
unstarted.start();
}
};
}
}
总结
本次我们实现了一个简单的RPC框架,使用了netty作为底层通信框架,使用了zookeeper作为服务发现和注册中心,使用了虚拟线程代替服务端的workerGroup的线程模型,扩展了可管控的Selector的数量,且在线程的切换上,没有系统级别的上下文切换,提高了性能。
这里只是一个简单的实现,实际的RPC框架还有很多功能,比如:熔断、限流、监控等等,这些功能可以根据实陫的需求来实现,而且在实际的实现过程中,还会遇到很多问题,比如:序列化和反序列化扩展、线程安全问题等等,都值得我们去深入研究。
这里分享一下我的实现的代码,麻烦老哥们帮忙点个star 😭 ,谢谢!有问题可以留言,我会在第一有空闲的时间回复。
项目地址:JGZHAN/lrpc 戳这里去点star
标签:
上一篇:Kubernetes GPU 虚拟化方案
下一篇:linux 手动释放内存