netty使用http和webSocket

2024-01-07 20:37:08

1:pom.xml配置

		<dependency>
			<groupId>io.netty</groupId>
			<artifactId>netty-all</artifactId>
			<version>4.1.73.Final</version>
		</dependency>

2:Netty作为HTTP服务器

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;

public class HttpServer {
    private final int port;

    public HttpServer(int port) {
        this.port = port;
    }

    public void start() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline p = ch.pipeline();

                    // 添加 HTTP 编解码器和自定义的ChannelHandler
                    p.addLast(new HttpServerCodec());
                    p.addLast(new HttpObjectAggregator(1024 * 1024)); // 设置最大聚合大小为1MB
                    p.addLast(new LargeJsonHandler());
                }
             })
             .option(ChannelOption.SO_BACKLOG, 128)
             .childOption(ChannelOption.SO_KEEPALIVE, true);

            // 绑定端口,开始接受进来的连接
            ChannelFuture f = b.bind(port).sync();

            // 等待服务器 socket 关闭
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new HttpServer(8080).start();
    }
}

class LargeJsonHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

    @Override
    public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        if (HttpUtil.is100ContinueExpected(request)) {
            send100Continue(ctx);
        }

        ByteBuf content = request.content();
        String jsonStr = content.toString(CharsetUtil.UTF_8);

        // 在这里对 JSON 数据进行处理
        System.out.println(jsonStr);

        // 发送响应
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
        response.content().writeBytes("OK".getBytes(CharsetUtil.UTF_8));
        response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
        ctx.writeAndFlush(response);
    }

    private static void send100Continue(ChannelHandlerContext ctx) {
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
        ctx.writeAndFlush(response);
    }
	@Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        // 写一个空的buf,并刷新写出区域。完成后关闭sock channel连接。
        //ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

注意:如果发送的JSO数据如果大于1M,是会分包发送的,每次发送都会执行channelReadComplete方法,所以不可以关闭通道,发送完数据才执行channelRead0方法

3:Netty作为webSocket服务器

package com.example.slave.netty.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;

/**
 * @Description:
 * @Author: xu
 * @Data: 2024-2024/1/4-11
 * @Version: V1.0
 */
public class CustomWebSocket {
    private final int port;

    public CustomWebSocket(int port) {
        this.port = port;
    }

    public void start() throws Exception {
        //设置用于连接的boss组, 可在构造器中定义使用的线程数  监听端口接收客户端连接,一个端口一个线程,然后转给worker组
        //boss组用于监听客户端连接请求,有连接传入时就生成连接channel传给worker,等worker 接收请求 io多路复用,
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
            //定义使用的通道 可以选择是NIO或者是OIO 代表了worker在处理socket channel时的不同情况。oio只能1对1, nio则没有1对1对关系
            //当netty要处理长连接时最好使用NIO,不然如果要保证效率 需要创建大量的线程,和io多路复用一致
                    .channel(NioServerSocketChannel.class)
                    //.channel(OioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            // 添加 HTTP 编解码器和自定义的ChannelHandler
                            p.addLast(new HttpServerCodec());
                            p.addLast(new HttpObjectAggregator(1024 * 1024)); // 设置最大聚合大小为1MB
                            p.addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10));
                            p.addLast(new MyWebSocketHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            // 绑定端口,开始接受进来的连接
            ChannelFuture f = b.bind(port).sync();

            // 等待服务器 socket 关闭
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

}

public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    public static ChannelGroup channelGroup;
    static {
        channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    }
    //客户端与服务器建立连接的时候触发,
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("与客户端建立连接,通道开启!");
        //添加到channelGroup通道组
        channelGroup.add(ctx.channel());
    }
    //客户端与服务器关闭连接的时候触发,
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("与客户端断开连接,通道关闭!");
        channelGroup.remove(ctx.channel());
    }
	@Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        // 写一个空的buf,并刷新写出区域。完成后关闭sock channel连接。
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }
?
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // 关闭发生异常的连接
        ctx.close();
    }


    //服务器接受客户端的数据信息,
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg){
        System.out.println("服务器收到的数据:" + msg.text());
        //sendMessage(ctx);
        sendAllMessage();
    }
    //给固定的人发消息
    private void sendMessage(ChannelHandlerContext ctx) {
        String message = "你好,"+ctx.channel().localAddress()+" 给固定的人发消息";
        ctx.channel().writeAndFlush(new TextWebSocketFrame(message));
    }
    //发送群消息,此时其他客户端也能收到群消息
    private void sendAllMessage(){
        String message = "我是服务器,这里发送的是群消息";
        channelGroup.writeAndFlush( new TextWebSocketFrame(message));
    }
}

文章来源:https://blog.csdn.net/qq_19891197/article/details/135381585
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。