You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

88 lines
3.1 KiB
C#

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

using NLog;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using DS.Module.Core.Extensions;
using static AnyDiff.DifferenceLines;
using DS.Module.Core;
using Microsoft.VisualBasic;
namespace DS.Module.MQ
{
public class RabbitMQService: IRabbitMQService
{
private static readonly Logger _logger = LogManager.GetCurrentClassLogger();
/// <summary>
/// 发送MQ报文
/// </summary>
/// <param name="request">请求参数</param>
/// <returns>true-发送成功 false-发送失败</returns>
public async Task<DataResult> PublishMessage(MQPublishMessageReqDto request)
{
bool isException = false;
string msg = string.Empty;
try
{
ConnectionFactory factory = new ConnectionFactory();
factory.Uri = new Uri(request.mqUri);
using (IConnection conn = factory.CreateConnection())
{
try
{
IModel mqModel = conn.CreateModel();
mqModel.ExchangeDeclare(request.mqExchangeName, ExchangeType.Direct);
//var queueName = $"{MqActionQueueName}.{(item.SubTenantId.HasValue && item.SubTenantId > 0 ? item.SubTenantId.Value : item.TenantId)}";
mqModel.QueueDeclare(request.mqQueueName, false, false, false, null);
mqModel.QueueBind(request.mqQueueName, request.mqExchangeName, request.mqQueueName, null);
byte[] messageBodyBytes = Encoding.UTF8.GetBytes(request.IsZip ? SharpZipLib.Compress(request.json) : request.json);
IBasicProperties props = mqModel.CreateBasicProperties();
props.DeliveryMode = 2;
mqModel.BasicPublish(request.mqExchangeName, request.mqQueueName, props, messageBodyBytes);
_logger.Info($"Id:{request.BusinessId},订舱数据回推,已发送数据到消息队列【{request.mqUri}】,队列名称:【{request.mqQueueName}】");
}
catch (Exception inne)
{
_logger.Error($"Id:{request.BusinessId}推送MQ时发生异常原因:{inne.Message}");
msg = $"Id:{request.BusinessId}推送MQ时发生异常原因:{inne.Message}";
isException = true;
}
finally {
conn.Close();
}
}
}
catch (Exception ex)
{
_logger.Error($"Id:{request.BusinessId},启动推送MQ时发生异常原因:{ex.Message}");
msg = $"Id:{request.BusinessId},启动推送MQ时发生异常原因:{ex.Message}";
isException = true;
}
if(isException)
return DataResult.Failed(msg);
return DataResult.Successed("发送成功");
}
}
}