Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读

2023-12-21 08:20:43


在这里插入图片描述


概述

在这里插入图片描述

Netty是一个高性能、异步的网络应用程序框架,它提供了对TCP、UDP和文件传输的支持。在Netty中,数据的发送和接收都是以字节流的形式进行的,因此需要将对象转换为字节流(编码)以及将字节流转换回对象(解码)。

ObjectEncoder

ObjectEncoder 是 Netty 中用于将对象编码为字节流的一种组件。在 Netty 的 pipeline 中,当你需要将某个对象发送到网络时,你可以使用 ObjectEncoder 来实现。它会将对象序列化为字节流,以便可以在网络中传输。
例如,当你使用 Netty 的 Bootstrap 类来配置你的客户端时,你可以为你的 channel pipeline 添加一个 ObjectEncoder

Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
       .channel(NioSocketChannel.class)
       .handler(new ChannelInitializer<SocketChannel>() {
           @Override
           public void initChannel(SocketChannel ch) throws Exception {
               ch.pipeline().addLast(new ObjectEncoder());
               // 添加其他 handlers...
           }
       });

在这个例子中,ObjectEncoder 被添加到了 channel 的 pipeline 中,这样在数据传输过程中,发送的对象就会被自动编码为字节流。


ObjectDecoder

ObjectEncoder 相对应,ObjectDecoder 是用于将接收到的字节流解码为对象的组件。当你在 Netty 的 pipeline 中接收到字节流时,你可以使用 ObjectDecoder 来自动将字节流反序列化为对象。

继续上面的例子,如果你想在 pipeline 中添加 ObjectDecoder,你可以这样做:

Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
       .channel(NioSocketChannel.class)
       .handler(new ChannelInitializer<SocketChannel>() {
           @Override
           public void initChannel(SocketChannel ch) throws Exception {
               ch.pipeline().addLast(new ObjectDecoder());
               // 添加其他 handlers...
           }
       });

在这个例子中,ObjectDecoder 被添加到了 channel 的 pipeline 中,这样在数据接收过程中,接收到的字节流就会被自动解码为对象。
总的来说,ObjectEncoderObjectDecoder 是 Netty 中用于对象序列化和反序列化的工具,它们让开发者可以更方便地在网络中传输对象。


Code

在这里插入图片描述


这段代码是一个简单的Netty服务器启动类

package com.artisan.codec.objectencoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class NettyServer {
    public static void main(String[] args) throws Exception {
        // 创建事件循环组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 创建ServerBootstrap
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            // 配置ServerBootstrap
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // 初始化通道
                            ChannelPipeline pipeline = ch.pipeline();
                            // 添加ObjectDecoder
                            pipeline.addLast(new ObjectDecoder(10240, ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
                            // 添加自定义的处理器
                            pipeline.addLast(new NettyServerHandler());
                        }
                    });
            // 打印日志
            System.out.println("netty server start。。");
            // 绑定端口并启动服务器
            ChannelFuture channelFuture = serverBootstrap.bind(4567).sync();
            // 等待服务器通道关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            // 优雅地关闭事件循环组
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

在上述代码中,NettyServer类通过ServerBootstrap配置并启动了一个Netty服务器。服务器使用了两个事件循环组:一个用于处理连接(bossGroup),另一个用于处理已连接的通道(workerGroup)。
initChannel方法中,初始化了SocketChannel的通道 pipeline,并添加了ObjectDecoder和自定义的处理器NettyServerHandlerObjectDecoder用于反序列化接收到的字节流为Java对象,NettyServerHandler用于处理业务逻辑。
服务器启动后,会绑定到指定端口(本例中为4567),并等待服务器通道关闭。在关闭服务器之前,通过调用shutdownGracefully方法优雅地关闭事件循环组。
请注意,此代码片段仅作为Netty服务器启动的示例,实际应用中需要根据具体业务需求调整NettyServerHandler以实现相应的功能。


package com.artisan.codec.objectencoder;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 当接收到客户端发送的消息时,执行该方法
        System.out.println("从客户端读取到Object:" + ((ArtisanSimple) msg).toString());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 当发生异常时,执行该方法
        cause.printStackTrace();
        ctx.close();
    }
}


package com.artisan.codec.objectencoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ObjectEncoder;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class NettyClient {
    public static void main(String[] args) throws Exception {
        // 创建事件循环组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            // 创建Bootstrap
            Bootstrap bootstrap = new Bootstrap();
            // 配置Bootstrap
            bootstrap.group(group).channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // 初始化通道
                            ChannelPipeline pipeline = ch.pipeline();
                            // 添加ObjectEncoder
                            pipeline.addLast(new ObjectEncoder());
                            // 添加自定义的处理器
                            pipeline.addLast(new NettyClientHandler());
                        }
                    });
            // 打印日志
            System.out.println("netty client start。。");
            // 连接到服务器
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 4567).sync();
            // 等待客户端通道关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            // 优雅地关闭事件循环组
            group.shutdownGracefully();
        }
    }
}

在上述代码中,NettyClient类通过Bootstrap配置并启动了一个Netty客户端。客户端使用了一个事件循环组(group)来处理通道的连接和接收到的消息。

initChannel方法中,初始化了SocketChannel的通道 pipeline,并添加了ObjectEncoder和自定义的处理器NettyClientHandlerObjectEncoder用于将Java对象序列化为字节流,NettyClientHandler用于处理业务逻辑。

客户端启动后,会连接到指定IP地址(本例中为127.0.0.1)和端口(本例中为4567)的服务器,并等待客户端通道关闭。在关闭客户端之前,通过调用shutdownGracefully方法优雅地关闭事件循环组。

请注意,此代码片段仅作为Netty客户端启动的示例,实际应用中需要根据具体业务需求调整NettyClientHandler以实现相应的功能。


这段代码是一个自定义的Netty处理器,名为NettyClientHandler。它继承自ChannelInboundHandlerAdapter,用于处理客户端接收到的消息和通道激活事件。

package com.artisan.codec.objectencoder;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 当接收到服务器发送的消息时,执行该方法
        System.out.println("收到服务器消息:" + msg);
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 当通道激活时,执行该方法
        System.out.println("NettyClientHandler发送数据");
        // 测试对象编解码
        ArtisanSimple artisanSimple = new ArtisanSimple(1, "xxxx");
        ctx.writeAndFlush(artisanSimple);
    }
}

在上述代码中,NettyClientHandler类重写了channelReadchannelActive方法。
channelRead方法用于处理客户端接收到的服务器消息。在这个例子中,它将打印出接收到的消息。在实际应用中,你可以根据业务需求修改此方法以处理不同的消息类型和逻辑。

channelActive方法用于处理通道激活事件。在这个例子中,它将打印一条日志,并测试对象编解码功能。具体来说,它创建了一个ArtisanSimple对象,并通过ctx.writeAndFlush()方法将其发送到服务器。

在实际应用中,你可以根据需求修改此方法以实现不同的业务逻辑。

NettyClientHandler处理器需要与ObjectEncoderObjectDecoder配合使用,以确保发送和接收到的字节流能够正确地反序列化为Java对象。在客户端启动类NettyClient中,NettyClientHandler已经添加到了通道的pipeline中,因此可以处理发送和接收到的消息。


package com.artisan.codec.objectencoder;

import java.io.Serializable;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class ArtisanSimple implements Serializable {

    private int id;
    private String name;

    public ArtisanSimple() {
    }

    public ArtisanSimple(int id, String name) {
        super();
        this.id = id;
        this.name = name;
    }


    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "ArtisanSimple{" +
                "id=" + id +
                ", name='" + name + '\'' +
                '}';
    }



}

package com.artisan.codec.objectencoder;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class AddressSimple {

    private String location;


    public String getLocation() {
        return location;
    }

    public void setLocation(String location) {
        this.location = location;
    }


    public AddressSimple() {
    }

    public AddressSimple(String location) {
        this.location = location;
    }


    @Override
    public String toString() {
        return "AddressSimple{" +
                "location='" + location + '\'' +
                '}';
    }
}
    

【测试】

在这里插入图片描述


源码分析

在这里插入图片描述

ObjectEncoder

这段代码定义了一个名为ObjectEncoder的类,它属于Netty网络通信框架的一部分,用于将Java对象序列化为字节流。

在这里插入图片描述

下面是对代码的详细分析以及增加的中文注释:

package io.netty.handler.codec.serialization; 

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder; 
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
/**
 * An encoder which serializes a Java object into a {@link ByteBuf}.
 * <p>
 * 请注意,此编码器产生的序列化形式与标准的{@link ObjectInputStream}不兼容。
 * 请使用{@link ObjectDecoder}或{@link ObjectDecoderInputStream}以确保与该编码器的互操作性。
 */
@Sharable
public class ObjectEncoder extends MessageToByteEncoder<Serializable> {
    // 定义一个占位符,用于标记ByteBuf中对象序列化数据长度的位置
    private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
    // 覆写encode方法,实现序列化逻辑
    @Override
    protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception {
        int startIdx = out.writerIndex(); // 记录开始编码的位置
        // 创建一个ByteBufOutputStream包装器,用于向ByteBuf中写入数据
        ByteBufOutputStream bout = new ByteBufOutputStream(out);
        ObjectOutputStream oout = null;
        try {
            bout.write(LENGTH_PLACEHOLDER); // 先写入长度占位符
            // 创建一个紧凑型ObjectOutputStream,用于序列化对象
            oout = new CompactObjectOutputStream(bout);
            oout.writeObject(msg); // 将要序列化的对象写入流中
            oout.flush(); // 刷新输出流,确保所有数据都被写出
        } finally {
            // 关闭ObjectOutputStream和ByteBufOutputStream
            if (oout != null) {
                oout.close();
            } else {
                bout.close();
            }
        }
        int endIdx = out.writerIndex(); // 记录编码结束的位置
        // 设置占位符的长度,即实际序列化数据长度
        out.setInt(startIdx, endIdx - startIdx - 4);
    }
}

在上述代码中,ObjectEncoder类继承自MessageToByteEncoder,这意味着它是一个用于将某种类型消息编码成字节流的编码器。encode方法被重写以实现序列化过程。在这个方法中,首先通过ByteBufOutputStreamByteBuf写入了一个长度占位符,然后通过CompactObjectOutputStream将传入的Serializable对象序列化成字节流,并写入到ByteBuf中。最后,修改了长度占位符,将其设置为实际序列化数据的长度。

此代码片段使用@Sharable注解标记,表明这个ChannelHandler是可以共享给多个ChannelPipeline的。
序列化完成后,通过ObjectOutputStreamflush方法刷新流,确保所有数据都被写出。最后,在finally块中关闭输出流,确保资源被正确释放。


ObjectDecoder

这段代码定义了一个名为ObjectDecoder的类,它也是Netty网络通信框架的一部分,用于将接收到的字节流反序列化为Java对象。

在这里插入图片描述

下面是对代码的详细分析以及增加的中文注释:

package io.netty.handler.codec.serialization;
// 引入Netty相关类库
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
// 引入Java序列化相关类库
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.StreamCorruptedException;
/**
 * A decoder which deserializes the received {@link ByteBuf}s into Java
 * objects.
 * <p>
 * 请注意,此解码器期望的序列化形式与标准的{@link ObjectOutputStream}不兼容。
 * 请使用{@link ObjectEncoder}或{@link ObjectEncoderOutputStream}以确保与该解码器的互操作性。
 */
public class ObjectDecoder extends LengthFieldBasedFrameDecoder {
    // ClassResolver用于加载序列化对象的类
    private final ClassResolver classResolver;
    /**
     * 创建一个新的解码器,其最大对象大小为1048576字节。
     * 如果接收到的对象大小大于1048576字节,将抛出StreamCorruptedException异常。
     *
     * @param classResolver  用于此解码器的ClassResolver
     */
    public ObjectDecoder(ClassResolver classResolver) {
        this(1048576, classResolver);
    }
    /**
     * 创建一个新的解码器,其最大对象大小为指定的值。
     *
     * @param maxObjectSize  序列化对象的最大字节长度。
     *                     如果接收到的对象的长度大于此值,将抛出StreamCorruptedException异常。
     * @param classResolver  用于加载序列化对象类的ClassResolver
     */
    public ObjectDecoder(int maxObjectSize, ClassResolver classResolver) {
        super(maxObjectSize, 0, 4, 0, 4);
        this.classResolver = classResolver;
    }
    // 覆写decode方法,实现反序列化逻辑
    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        ByteBuf frame = (ByteBuf) super.decode(ctx, in);
        if (frame == null) {
            return null;
        }
        // 创建一个紧凑型ObjectInputStream,用于反序列化对象
        ObjectInputStream ois = new CompactObjectInputStream(new ByteBufInputStream(frame, true), classResolver);
        try {
            return ois.readObject(); // 读取并返回反序列化的对象
        } finally {
            ois.close(); // 关闭输入流
        }
    }
}

在上述代码中,ObjectDecoder类继承自LengthFieldBasedFrameDecoder,这意味着它是一个用于解码具有长度字段帧的解码器。decode方法被重写以实现反序列化过程。在这个方法中,首先通过LengthFieldBasedFrameDecoder的解码方法获取到包含序列化数据的ByteBuf帧,然后通过CompactObjectInputStream将字节流反序列化为Java对象。

此代码片段使用了一个ClassResolver,它负责加载序列化对象的类,从而允许在反序列化过程中创建对象。反序列化完成后,通过ObjectInputStreamclose方法关闭输入流,确保资源被正确释放。


小结

ObjectEncoder和ObjectDecoder是Netty框架中的两个重要组件,它们分别负责将Java对象编码为字节流以及将字节流解码为Java对象。

ObjectEncoder是一个ChannelOutboundHandler,它主要负责将Java对象转换为字节流。当发送一个对象时,ObjectEncoder会根据对象的类型将其序列化为字节流,以便在网络上进行传输。ObjectEncoder通常与ObjectDecoder配合使用,以确保编码和解码过程能够正确地进行。

ObjectDecoder是一个ChannelInboundHandler,它主要负责将接收到的字节流解码为Java对象。当接收到字节流时,ObjectDecoder会根据字节流的类型进行反序列化,将字节流转换回原始的Java对象。ObjectDecoder通常与ObjectEncoder配合使用,以确保编码和解码过程能够正确地进行。

在实际应用中,ObjectEncoderObjectDecoder需要根据业务需求进行定制,以便正确地处理各种不同类型的对象。通过使用这两个组件,Netty框架可以在发送和接收消息时自动进行对象的编码和解码,简化了网络编程的复杂度。

在这里插入图片描述

文章来源:https://blog.csdn.net/yangshangwei/article/details/135118293
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。