using Furion; using Furion.DatabaseAccessor; using Furion.DependencyInjection; using Furion.DistributedIDGenerator; using Furion.FriendlyException; using Furion.JsonSerialization; using Furion.RemoteRequest.Extensions; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using ServiceProjectSyncWin; using SqlSugar; using System.Text; using static System.Net.Mime.MediaTypeNames; Serve.RunGeneric(additional: services => { services.AddRemoteRequest(); services.AddSqlsugarSetup(App.Configuration); }, true, true); Console.WriteLine("开始准备同步历史服务状态数据"); var service1 = App.GetService(); service1.SyncServiceProjectRecord(); Console.ReadKey(); public interface ISyncHisRecord { void SyncServiceProjectRecord(); } public class SyncHisRecord: ISyncHisRecord,ITransient { SqlSugar.ISqlSugarClient _db; private readonly ILogger _logger; public SyncHisRecord(ISqlSugarClient db, ILogger logger) { _db = db; _logger = logger; } public void SyncServiceProjectRecord() { int totalSyncNum = 0; int succSyncNum = 0; long maxId = 0; //349708986646597/中集世联达领鲜物流科技(山东)有限公司 long tenantId = 349708986646597; string batchNo = Guid.NewGuid().ToString(); _logger.LogInformation("批次={no} 触发同步货物状态 tenantId={tenantId}", batchNo, tenantId); try { var tenantInfo = _db.Queryable().First(a => a.Id == tenantId); /* 1、按批次读取服务状态数据。(去掉没有完成时间的、并且订舱数据是已删除的记录) 2、写入历史记录表。 3、单票生成触发报文,并推送状态。 4、更新历史记录,标记同步状态 */ maxId = _db.Queryable().Max(a => a.ORG_STATUS_ID); _logger.LogInformation("批次={no} 获取最后同步服务状态的ID={maxId}", batchNo, maxId); int takeNum = 1000; while (true) { var takeList = _db.Queryable() .InnerJoin((gs, cfg) => gs.ConfigId == cfg.Id) .InnerJoin((gs, cfg, bk) => gs.bookingId == bk.Id) .Where((gs, cfg, bk) => gs.FinishTime.HasValue && gs.TenantId == tenantId && !bk.IsDeleted && gs.FinishUserId != 142307070910551 && (maxId == 0 || gs.Id > maxId)) .OrderBy((gs, cfg, bk) => gs.Id) .Select((gs, cfg, bk) => new { GS = gs, CFG = cfg, BK = bk }) .Take(takeNum).ToList(); totalSyncNum += takeList.Count; Console.WriteLine($"批次={batchNo} 同步待处理任务 totalSyncNum={totalSyncNum}"); _logger.LogInformation("批次={no} 同步待处理任务 totalSyncNum={totalSyncNum}", batchNo, totalSyncNum); //没有记录跳出循环 if (takeList.Count == 0) { Console.WriteLine($"没有记录跳出循环 total={succSyncNum}"); break; } maxId = takeList.Max(a => a.GS.Id); _logger.LogInformation("批次={no} 获取最后同步服务状态的 maxId={maxId}", batchNo, maxId); //写入记录表 takeList.ForEach(async record => { var entity = new ServiceStatusBookingSyncHisInfo { PK_ID = Guid.NewGuid().ToString(), ORG_STATUS_ID = record.GS.Id, BOOKING_ID = record.BK.Id, FINISH_TIME = record.GS.FinishTime.Value, FINISH_USER_ID = record.GS.FinishUserId.Value, FINISH_USER_NAME = record.GS.FinishUser, MBL_NO = record.BK.MBLNO, SORT_NO = record.CFG.Sort, STATUS_SKU_CODE = record.CFG.SystemCode, STATUS_SKU_NAME = record.CFG.StatusName, STATUS_REMARK = record.GS.Remark, STATUS_VAL = record.GS.ExtData, TENANT_ID = record.GS.TenantId.Value, TENANT_NAME = tenantInfo.Name, VESSEL = record.BK.VESSEL, VOYNO = record.BK.VOYNO, }; _db.Insertable(entity).ExecuteCommand(); //await _serviceStatusBookingSyncHisInfoRepository.InsertAsync(entity); succSyncNum++; Console.WriteLine($"ORG_STATUS_ID={record.GS.Id} 写入成功 total={succSyncNum}"); Thread.Sleep(300); }); Console.WriteLine($"等待500毫秒"); Thread.Sleep(500); } //停用的状态(接受委托、放箱指令、已发账单、账单确认、账单已回传) string[] deletedStatusCodeArg = new string[] { "JSWTUO", "FXZLING", "YFZD", "ZDQR", "ZDYHC" }; /* 单票触发推送状态 1、取状态是null的,并且每次取前100个订舱记录。 2、按订舱记录取所有的状态记录。 3、生成触发脚本推送PUSH状态。(部分状态自动不执行) 4、执行成功后,更新对应的状态。 */ Console.WriteLine($"开始推送记录 succ={succSyncNum}"); while (true) { var bookingList = _db.Queryable() .Where(a => a.TENANT_ID == tenantId && a.STATUS == null) .Take(100).Select(a => a.BOOKING_ID).Distinct().ToList(); //无数据跳出 if (bookingList.Count == 0) { Console.WriteLine($"没有待推送记录跳出循环 succ={succSyncNum}"); break; } var taskList = _db.Queryable() .Where(a => a.TENANT_ID == tenantId && bookingList.Contains(a.BOOKING_ID)).ToList(); taskList.GroupBy(a => a.BOOKING_ID).ToList().ForEach(a => { DateTime currDate = DateTime.Now; var currStatusList = a.ToList(); var groupCheckList = currStatusList .Where(b => !deletedStatusCodeArg.Contains(b.STATUS_SKU_CODE)) .GroupBy(b => b.STATUS_SKU_CODE).Select(b => { var currList = b.ToList(); if (currList.Count == 1) return currList.FirstOrDefault(); return currList.OrderByDescending(c => c.ORG_STATUS_ID).FirstOrDefault(); }).ToList(); if (groupCheckList.Count > 0) { var firstInfo = groupCheckList.FirstOrDefault(); TrackingMessageInfo msgInfo = new TrackingMessageInfo { Head = new TrackingMessageHeadInfo { GID = IDGen.NextID().ToString(), MessageType = "PROJECT", ReceiverId = "ServiceProjectStatus", ReceiverName = "服务项目和状态", SenderId = "BookingOrder", SenderName = "海运订舱", RequestDate = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"), Version = "2.0", RequestAction = "AddOrModify", }, Main = new TrackingMessageMainInfo { BusiId = firstInfo.BOOKING_ID.ToString(), BusiSystemCode = "BOOKING_ORDER", MBlNo = firstInfo.MBL_NO, VesselVoyno = $"{firstInfo.VESSEL}/{firstInfo.VOYNO}", OrderNo = "", PushType = TrackingPushTypeEnum.Status, OperTenantId = firstInfo.TENANT_ID, OperTenantName = firstInfo.TENANT_NAME, OpertType = TrackingOperTypeEnum.AUTO, OperUserId = firstInfo.FINISH_USER_ID.ToString(), OperUserName = firstInfo.FINISH_USER_NAME, SourceType = TrackingSourceTypeEnum.AUTO, StatusList = groupCheckList.Select(a => new TrackingMessageMainStatusInfo { StatusCode = a.STATUS_SKU_CODE, StatusDate = a.FINISH_TIME, StatusVal = a.STATUS_VAL, Remark = a.STATUS_REMARK }).ToList() } }; Console.WriteLine($"准备PUSH状态 msg={JSON.Serialize(msgInfo.Main)}"); //推送状态 var pushRlt = PushStatus(msgInfo).GetAwaiter().GetResult(); Console.WriteLine($"PUSH返回结果 rlt={JSON.Serialize(pushRlt)}"); groupCheckList.ForEach(async t => { t.STATUS = pushRlt.succ ? "SUCC" : "FAILURE"; t.SYNC_TIME = currDate; t.SYNC_RESULT = pushRlt.msg; if (deletedStatusCodeArg.Contains(t.STATUS_SKU_CODE)) { t.STATUS = "FAILURE"; t.SYNC_RESULT = "状态已取消不再同步"; } await _db.Updateable(t) .UpdateColumns(it => new { it.STATUS, it.SYNC_TIME, it.SYNC_RESULT }).ExecuteCommandAsync(); }); var undoList = currStatusList.GroupJoin(groupCheckList, l => l.ORG_STATUS_ID, r => r.ORG_STATUS_ID, (l, r) => { var currList = r.ToList(); if (currList.Count == 0) return new { IsUpdate = true, Obj = l }; return new { IsUpdate = false, Obj = l }; }).Where(c => c.IsUpdate) .Select(c => c.Obj).ToList(); if (undoList.Count > 0) { undoList.ForEach(async t => { t.STATUS = "FAILURE"; t.SYNC_RESULT = "状态已取最后触发记录"; t.SYNC_TIME = currDate; if (deletedStatusCodeArg.Contains(t.STATUS_SKU_CODE)) { t.SYNC_RESULT = "状态已取消不再同步"; } await _db.Updateable(t) .UpdateColumns(it => new { it.STATUS, it.SYNC_TIME, it.SYNC_RESULT }).ExecuteCommandAsync(); }); } Console.WriteLine($"更新表结束"); } else { Console.WriteLine($"没有可用记录"); } Thread.Sleep(300); }); Thread.Sleep(500); } } catch(Exception ex) { _logger.LogInformation("批次={no} 同步异常ex={ex}", batchNo, maxId); } } public async Task PushStatus(TrackingMessageInfo info) { TaskManageOrderResultDto model = null; /* 1、读取配置文件中的规则引擎URL 2、填充请求的类,并生成JSON报文 3、POST请求接口,并记录回执。 4、返回信息。 */ var url = App.Configuration["ServiceStatusPushUrl"]; try { var res = await url.SetHttpMethod(HttpMethod.Post) .SetBody(JSON.Serialize(info), "application/json") .SetContentEncoding(Encoding.UTF8) .PostAsync(); _logger.LogInformation("批次={no} 对应请求报文完成 res={res}", info.Head.GID, JSON.Serialize(res)); if (res.StatusCode == System.Net.HttpStatusCode.OK) { var userResult = await res.Content.ReadAsStringAsync(); var cmRlt = JSON.Deserialize(userResult); if(cmRlt.success) model = JSON.Deserialize(JSON.Serialize(cmRlt.data)); } } catch (Exception ex) { //写日志 if (ex is HttpRequestException) throw Oops.Oh(10000002); } return model; } }