diff --git a/ds-wms-service/DS.Module.Core/DS.Module.Core.csproj b/ds-wms-service/DS.Module.Core/DS.Module.Core.csproj index 8516d2c1..b56094fd 100644 --- a/ds-wms-service/DS.Module.Core/DS.Module.Core.csproj +++ b/ds-wms-service/DS.Module.Core/DS.Module.Core.csproj @@ -22,6 +22,7 @@ + diff --git a/ds-wms-service/DS.Module.Core/Extensions/SharpZipLib.cs b/ds-wms-service/DS.Module.Core/Extensions/SharpZipLib.cs new file mode 100644 index 00000000..40977ff1 --- /dev/null +++ b/ds-wms-service/DS.Module.Core/Extensions/SharpZipLib.cs @@ -0,0 +1,56 @@ +using ICSharpCode.SharpZipLib.BZip2; +using Org.BouncyCastle.Utilities.Bzip2; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace DS.Module.Core.Extensions +{ + public class SharpZipLib + { + + /// + /// 数据压缩 + /// + /// + /// + public static string Compress(string input) + { + string result = string.Empty; + byte[] buffer = Encoding.UTF8.GetBytes(input); + using (MemoryStream outputStream = new MemoryStream()) + { + using (BZip2OutputStream zipStream = new BZip2OutputStream(outputStream)) + { + zipStream.Write(buffer, 0, buffer.Length); + zipStream.Close(); + } + return Convert.ToBase64String(outputStream.ToArray()); + } + } + /// + /// 数据解压缩 + /// + /// + /// + public static string Decompress(string input) + { + string result = string.Empty; + byte[] buffer = Convert.FromBase64String(input); + using (Stream inputStream = new MemoryStream(buffer)) + { + BZip2InputStream zipStream = new BZip2InputStream(inputStream); + + using (StreamReader reader = new StreamReader(zipStream, Encoding.UTF8)) + { + //输出 + result = reader.ReadToEnd(); + } + } + + return result; + } + } +} diff --git a/ds-wms-service/DS.Module.MQ/IRabbitMQService.cs b/ds-wms-service/DS.Module.MQ/IRabbitMQService.cs index 808c7d80..9e9d98e7 100644 --- a/ds-wms-service/DS.Module.MQ/IRabbitMQService.cs +++ b/ds-wms-service/DS.Module.MQ/IRabbitMQService.cs @@ -1,12 +1,24 @@ -using System; +using RabbitMQ.Client; +using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; +using DS.Module.Core; + namespace DS.Module.MQ { + /// + /// + /// public interface IRabbitMQService { + /// + /// 发送MQ报文 + /// + /// 请求参数 + /// true-发送成功 false-发送失败 + Task PublishMessage(MQPublishMessageReqDto request); } } diff --git a/ds-wms-service/DS.Module.MQ/MQPublishMessageReqDto.cs b/ds-wms-service/DS.Module.MQ/MQPublishMessageReqDto.cs new file mode 100644 index 00000000..6565e480 --- /dev/null +++ b/ds-wms-service/DS.Module.MQ/MQPublishMessageReqDto.cs @@ -0,0 +1,44 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace DS.Module.MQ +{ + /// + /// + /// + public class MQPublishMessageReqDto + { + /// + /// MQ URI + /// + public string mqUri { get; set; } + + /// + /// Exchange Name + /// + public string mqExchangeName { get; set; } + + /// + /// Queue Name + /// + public string mqQueueName { get; set; } + + /// + /// 业务主键 + /// + public string BusinessId { get; set; } + + /// + /// 请求MQ报文(JSON串行化) + /// + public string json { get; set; } + + /// + /// 是否默认压缩 + /// + public bool IsZip { get; set; } = false; + } +} diff --git a/ds-wms-service/DS.Module.MQ/RabbitMQService.cs b/ds-wms-service/DS.Module.MQ/RabbitMQService.cs index 506743ec..f37bf8dd 100644 --- a/ds-wms-service/DS.Module.MQ/RabbitMQService.cs +++ b/ds-wms-service/DS.Module.MQ/RabbitMQService.cs @@ -1,12 +1,87 @@ -using System; +using NLog; +using RabbitMQ.Client; +using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; +using DS.Module.Core.Extensions; +using static AnyDiff.DifferenceLines; +using DS.Module.Core; +using Microsoft.VisualBasic; namespace DS.Module.MQ { public class RabbitMQService: IRabbitMQService { + private static readonly Logger _logger = LogManager.GetCurrentClassLogger(); + + /// + /// 发送MQ报文 + /// + /// 请求参数 + /// true-发送成功 false-发送失败 + public async Task PublishMessage(MQPublishMessageReqDto request) + { + bool isException = false; + string msg = string.Empty; + + try + { + ConnectionFactory factory = new ConnectionFactory(); + factory.Uri = new Uri(request.mqUri); + + using (IConnection conn = factory.CreateConnection()) + { + + try + { + IModel mqModel = conn.CreateModel(); + mqModel.ExchangeDeclare(request.mqExchangeName, ExchangeType.Direct); + + //var queueName = $"{MqActionQueueName}.{(item.SubTenantId.HasValue && item.SubTenantId > 0 ? item.SubTenantId.Value : item.TenantId)}"; + mqModel.QueueDeclare(request.mqQueueName, false, false, false, null); + mqModel.QueueBind(request.mqQueueName, request.mqExchangeName, request.mqQueueName, null); + + + byte[] messageBodyBytes = Encoding.UTF8.GetBytes(request.IsZip ? SharpZipLib.Compress(request.json) : request.json); + + + IBasicProperties props = mqModel.CreateBasicProperties(); + props.DeliveryMode = 2; + mqModel.BasicPublish(request.mqExchangeName, request.mqQueueName, props, messageBodyBytes); + + + _logger.Info($"Id:{request.BusinessId},订舱数据回推,已发送数据到消息队列【{request.mqUri}】,队列名称:【{request.mqQueueName}】"); + + } + catch (Exception inne) + { + _logger.Error($"Id:{request.BusinessId},推送MQ时发生异常,原因:{inne.Message}"); + + msg = $"Id:{request.BusinessId},推送MQ时发生异常,原因:{inne.Message}"; + + isException = true; + } + finally { + conn.Close(); + } + } + } + catch (Exception ex) + { + _logger.Error($"Id:{request.BusinessId},启动推送MQ时发生异常,原因:{ex.Message}"); + + msg = $"Id:{request.BusinessId},启动推送MQ时发生异常,原因:{ex.Message}"; + + isException = true; + } + + if(isException) + return DataResult.Failed(msg); + + return DataResult.Successed("发送成功"); + } + } } diff --git a/ds-wms-service/DS.Module.Nuget/DS.Module.Nuget.csproj b/ds-wms-service/DS.Module.Nuget/DS.Module.Nuget.csproj index 867f7ab1..74af21a1 100644 --- a/ds-wms-service/DS.Module.Nuget/DS.Module.Nuget.csproj +++ b/ds-wms-service/DS.Module.Nuget/DS.Module.Nuget.csproj @@ -47,6 +47,7 @@ +