首页 > 基础资料 博客日记

【Java RPC】使用netty手写一个RPC框架 结合新特性 虚拟线程

2025-01-09 17:00:06基础资料围观51

本篇文章分享【Java RPC】使用netty手写一个RPC框架 结合新特性 虚拟线程,对你有帮助的话记得收藏一下,看Java资料网收获更多编程知识

【手写RPC框架】如何使用netty手写一个RPC框架 结合新特性 虚拟线程

什么是RPC框架

RPC(Remote Procedure Call)远程过程调用,是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC框架是一种远程调用的框架,它可以让你像调用本地方法一样调用远程方法。

避免了开发人员自己去封装网络请求、连接管理、序列化、反序列化等操作,提高了开发效率。

Netty是什么?为什么使用Netty

Netty是一个基于NIO的客户、服务器端编程框架,使用Netty可以快速开发网络应用,例如服务器和客户端。Netty是一个高性能、异步事件驱动的网络应用框架,它简化了网络编程,提供了一种新的方式来处理网络通信。

大白话粗略理解:因为Java的NIO的API使用起来比较复杂,Netty是对NIO的封装,使用起来更加简单。

所以这也是为什么我们使用Netty来实现RPC框架的原因,netty也被很多框架证明了它的稳定性和性能。

Java虚拟线程

Java虚拟线程是一个轻量级的线程,它不需要操作系统的线程支持,可以在一个线程中运行多个虚拟线程。Java虚拟线程是一个用户态的线程,它不需要操作系统的线程支持,可以在一个线程中运行多个虚拟线程。

虚拟线程实际上是通过传统的线程来管理多个虚拟线程,在Java的平台上去调度这些虚拟线程,从而实现了轻量级的线程称为虚拟线程,想要了解更加细节的可以去看下我的另一篇文章:【虚拟线程】Java虚拟线程 VirtualThread 是什么黑科技

虚拟线程的优势:

  1. 轻量级:虚拟线程是轻量级的线程,可以在一个线程中运行多个虚拟线程。
  2. 高效:虚拟线程是用户态的线程,不需要操作系统的线程支持,可以在一个线程中运行多个虚拟线程,线程的切换不涉及内核态和用户态的切换,效率更高。

适合的场景:

  1. 高并发:虚拟线程适合高并发的场景,可以在一个线程中运行多个虚拟线程,减少线程的创建和销毁,提高性能。
  2. IO密集型:虚拟线程适合IO密集型的场景,可以在一个线程中运行多个虚拟线程,减少线程的创建和销毁,提高性能。
  3. 任务短暂:虚拟线程适合任务短暂的场景,可以在一个线程中运行多个虚拟线程,减少线程的创建和销毁,提高性能。

写一个RPC框架需要哪些步骤

既然我们要写一个RPC框架,那么我们需要明确一下我们需要做哪些事情。
我们是从A服务调用B服务,那么就代表我们的服务A是客户端,服务B是服务端。但是我们的系统正常来说要调用别的服务,也会被别的服务调用,
所以我们的服务A也是服务端,服务B也是客户端。所以我们的系统要同时具备客户端和服务端的功能。

  • 客户端的功能:发现服务、请求(负载均衡、发起连接、发送请求)、接收响应、关闭连接。
  • 服务端的功能:注册服务、接收请求(接收连接、接收请求)、发送响应、关闭连接。

其实根据上面可以发现,服务端和客户端所做的事情是对应的,是一个镜像的关系。所以我们就是对应放在一起讲。

注意注意注意⚠️:

  1. 示例中的代码为了方便理解,我只摘取了主要逻辑,且做了简略,具体的实现可以看我放在最后的项目源码。
  2. 这里我们只是简单的实现一个RPC框架,所以我们只是实现了最基本的功能,实际的RPC框架还有很多功能,比如:熔断、限流、监控等等,这些功能可以根据实陫的需求来实现扩展。

1 发现服务、注册服务

注册服务:服务端想告诉别人我提供了哪些服务(接口的方法),我的地址是什么。
发现服务:客户端需要知道我调用的一些服务(接口的方法)有哪些地址(ip + 端口)可以调用。

服务发现和注册的方式有很多种,比如:zookeeper、nacos、consul、etcd等等。本次我们以zookeeper为例。

注册服务代码示例:

    private static CuratorFramework client;
    
    // 这里使用Curator框架来操作zookeeper
    public ZookeeperRegistryCenter() {
       final var zookeeper = PROPERTIES_THREAD_LOCAL.get().getRegistry().getZookeeper();
    
       RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
       final var builder = CuratorFrameworkFactory.builder()
               .connectString(zookeeper.getAddress())
               .namespace(zookeeper.getRootPath());
       client = builder.build();
    }
    // 创建一个zk客户端
    private static void create(String path, CreateMode mode) throws Exception {
        client.create()
                .creatingParentsIfNeeded()
                .withMode(mode)
                .forPath(path);
    }

发现服务代码示例:

    // 发现服务,只要监听注册中心的变化
    public void watch() {
    
        // 观察者模式,监听注册中心的变化
        registryCenter.watch((change, providerInfo) -> {
            switch (change) {
                case Change.ADD -> addServiceAddress(providerInfo);
                case Change.UPDATE -> updateServiceAddress(providerInfo);
                case Change.REMOVE -> deleteServiceAddress(providerInfo);
            }
        });
    }

    private void addOrUpdateServiceAddress(String methodStr, Pair<String, Integer> address) {
        // 这里使用SERVICE_ADDRESS_MAP(ConcurrentHashMap)本地缓存服务地址,key是接口名+方法名,value是服务地址
        SERVICE_ADDRESS_MAP.computeIfAbsent(methodStr, _ -> new CopyOnWriteArraySet<>())
                .add(address);
    }

2 请求、接收

请求代码示例:

    // 请求
    public Object send(RpcRequestMessage msg, Method method, Set<Pair<String, Integer>> addressSet) throws LRPCTimeOutException {
        // 负载均衡选择服务地址
        final var address = clazzToAddress(method, addressSet);
        // 获取连接池
        final var channelPool = getChannelPool(address);
        // 在连接池中执行请求
        return channelManager.executeWithChannelPool(channelPool, channelExeFunction, msg);
    }

2.1 负载均衡

负载均衡:客户端在发现了服务的地址之后,可能有多个服务的地址,这时候需要做负载均衡,选择一个服务的地址来调用。

    // 选择服务地址,负载均衡
    private Pair<String, Integer> clazzToAddress(Method method, Set<Pair<String, Integer>> addressSet) {
        if (addressSet != null && !addressSet.isEmpty()) {
            // 若指定了服务地址,则在指定的服务地址中选择
            return loadBalancer.selectServiceAddress(method, addressSet);
        }
        addressSet = serviceManager.getServiceAddress(method);
        // 若未指定服务地址,则在注册中心的服务地址中选择
        return loadBalancer.selectServiceAddress(method, addressSet);
    }

2.2 发起连接、接收连接

因为我们的rpc的调用会比较频繁,所以我们需要保持长连接,避免频繁的创建连接和断开,这里我们使用连接池来管理连接。

发起连接:客户端在知道了服务的地址之后,需要和服务端建立连接,建立连接后,再发送请求。

接收连接:服务端需要接收客户端的连接,接收到连接后,再接收请求。

    private FixedChannelPool getChannelPool(Pair<String, Integer> address) {
        final var host = address.left;
        final var port = address.right;
        return serviceManager.getChannelPool(address,
                // 创建连接池
                _ -> LrpcChannelPoolFactory.createFixedChannelPool(host, port, lrpcProperties.getClient().getAddressMaxConnection()));
    }
    public FixedChannelPool getChannelPool(Pair<String, Integer> address, Function<String, FixedChannelPool> mappingFunction) {
        final var host = address.left;
        final var port = address.right;
        return ADDRESS_POOL_MAP.computeIfAbsent(host + ":" + port, mappingFunction);
    }

接收连接其实就是bossGroup的处理逻辑,这里就不贴代码了,可以看最后我贴的项目源码。

2.3 发送请求、接收请求

发送请求:客户端在建立连接后,在调用服务的方法时,需要发送报文体,发送本地需要保存请求ID和Promise(用于接收调用结果,netty包装一层的future)的映射关系,用来接收响应时,根据请求ID找到对应的请求。

接收请求:服务端在接收到客户端的连接后,需要接收到客户端的请求,解析请求,调用对应的方法。

我们本次使用自定义协议,所以需要约定好报文体的格式

报文体:16字节协议约定内容 + 请求体;

16字节协议约定内容:

  (1):4个字节的长度来表示协议的魔数:就是一个固定的值,用来标识这是我们自定义的协议,这里使用'L'、'R'、'P'、'C'。

  (2):1个字节的版本号:标识这个协议的版本号,这里因为是第一个版本,所以使用1。

  (3):1个字节的序列化算法:标识这个协议使用的序列化算法,对应了序列化算法在枚举中的数组下标,这里使用的是0,表示使用JSON序列化。

  (4):4个字节的请求ID:标识这个请求的ID,用来标识这个请求的唯一性,这里使用UUID生成,可以在客户端和服务端都保存一个Map,用来保存请求ID和请求的映射关系。

  (5):1个字节的消息类型:标识这个消息的类型,是请求还是响应,这里使用1表示请求消息,2表示响应消息。

  (6):4个字节的请求体的长度:使用Integer类型,表示请求体的长度,在接收请求时,根据这个长度来解析请求体。

  (7):1个字节的补充位;无实际意义,只是为了对齐16字节。

请求体:序列化后转成字节数组,内容有:接口名 + 方法名 + 返回参数类型 + 请求参数类型数组 + 请求参数值数组。

按刚刚上面约定好的协议格式解析,然后将请求体的内容反序列化,得到消息类型,使用LengthFieldBasedFrameDecoder解码器,解决粘包和拆包问题,得到请求体的字节数组,然后反序列化,

得到消息后,获取到接口名、方法名、返回参数类型、请求参数类型数组、请求参数值数组,使用动态代理调用对应的方法,得到返回值。


    public <T> T getProxy(Class<T> clazz, Set<Pair<String, Integer>> serviceAddress) {
        // 使用代理的方式,调用方法
        final var proxyInstance = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, (proxy, method, args) -> {
            RpcRequestMessage msg = buildRpcRequestMessage(clazz, method, args);
            return consumerManager.send(msg, method, serviceAddress);
        });
        return clazz.cast(proxyInstance);
    }
    
    public Object executeWithChannelPool(ChannelPool channelPool,
                                                  BiFunction<Channel, RpcRequestMessage, Promise<Object>> function,
                                                  RpcRequestMessage msg) throws LRPCTimeOutException {
        // 1. 从连接池中获取连接,等待超市时间,未获取连接则抛出异常
        final Future<Channel> future = channelPool.acquire();
        Channel channel = future.get();
        final var promise = function.apply(channel, msg);
        try {
            return getResult(promise, msg.getMessageId());
        } finally {
            // 这里的释放需要放在拿到结果之后,否则会导臃连接被释放
            channelPool.release(channel);
        }
    }
    
    private static BiFunction<Channel, RpcRequestMessage, Promise<Object>> channelExeFunction() {
        // 发送请求,且处理写失败
        return (channel, msg) -> {
            final var promise = new DefaultPromise<>(channel.eventLoop());
            RpcRespHandler.addPromise(msg.getMessageId(), promise);
            // 发送请求,且处理写失败
            final var channelFuture = channel.writeAndFlush(msg);
            channelFuture.addListener(processAftermath(promise, msg));
            return promise;
        };
    }

接收处理请求


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage msg) {

        log.info("接收到消息 {}", JSON.toJSON(msg));
        final var interfaceName = msg.getInterfaceName();
        final var methodName = msg.getMethodName();
        
        // 根据接口名获取服务的本地实例
        final var service = serviceManager.getService(interfaceName);

        final var response = new RpcResponseMessage();
        response.setMessageId(msg.getMessageId());
        try {
            // 使用反射调用方法
            final Class<?> aClass = service.getClass();
            final var method = aClass.getMethod(methodName, msg.getParameterTypes());
            final var result = method.invoke(service, msg.getParameterValues());
            response.setReturnValue(result);
        } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
            log.error("e : ", e);
            response.setExceptionValue(new Error(e.getCause().getMessage()));
        }
        
        // 以下属于发送响应的逻辑
        ctx.writeAndFlush(response).addListener(future -> {
            if (future.isSuccess()) {
                log.info("消息响应成功 {}", JSON.toJSON(msg));
                return;
            }
            log.error("发送消息时有错误发生: ", future.cause());
        });
    }


4 发送响应、接收响应

得到第6步的返回值后,需要将返回值封装成响应报文体,发送给客户端。

这里发送响应的方式其实是和发送请求的方式是一样的,只是消息类型不一样,这里是响应消息。
客户端接收到响应后,根据请求ID找到对应的请求,将响应的内容返回给调用方。

发送响应


        // 在刚刚接收请求处理的channelRead0函数中,处理发送响应的逻辑
        ctx.writeAndFlush(response).addListener(future -> {
            if (future.isSuccess()) {
                log.info("消息响应成功 {}", JSON.toJSON(msg));
                return;
            }
            log.error("发送消息时有错误发生: ", future.cause());
        });

接收响应

    private static Object getResult(Promise<Object> promise, Integer messageId) throws LRPCTimeOutException {
        try {
            // 超时等待
            if (promise.await(5, TimeUnit.SECONDS)) {
                if (promise.isSuccess()) {
                    return promise.getNow();
                } else {
                    throw new RuntimeException(promise.cause());
                }
            } else {
                throw new LRPCTimeOutException("请求超时");
            }
        } catch (InterruptedException e) {
            throw new RuntimeException("操作被中断", e);
        } finally {
            // 确保 promise 被移除
            RpcRespHandler.removePromise(messageId);
        }
    }

5 关闭连接

关闭连接:客户端和服务端在完成请求和响应后,会把连接放回连接池,等待下一次的调用,等连接池关闭时,会关闭连接,服务端感应到连接关闭,会关闭连接。

怎么将虚拟线程和Netty结合起来

分析

前面我们说过,虚拟线程适合高并发、IO密集型的场景,可以在一个线程中运行多个虚拟线程,减少线程的创建和销毁,提高性能。

看一下netty的服务端网络通信的架构简图:

在netty中,一个NioEventLoop中有一个Selector,一个Selector可以注册多个Channel,一个Channel对应一个连接,一个线程可以处理多个连接,这就是netty的高性能的原因。
在每次循环中,Selector就会阻塞监听Channel的事件,当有事件发生时,就会处理这个事件。
所以在这过程中,线程的数量,影响着Selector的数量,影响着Channel的数量,但是在传统的线程中,线程的数量是有限的,所以这就限制了Selector的数量,影响着Channel的数量,影响着性能,
所以我们可以使用虚拟线程来解决这个问题,虚拟线程可以在一个线程中运行多个虚拟线程,且虚拟线程会在其中一个虚拟线程阻塞时,会切换到其他虚拟线程,且没有系统级别的上下文切换,所以可以带来更高的性能。
所以我们这里主要是改变workerGroup的线程模型,使用虚拟线程来代替workerGroup里的传统的线程。

实现

根据netty的NioEventGroup的源码,线程来自三个地方:

  1. 构造函数的入参的线程工厂;
  2. 构造参数的入参的executor;
  3. 父类io.netty.channel.MultithreadEventLoopGroup#newDefaultThreadFactory()方法返回的线程工厂;
    这里我们以重写父类的newDefaultThreadFactory()方法为例,来实现虚拟线程。
    private NioEventLoopGroup getWorker() {
        final var workerMax = lrpcProperties.getServer().getWorkerMax();
        // 创建workerGroup
        return new NioEventLoopGroup(workerMax) {
            // 直接在创建的时候重写newDefaultThreadFactory()方法
            @Override
            protected ThreadFactory newDefaultThreadFactory() {
                return new VirtualThreadFactory(NioEventLoopGroup.class, Thread.MAX_PRIORITY);
            }
        };
    }

// 这里是重写的ThreadFactory
public class VirtualThreadFactory extends DefaultThreadFactory {
   public VirtualThreadFactory(Class<?> poolType, int priority) {
      super(poolType, priority);
   }

   @Override
   protected Thread newThread(Runnable r, String name) {
      // 这里使用FastThreadLocalThread,是因为FastThreadLocalThread是netty提供的一个线程,里面的方法有些功能,所以我们这里直接继承它,然后重写start()方法
      return new FastThreadLocalThread(threadGroup, r, name){
         // 这里的Thread.ofVirtual().unstarted(this)是创建一个虚拟线程
         @Override
         public void start() {
            final var unstarted = Thread.ofVirtual().unstarted(this);
            unstarted.setName(this.getName());
            unstarted.start();
         }
      };
   }
}

总结

本次我们实现了一个简单的RPC框架,使用了netty作为底层通信框架,使用了zookeeper作为服务发现和注册中心,使用了虚拟线程代替服务端的workerGroup的线程模型,扩展了可管控的Selector的数量,且在线程的切换上,没有系统级别的上下文切换,提高了性能。

这里只是一个简单的实现,实际的RPC框架还有很多功能,比如:熔断、限流、监控等等,这些功能可以根据实陫的需求来实现,而且在实际的实现过程中,还会遇到很多问题,比如:序列化和反序列化扩展、线程安全问题等等,都值得我们去深入研究。
这里分享一下我的实现的代码,麻烦老哥们帮忙点个star 😭 ,谢谢!有问题可以留言,我会在第一有空闲的时间回复。
项目地址:JGZHAN/lrpc 戳这里去点star


文章来源:https://www.cnblogs.com/seazhan/p/18662414
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!

标签:

相关文章

本站推荐

标签云