using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Myshipping.Core.Entity; using Myshipping.Core.Service; using Newtonsoft.Json.Linq; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using Yitter.IdGenerator; namespace Myshipping.Core.MQ { /// /// 接收公司、员工变动消息推送 /// public class RecCompanyUserChangeService : BackgroundService { private readonly IServiceScopeFactory _scopeFactory; private readonly IServiceScope _serviceScope; private readonly ILogger _logger; private readonly ISysTenantService _tenantService; private readonly ISysUserService _userService; private readonly SqlSugarRepository _sysUserRep; private readonly SqlSugarRepository _sysTenantRep; private IConnection mqConn; private IModel model; public RecCompanyUserChangeService(IServiceScopeFactory scopeFactory) { _scopeFactory = scopeFactory; //通过这个注入DBContext _serviceScope = _scopeFactory.CreateScope(); _logger = _serviceScope.ServiceProvider.GetService>(); _tenantService = _serviceScope.ServiceProvider.GetService(); _userService = _serviceScope.ServiceProvider.GetService(); _sysUserRep = _serviceScope.ServiceProvider.GetService>(); _sysTenantRep = _serviceScope.ServiceProvider.GetService>(); } public override void Dispose() { base.Dispose(); _serviceScope.Dispose(); mqConn.Close(); _logger.LogInformation("RecCompanyUserChangeService Dispose"); } protected override Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("RecCompanyUserChangeService ExecuteAsync"); return Task.Run(async () => { _logger.LogInformation("RecCompanyUserChangeService ExecuteAsync RunTask"); string ExchangeName = "djy.comp&user.change"; string QueueName = $"djy.comp&user.change.{DateTime.Now.Ticks}"; ConnectionFactory factory = new ConnectionFactory(); var cache = _serviceScope.ServiceProvider.GetService(); var sysConfig = await cache.GetAllSysConfig(); var mqUrl = sysConfig.First(x => x.Code == "CompanyUserChangeMqUrl").Value; factory.Uri = new Uri(mqUrl); mqConn = factory.CreateConnection("客户订舱平台"); model = mqConn.CreateModel(); model.ExchangeDeclare(ExchangeName, ExchangeType.Fanout); model.QueueDeclare(QueueName, false, false, true, null); model.QueueBind(QueueName, ExchangeName, "", null); var consumer = new EventingBasicConsumer(model); consumer.Received += Consumer_Received; model.BasicConsume(QueueName, true, consumer); }); } private void Consumer_Received(object ch, BasicDeliverEventArgs ea) { var body = ea.Body; var strBody = Encoding.UTF8.GetString(body.ToArray()); var jobj = JObject.Parse(strBody); var type = jobj.GetStringValue("type"); if (type == "CompanyAudit") //新公司审核通过 { var comp = jobj.GetJObjectValue("company"); var compid = comp.GetStringValue("CompId"); var compname = comp.GetStringValue("CompName"); var adminShowName = comp.GetStringValue("AdminShowName"); var findTenant = _sysTenantRep.FirstOrDefault(x => x.CompId == compid); //先根据关联ID查找 if (findTenant == null) { findTenant = _sysTenantRep.FirstOrDefault(x => x.Name == compname); //再根据公司全称查找 } if (findTenant == null) //找不到,新建 { var tenant = new SysTenant(); tenant.Id = YitIdHelper.NextId(); tenant.Name = compname; tenant.AdminName = adminShowName; tenant.Email = $"{tenant.Id}"; tenant.TenantType = TenantTypeEnum.COMMON; var newTenant = _sysTenantRep.InsertReturnEntity(tenant); _tenantService.InitNewTenant(tenant); } } else if (type == "UserJoin") //用户加入公司 { } else if (type == "UserLeave") //用户离职 { } else if (type == "CompanyUserSync") //公司及员工数据同步 { } _logger.LogInformation($"收到消息:{strBody}"); } } }