.NET6 RabbitMQ自动重连
2023-12-13 05:45:51
序言
? ? ? ? 在技术领域,不断学习和探索是保持竞争力的关键。最近,我重新开始了对RabbitMQ的研究,这个过程让我又收获了许多新的知识和技能,我觉得有必要记录下来,以便将来回顾和分享。
问题引出
? ? ? ? RabbitMQ是一款开源的消息队列中间件,它提供了高效、可靠、灵活的通信机制,广泛应用于分布式系统中。然而,有时候在使用RabbitMQ时会遇到连接断开的问题,这可能会导致消息传递中断和应用程序的不可用性。
问题分析
? ? ? ? 当使用RabbitMQ时,可能会遇到以下几种情况导致连接断开的问题:
1.网络问题:网络中断、防火墙设置等问题可能导致RabbitMQ连接断开。
2.长时间空闲:如果连接在一段时间内没有进行任何通信,RabbitMQ可能会自动关闭连接。
3.RabbitMQ服务器问题:RabbitMQ服务器可能会因为负载过高或其他原因主动关闭连接。
解决方案
? ? ? ? 虽然RabbitMQ.Client库,有心跳机制,有断线重连机制,但是在网络断开的时候,并不能重连。
? ? ? ? 下面这段代码经过本人验证有效,可以解决上面的问题。
using RabbitMQ.Client.Events;
using RabbitMQ.Client;
using System.Text;
using RabbitMQDemo.Shared;
using System.Collections.Concurrent;
using RabbitMQ.Client.Exceptions;
namespace RabbitMQConsumerDemo
{
public class RabbitMQRpcClientHandler
{
/// <summary>
/// 定义一个静态变量来保存类的实列
/// </summary>
private static RabbitMQRpcClientHandler? _uniqueInstance;
/// <summary>
/// 定义一个标识确保线程同步
/// </summary>
private static readonly object _locker = new();
/// <summary>
/// Main entry point to the RabbitMQ .NET AMQP client API. Constructs RabbitMQ.Client.IConnection instances.
/// </summary>
private static IConnectionFactory? _factory;
/// <summary>
/// Main interface to an AMQP connection.
/// </summary>
private IConnection? _connection;
/// <summary>
/// 发送通道(发送通道及接收通道分开,避免互相影响,导致整个服务不可用)
/// </summary>
private IModel? _sendChannel;
/// <summary>
/// 接收通道(发送通道及接收通道分开,避免互相影响,导致整个服务不可用)
/// </summary>
private IModel? _listenChannel;
/// <summary>
/// 监听消费者
/// </summary>
private EventingBasicConsumer? _listenConsumer;
/// <summary>
/// 响应消费者
/// </summary>
private EventingBasicConsumer? _replyConsumer;
/// <summary>
/// RabbitMQ 主机域名
/// </summary>
private readonly string _defaultRabbitMQHostName = "127.0.0.1";
/// <summary>
/// RabbitMQ 服务器端口, 默认 5672, 网页监控页面是 15672
/// </summary>
private readonly int _defaultRabbitMQPort = 5672;
/// <summary>
/// RabbitMQ 用户名, 默认 guest
/// </summary>
private readonly string _defaultRabbitMQUserName = "guest";
/// <summary>
/// RabbitMQ 密码, 默认 guest
/// </summary>
private readonly string _defaultRabbitMQPassword = "guest!";
/// <summary>
/// 虚拟主机
/// </summary>
private readonly string _defaultRabbitMQVirtualHost = "/";
/// <summary>
/// 交换机
/// </summary>
private readonly string _exchangeName = "";
/// <summary>
/// 数据监控队列
/// </summary>
private readonly string _listenQueueName = "queue.listen.test";
/// <summary>
/// 指令响应队列
/// </summary>
private string _replyQueueName = string.Empty;
/// <summary>
/// 注册-路由键
/// </summary>
private readonly string _routingKeyRegister = "queue.register";
/// <summary>
/// 心跳-路由键
/// </summary>
private readonly string _routingKeyHeart = "queue.heart";
/// <summary>
/// 取消信号
/// </summary>
private readonly CancellationTokenSource _cts = new();
/// <summary>
/// 回调函数映射器
/// </summary>
private readonly ConcurrentDictionary<string, TaskCompletionSource<string>> _callbackMapper = new();
private bool _connectionState;
private bool _sendChannelState;
private bool _listenChannelState;
/// <summary>
/// 连接状态
/// </summary>
public bool ConnectionState
{
get { return _connectionState; }
set
{
if (_connectionState == value)
{
return;
}
_connectionState = value;
if (_connectionState)
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 连接已打开");
}
else
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 连接已关闭");
}
}
}
/// <summary>
/// 发送通道状态
/// </summary>
public bool SendChannelState
{
get { return _sendChannelState; }
set
{
if (_sendChannelState == value)
{
return;
}
_sendChannelState = value;
if (_sendChannelState)
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 发送通道已打开");
}
else
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 发送通道已关闭");
}
}
}
/// <summary>
/// 接收通道状态
/// </summary>
public bool ListenChannelState
{
get { return _listenChannelState; }
set
{
if (_listenChannelState == value)
{
return;
}
_listenChannelState = value;
if (_listenChannelState)
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 接收通道已打开");
}
else
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 接收通道已关闭");
}
}
}
/// <summary>
/// 定义私有构造函数,使外界不能创建该类实例
/// </summary>
private RabbitMQRpcClientHandler()
{
// 创建连接工厂
_factory = new ConnectionFactory()
{
HostName = _defaultRabbitMQHostName,//MQ服务器地址
Port = _defaultRabbitMQPort,//MQ服务端口号
UserName = _defaultRabbitMQUserName,//账号
Password = _defaultRabbitMQPassword,//密码
VirtualHost = _defaultRabbitMQVirtualHost,
RequestedHeartbeat = TimeSpan.FromSeconds(2),
AutomaticRecoveryEnabled = true,//自动重连
TopologyRecoveryEnabled = true,//拓扑重连
NetworkRecoveryInterval = TimeSpan.FromSeconds(10)
};
}
/// <summary>
/// 定义公有方法提供一个全局访问点,同时你也可以定义公有属性来提供全局访问点
/// </summary>
/// <returns></returns>
public static RabbitMQRpcClientHandler GetInstance()
{
/* *********************
* 当第一个线程运行到这里时,此时会对_locker对象“加锁”
* 当第二个线程运行该方法时,首先检测到_locker对象为“加锁”状态,该线程就会挂起等待第一个线程解锁
* lock语句运行完之后(即线程运行完之后)会对该对象“解锁”
* 双重锁定只需要一句判断就可以了
* *********************/
if (_uniqueInstance == null)
{
lock (_locker)
{
// 如果类的实例不存在则创建,否则直接返回
_uniqueInstance ??= new RabbitMQRpcClientHandler();
}
}
return _uniqueInstance;
}
/// <summary>
/// 异步调用
/// </summary>
/// <param name="message"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <exception cref="Exception"></exception>
public Task<string> CallAsync(string message, EnumMsgType msgType = EnumMsgType.Register, CancellationToken cancellationToken = default)
{
if (_connection?.IsOpen != true)
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 连接为空或已经关闭(生产者)");
TaskCompletionSource<string> taskCompletionSource = new();
taskCompletionSource.TrySetResult(Newtonsoft.Json.JsonConvert.SerializeObject(new ReplyDataEntity()
{
Successed = false,
Message = "连接为空或已经关闭"
}, Newtonsoft.Json.Formatting.None));
return taskCompletionSource.Task;
}
if (_sendChannel?.IsOpen != true)
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 发送通道为空或已经关闭(生产者)");
TaskCompletionSource<string> taskCompletionSource = new();
taskCompletionSource.TrySetResult(Newtonsoft.Json.JsonConvert.SerializeObject(new ReplyDataEntity()
{
Successed = false,
Message = "发送通道为空或已经关闭"
}, Newtonsoft.Json.Formatting.None));
return taskCompletionSource.Task;
}
// 设置消息ID、类型、非持久性等
IBasicProperties props = _sendChannel.CreateBasicProperties();
var correlationId = Guid.NewGuid().ToString();
props.CorrelationId = correlationId;
props.ReplyTo = _replyQueueName;
props.ContentType = "application/json";
props.DeliveryMode = 1;//非持久性
var messageBytes = Encoding.UTF8.GetBytes(message);
var tcs = new TaskCompletionSource<string>();
_callbackMapper.TryAdd(correlationId, tcs);
switch (msgType)
{
case EnumMsgType.Register:
{
/* *********************
* 作用:向默认交换机的指定队列中发送注册消息
* 说明:生产者
* 参数:
* 1、exchange:交换机名称。如果不指定将使用RabbitMQ的默认交换机(设置为"")
* 2、routingKey:路由键。交换机根据路由键来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
* 3、basicProperties:消息的属性。
* 4、body:发送消息的内容
* *********************/
_sendChannel.BasicPublish(exchange: string.Empty, routingKey: _routingKeyRegister, basicProperties: props, body: messageBytes);
}
break;
default:
{
/* *********************
* 作用:向默认交换机的指定队列中发送心跳消息
* 说明:生产者
* 参数:
* 1、exchange:交换机名称。如果不指定将使用RabbitMQ的默认交换机(设置为"")
* 2、routingKey:路由键。交换机根据路由键来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
* 3、basicProperties:消息的属性。
* 4、body:发送消息的内容
* *********************/
_sendChannel.BasicPublish(exchange: string.Empty, routingKey: _routingKeyHeart, basicProperties: props, body: messageBytes);
}
break;
}
// 通知任务已经取消,处理取消后的回调操作
cancellationToken.Register(() => _callbackMapper.TryRemove(correlationId, out _));
// 请求超时检测
if (tcs.Task.Wait(TimeSpan.FromSeconds(10)) == false)
{
_callbackMapper.TryRemove(correlationId, out _);
tcs.TrySetResult(Newtonsoft.Json.JsonConvert.SerializeObject(new ReplyDataEntity()
{
Successed = false,
Message = $"{(msgType == EnumMsgType.Register ? "注册" : "心跳")}请求超时"
}, Newtonsoft.Json.Formatting.None));
}
return tcs.Task;
}
/// <summary>
/// 开始
/// </summary>
public void Start()
{
new Thread(new ThreadStart(Checking))
{
IsBackground = true
}.Start();
Reconnect();
}
/// <summary>
/// 停止
/// </summary>
public void Stop()
{
Cleanup();
}
/// <summary>
/// 取消状态监测
/// </summary>
public void CancelChecking()
{
_cts.Cancel();
}
/// <summary>
/// 状态监测
/// </summary>
private void Checking()
{
while (_cts.IsCancellationRequested == false)
{
Thread.Sleep(1000);
ConnectionState = _connection?.IsOpen == true;
if (_connection?.IsOpen != true)
{
SendChannelState = false;
ListenChannelState = false;
continue;
}
SendChannelState = _sendChannel?.IsOpen == true;
ListenChannelState = _listenChannel?.IsOpen == true;
}
}
/// <summary>
/// 连接
/// </summary>
private void Connect()
{
if (_factory == null)
{
return;
}
// 创建连接
_connection = _factory.CreateConnection();
_connection.ConnectionShutdown += Connection_ConnectionShutdown;
// 创建发送通道
_sendChannel = _connection.CreateModel();
// 创建接收通道
_listenChannel = _connection.CreateModel();
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 尝试连接至RabbitMQ服务器[{_defaultRabbitMQHostName}]");
// 监控消息
ListenMessageConsume();
// 响应消息
ReplyMessageConsume();
}
/// <summary>
/// 重连
/// </summary>
private void Reconnect()
{
Cleanup();
// state is initially false
var mres = new ManualResetEventSlim(false);
// loop until state is true, checking every 3s
while (!mres.Wait(3 * 1000))
{
try
{
// 连接RabbitMQ服务器
Connect();
// state set to true - breaks out of loop
mres.Set();
}
catch (Exception ex)
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 尝试连接RabbitMQ服务器出现错误【{ex.Message}】");
}
}
}
/// <summary>
/// 清理
/// </summary>
private void Cleanup()
{
if (_replyConsumer != null)
{
if (_replyConsumer.IsRunning)
{
try
{
_replyConsumer.OnCancel();
}
catch (Exception ex)
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: RabbitMQ重新连接,正在尝试关闭之前的响应消费者,不再执行任何操作,但遇到错误【{ex.Message}】");
}
}
_replyConsumer.Received -= ReplyConsumer_Received;
_replyConsumer.Registered -= ReplyConsumer_Registered;
_replyConsumer.Shutdown -= ReplyConsumer_Shutdown;
_replyConsumer.Unregistered -= ReplyConsumer_Unregistered;
_replyConsumer = null;
}
if (_listenConsumer != null)
{
if (_listenConsumer.IsRunning)
{
try
{
_listenConsumer.OnCancel();
}
catch (Exception ex)
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: RabbitMQ重新连接,正在尝试关闭之前的监听消费者,不再执行任何操作,但遇到错误【{ex.Message}】");
}
}
_listenConsumer.Received -= ListenConsumer_Received;
_listenConsumer.Registered -= ListenConsumer_Registered;
_listenConsumer.Shutdown -= ListenConsumer_Shutdown;
_listenConsumer.Unregistered -= ListenConsumer_Unregistered;
_listenConsumer = null;
}
if (_sendChannel != null)
{
if (_sendChannel.IsOpen)
{
try
{
_sendChannel.Close();
}
catch (Exception ex)
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: RabbitMQ重新连接,正在尝试关闭之前的发送通道,但遇到错误【{ex.Message}】");
}
}
_sendChannel.Dispose();
_sendChannel = null;
}
if (_listenChannel != null)
{
if (_listenChannel.IsOpen)
{
try
{
_listenChannel.Close();
}
catch (Exception ex)
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: RabbitMQ重新连接,正在尝试关闭之前的接收通道,但遇到错误【{ex.Message}】");
}
}
_listenChannel.Dispose();
_listenChannel = null;
}
if (_connection != null)
{
_connection.ConnectionShutdown -= Connection_ConnectionShutdown;
if (_connection.IsOpen)
{
try
{
_connection.Close();
}
catch (Exception ex)
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: RabbitMQ重新连接,正在尝试关闭之前的连接,但遇到错误【{ex.Message}】");
}
}
_connection.Dispose();
_connection = null;
}
}
/// <summary>
/// 响应消息消费
/// </summary>
private void ReplyMessageConsume()
{
try
{
if (_connection?.IsOpen != true)
{
throw new Exception($"{DateTime.Now:HH:mm:ss.fff}: 连接为空或已经关闭(响应消费者)");
}
if (_sendChannel?.IsOpen != true)
{
throw new Exception($"{DateTime.Now:HH:mm:ss.fff}: 发送通道为空或已经关闭(响应消费者)");
}
// 声明一个服务器命名的队列
_replyQueueName = _sendChannel.QueueDeclare().QueueName;
/* *********************
* 作用:定义消息分发机制
* 说明:Qos可以设置消费者一次接收消息的最大条数,能够解决消息拥堵时造成的消费者内存爆满问题。
* Qos也比较适用于耗时任务队列,当任务队列中的任务很多时,使用Qos后我们可以随时添加新的消费者来提高任务的处理效率。
* prefetchCount=1,RabbitMQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),再从队列中获取一个新的。
* 参数:
* 1、prefetchSize:是可接收消息的大小。但是似乎在客户端2.8.6版本中它必须为0,即使:不受限制。如果不输0,程序会在运行到这一行的时候报错,说还没有实现不为0的情况
* 2、prefetchCount:处理消息最大的数量。即在下一次发送应答消息前,客户端可以收到的消息最大数量
* 3、global:则设置了是不是针对整个Connection的,因为一个Connection可以有多个Channel,如果是false则说明只是针对于这个Channel的
* *********************/
_sendChannel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
/* *********************
* 作用:创建基于该队列的消费者
* 优点:
* 1、基于长连接
* 2、消费方式为发布订阅模式
* 3、节省资源且实时性好
* *********************/
_replyConsumer = new EventingBasicConsumer(_sendChannel);
_replyConsumer.Received += ReplyConsumer_Received;
_replyConsumer.Registered += ReplyConsumer_Registered;
_replyConsumer.Shutdown += ReplyConsumer_Shutdown;
_replyConsumer.Unregistered += ReplyConsumer_Unregistered;
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 消费者准备完毕(响应消费者)");
/* *********************
* 作用:监听队列(绑定消费者)
* 说明:消费者
* 参数:
* 1、queue:队列名称
* 2、autoAck:自动回复,当消费者接收到消息后要告诉RabbitMQ消息已接收,如果将此参数设置为true表示会自动回复RabbitMQ,如果设置为false要通过编程实现回复
* 3、consumer:消费方法,当消费者接收到消息要执行的方法
* *********************/
_sendChannel.BasicConsume(queue: _replyQueueName, autoAck: false, consumer: _replyConsumer);
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 开始监控RabbitMQ服务器,队列{_replyQueueName}(响应消费者)");
}
catch (AggregateException aex)
{
// 错误信息去重
var errorList = (from error in aex.InnerExceptions select error.Message).Distinct().ToList();
// 打印所有错误信息
foreach (var error in errorList)
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 总异常之【{error}】(响应消费者)");
}
}
catch (Exception ex)
{
throw ex;
}
}
/// <summary>
/// 监听消息消费
/// </summary>
private void ListenMessageConsume()
{
try
{
if (_connection?.IsOpen != true)
{
throw new Exception($"{DateTime.Now:HH:mm:ss.fff}: 连接为空或已经关闭(监听消费者)");
}
if (_listenChannel?.IsOpen != true)
{
throw new Exception($"{DateTime.Now:HH:mm:ss.fff}: 接收通道为空或已经关闭(监听消费者)");
}
/* *********************
* 作用:声明(创建)队列--RabbitMQ持久化机制(队列持久化)
* 说明:生产者、消费者都有
* 参数:
* 1、queue:队列名称。
* 2、durable:是否持久化。true持久化,队列会保存磁盘,服务器重启时可以保证不丢失相关信息
* 3、exclusive:是否独占队列。队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
* 4、autoDelete:是否自动删除。队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
* 5、arguments:其他参数。可以设置一个队列的扩展参数,比如:可设置存活时间
* *********************/
_listenChannel.QueueDeclare(queue: _listenQueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
/* *********************
* 作用:定义消息分发机制
* 说明:Qos可以设置消费者一次接收消息的最大条数,能够解决消息拥堵时造成的消费者内存爆满问题。
* Qos也比较适用于耗时任务队列,当任务队列中的任务很多时,使用Qos后我们可以随时添加新的消费者来提高任务的处理效率。
* prefetchCount=1,RabbitMQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),再从队列中获取一个新的。
* 参数:
* 1、prefetchSize:是可接收消息的大小。但是似乎在客户端2.8.6版本中它必须为0,即使:不受限制。如果不输0,程序会在运行到这一行的时候报错,说还没有实现不为0的情况
* 2、prefetchCount:处理消息最大的数量。即在下一次发送应答消息前,客户端可以收到的消息最大数量
* 3、global:则设置了是不是针对整个Connection的,因为一个Connection可以有多个Channel,如果是false则说明只是针对于这个Channel的
* *********************/
_listenChannel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
/* *********************
* 作用:创建基于该队列的消费者
* 优点:
* 1、基于长连接
* 2、消费方式为发布订阅模式
* 3、节省资源且实时性好
* *********************/
_listenConsumer = new EventingBasicConsumer(_listenChannel);
_listenConsumer.Received += ListenConsumer_Received;
_listenConsumer.Registered += ListenConsumer_Registered;
_listenConsumer.Shutdown += ListenConsumer_Shutdown;
_listenConsumer.Unregistered += ListenConsumer_Unregistered;
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 消费者准备完毕(监听消费者)");
/* *********************
* 作用:监听队列(绑定消费者)
* 说明:消费者
* 参数:
* 1、queue:队列名称
* 2、autoAck:自动回复,当消费者接收到消息后要告诉RabbitMQ消息已接收,如果将此参数设置为true表示会自动回复RabbitMQ,如果设置为false要通过编程实现回复
* 3、consumer:消费方法,当消费者接收到消息要执行的方法
* *********************/
_listenChannel.BasicConsume(queue: _listenQueueName, autoAck: false, consumer: _listenConsumer);
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 开始监控RabbitMQ服务器,队列{_listenQueueName}(监听消费者)");
}
catch (AggregateException aex)
{
// 错误信息去重
var errorList = (from error in aex.InnerExceptions select error.Message).Distinct().ToList();
// 打印所有错误信息
foreach (var error in errorList)
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 总异常之【{error}】(监听消费者)");
}
}
catch (Exception ex)
{
throw ex;
}
}
/// <summary>
/// 绑定事件:消息监控(响应消费者)
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void ReplyConsumer_Received(object? sender, BasicDeliverEventArgs e)
{
if (_sendChannel == null)
{
return;
}
/* *********************
* 作用:手动签收消息
* 说明:消费者
* 参数:
* 1、deliveryTag:消息投递标签
* 2、multiple:是否批量签收。设置为true,一次性签收所有,设置为false,只签收当前消息
* *********************/
_sendChannel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false);
if (!_callbackMapper.TryRemove(e.BasicProperties.CorrelationId, out var tcs))
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 响应消费者收到新消息【{Encoding.UTF8.GetString(e.Body.ToArray())}】");
return;
}
var body = e.Body.ToArray();
var response = Encoding.UTF8.GetString(body);
tcs.TrySetResult(response);
}
/// <summary>
/// 绑定事件:订阅成功(响应消费者)
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void ReplyConsumer_Registered(object? sender, ConsumerEventArgs e)
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 已经注册上队列(响应消费者)");
}
/// <summary>
/// 绑定事件:通道被关闭(响应消费者)
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void ReplyConsumer_Shutdown(object? sender, ShutdownEventArgs e)
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 队列已经关闭(响应消费者)");
}
/// <summary>
/// 绑定事件:取消订阅(响应消费者)
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void ReplyConsumer_Unregistered(object? sender, ConsumerEventArgs e)
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 已经取消注册队列(响应消费者)");
}
/// <summary>
/// 绑定事件:消息监控(监听消费者)
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void ListenConsumer_Received(object? sender, BasicDeliverEventArgs e)
{
if (_listenChannel == null)
{
return;
}
try
{
var body = e.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var props = e.BasicProperties;
if (props != null && string.IsNullOrWhiteSpace(props.CorrelationId) == false && string.IsNullOrWhiteSpace(props.ReplyTo) == false)
{
var replyProps = _listenChannel.CreateBasicProperties();
replyProps.CorrelationId = props.CorrelationId;
replyProps.Persistent = true;
var responseBytes = Encoding.UTF8.GetBytes(message);
/* *********************
* 作用:向指定的队列中发送消息--RabbitMQ持久化机制(消息持久化)
* 说明:生产者
* 参数:
* 1、exchange:交换机名称。如果不指定将使用RabbitMQ的默认交换机(设置为"")
* 2、routingKey:路由键。交换机根据路由键来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
* 3、basicProperties:消息的属性。
* 4、body:发送消息的内容
* *********************/
_listenChannel.BasicPublish(exchange: _exchangeName, routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes);
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 签收响应【{message}】(监听消费者)");
}
// TODO: 正常处理回复BasicAck,未正常处理回复BasicReject
/* *********************
* 作用:手动签收消息
* 说明:消费者
* 参数:
* 1、deliveryTag:消息投递标签
* 2、multiple:是否批量签收。设置为true,一次性签收所有,设置为false,只签收当前消息
* *********************/
_listenChannel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false);
// 未正常处理的消息,重新放回队列
//_listenChannel.BasicReject(e.DeliveryTag, true);
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 手动签收【{message}】(监听消费者)");
}
catch (OperationInterruptedException oiex)
{
/* *********************
* 作用:手动拒绝签收,返回消息到Broke
* 说明:消费者,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息
* 参数:
* 1、deliveryTag:当前消息的投递标签
* 2、multiple:是否批量签收。设置为true,一次性签收所有,设置为false,只签收当前消息
* 3、requeue:是否重回队列。设置为true,重回队列,设置为false,不重回
* *********************/
_listenChannel.BasicNack(deliveryTag: e.DeliveryTag, multiple: false, requeue: true);
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 手动拒绝签收【{oiex.Message}】,返回消息到Broke(监听消费者)");
}
catch (Exception ex)
{
/* *********************
* 作用:手动拒绝签收,返回消息到Broke
* 说明:消费者,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息
* 参数:
* 1、deliveryTag:当前消息的投递标签
* 2、multiple:是否批量签收。设置为true,一次性签收所有,设置为false,只签收当前消息
* 3、requeue:是否重回队列。设置为true,重回队列,设置为false,不重回
* *********************/
_listenChannel.BasicNack(deliveryTag: e.DeliveryTag, multiple: false, requeue: true);
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 手动拒绝签收【{ex.Message}】,返回消息到Broke(监听消费者)");
}
}
/// <summary>
/// 绑定事件:订阅成功(监听消费者)
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void ListenConsumer_Registered(object? sender, ConsumerEventArgs e)
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 已经注册上队列(监听消费者)");
}
/// <summary>
/// 绑定事件:通道被关闭(监听消费者)
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void ListenConsumer_Shutdown(object? sender, ShutdownEventArgs e)
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 队列已经关闭(监听消费者)");
}
/// <summary>
/// 绑定事件:取消订阅(监听消费者)
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void ListenConsumer_Unregistered(object? sender, ConsumerEventArgs e)
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 已经取消注册队列(监听消费者)");
}
/// <summary>
/// 绑定事件:断开连接时,调用方法自动重连
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void Connection_ConnectionShutdown(object? sender, ShutdownEventArgs e)
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 已经断开连接,正在尝试重新连接至RabbitMQ服务器");
Reconnect();
}
}
}
? ? ? ? 方法调用如下:
using RabbitMQConsumerDemo;
RabbitMQRpcClientHandler.GetInstance().Start();
while (true)
{
var readLine = Console.ReadLine();
if (string.IsNullOrWhiteSpace(readLine))
{
}
else if (readLine.Equals("exit", StringComparison.OrdinalIgnoreCase))
{
break;
}
else if (readLine.StartsWith("register=", StringComparison.OrdinalIgnoreCase))
{
var response = await RabbitMQRpcClientHandler.GetInstance().CallAsync(readLine, RabbitMQDemo.Shared.EnumMsgType.Register);
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 注册指令响应【{response}】");
}
else if (readLine.StartsWith("heart=", StringComparison.OrdinalIgnoreCase))
{
var response = await RabbitMQRpcClientHandler.GetInstance().CallAsync(readLine, RabbitMQDemo.Shared.EnumMsgType.Heart);
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 心跳指令响应【{response}】");
}
else
{
}
}
RabbitMQRpcClientHandler.GetInstance().Stop();
Thread.Sleep(2 * 1000);
// 退出程序
int count = 8;
while (true)
{
if (count == 0)
{
Console.WriteLine($"Exit in {count} seconds");
Thread.Sleep(1000);
break;
}
else if (count == 5)
{
RabbitMQRpcClientHandler.GetInstance().CancelChecking();
//RabbitMQClientHandler.GetInstance().CancelChecking();
}
Console.WriteLine($"Exit in {count} seconds");
Thread.Sleep(1000);
count--;
}
文章来源:https://blog.csdn.net/Small_bottle_cap/article/details/134850812
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!