【Netty】Netty核心API及使用

2023-12-20 17:14:10

Netty核心API

ChannelHandler及其实现类

ChannelHandler 接口定义了许多事件处理的方法,我们可以通过重写这些方法去实现具体的业务逻辑。API 关系如下图所示

在这里插入图片描述

Netty开发中需要自定义一个 Handler 类去实现 ChannelHandle接口或其子接口或其实现类。

主要有两类:

  • ChannelInboundHandler:数据流向是从外面传进来的,Netty接收到数据后,经过若干 InboundHandler 处理后接收成功。例如channelRead,外面传数据来了,管道有数据了就会触发;channelActive,外面请求连接,管道被激活了就会触发。
  • ChannelOutboundHandler:数据流向是从内向外发的,Netty要输出数据,就需要经过若干个 OutboundHandler 处理完成后发送。例如bind,绑定端口;connect,连接;disconnect,断开连接;close关闭通道;write写数据。

可通过重写相应方法实现业务逻辑,一般都需要重写以下方法:

  • public void channelActive(ChannelHandlerContext ctx),通道就绪事件
  • public void channelRead(ChannelHandlerContext ctx, Object msg),通道读取数据事件
  • public void channelReadComplete(ChannelHandlerContext ctx) ,数据读取完毕事件
  • public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause),通道发生异常事件

ChannelHandlerContext 代表的是ChannelPipline链上的节点。

Netty也提供了一些自带的实现类,如:

  • SimpleChannelInboundHandler:继承ChannelInboundHandlerAdapter,Netty 提供的一个方便的入站事件处理器,它在处理数据读取事件时自动管理消息对象的引用计数,避免了很多潜在的内存泄漏问题。在大多数情况下,如果只需要处理数据读取事件,并且不需要手动管理消息对象的引用计数,推荐使用。
  • SimpleChannelOutboundHandler:继承自 ChannelOutboundHandlerAdapter,用于处理出站事件。并且在消息处理完毕后会自动释放消息对象的引用计数,避免了很多潜在的内存泄漏问题,避免了手动释放消息的步骤。

ChannelPipeline

ChannelPipeline 是一个 Handler 的集合,它负责处理和拦截 inbound 或者 outbound 的事件和操作,相当于一个贯穿 Netty 的责任链。

在这里插入图片描述

如果客户端和服务器的Handler是一样的,消息从客户端到服务端或者反过来,每个Inbound类型或Outbound类型的Handler只会经过一次,混合类型的Handler(实现了Inbound和Outbound的Handler)会经过两次。准确的说ChannelPipeline中是一个ChannelHandlerContext,每个上下文对象中有ChannelHandler。

InboundHandler是按照Pipleline的加载顺序的顺序执行, OutboundHandler是按照Pipeline的加载顺序,逆序执行

ChannelHandlerContext

这 是 事 件 处 理 器 上 下 文 对 象 , Pipeline 链 中 的 实 际 处 理 节 点 。 每 个 处 理 节 点ChannelHandlerContext 中 包 含 一 个 具 体 的 事 件 处 理 器 ChannelHandler ,同时ChannelHandlerContext 中也绑定了对应的 ChannelPipeline和 Channel 的信息,方便对ChannelHandler 进行调用。常用方法如下所示:

  • ChannelFuture close(),关闭通道
  • ChannelOutboundInvoker flush(),刷新
  • ChannelFuture writeAndFlush(Object msg) , 将数据写到 ChannelPipeline 中当前ChannelHandler 的下一个 ChannelHandler,开始处理(出站)

ChannelOption

Netty 在创建 Channel 实例后,一般都需要设置 ChannelOption 参数。ChannelOption 是 Socket 的标准参数,而非 Netty 独创的。常用的参数配置有:

  • ChannelOption.SO_BACKLOG

    对应 TCP/IP 协议 listen 函数中的 backlog 参数,用来初始化服务器可连接队列大小。服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。多个客户 端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog 参数指定 了队列的大小。

  • ChannelOption.SO_KEEPALIVE

    一直保持连接活动状态。该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。

ChannelFuture

表示 Channel 中异步 I/O 操作的结果,在 Netty 中所有的 I/O 操作都是异步的,I/O 的调用会直接返回,调用者并不能立刻获得结果,但是可以通过 ChannelFuture 来获取 I/O 操作 的处理状态。

常用方法如下所示:

  • Channel channel(),返回当前正在进行 IO 操作的通道
  • ChannelFuture sync(),等待异步操作执行完毕,将异步改为同步

EventLoopGroup和实现类NioEventLoopGroup

EventLoopGroup是一组 EventLoop 的抽象,Netty 为了更好的利用多核 CPU 资源,一般会有多个EventLoop 同时工作,每个 EventLoop 维护着一个 Selector 实例。

EventLoopGroup 提供 next 接口,可以从组里面按照一定规则获取其中一个 EventLoop 来处理任务。在 Netty 服务器端编程中,我们一般都需要提供两个 EventLoopGroup,例如:BossEventLoopGroup 和 WorkerEventLoopGroup。 通常一个服务端口即一个 ServerSocketChannel对应一个Selector 和一个EventLoop线程。 BossEventLoop 负责接收客户端的连接并将SocketChannel 交给 WorkerEventLoopGroup 来进 行 IO 处理,如下图所示:

在这里插入图片描述

BossEventLoopGroup 通常是一个单线程的 EventLoop,EventLoop 维护着一个注册了ServerSocketChannel 的 Selector 实例,BossEventLoop 不断轮询 Selector 将连接事件分离出来, 通常是 OP_ACCEPT 事件,然后将接收到的 SocketChannel 交给 WorkerEventLoopGroup,WorkerEventLoopGroup 会由 next 选择其中一个 EventLoopGroup 来将这个 SocketChannel 注册到其维护的 Selector 并对其后续的 IO 事件进行处理。

一般情况下我们都是用实现类NioEventLoopGroup

常用方法如下所示:

  • public NioEventLoopGroup(),构造方法,创建线程组
  • public Future<?> shutdownGracefully(),断开连接,关闭线程

ServerBootstrap和Bootstrap

ServerBootstrap 是 Netty 中的服务器端启动助手,通过它可以完成服务器端的各种配置。

Bootstrap 是 Netty 中的客户端启动助手,通过它可以完成客户端的各种配置。

常用方法如下所示:

  • public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroupchildGroup), 该方法用于服务器端,用来设置两个 EventLoop
  • public B group(EventLoopGroup group) ,该方法用于客户端,用来设置一个 EventLoop
  • public B channel(Class<? extends C> channelClass),该方法用来设置一个服务器端的通道实现
  • public B option(ChannelOption option, T value),用来给 ServerChannel 添加配置
  • `public ServerBootstrap childOption(ChannelOption childOption, T value),用来给接收到的通道添加配置
  • public ServerBootstrap childHandler(ChannelHandler childHandler),该方法用来设置业务处理类(自定义的 handler)
  • public ChannelFuture bind(int inetPort) ,该方法用于服务器端,用来设置占用的端口号
  • public ChannelFuture connect(String inetHost, int inetPort) ,该方法用于客户端,用来连 接服务器端

Unpooled类

这是 Netty 提供的一个专门用来操作缓冲区的工具类,常用方法如下所示:

  • public static ByteBuf copiedBuffer(CharSequence string, Charset charset),通过给定的数据和字符编码返回一个 ByteBuf 对象(类似于 NIO 中的 ByteBuffer 对象)

Netty入门案例

引入依赖

Netty 是由 JBOSS 提供的一个 Java 开源框架,所以在使用得时候首先得导入Netty的maven坐标

<dependency> 
  <groupId>io.netty</groupId> 
  <artifactId>netty-all</artifactId>
  <version>4.1.42.Final</version> 
</dependency>

Netty服务端编写

服务端实现步骤:

  1. 创建bossGroup线程组: 处理网络事件–连接事件
  2. 创建workerGroup线程组: 处理网络事件–读写事件
  3. 创建服务端启动助手
  4. 设置bossGroup线程组和workerGroup线程组
  5. 设置服务端通道实现为NIO
  6. 参数设置
  7. 创建一个通道初始化对象
  8. 向pipeline中添加自定义业务处理handler
  9. 启动服务端并绑定端口,同时将异步改为同步
  10. 关闭通道和关闭连接池

代码实现:

首先编写自定义的ChannelHandler,后面会加到服务端的handler里,代码如下:

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.util.CharsetUtil;

public class NettyServerHandler implements ChannelInboundHandler {
    /**
     * 通道读取事件
     *
     * @param channelHandlerContext
     * @param o
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
        ByteBuf byteBuf = (ByteBuf) o;
        System.out.println("客户端发送过来的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
    }

    /**
     * 通道读取完毕事件
     *
     * @param channelHandlerContext
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
        // 消息出站
        channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer("你好.我是Netty服务端",
                CharsetUtil.UTF_8));
    }

    /**
     * 通道异常事件
     *
     * @param channelHandlerContext
     * @param throwable
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) throws Exception {
        throwable.printStackTrace();
        channelHandlerContext.close();
    }

    @Override
    public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {

    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void handlerAdded(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void handlerRemoved(ChannelHandlerContext channelHandlerContext) throws Exception {

    }
}

然后编写服务端的代码:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyServer {
    public static void main(String[] args) throws InterruptedException {
        //1. 创建bossGroup线程组: 处理网络事件--连接事件。线程数默认为: 2 * 处理器线程数
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        //2. 创建workerGroup线程组: 处理网络事件--读写事件 2*处理器线程数
        NioEventLoopGroup workerGroup = new NioEventLoopGroup(1);
        //3.创建服务端启动助手
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        //4. 设置bossGroup线程组和workerGroup线程组
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class) //5. 设置服务端通道实现为NIO
                .option(ChannelOption.SO_BACKLOG, 128) //6. 参数设置:初始化服务器可连接队列大小
                .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE) //6. 参数设置: 一直保持连接活动状态
                .childHandler(new ChannelInitializer<SocketChannel>() { //7. 创建一个通道初始化对象
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        //8. 向pipeline中添加自定义业务处理handler
                        socketChannel.pipeline().addLast(new NettyServerHandler());
                    }
                });
        //9. 启动服务端并绑定端口,同时将异步改为同步
        ChannelFuture future = serverBootstrap.bind(9999).sync();
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (future.isSuccess()) {
                    System.out.println("端口绑定成功!");
                } else {
                    System.out.println("端口绑定失败!");
                }
            }
        });
        System.out.println("服务端启动成功.");
        //10. 关闭通道(并不是真正意义上关闭,而是监听通道关闭的状态)和关闭连接池
        future.channel().closeFuture().sync();
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

Netty客户端编写

客户端实现步骤:

  1. 创建线程组
  2. 创建客户端启动助手
  3. 设置线程组
  4. 设置客户端通道实现为NIO
  5. 创建一个通道初始化对象
  6. 向pipeline中添加自定义业务处理handler
  7. 启动客户端,等待连接服务端,同时将异步改为同步
  8. 关闭通道和关闭连接池

代码实现:

首先编写自定义的ChannelHandler,后面会加到服客户端的handler里,代码如下:

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.util.CharsetUtil;


public class NettyClientHandler implements ChannelInboundHandler {
    /**
     * 通道就绪事件
     *
     * @param channelHandlerContext
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        ChannelFuture future = channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer("你好呀.我是Netty客户端",
                CharsetUtil.UTF_8));
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    System.out.println("数据发送成功!");
                } else {
                    System.out.println("数据发送失败!");
                }
            }
        });
    }

    /**
     * 通道读就绪事件
     *
     * @param channelHandlerContext
     * @param o
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
        ByteBuf byteBuf = (ByteBuf) o;
        System.out.println("服务端发送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {

    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void handlerAdded(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void handlerRemoved(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) throws Exception {

    }
}

客户端代码:

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;


public class NettyClient {
    public static void main(String[] args) throws InterruptedException {
        //1. 创建线程组
        NioEventLoopGroup group = new NioEventLoopGroup();
        //2. 创建客户端启动助手
        Bootstrap bootstrap = new Bootstrap();
        //3. 设置线程组
        bootstrap.group(group)
                .channel(NioSocketChannel.class) //4. 设置客户端通道实现为NIO
                .handler(new ChannelInitializer<SocketChannel>() { //5. 创建一个通道初始化对象
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        //6. 向pipeline中添加自定义业务处理handler
                        socketChannel.pipeline().addLast(new NettyClientHandler());
                    }
                });
        //7. 启动客户端,等待连接服务端,同时将异步改为同步
        ChannelFuture future = bootstrap.connect("127.0.0.1", 9999).sync();
        //8. 关闭通道和关闭连接池
        future.channel().closeFuture().sync();
        group.shutdownGracefully();
    }
}

文章来源:https://blog.csdn.net/qq_43745578/article/details/135106580
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。