From d9421a7f5561492fa3f13a2eb57c8e63cc57ed83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B5=87=E6=96=87=E9=BE=99?= Date: Fri, 15 Nov 2024 17:16:11 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9A=E6=97=B6=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../DS.Module.Quartz/QuartzModuleInstall.cs | 30 ++++--- .../Info/Method/ClientInfoService.cs | 4 +- .../QuarztJobs/BackgroundTaskJob.cs | 85 +++++++++++-------- .../TaskInteraction/Dtos/TaskJobContext.cs | 2 +- .../TaskInteraction/Entity/TaskJob.cs | 39 ++++++++- .../TaskInteraction/Interface/ITaskJob.cs | 2 +- .../TaskInteraction/Method/FeeBillTaskJob.cs | 13 --- .../Method/FeeBillTaskService.cs | 43 +++++----- .../Method/Jobs/BillSendingTaskJob.cs | 16 ++++ .../TaskInteraction/Method/TaskService.cs | 70 +++++++++++---- 10 files changed, 197 insertions(+), 107 deletions(-) delete mode 100644 ds-wms-service/DS.WMS.Core/TaskInteraction/Method/FeeBillTaskJob.cs create mode 100644 ds-wms-service/DS.WMS.Core/TaskInteraction/Method/Jobs/BillSendingTaskJob.cs diff --git a/ds-wms-service/DS.Module.Quartz/QuartzModuleInstall.cs b/ds-wms-service/DS.Module.Quartz/QuartzModuleInstall.cs index 6f5fc26e..8dfbe7ad 100644 --- a/ds-wms-service/DS.Module.Quartz/QuartzModuleInstall.cs +++ b/ds-wms-service/DS.Module.Quartz/QuartzModuleInstall.cs @@ -97,6 +97,22 @@ namespace DS.Module.QuartzModuleInstall jobCount++; } + var jobKey2 = new JobKey(nameof(BackgroundTaskJob)); + services.AddQuartz(q => + { + // 配置 Quartz + q.UseMicrosoftDependencyInjectionJobFactory(); + q.AddJob(opts => opts.WithIdentity(jobKey2)); + q.AddTrigger(opts => opts + .ForJob(jobKey2) + .WithIdentity(nameof(BackgroundTaskJob) + "-trigger") + .WithSimpleSchedule(x => x + .WithIntervalInMinutes(5) + .RepeatForever())); + }); + jobCount++; + + //var jobKey2 = new JobKey(nameof(WSLWeeklyReportJob)); //services.AddQuartz(q => //{ @@ -108,20 +124,6 @@ namespace DS.Module.QuartzModuleInstall // .WithCronSchedule(configuration["JobConfig:" + jobKey2.Name]) // ); //}); - //var jobKey2 = new JobKey(nameof(BackgroundTaskJob)); - //services.AddQuartz(q => - //{ - // // 配置 Quartz - // q.UseMicrosoftDependencyInjectionJobFactory(); - - // q.AddJob(opts => opts.WithIdentity(jobKey2)); - // q.AddTrigger(opts => opts - // .ForJob(jobKey2) - // .WithIdentity(nameof(BackgroundTaskJob) + "-trigger") - // .WithSimpleSchedule(x => x - // .WithIntervalInMinutes(2) - // .RepeatForever())); - //}); if (jobCount > 0) services.AddQuartzServer(q => q.WaitForJobsToComplete = true); diff --git a/ds-wms-service/DS.WMS.Core/Info/Method/ClientInfoService.cs b/ds-wms-service/DS.WMS.Core/Info/Method/ClientInfoService.cs index 6206b314..4e938377 100644 --- a/ds-wms-service/DS.WMS.Core/Info/Method/ClientInfoService.cs +++ b/ds-wms-service/DS.WMS.Core/Info/Method/ClientInfoService.cs @@ -733,12 +733,10 @@ public class ClientInfoService : ServiceBase, IClientInfoService updateable = updateable.IgnoreColumns(x => x.ShortName); await updateable.ExecuteCommandAsync(); + await TenantDb.Deleteable().Where(x => x.ClientId == req.Id).ExecuteCommandAsync(); if (data.Attributes?.Count > 0) - { - await TenantDb.Deleteable().Where(x => x.ClientId == req.Id).ExecuteCommandAsync(); await TenantDb.Insertable(data.Attributes).ExecuteCommandAsync(); - } } await TenantDb.Ado.CommitTranAsync(); diff --git a/ds-wms-service/DS.WMS.Core/QuarztJobs/BackgroundTaskJob.cs b/ds-wms-service/DS.WMS.Core/QuarztJobs/BackgroundTaskJob.cs index f040071c..1fdffe07 100644 --- a/ds-wms-service/DS.WMS.Core/QuarztJobs/BackgroundTaskJob.cs +++ b/ds-wms-service/DS.WMS.Core/QuarztJobs/BackgroundTaskJob.cs @@ -1,4 +1,5 @@ -using DS.Module.Core; +using System.Collections.Concurrent; +using DS.Module.Core; using DS.WMS.Core.TaskInteraction.Dtos; using DS.WMS.Core.TaskInteraction.Entity; using DS.WMS.Core.TaskInteraction.Interface; @@ -16,7 +17,7 @@ namespace DS.WMS.Core.QuarztJobs public sealed class BackgroundTaskJob : IJob { IServiceProvider serviceProvider; - ISqlSugarClient? db; + ISqlSugarClient db; ILogger logger; /// @@ -37,11 +38,13 @@ namespace DS.WMS.Core.QuarztJobs /// public async Task Execute(IJobExecutionContext context) { - var dbLinks = await db.Queryable().ToListAsync(); - SqlSugarClient? tenantDb = null; - try + var exceptions = new ConcurrentBag(); + var options = new ParallelOptions { MaxDegreeOfParallelism = 10 }; + var list = await db.Queryable().ToListAsync(); + await Parallel.ForEachAsync(list, options, async (dbLink, ct) => { - foreach (var dbLink in dbLinks) + SqlSugarClient? tenantDb = null; + try { tenantDb = new SqlSugarClient(new ConnectionConfig { @@ -53,7 +56,7 @@ namespace DS.WMS.Core.QuarztJobs tenantDb.QueryFilter.Clear(); var jobs = await tenantDb.Queryable() .InnerJoin((x, y) => x.TaskId == y.Id) - .Where((x, y) => y.TaskStatus == TaskStatusEnum.Create && !x.IsExecuted && SqlFunc.GetDate() >= x.NextTriggerTime) + .Where((x, y) => y.TaskStatus == TaskStatusEnum.Create && x.Status == TaskJobStatus.Ready) .Select((x, y) => new TaskJob { Id = x.Id, @@ -62,50 +65,53 @@ namespace DS.WMS.Core.QuarztJobs TypeName = x.TypeName, BusinessId = y.BusinessId, BusinessType = y.BusinessType, - }).ToListAsync(); + }).ToListAsync(ct); if (jobs.Count == 0) return; var jobContext = new TaskJobContext { - AdditionalData = context.MergedJobDataMap! + AdditionalData = context.MergedJobDataMap }; - foreach (var item in jobs) { var type = Type.GetType(item.TypeName, false); if (type == null) continue; - try + if (ConstructorExtensions.CreateInstance(type, serviceProvider) is ITaskJob job) { - if (ConstructorExtensions.CreateInstance(type, serviceProvider) is ITaskJob job) + jobContext.JobInfo = item; + jobContext.Result = DataResult.Success; + + try + { + await job.ExecuteAsync(jobContext); + } + catch (Exception ex) { - jobContext.JobInfo = item; - jobContext.Result = DataResult.Success; - await job.Execute(jobContext); + item.LastException = $"执行作业({item.Id})时发生错误:" + ex.Message + Environment.NewLine + ex.StackTrace; + jobContext.Result = DataResult.Failed(item.LastException); + exceptions.Add(new ApplicationException(item.LastException, ex)); + continue; + } - if (jobContext.Result.Succeeded) - { - item.IsExecuted = true; - item.NextTriggerTime = null; - } + if (jobContext.Result.Succeeded) + { + item.Status = TaskJobStatus.Completed; + item.NextTriggerTime = null; } } - catch (Exception ex) - { - item.LastException = $"执行作业({item.Id})时发生错误:" + ex.Message + Environment.NewLine + ex.StackTrace; - } } await tenantDb.Updateable(jobs).UpdateColumns(x => new { - x.IsExecuted, + x.Status, x.NextTriggerTime, x.LastException - }).ExecuteCommandAsync(); + }).ExecuteCommandAsync(ct); - var finishedJobs = jobs.FindAll(x => x.IsExecuted); + var finishedJobs = jobs.FindAll(x => x.Status == TaskJobStatus.Completed); if (finishedJobs.Count > 0) //尝试更新关联任务 { List tasks = []; @@ -115,13 +121,13 @@ namespace DS.WMS.Core.QuarztJobs { x.Id, x.TaskId, - x.IsExecuted - }).ToListAsync(); + x.Status + }).ToListAsync(ct); var groups = list.GroupBy(x => x.TaskId); DateTime dtNow = DateTime.Now; foreach (var group in groups) { - if (group.Count() == group.Count(x => x.IsExecuted)) + if (group.Count() == group.Count(x => x.Status == TaskJobStatus.Completed)) { tasks.Add(new BusinessTask { @@ -137,14 +143,21 @@ namespace DS.WMS.Core.QuarztJobs x.TaskStatus, x.UpdateBy, x.UpdateTime - }).ExecuteCommandAsync(); + }).ExecuteCommandAsync(ct); } } - } - catch (Exception ex) - { - logger.LogError(ex, "执行定时作业时发生异常"); - } + catch (Exception ex) + { + exceptions.Add(ex); + } + finally + { + tenantDb?.Dispose(); + } + }); + + if (!exceptions.IsEmpty) + throw new AggregateException(exceptions); } } } diff --git a/ds-wms-service/DS.WMS.Core/TaskInteraction/Dtos/TaskJobContext.cs b/ds-wms-service/DS.WMS.Core/TaskInteraction/Dtos/TaskJobContext.cs index fa1cf399..4fbd1864 100644 --- a/ds-wms-service/DS.WMS.Core/TaskInteraction/Dtos/TaskJobContext.cs +++ b/ds-wms-service/DS.WMS.Core/TaskInteraction/Dtos/TaskJobContext.cs @@ -16,7 +16,7 @@ namespace DS.WMS.Core.TaskInteraction.Dtos /// /// 任务参数 /// - public IDictionary? AdditionalData { get; set; } + public IDictionary? AdditionalData { get; set; } /// /// 执行结果 diff --git a/ds-wms-service/DS.WMS.Core/TaskInteraction/Entity/TaskJob.cs b/ds-wms-service/DS.WMS.Core/TaskInteraction/Entity/TaskJob.cs index b0ac722a..d4157a9e 100644 --- a/ds-wms-service/DS.WMS.Core/TaskInteraction/Entity/TaskJob.cs +++ b/ds-wms-service/DS.WMS.Core/TaskInteraction/Entity/TaskJob.cs @@ -1,4 +1,5 @@ -using DS.WMS.Core.Op.Entity; +using System.ComponentModel; +using DS.WMS.Core.Op.Entity; using SqlSugar; namespace DS.WMS.Core.TaskInteraction.Entity @@ -34,10 +35,10 @@ namespace DS.WMS.Core.TaskInteraction.Entity public DateTime? NextTriggerTime { get; set; } /// - /// 是否已执行 + /// 状态 /// - [SugarColumn(ColumnDescription = "是否已执行")] - public bool IsExecuted { get; set; } + [SugarColumn(ColumnDescription = "状态")] + public TaskJobStatus Status { get; set; } /// /// 上次执行异常信息 @@ -69,4 +70,34 @@ namespace DS.WMS.Core.TaskInteraction.Entity [SugarColumn(IsIgnore = true)] public BusinessType? BusinessType { get; set; } } + + /// + /// 定时作业状态 + /// + public enum TaskJobStatus + { + /// + /// 就绪 + /// + [Description("就绪")] + Ready = 0, + + /// + /// 挂起 + /// + [Description("挂起")] + Pending = 1, + + /// + /// 完成 + /// + [Description("完成")] + Completed = 2, + + /// + /// 取消 + /// + [Description("取消")] + Cancelled = -1, + } } diff --git a/ds-wms-service/DS.WMS.Core/TaskInteraction/Interface/ITaskJob.cs b/ds-wms-service/DS.WMS.Core/TaskInteraction/Interface/ITaskJob.cs index 0dd614ec..1caa988d 100644 --- a/ds-wms-service/DS.WMS.Core/TaskInteraction/Interface/ITaskJob.cs +++ b/ds-wms-service/DS.WMS.Core/TaskInteraction/Interface/ITaskJob.cs @@ -12,6 +12,6 @@ namespace DS.WMS.Core.TaskInteraction.Interface /// /// /// - Task Execute(TaskJobContext context); + Task ExecuteAsync(TaskJobContext context); } } diff --git a/ds-wms-service/DS.WMS.Core/TaskInteraction/Method/FeeBillTaskJob.cs b/ds-wms-service/DS.WMS.Core/TaskInteraction/Method/FeeBillTaskJob.cs deleted file mode 100644 index dcf86876..00000000 --- a/ds-wms-service/DS.WMS.Core/TaskInteraction/Method/FeeBillTaskJob.cs +++ /dev/null @@ -1,13 +0,0 @@ -using DS.WMS.Core.TaskInteraction.Dtos; -using DS.WMS.Core.TaskInteraction.Interface; - -namespace DS.WMS.Core.TaskInteraction.Method -{ - public class FeeBillTaskJob : ITaskJob - { - public async Task Execute(TaskJobContext context) - { - - } - } -} diff --git a/ds-wms-service/DS.WMS.Core/TaskInteraction/Method/FeeBillTaskService.cs b/ds-wms-service/DS.WMS.Core/TaskInteraction/Method/FeeBillTaskService.cs index d61a8954..06de87cf 100644 --- a/ds-wms-service/DS.WMS.Core/TaskInteraction/Method/FeeBillTaskService.cs +++ b/ds-wms-service/DS.WMS.Core/TaskInteraction/Method/FeeBillTaskService.cs @@ -6,6 +6,7 @@ using DS.WMS.Core.Op.Entity; using DS.WMS.Core.TaskInteraction.Dtos; using DS.WMS.Core.TaskInteraction.Entity; using DS.WMS.Core.TaskInteraction.Interface; +using DS.WMS.Core.TaskInteraction.Method.Jobs; using Masuit.Tools.Systems; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; @@ -34,6 +35,27 @@ namespace DS.WMS.Core.TaskInteraction.Method UserId = long.Parse(User.UserId); } + /// + /// 创建任务 + /// + /// + /// + /// + public override Task CreateTaskAsync(TaskCreationRequest request, bool useTransaction = true) + { + if (request.TaskType == TaskBaseTypeEnum.BILL_SENDING) + { + request.Jobs ??= []; + request.Jobs.Add(new TaskJob + { + TypeName = typeof(BillSendingTaskJob).FullName!, + NextTriggerTime = DateTime.Now.AddMinutes(60) + }); + } + + return base.CreateTaskAsync(request, useTransaction); + } + /// /// 批量创建 /// @@ -196,14 +218,8 @@ namespace DS.WMS.Core.TaskInteraction.Method BusinessId = biz.BusinessId, BusinessType = biz.BusinessType, TaskTypeName = TaskBaseTypeEnum.BILL_SENDING.ToString(), - TaskTitle = $"【{TaskBaseTypeEnum.BILL_SENDING.GetDescription()}】{biz.CustomerNo}", - Jobs = [] + TaskTitle = $"【{TaskBaseTypeEnum.BILL_SENDING.GetDescription()}】{biz.CustomerNo}" }; - req.Jobs.Add(new TaskJob - { - TypeName = typeof(FeeBillTaskJob).FullName!, - NextTriggerTime = dtNow.AddMinutes(60) - }); //生成账单任务 result = await CreateTaskAsync(req); if (!result.Succeeded) @@ -311,19 +327,6 @@ namespace DS.WMS.Core.TaskInteraction.Method } } - /// - /// 设置任务状态 - /// - /// - /// - /// - public override Task SetTaskStatusAsync(TaskUpdateRequest request, bool useTransaction = true) - { - - - return base.SetTaskStatusAsync(request, useTransaction); - } - //更新当前任务状态 private async Task UpdateTaskStatus(FlowCallback callback, BusinessFeeStatus business) { diff --git a/ds-wms-service/DS.WMS.Core/TaskInteraction/Method/Jobs/BillSendingTaskJob.cs b/ds-wms-service/DS.WMS.Core/TaskInteraction/Method/Jobs/BillSendingTaskJob.cs new file mode 100644 index 00000000..56e0658a --- /dev/null +++ b/ds-wms-service/DS.WMS.Core/TaskInteraction/Method/Jobs/BillSendingTaskJob.cs @@ -0,0 +1,16 @@ +using DS.WMS.Core.TaskInteraction.Dtos; +using DS.WMS.Core.TaskInteraction.Interface; + +namespace DS.WMS.Core.TaskInteraction.Method.Jobs +{ + /// + /// 账单发送定时任务 + /// + public class BillSendingTaskJob : ITaskJob + { + public async Task ExecuteAsync(TaskJobContext context) + { + + } + } +} diff --git a/ds-wms-service/DS.WMS.Core/TaskInteraction/Method/TaskService.cs b/ds-wms-service/DS.WMS.Core/TaskInteraction/Method/TaskService.cs index 4584104e..284a0cde 100644 --- a/ds-wms-service/DS.WMS.Core/TaskInteraction/Method/TaskService.cs +++ b/ds-wms-service/DS.WMS.Core/TaskInteraction/Method/TaskService.cs @@ -35,7 +35,8 @@ namespace DS.WMS.Core.TaskInteraction.Method /// public class TaskService : ServiceBase, ITaskService, IAuditTaskService { - const long PERMISSION_ID = 1815294400855674880; + internal const long PERMISSION_ID = 1815294400855674880; + internal const string BillSendingInterval = "BillSendingInterval"; /// /// 获取支持审核的任务类型 @@ -291,7 +292,7 @@ namespace DS.WMS.Core.TaskInteraction.Method await TenantDb.Insertable(task).ExecuteCommandAsync(); - if (request.Steps?.Count > 0) + if (request.Steps?.Count > 0) //保存任务步骤 { var steps = request.Steps.Select(x => new TaskStep { @@ -306,12 +307,13 @@ namespace DS.WMS.Core.TaskInteraction.Method await TenantDb.Insertable(steps).ExecuteCommandAsync(); } - if (request.Jobs?.Count > 0) + + if (request.Jobs?.Count > 0) //保存定时任务 { foreach (var item in request.Jobs) { item.TaskId = task.Id; - //item.NextTriggerTime ??= dtNow.AddMinutes(30); + item.NextTriggerTime ??= dtNow.AddMinutes(60); } await TenantDb.Insertable(request.Jobs).ExecuteCommandAsync(); } @@ -430,10 +432,7 @@ namespace DS.WMS.Core.TaskInteraction.Method /// /// /// - protected virtual Task OnTaskCreated(BusinessTask task) - { - return Task.FromResult(DataResult.Success); - } + protected virtual Task OnTaskCreated(BusinessTask task) => Task.FromResult(DataResult.Success); /// /// 设置任务状态 @@ -467,6 +466,7 @@ namespace DS.WMS.Core.TaskInteraction.Method MultiLanguageConst.GetDescription(nameof(MultiLanguageConst.UnfinishedItems)) + ":" + string.Join("|", steps.Select(x => x.Type == StepType.NotSpecified ? x.Name : x.Type.GetDescription())), nameof(MultiLanguageConst.UnfinishedItems)); + request.ExtraData = task; //触发任务状态变更通知 if (task.TaskStatus != request.TaskStatus) await OnTaskStatusChanged(request); @@ -658,19 +658,59 @@ namespace DS.WMS.Core.TaskInteraction.Method } } - //public async Task SetJobsAsync(List jobs) - //{ - - //} - /// /// 当任务状态发生变化时调用 /// /// /// - protected virtual Task OnTaskStatusChanged(TaskUpdateRequest request) + protected virtual async Task OnTaskStatusChanged(TaskUpdateRequest request) { - return Task.CompletedTask; + var task = request.ExtraData as BusinessTask; + if (task == null) + return; + + //待执行的定时作业状态随主任务变化 + TaskJobStatus jobStatus = default; + switch (request.TaskStatus) + { + case TaskStatusEnum.Complete: + jobStatus = TaskJobStatus.Completed; + break; + case TaskStatusEnum.Cancel: + jobStatus = TaskJobStatus.Cancelled; + break; + case TaskStatusEnum.Pending: + jobStatus = TaskJobStatus.Pending; + break; + case TaskStatusEnum.Create: + jobStatus = TaskJobStatus.Ready; + break; + } + + var value = await Db.Queryable().Where(x => x.Code == BillSendingInterval).Select(x => x.Value).FirstAsync(); + if (!int.TryParse(value, out int interval)) + interval = 60; + + var list = await TenantDb.Queryable().Where(x => x.TaskId == task.Id && x.Status != TaskJobStatus.Completed) + .Select(x => new TaskJob + { + Id = x.Id, + Status = jobStatus, + NextTriggerTime = x.NextTriggerTime + }).ToListAsync(); + + if (jobStatus == TaskJobStatus.Ready) //重新设置下次触发时间 + foreach (var item in list) + { + item.NextTriggerTime = item.NextTriggerTime.HasValue ? + item.NextTriggerTime.Value.AddMinutes(interval) : DateTime.Now.AddMinutes(interval); + } + + await TenantDb.Updateable(list).UpdateColumns(x => new + { + x.Status, + x.NextTriggerTime + }).ExecuteCommandAsync(); } ///