第4章Netty第二节入门案例+channel,future,promise介绍
需求
开发一个简单的服务器端和客户端
- 客户端向服务器端发送 hello, world
- 服务器仅接收,不返回
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.39.Final</version>
</dependency>
服务端
new ServerBootstrap()
.group(new NioEventLoopGroup()) // 1
.channel(NioServerSocketChannel.class) // 2
.childHandler(new ChannelInitializer<NioSocketChannel>() { // 3
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new StringDecoder()); // 5
ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { // 6
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.out.println(msg);
}
});
}
})
.bind(8080); // 4
代码解读:
- 1处创建NIOEventLoopGroup,可以简单理解为线程池+selector
- 2处选择服务socket实现类,其中NIOServerSocketChannel表示基于NIO的服务端实现。其他还有
- 3处,为啥方法骄傲childHandler,是接下来处理器都是给socketChannel用的,而不是给ServerSocketChannel。ChannelInitializer 处理器(仅执行一次),它的作用是待客户端socketChannel建立连接后,执行initChannel以便添加更多的处理器。
- 4 处,ServerSocketChannel 绑定的监听端口
- 5处,socketChannel的处理器,解码ByteBuf=>String
- 6处,socketChannel的业务处理器,使用上一个处理器的处理结果
客户端
new Bootstrap()
.group(new NioEventLoopGroup()) // 1
.channel(NioSocketChannel.class) // 2
.handler(new ChannelInitializer<Channel>() { // 3
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new StringEncoder()); // 8
}
})
.connect("127.0.0.1", 8080) // 4
.sync() // 5
.channel() // 6
.writeAndFlush(new Date() + ": hello world!"); // 7
代码解读:
1.1处,创建NIOEventLoopGroup,同server
2. 2处, 选择客户的socket实现类,NIOSocketChannel表示基于NIO的客户端实现类。其他实现还有
- 3处,添加socketChannel的处理器,ChannelInitializer处理器(仅执行一次),它的作用是待客户端socketChannel建立连接后,执行initChannel以便添加跟多的处理器。
- 4 处,指定要连接的服务器和端口
- 5处,Netty中很多方法都是异步的,如connect,这时需要使用sync方法等待connect建立连接完毕。
- 6处,获取channel对象,它即为通道抽象,可以进行数据读写。
- 7处,写入消息并清空缓冲区
- 8处,建立连接后,消息会经过通道handler处理,这里是将string=>ByteBuf发出。
- 数据经过网络传输,到达服务端,服务端5和6处的handlerx先后被触发走完一个流程。
流程梳理
注意:
- 把channel理解为数据的通道
- 把msg理解为流动的数据,最开始输入的是ByteBuf,但经过pipeLine的加工,会变成其他类型的对象,最后输出又变成ByteBuf
- 把handler理解为数据的处理工序
- 工序有多道,合在一起就是pipeline.pipeline负责发布事件(读,读取完成。。。)传播给每个handler,handler对自己感兴趣的事件进行处理(重写了相应事件处理方法)
- handler分Inbound和outbound两类
- 把eventloop理解为处理数据的工人
- 工人可以管理多个channel的io操作。并且一旦工人负责了某个channel就要负责到底(绑定)
- 工人既可以执行io操作,也可以进行任务处理。每位工人有任务队列,队列里可以堆放多个channel待处理的任务,把任务分为普通任务和定时任务。
- 工人按照pipeline顺序,依次按照handler的规划(代码)处理数据,可以为每一道工序执行不同的工人。
组件-EventLoop
事件循环对象 EventLoop
本质上是一个单线程执行器同时维护了一个selector,里面有run方法处理channel上源源不断的io事件。
继承关系:
- 一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
- 另一条线是继承自 netty 自己的 OrderedEventExecutor
- 提供了boolean inEventLoop(Thread thread)方法判断一个线程是否属于此EventLoop
- 提供了parent方法来看看自己属于哪个EventLoopGroup
事件循环组 EventLoopGroup
是一组EventLoop,channel一般会调用EventLoopGroup的register方法来绑定一个EventLoop,后续这个channel上的io事件都由此EventLoop来处理(保证了io事件处理时的线程安全)
继承自 netty 自己的 EventExecutorGroup
- 实现了 Iterable 接口提供遍历 EventLoop 的能力
- 另有 next 方法获取集合中下一个 EventLoop
以一个简单的实现为例:
// 内部创建了两个 EventLoop, 每个 EventLoop 维护一个线程
DefaultEventLoopGroup group = new DefaultEventLoopGroup(2);
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
输出
io.netty.channel.DefaultEventLoop@60f82f98
io.netty.channel.DefaultEventLoop@35f983a6
io.netty.channel.DefaultEventLoop@60f82f98
也可以使用 for 循环
DefaultEventLoopGroup group = new DefaultEventLoopGroup(2);
for (EventExecutor eventLoop : group) {
System.out.println(eventLoop);
}
输出
io.netty.channel.DefaultEventLoop@60f82f98
io.netty.channel.DefaultEventLoop@35f983a6
优雅关闭
优雅关闭 shutdownGracefully
方法。该方法会首先切换 EventLoopGroup
到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的
演示NioEventLoop处理io事件
服务端两个Nio worker工人
new ServerBootstrap()
.group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = msg instanceof ByteBuf ? ((ByteBuf) msg) : null;
if (byteBuf != null) {
byte[] buf = new byte[16];
ByteBuf len = byteBuf.readBytes(buf, 0, byteBuf.readableBytes());
log.debug(new String(buf));
}
}
});
}
}).bind(8080).sync();
客户端,启动三次,分别修改发送字符串为 zhangsan(第一次),lisi(第二次),wangwu(第三次)
public static void main(String[] args) throws InterruptedException {
Channel channel = new Bootstrap()
.group(new NioEventLoopGroup(1))
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
System.out.println("init...");
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
})
.channel(NioSocketChannel.class).connect("localhost", 8080)
.sync()
.channel();
channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu".getBytes()));
Thread.sleep(2000);
channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu".getBytes()));
最后输出
22:03:34 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan
22:03:36 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan
22:05:36 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi
22:05:38 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi
22:06:09 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu
22:06:11 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu
可以看到两个工人轮流处理 channel,但工人与 channel 之间进行了绑定
再增加两个非 nio 工人
DefaultEventLoopGroup normalWorkers = new DefaultEventLoopGroup(2);
new ServerBootstrap()
.group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(normalWorkers,"myhandler",
new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = msg instanceof ByteBuf ? ((ByteBuf) msg) : null;
if (byteBuf != null) {
byte[] buf = new byte[16];
ByteBuf len = byteBuf.readBytes(buf, 0, byteBuf.readableBytes());
log.debug(new String(buf));
}
}
});
}
}).bind(8080).sync();
客户端代码不变,启动三次,分别修改发送字符串为 zhangsan(第一次),lisi(第二次),wangwu(第三次)
输出
22:19:48 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x251562d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52588] REGISTERED
22:19:48 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x251562d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52588] ACTIVE
22:19:48 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x251562d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52588] READ: 8B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7a 68 61 6e 67 73 61 6e |zhangsan |
+--------+-------------------------------------------------+----------------+
22:19:48 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x251562d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52588] READ COMPLETE
22:19:48 [DEBUG] [defaultEventLoopGroup-2-1] c.i.o.EventLoopTest - zhangsan
22:19:50 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x251562d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52588] READ: 8B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7a 68 61 6e 67 73 61 6e |zhangsan |
+--------+-------------------------------------------------+----------------+
22:19:50 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x251562d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52588] READ COMPLETE
22:19:50 [DEBUG] [defaultEventLoopGroup-2-1] c.i.o.EventLoopTest - zhangsan
22:20:24 [DEBUG] [nioEventLoopGroup-4-2] i.n.h.l.LoggingHandler - [id: 0x94b2a840, L:/127.0.0.1:8080 - R:/127.0.0.1:52612] REGISTERED
22:20:24 [DEBUG] [nioEventLoopGroup-4-2] i.n.h.l.LoggingHandler - [id: 0x94b2a840, L:/127.0.0.1:8080 - R:/127.0.0.1:52612] ACTIVE
22:20:25 [DEBUG] [nioEventLoopGroup-4-2] i.n.h.l.LoggingHandler - [id: 0x94b2a840, L:/127.0.0.1:8080 - R:/127.0.0.1:52612] READ: 4B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 6c 69 73 69 |lisi |
+--------+-------------------------------------------------+----------------+
22:20:25 [DEBUG] [nioEventLoopGroup-4-2] i.n.h.l.LoggingHandler - [id: 0x94b2a840, L:/127.0.0.1:8080 - R:/127.0.0.1:52612] READ COMPLETE
22:20:25 [DEBUG] [defaultEventLoopGroup-2-2] c.i.o.EventLoopTest - lisi
22:20:27 [DEBUG] [nioEventLoopGroup-4-2] i.n.h.l.LoggingHandler - [id: 0x94b2a840, L:/127.0.0.1:8080 - R:/127.0.0.1:52612] READ: 4B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 6c 69 73 69 |lisi |
+--------+-------------------------------------------------+----------------+
22:20:27 [DEBUG] [nioEventLoopGroup-4-2] i.n.h.l.LoggingHandler - [id: 0x94b2a840, L:/127.0.0.1:8080 - R:/127.0.0.1:52612] READ COMPLETE
22:20:27 [DEBUG] [defaultEventLoopGroup-2-2] c.i.o.EventLoopTest - lisi
22:20:38 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x79a26af9, L:/127.0.0.1:8080 - R:/127.0.0.1:52625] REGISTERED
22:20:38 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x79a26af9, L:/127.0.0.1:8080 - R:/127.0.0.1:52625] ACTIVE
22:20:38 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x79a26af9, L:/127.0.0.1:8080 - R:/127.0.0.1:52625] READ: 6B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 77 61 6e 67 77 75 |wangwu |
+--------+-------------------------------------------------+----------------+
22:20:38 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x79a26af9, L:/127.0.0.1:8080 - R:/127.0.0.1:52625] READ COMPLETE
22:20:38 [DEBUG] [defaultEventLoopGroup-2-1] c.i.o.EventLoopTest - wangwu
22:20:40 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x79a26af9, L:/127.0.0.1:8080 - R:/127.0.0.1:52625] READ: 6B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 77 61 6e 67 77 75 |wangwu |
+--------+-------------------------------------------------+----------------+
22:20:40 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x79a26af9, L:/127.0.0.1:8080 - R:/127.0.0.1:52625] READ COMPLETE
22:20:40 [DEBUG] [defaultEventLoopGroup-2-1] c.i.o.EventLoopTest - wangwu
可以看到,nio 工人和 非 nio 工人也分别绑定了 channel(LoggingHandler 由 nio 工人执行,而我们自己的 handler 由非 nio 工人执行)
handler 执行中如何换人?
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 下一个 handler 的事件循环是否与当前的事件循环是同一个线程
EventExecutor executor = next.executor();
// 是,直接调用
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
}
// 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人)
else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);// 用新的线程去执行
}
});
}
}
- 如果两个handler绑定的是同一个线程,那么在当前线程中直接调用
- 否则,提交给一个新的线程调用。
演示 NioEventLoop 处理普通任务
NioEventLoop 除了可以处理 io 事件,同样可以向它提交普通任务
// 1. 创建事件循环组
EventLoopGroup group = new NioEventLoopGroup(2); // io 事件,普通任务,定时任务
// 3. 执行普通任务。submit和execute都可以。将这个任务加到事件循环组中的一个线程去执行不会造成阻塞。实现了异步
group.next().submit(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("ok");
});
log.debug("main");
输出
19:52:29 [DEBUG] [main] c.i.n.c.TestEventLoop - main
19:52:30 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestEventLoop - ok
可以用来执行耗时较长的任务,不会被阻塞。
演示 NioEventLoop 处理定时任务
// 1. 创建事件循环组
EventLoopGroup group = new NioEventLoopGroup(2); // io 事件,普通任务,定时任务
// 4. 执行定时任务
group.next().scheduleAtFixedRate(() -> {
log.debug("ok");
}, 0, 1, TimeUnit.SECONDS);
log.debug("main");
19:53:32 [DEBUG] [main] c.i.n.c.TestEventLoop - main
19:53:32 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestEventLoop - ok
19:53:33 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestEventLoop - ok
19:53:34 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestEventLoop - ok
可以用来执行定时任务。
组件-Channel
channel的主要方法
- close() 可以用来关闭 channel
- closeFuture() 用来处理 channel 的关闭
- sync 方法作用是同步等待 channel 关闭
- 而 addListener 方法是异步等待 channel 关闭
- pipeline() 方法添加处理器
- write() 方法将数据写入到缓冲区中,但没有将数据发出。
- writeAndFlush() 方法将数据写入缓存去,并立刻将缓冲区数据刷出。会立即将数据发出。
ChannelFuture
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new StringEncoder());
}
})
// 异步非阻塞的。异步:main线程发起调用,真正执行连接是另一个线程-Nio线程。如果没有sync,是无阻塞的,会直接向下执行。连接都还没建立好,信息不知道发送到哪里去。
.connect("127.0.0.1", 8080); // 1
channelFuture.sync().channel().writeAndFlush(new Date() + ": hello world!");
说明:
1处返回的是 ChannelFuture 对象,它的作用是利用 channel() 方法来获取 Channel 对象
注意:connect 方法是异步的,意味着不等连接建立,方法执行就返回了。因此 channelFuture 对象中不能【立刻】获得到正确的 Channel 对象。加了sync才能变成同步,一直阻塞直到建立连接。然后执行后面的才能正确发送数据。
实验如下:
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect("127.0.0.1", 8080);
System.out.println(channelFuture.channel()); // 1
channelFuture.sync(); // 2 会阻塞直到返回结果
System.out.println(channelFuture.channel()); // 3
- 执行到 1 时,连接未建立,打印
[id: 0x2e1884dd]
- 执行到 2 时,sync 方法是同步等待连接建立完成
- 执行到 3 时,连接肯定建立了,打印
[id: 0x2e1884dd, L:/127.0.0.1:57191 - R:/127.0.0.1:8080]
除了用 sync 方法可以让异步操作同步以外,还可以使用回调的方式:
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect("127.0.0.1", 8080);
System.out.println(channelFuture.channel()); // 1
channelFuture.addListener((ChannelFutureListener) future -> {
System.out.println(future.channel()); // 2 连接建立好了会调用该方法
});
// 或者用下面的方法
/*
// 使用 addListener(回调对象) 方法异步处理结果
channelFuture.addListener(new ChannelFutureListener() {
@Override
// 在 nio 线程连接建立好之后,会调用 operationComplete
public void operationComplete(ChannelFuture future) throws Exception {
Channel channel = future.channel();
log.debug("{}", channel);
channel.writeAndFlush("hello, world");
}
});
*/
核心:channelFuture.addListener()。注意带有future,promise的类型都可以和addListener方法配套使用的。
说明:这种方法不同于上面的同步的方法,同步的方法是有main线程完成打印的操作。而这里main线程直接将任务交给其他线程以异步的方式执行,当连接建立完成后自动完成打印操作。
CloseFuture
用法基本同上面的channelFuture。closeFuture是用来处理关闭之后的操作的,一般用来关闭EventLoopGroup。
@Slf4j
public class CloseFutureClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override // 在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));
Channel channel = channelFuture.sync().channel();
log.debug("{}", channel);
new Thread(()->{
Scanner scanner = new Scanner(System.in);
while (true) {
String line = scanner.nextLine();
if ("q".equals(line)) {
channel.close(); // close 异步操作 1s 之后
// log.debug("处理关闭之后的操作"); // 不能在这里善后
break;
}
channel.writeAndFlush(line);
}
}, "input").start();
// 获取 CloseFuture 对象, 1) 同步处理关闭, 2) 异步处理关闭
ChannelFuture closeFuture = channel.closeFuture();
// 1.同步关闭的方法
/*log.debug("waiting close...");
closeFuture.sync();
log.debug("处理关闭之后的操作");*/
// 2.异步处理的方法
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
log.debug("处理关闭之后的操作");
group.shutdownGracefully();
}
});
}
}
组件-Future&Promise
在异步处理的时候,经常用到这两个接口。
netty中的Future和JDK中的Future同名。Netty的Future是继承自JDK的Future。Promise是对Netty中的Future进行拓展(Extend)。
- JDK中的Future只能同步等待任务结束(或成功或失败)才能得到结果。
- netty Future可以同步等待任务结束得到结果,也可以异步(addListener)得到结果。但都要等任务结束
- netty Promise不仅有netty Future的功能,而且可以「主动创建」一个Promise,作为两个线程间传递结果的容器。
功能/名称 | jdk Future | netty Future | Promise |
---|---|---|---|
cancel | 取消任务 | - | - |
isCanceled | 任务是否取消 | - | - |
isDone | 任务是否完成,不能区分成功失败 | - | - |
get | 获取任务结果,阻塞等待 | - | - |
getNow | - | 获取任务结果,非阻塞,还未产生结果时返回 null | - |
await | - | 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 | - |
sync | - | 等待任务结束,如果任务失败,抛出异常 | - |
isSuccess | - | 判断任务是否成功 | - |
cause | - | 获取失败信息,非阻塞,如果没有失败,返回null | - |
addLinstener | - | 添加回调,异步接收结果 | - |
setSuccess | - | - | 设置成功结果 |
setFailure | - | - | 设置失败结果 |
JDK Future
只能通过同步等待任务结束,future.get()会阻塞,直到拿到结果。
// 1. 线程池
ExecutorService service = Executors.newFixedThreadPool(2);
// 2. 提交任务
Future<Integer> future = service.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.debug("执行计算");
Thread.sleep(1000);
return 50;
}
});
// 3. 主线程通过 future 来获取结果
log.debug("等待结果");
log.debug("结果是 {}", future.get());
22:29:50 [DEBUG] [pool-1-thread-1] c.i.n.c.TestJdkFuture - 执行计算
22:29:50 [DEBUG] [main] c.i.n.c.TestJdkFuture - 等待结果
22:29:51 [DEBUG] [main] c.i.n.c.TestJdkFuture - 结果是 50
netty Future
可以通过同步(get方法会阻塞)也可以通过异步(addListener不会阻塞,交给另外的线程处理)获得结果
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.debug("执行计算");
Thread.sleep(1000);
return 70;
}
});
// log.debug("等待结果");
// log.debug("结果是 {}", future.get()); // 1. 同步的方式
//2. 异步的方式
future.addListener(new GenericFutureListener<Future<? super Integer>>(){
@Override
public void operationComplete(Future<? super Integer> future) throws Exception {
log.debug("接收结果:{}", future.getNow());
}
});
上面异步的方式输出结果:
22:34:24 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestNettyFuture - 执行计算
22:34:25 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestNettyFuture - 接收结果:70
netty Promise(常用的)
Promise 继承自(extend)netty的Future。不仅有Future的功能,而且能主动创建,作为线程 间存放数据的容器
// 1. 准备 EventLoop 对象
EventLoop eventLoop = new NioEventLoopGroup().next();
// 2. 可以主动创建 promise, 结果容器
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
new Thread(() -> {
// 3. 任意一个线程执行计算,计算完毕后向 promise 填充结果
log.debug("开始计算...");
try {
int i = 1 / 0;
Thread.sleep(1000);
promise.setSuccess(80);
} catch (Exception e) {
e.printStackTrace();
promise.setFailure(e);
}
}).start();
// 4. 接收结果的线程
log.debug("等待结果...");
log.debug("结果是: {}", promise.get());
22:37:03 [DEBUG] [main] c.i.n.c.TestNettyPromise - 等待结果...
22:37:03 [DEBUG] [Thread-0] c.i.n.c.TestNettyPromise - 开始计算...
22:37:04 [DEBUG] [main] c.i.n.c.TestNettyPromise - 结果是: 80
后续netty一般都用Promise
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!