using Furion.EventBus;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Myshipping.Core.Entity;
using Newtonsoft.Json.Linq;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Myshipping.Core.MQ
{
///
/// 接收公司、员工变动消息推送
///
public class RecCompanyUserChangeService : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly IServiceScope _serviceScope;
private readonly ILogger _logger;
private readonly IEventPublisher _publisher;
private IConnection mqConn;
private IModel model;
public RecCompanyUserChangeService(IServiceScopeFactory scopeFactory)
{
_scopeFactory = scopeFactory;
//通过这个注入DBContext
_serviceScope = _scopeFactory.CreateScope();
_logger = _serviceScope.ServiceProvider.GetService>();
_publisher = _serviceScope.ServiceProvider.GetService();
}
public override void Dispose()
{
base.Dispose();
_serviceScope.Dispose();
if (mqConn != null && mqConn.IsOpen)
mqConn.Close();
_logger.LogInformation("RecCompanyUserChangeService Dispose");
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("RecCompanyUserChangeService ExecuteAsync");
return Task.Run(() =>
{
_logger.LogInformation("RecCompanyUserChangeService ExecuteAsync RunTask");
string ExchangeName = "djy.comp&user.change";
string QueueName = $"djy.comp&user.change.{DateTime.Now.Ticks}";
ConnectionFactory factory = new ConnectionFactory();
var repoSysCfg = _serviceScope.ServiceProvider.GetService>();
var mqUrl = repoSysCfg.FirstOrDefault(x => x.Code == "CompanyUserChangeMqUrl").Value;
_logger.LogInformation($"准备连接公司用户同步消息队列:{mqUrl}");
factory.Uri = new Uri(mqUrl);
mqConn = factory.CreateConnection("客户订舱平台客户端");
model = mqConn.CreateModel();
model.ExchangeDeclare(ExchangeName, ExchangeType.Fanout);
model.QueueDeclare(QueueName, false, false, true, null);
model.QueueBind(QueueName, ExchangeName, "", null);
var consumer = new EventingBasicConsumer(model);
consumer.Received += Consumer_Received;
model.BasicConsume(QueueName, true, consumer);
});
}
private async void Consumer_Received(object ch, BasicDeliverEventArgs ea)
{
var body = ea.Body;
var strBody = Encoding.UTF8.GetString(body.ToArray());
var jobj = JObject.Parse(strBody);
var type = jobj.GetStringValue("type");
if (type == "UserLeave") //用户离职
{
await _publisher.PublishAsync(new ChannelEventSource($"CompanyUserSync:UserLeave", strBody));
}
else if (type == "CompanyUserSync") //公司及员工数据同步
{
await _publisher.PublishAsync(new ChannelEventSource($"CompanyUserSync:CompanyUser", strBody));
}
_logger.LogInformation($"收到消息:{strBody}");
}
}
}