消费端并发和限流设置
2024-01-08 19:33:51
package com.java1234.consumer.service.impl;
import com.java1234.consumer.service.RabbitMqService;
import com.java1234.producer.config.RabbitMQConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import java.io.IOException;
import org.springframework.context.annotation.Bean;
@Service("rabbitmqService")
public class RabbitMqServiceImpl implements RabbitMqService {
@Autowired
private AmqpTemplate amqpTemplate;
@Autowired
CachingConnectionFactory cachingConnectionFactory;
@Bean(name="limitContainerFactory")
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(){
SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cachingConnectionFactory);
factory.setPrefetchCount(3);
return factory;
}
@Override
@RabbitListener(queues = {RabbitMQConfig.DIRECT_QUEUE},containerFactory = "limitContainerFactory")
public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
try{
System.out.println("接受到的mq消息:"+message);
//处理业务
System.out.println("处理业务");
System.out.println("deliveryTag="+deliveryTag);
if(deliveryTag==5){
channel.basicAck(deliveryTag,true);
}
}catch (Exception e){
e.printStackTrace();
try {
channel.basicNack(deliveryTag,false,true);
// channel.basicReject(deliveryTag,true);
// Thread.sleep(1000);
}catch (Exception e1){
e1.printStackTrace();
}
}
}
@Override
// @RabbitListener(queues = {RabbitMQConfig.DIRECT_QUEUE})
public void receiveMessage2(String message) {
// System.out.println("消费者1:接收到的mq消息:"+message);
System.out.println("队列1接收日志消息:"+message);
}
@Override
@RabbitListener(queues = {RabbitMQConfig.TOPIC_QUEUE2})
public void receiveMessage3(String message) {
// System.out.println("消费者2:接收到的mq消息:"+message);
System.out.println("队列2接收日志消息:"+message);
}
@Override
@RabbitListener(queues = {RabbitMQConfig.SUB_QUEUE1})
public void receiveSubMessage1(String message) {
System.out.println("订阅者1:接收到的mq消息:"+message);
}
@Override
@RabbitListener(queues = {RabbitMQConfig.SUB_QUEUE2})
public void receiveSubMessage2(String message) {
System.out.println("订阅者2:接收到的mq消息:"+message);
}
}
文章来源:https://blog.csdn.net/m0_68935893/article/details/135464195
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!