springboot实现netty的websocket服务端与客户端
2023-12-20 14:42:38
前言
强烈建议websocket使用netty实现,与tomcat的websocket性能差距明显
Coding
下文示例连接地址:wss://ip:5977/ws
pom.xml
<!--netty-websocket-->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
<version>4.1.96.Final</version>
</dependency>
服务端
application.properties
# nettyWebsocket
netty.websocket.port=5977
netty.websocket.max-message-frame-size=655360
netty.websocket.connection-path=/ws
NettyWebsocketServerHandler
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.stereotype.Component;
/**
* @author 954L
* @create 2022/10/11 21:05
*/
@Slf4j
@Component
public class NettyWebsocketServerHandler implements ApplicationRunner, ApplicationListener<ContextClosedEvent>, ApplicationContextAware {
/* nettyWebsocket服务端口 */
@Value("${netty.websocket.port}")
private int port;
/* 单次消息大小上限 */
@Value("${netty.websocket.max-message-frame-size}")
private Integer maxMessageFrameSize;
/* 连接ws路径 */
@Value("${netty.websocket.connection-path}")
private String connectionPath;
private ApplicationContext applicationContext;
private Channel serverChannel;
/** 启动nettyWebsocket服务 */
@Override
public void run(ApplicationArguments args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap sb = new ServerBootstrap();
sb.option(ChannelOption.SO_BACKLOG, 1024);
sb.group(group, bossGroup).channel(NioServerSocketChannel.class)
.localAddress(this.port).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(8192));
// 读写超时检测;60s ping超时;180s ping pong超时
pipeline.addLast(new IdleStateHandler(
0,60,180, TimeUnit.SECONDS));
// 触发读写超时,发送心跳内容handler
pipeline.addLast(applicationContext.getBean(WsHeartBeatHandler.class));
// 自定义逻辑处理
pipeline.addLast(applicationContext.getBean(MyWebSocketHandler.class));
pipeline.addLast(new WebSocketServerProtocolHandler(this.connectionPath, null,
true, this.maxMessageFrameSize, false, true));
}
});
Channel channel = sb.bind().sync().channel();
this.serverChannel = channel; log.info("Websocket start success!");
channel.closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
bossGroup.shutdownGracefully().sync();
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Override
public void onApplicationEvent(ContextClosedEvent event) {
if (this.serverChannel != null) this.serverChannel.close();
log.info("Websocket stop!");
}
}
WsHeartBeatHandler
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
/**
* 心跳处理
* @author 954L
* @create 2023/06/04 17:40
*/
@Slf4j
@Component
@ChannelHandler.Sharable
@RequiredArgsConstructor(onConstructor_ = {@Lazy} )
public class WsHeartBeatHandler extends ChannelInboundHandlerAdapter {
private final MyWebSocketHandler myWebSocketHandler;
/** 超时触发 */
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent) {
IdleState state = ((IdleStateEvent) evt).state();
if (IdleState.READER_IDLE.equals(state)) return;
if (IdleState.WRITER_IDLE.equals(state))
ctx.writeAndFlush(new TextWebSocketFrame("pong"));
else if (IdleState.ALL_IDLE.equals(state))
myWebSocketHandler.close(ctx);
else super.userEventTriggered(ctx, evt);
}
}
}
MyWebSocketHandler
此处校验token为协议的值,若客户端支持传递header可调整下述代码token获取方式
package com.w954l.lover.handler;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* @author 954L
* @create 2022/10/11 21:09
*/
@Slf4j
@Component
@ChannelHandler.Sharable
public class MyWebSocketHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
public static final String TOKEN_HEAD_KEY = "sec-websocket-protocol";
@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) throws Exception {
final Channel channel = ctx.channel();
if (msg instanceof FullHttpRequest) {
FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;
String uri = fullHttpRequest.uri(), tokenContent = null,
token = fullHttpRequest.headers().get(TOKEN_HEAD_KEY);
try { tokenContent = JwtTokenUtils.verifyToken(token); } catch (Exception e) {
// token验证失败
ctx.channel().writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.NOT_FOUND)).addListener(ChannelFutureListener.CLOSE);
log.info("拒绝ws连接,token验证失败;path:{},address:{}",
uri, channel.remoteAddress()); return; }
// 手动连接握手,解决协议token无法二次握手问题
WebSocketServerHandshakerFactory wsFactory =
new WebSocketServerHandshakerFactory(uri, token, false);
WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(fullHttpRequest);
if (handshaker == null) WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
else handshaker.handshake(ctx.channel(), fullHttpRequest);
log.info("新连接创建成功: {}", ctx.channel().remoteAddress());
// TODO 保存客户端的channel连接通道,用于后续下发消息等操作
} else super.channelRead(ctx, msg);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
log.info("连接断开: {}", ctx.channel().remoteAddress());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
log.info("建立连接, {}", ctx.channel().remoteAddress());
}
}
客户端
NettyWebsocketClientHandler
package com.w954l.bot.handlers;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.stereotype.Component;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import java.net.URI;
/**
* @author 954L
* @create 2022/10/11 21:05
*/
@Slf4j
@Component
@RequiredArgsConstructor(onConstructor_ = { @Lazy})
public class NettyWebsocketClientHandler implements ApplicationRunner, ApplicationListener<ContextClosedEvent> {
private Channel serverChannel;
private final FrameHandler frameHandler;
/**
* 启动nettyWebsocket服务
* @param args
* @throws Exception
*/
@Override
public void run(ApplicationArguments args) throws Exception {
String wsUrl = frameHandler.wsUrl();
EventLoopGroup bossGroup = new NioEventLoopGroup();
Bootstrap boot = new Bootstrap();
boot.option(ChannelOption.SO_KEEPALIVE,true)
.option(ChannelOption.TCP_NODELAY,true)
.option(ChannelOption.SO_BACKLOG,1024 * 1024 * 10)
.group(bossGroup).handler(new LoggingHandler(LogLevel.INFO))
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline p = socketChannel.pipeline();
if (wsUrl.startsWith("wss")) {
SSLEngine sslEngine = SSLContext.getDefault().createSSLEngine();
sslEngine.setUseClientMode(true);
p.addLast("ssl", new SslHandler(sslEngine));
}
p.addLast(new ChannelHandler[]{ new HttpClientCodec(),
new HttpObjectAggregator(1024 * 1024 * 10)});
p.addLast("websocketHandler", new MyWebsocketClientHandler());
}
});
try {
// 进行握手
URI websocketURI = new URI(wsUrl);
WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(
websocketURI, WebSocketVersion.V13, null, true, new DefaultHttpHeaders());
String scheme = websocketURI.getScheme(), host = websocketURI.getHost();
int port = websocketURI.getPort(); port = port != -1? port: scheme.startsWith("wss")? 443: 80;
serverChannel = boot.connect(host, port).sync().channel();
MyWebsocketClientHandler handler = (MyWebsocketClientHandler) serverChannel.pipeline().get("websocketHandler");
handler.setHandshaker(handshaker); handshaker.handshake(serverChannel);
handler.handshakeFuture().sync(); serverChannel.closeFuture().sync();
} finally { bossGroup.shutdownGracefully().sync(); }
}
@Override
public void onApplicationEvent(ContextClosedEvent event) {
if (this.serverChannel != null) this.serverChannel.close();
log.info("Websocket stop!");
}
}
MyWebsocketClientHandler
package com.w954l.bot.handlers;
import io.netty.channel.*;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
/**
* @author 954L
* @create 2022/12/27 22:26
*/
@Data
@Slf4j
public class MyWebsocketClientHandler extends SimpleChannelInboundHandler<Object> {
WebSocketClientHandshaker handshaker;
ChannelPromise handshakeFuture;
public void handlerAdded(ChannelHandlerContext ctx) {
this.handshakeFuture = ctx.newPromise();
}
public ChannelFuture handshakeFuture() {
return this.handshakeFuture;
}
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
log.info("channelRead0 " + this.handshaker.isHandshakeComplete());
Channel ch = ctx.channel();
FullHttpResponse response;
if (!this.handshaker.isHandshakeComplete()) {
try {
response = (FullHttpResponse) msg;
//握手协议返回,设置结束握手
this.handshaker.finishHandshake(ch, response);
//设置成功
this.handshakeFuture.setSuccess();
System.out.println("WebSocket Client connected! response headers[sec-websocket-extensions]:{}" + response.headers());
} catch (WebSocketHandshakeException var7) {
FullHttpResponse res = (FullHttpResponse) msg;
String errorMsg = String.format("WebSocket Client failed to connect,status:%s,reason:%s",
res.status(), res.content().toString(CharsetUtil.UTF_8));
this.handshakeFuture.setFailure(new Exception(errorMsg));
}
} else if (msg instanceof FullHttpResponse) {
response = (FullHttpResponse) msg;
throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" +
response.status() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
} else {
WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
log.info("收到消息:{}", textFrame.text());
} else if (frame instanceof BinaryWebSocketFrame) {
BinaryWebSocketFrame binFrame = (BinaryWebSocketFrame) frame;
System.out.println("BinaryWebSocketFrame");
} else if (frame instanceof PongWebSocketFrame) {
System.out.println("WebSocket Client received pong");
} else if (frame instanceof CloseWebSocketFrame) {
System.out.println("receive close frame");
ch.close();
}
}
}
}
Nginx配置
敏感信息已用“x”代替,注意辨别
server {
listen 80;
server_name xxx.xxx.xxx;
return 301 https://$http_host$request_uri;
}
server {
listen 443 ssl;
server_name xxx.xxx.xxx;
ssl_certificate conf.d/ssl_key/xxx.xxx.xxx_bundle.crt;
ssl_certificate_key conf.d/ssl_key/xxx.xxx.xxx.key;
ssl_session_timeout 5m;
ssl_session_cache shared:SSL:50m;
ssl_protocols SSLv3 SSLv2 TLSv1 TLSv1.1 TLSv1.2;
ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:HIGH:!aNULL:!MD5:!RC4:!DHE;
ssl_prefer_server_ciphers on;
location /ws {
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_pass http://ip:5977;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
# 60s无交互则断开连接
proxy_read_timeout 60s;
}
location / {
client_max_body_size 32m;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_pass http://ip:port;
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root /usr/share/nginx/html;
}
}
文章来源:https://blog.csdn.net/wkh___/article/details/135105677
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!