RabbitMQ手动应答
2023-12-17 22:57:55
1.SleepUtil线程睡眠工具类
package com.hong.utils;
/**
* @Description: 线程睡眠工具类
* @Author: hong
* @Date: 2023-12-16 23:10
* @Version: 1.0
**/
public class SleepUtil {
public static void sleep(int second) {
try {
Thread.sleep(1000*second);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
2.消息生产者
package com.hong.rabbitmq3;
import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import java.util.Scanner;
/**
* @Description: 消息手动应答时不丢失,放回队列重新消费
* @Author: hong
* @Date: 2023-12-16 22:33
* @Version: 1.0
**/
public class Task3 {
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQUtil.getChannel();
channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
Scanner scanner = new Scanner(System.in);
System.out.println("请输入:");
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes("UTF-8"));
System.out.println("消息发送完成------" + message);
}
}
}
3.两个消费者
模拟一个处理速度快(Worker3),另一个处理速度慢(Worker4)
3.1.处理时间短
package com.hong.rabbitmq3;
import com.hong.utils.RabbitMQUtil;
import com.hong.utils.SleepUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
/**
* @Description: 消息手动应答时不丢失,放回队列重新消费
* @Author: hong
* @Date: 2023-12-16 23:05
* @Version: 1.0
**/
public class Worker3 {
private static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQUtil.getChannel();
System.out.println("worker3等待接收消息,处理速度快");
DeliverCallback deliverCallback = (comsumerTag, message) -> {
SleepUtil.sleep(1);
System.out.println("接收到的消息:"+ new String(message.getBody(),"UTF-8"));
//手动应答
/**
* 第一个参数:消息标识
* 第二个参数是否批量:true批量
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback = var -> System.out.println(var + "消息消费被中断!");
channel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,cancelCallback);
}
}
3.2.处理时间长
package com.hong.rabbitmq3;
import com.hong.utils.RabbitMQUtil;
import com.hong.utils.SleepUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
/**
* @Description: 消息手动应答时不丢失, 放回队列重新消费
* @Author: hong
* @Date: 2023-12-16 23:05
* @Version: 1.0
**/
public class Worker4 {
private static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQUtil.getChannel();
System.out.println("worker4等待接收消息,处理速度慢");
DeliverCallback deliverCallback = (comsumerTag, message) -> {
SleepUtil.sleep(20);
System.out.println("接收到的消息:"+ new String(message.getBody(),"UTF-8"));
//手动应答
/**
* 第一个参数:消息标识
* 第二个参数是否批量:true批量
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback = var -> System.out.println(var + "消息消费被中断!");
channel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,cancelCallback);
}
}
4.结果
启动生产者后启动2个消费者,等消息bb接收到后,发送cc和dd
等Worker4接收到消息bb后将其关闭,发现原本该Worker4消费的消息dd并未丢失,重回队列被Worker3消费
文章来源:https://blog.csdn.net/qq_41596346/article/details/135050974
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!