springboot实现netty的websocket服务端与客户端

2023-12-20 14:42:38

前言

强烈建议websocket使用netty实现,与tomcat的websocket性能差距明显

Coding

下文示例连接地址:wss://ip:5977/ws

pom.xml

<!--netty-websocket-->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-codec-http</artifactId>
    <version>4.1.96.Final</version>
</dependency>

服务端

application.properties

# nettyWebsocket
netty.websocket.port=5977
netty.websocket.max-message-frame-size=655360
netty.websocket.connection-path=/ws

NettyWebsocketServerHandler

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.stereotype.Component;

/**
 * @author 954L
 * @create 2022/10/11 21:05
 */
@Slf4j
@Component
public class NettyWebsocketServerHandler implements ApplicationRunner, ApplicationListener<ContextClosedEvent>, ApplicationContextAware {

    /* nettyWebsocket服务端口 */
    @Value("${netty.websocket.port}")
    private int port;

    /* 单次消息大小上限 */
    @Value("${netty.websocket.max-message-frame-size}")
    private Integer maxMessageFrameSize;

    /* 连接ws路径 */
    @Value("${netty.websocket.connection-path}")
    private String connectionPath;

    private ApplicationContext applicationContext;
    private Channel serverChannel;
    
	/** 启动nettyWebsocket服务 */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            ServerBootstrap sb = new ServerBootstrap();
            sb.option(ChannelOption.SO_BACKLOG, 1024);
            sb.group(group, bossGroup).channel(NioServerSocketChannel.class)
            .localAddress(this.port).childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) {
                    ChannelPipeline pipeline = ch.pipeline();
                    pipeline.addLast(new HttpServerCodec());
                    pipeline.addLast(new ChunkedWriteHandler());
                    pipeline.addLast(new HttpObjectAggregator(8192));
                    // 读写超时检测;60s ping超时;180s ping pong超时
                    pipeline.addLast(new IdleStateHandler(
                        0,60,180, TimeUnit.SECONDS));
                    // 触发读写超时,发送心跳内容handler
                    pipeline.addLast(applicationContext.getBean(WsHeartBeatHandler.class));
                    // 自定义逻辑处理
                    pipeline.addLast(applicationContext.getBean(MyWebSocketHandler.class));
                    pipeline.addLast(new WebSocketServerProtocolHandler(this.connectionPath, null,
                    true, this.maxMessageFrameSize, false, true));
                }
            });
            Channel channel = sb.bind().sync().channel();
            this.serverChannel = channel; log.info("Websocket start success!");
            channel.closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
            bossGroup.shutdownGracefully().sync();
        }
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    @Override
    public void onApplicationEvent(ContextClosedEvent event) {
        if (this.serverChannel != null) this.serverChannel.close();
        log.info("Websocket stop!");
    }
}

WsHeartBeatHandler

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;

/**
 * 心跳处理
 * @author 954L
 * @create 2023/06/04 17:40
 */
@Slf4j
@Component
@ChannelHandler.Sharable
@RequiredArgsConstructor(onConstructor_ = {@Lazy} )
public class WsHeartBeatHandler extends ChannelInboundHandlerAdapter {

    private final MyWebSocketHandler myWebSocketHandler;

    /** 超时触发 */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if(evt instanceof IdleStateEvent) {
            IdleState state = ((IdleStateEvent) evt).state();
            if (IdleState.READER_IDLE.equals(state)) return;
            if (IdleState.WRITER_IDLE.equals(state))
                ctx.writeAndFlush(new TextWebSocketFrame("pong"));
            else if (IdleState.ALL_IDLE.equals(state))
                myWebSocketHandler.close(ctx);
            else super.userEventTriggered(ctx, evt);
        }
    }

}

MyWebSocketHandler

此处校验token为协议的值,若客户端支持传递header可调整下述代码token获取方式

package com.w954l.lover.handler;

import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * @author 954L
 * @create 2022/10/11 21:09
 */
@Slf4j
@Component
@ChannelHandler.Sharable
public class MyWebSocketHandler extends SimpleChannelInboundHandler<WebSocketFrame> {

    public static final String TOKEN_HEAD_KEY = "sec-websocket-protocol";
    
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) throws Exception {
        final Channel channel = ctx.channel();
        if (msg instanceof FullHttpRequest) {
            FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;
            String uri = fullHttpRequest.uri(), tokenContent = null,
                token = fullHttpRequest.headers().get(TOKEN_HEAD_KEY);
            try { tokenContent = JwtTokenUtils.verifyToken(token); } catch (Exception e) {
                // token验证失败
                ctx.channel().writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
                    HttpResponseStatus.NOT_FOUND)).addListener(ChannelFutureListener.CLOSE);
                log.info("拒绝ws连接,token验证失败;path:{},address:{}",
                    uri, channel.remoteAddress()); return; }

            // 手动连接握手,解决协议token无法二次握手问题
            WebSocketServerHandshakerFactory wsFactory =
                new WebSocketServerHandshakerFactory(uri, token, false);
            WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(fullHttpRequest);
            if (handshaker == null) WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
            else handshaker.handshake(ctx.channel(), fullHttpRequest);

        	log.info("新连接创建成功: {}", ctx.channel().remoteAddress());
        	// TODO 保存客户端的channel连接通道,用于后续下发消息等操作
        	
        } else super.channelRead(ctx, msg);
    }
    
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        log.info("连接断开: {}", ctx.channel().remoteAddress());
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        log.info("建立连接, {}", ctx.channel().remoteAddress());
    }

}

客户端

NettyWebsocketClientHandler

package com.w954l.bot.handlers;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.stereotype.Component;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import java.net.URI;

/**
 * @author 954L
 * @create 2022/10/11 21:05
 */
@Slf4j
@Component
@RequiredArgsConstructor(onConstructor_ = { @Lazy})
public class NettyWebsocketClientHandler implements ApplicationRunner, ApplicationListener<ContextClosedEvent> {

    private Channel serverChannel;
    private final FrameHandler frameHandler;

    /**
     * 启动nettyWebsocket服务
     * @param args
     * @throws Exception
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        String wsUrl = frameHandler.wsUrl();
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        Bootstrap boot = new Bootstrap();

        boot.option(ChannelOption.SO_KEEPALIVE,true)
            .option(ChannelOption.TCP_NODELAY,true)
            .option(ChannelOption.SO_BACKLOG,1024 * 1024 * 10)
            .group(bossGroup).handler(new LoggingHandler(LogLevel.INFO))
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<SocketChannel>() {
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    ChannelPipeline p = socketChannel.pipeline();
                    if (wsUrl.startsWith("wss")) {
                        SSLEngine sslEngine = SSLContext.getDefault().createSSLEngine();
                        sslEngine.setUseClientMode(true);
                        p.addLast("ssl", new SslHandler(sslEngine));
                    }
                    p.addLast(new ChannelHandler[]{ new HttpClientCodec(),
                        new HttpObjectAggregator(1024 * 1024 * 10)});
                    p.addLast("websocketHandler", new MyWebsocketClientHandler());
                }
            });
        try {
            // 进行握手
            URI websocketURI = new URI(wsUrl);
            WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(
                websocketURI, WebSocketVersion.V13, null, true, new DefaultHttpHeaders());
            String scheme = websocketURI.getScheme(), host = websocketURI.getHost();
            int port = websocketURI.getPort(); port = port != -1? port: scheme.startsWith("wss")? 443: 80;
            serverChannel = boot.connect(host, port).sync().channel();
            MyWebsocketClientHandler handler = (MyWebsocketClientHandler) serverChannel.pipeline().get("websocketHandler");
            handler.setHandshaker(handshaker); handshaker.handshake(serverChannel);
            handler.handshakeFuture().sync(); serverChannel.closeFuture().sync();
        } finally { bossGroup.shutdownGracefully().sync(); }
    }

    @Override
    public void onApplicationEvent(ContextClosedEvent event) {
        if (this.serverChannel != null) this.serverChannel.close();
        log.info("Websocket stop!");
    }

}

MyWebsocketClientHandler

package com.w954l.bot.handlers;

import io.netty.channel.*;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

/**
 * @author 954L
 * @create 2022/12/27 22:26
 */
@Data
@Slf4j
public class MyWebsocketClientHandler extends SimpleChannelInboundHandler<Object> {

    WebSocketClientHandshaker handshaker;
    ChannelPromise handshakeFuture;

    public void handlerAdded(ChannelHandlerContext ctx) {
        this.handshakeFuture = ctx.newPromise();
    }

    public ChannelFuture handshakeFuture() {
        return this.handshakeFuture;
    }

    protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
        log.info("channelRead0  " + this.handshaker.isHandshakeComplete());
        Channel ch = ctx.channel();
        FullHttpResponse response;
        if (!this.handshaker.isHandshakeComplete()) {
            try {
                response = (FullHttpResponse) msg;
                //握手协议返回,设置结束握手
                this.handshaker.finishHandshake(ch, response);
                //设置成功
                this.handshakeFuture.setSuccess();
                System.out.println("WebSocket Client connected! response headers[sec-websocket-extensions]:{}" + response.headers());
            } catch (WebSocketHandshakeException var7) {
                FullHttpResponse res = (FullHttpResponse) msg;
                String errorMsg = String.format("WebSocket Client failed to connect,status:%s,reason:%s",
                        res.status(), res.content().toString(CharsetUtil.UTF_8));
                this.handshakeFuture.setFailure(new Exception(errorMsg));
            }
        } else if (msg instanceof FullHttpResponse) {
            response = (FullHttpResponse) msg;
            throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" +
                response.status() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
        } else {
            WebSocketFrame frame = (WebSocketFrame) msg;
            if (frame instanceof TextWebSocketFrame) {
                TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
                log.info("收到消息:{}", textFrame.text());
            } else if (frame instanceof BinaryWebSocketFrame) {
                BinaryWebSocketFrame binFrame = (BinaryWebSocketFrame) frame;
                System.out.println("BinaryWebSocketFrame");
            } else if (frame instanceof PongWebSocketFrame) {
                System.out.println("WebSocket Client received pong");
            } else if (frame instanceof CloseWebSocketFrame) {
                System.out.println("receive close frame");
                ch.close();
            }
        }
    }
}

Nginx配置

敏感信息已用“x”代替,注意辨别

server {
    listen 80;
    server_name xxx.xxx.xxx;
    return 301 https://$http_host$request_uri;
}

server {
    listen 443 ssl;
    server_name xxx.xxx.xxx;
    ssl_certificate conf.d/ssl_key/xxx.xxx.xxx_bundle.crt;
    ssl_certificate_key conf.d/ssl_key/xxx.xxx.xxx.key;
    ssl_session_timeout 5m;
    ssl_session_cache shared:SSL:50m;
    ssl_protocols SSLv3 SSLv2 TLSv1 TLSv1.1 TLSv1.2;
    ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:HIGH:!aNULL:!MD5:!RC4:!DHE;
    ssl_prefer_server_ciphers on;

    location /ws {
       proxy_http_version 1.1;
       proxy_set_header Host $host;
       proxy_set_header X-Real-IP $remote_addr;
       proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
       proxy_pass http://ip:5977;
       proxy_set_header Upgrade $http_upgrade;
       proxy_set_header Connection "upgrade";
       # 60s无交互则断开连接
       proxy_read_timeout 60s;
    }

    location / {
       client_max_body_size 32m;
       proxy_set_header Host $host;
       proxy_set_header X-Real-IP $remote_addr;
       proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
       proxy_pass http://ip:port;
    }

    error_page   500 502 503 504  /50x.html;
    location = /50x.html {
        root   /usr/share/nginx/html;
    }
}

文章来源:https://blog.csdn.net/wkh___/article/details/135105677
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。