定时任务

dev
嵇文龙 2 weeks ago
parent 4646e3b539
commit d9421a7f55

@ -97,6 +97,22 @@ namespace DS.Module.QuartzModuleInstall
jobCount++;
}
var jobKey2 = new JobKey(nameof(BackgroundTaskJob));
services.AddQuartz(q =>
{
// 配置 Quartz
q.UseMicrosoftDependencyInjectionJobFactory();
q.AddJob<BackgroundTaskJob>(opts => opts.WithIdentity(jobKey2));
q.AddTrigger(opts => opts
.ForJob(jobKey2)
.WithIdentity(nameof(BackgroundTaskJob) + "-trigger")
.WithSimpleSchedule(x => x
.WithIntervalInMinutes(5)
.RepeatForever()));
});
jobCount++;
//var jobKey2 = new JobKey(nameof(WSLWeeklyReportJob));
//services.AddQuartz(q =>
//{
@ -108,20 +124,6 @@ namespace DS.Module.QuartzModuleInstall
// .WithCronSchedule(configuration["JobConfig:" + jobKey2.Name])
// );
//});
//var jobKey2 = new JobKey(nameof(BackgroundTaskJob));
//services.AddQuartz(q =>
//{
// // 配置 Quartz
// q.UseMicrosoftDependencyInjectionJobFactory();
// q.AddJob<BackgroundTaskJob>(opts => opts.WithIdentity(jobKey2));
// q.AddTrigger(opts => opts
// .ForJob(jobKey2)
// .WithIdentity(nameof(BackgroundTaskJob) + "-trigger")
// .WithSimpleSchedule(x => x
// .WithIntervalInMinutes(2)
// .RepeatForever()));
//});
if (jobCount > 0)
services.AddQuartzServer(q => q.WaitForJobsToComplete = true);

@ -733,13 +733,11 @@ public class ClientInfoService : ServiceBase, IClientInfoService
updateable = updateable.IgnoreColumns(x => x.ShortName);
await updateable.ExecuteCommandAsync();
await TenantDb.Deleteable<InfoClientAttribute>().Where(x => x.ClientId == req.Id).ExecuteCommandAsync();
if (data.Attributes?.Count > 0)
{
await TenantDb.Deleteable<InfoClientAttribute>().Where(x => x.ClientId == req.Id).ExecuteCommandAsync();
await TenantDb.Insertable(data.Attributes).ExecuteCommandAsync();
}
}
await TenantDb.Ado.CommitTranAsync();
return DataResult.Successed("提交成功", req.Id, MultiLanguageConst.DataUpdateSuccess);

@ -1,4 +1,5 @@
using DS.Module.Core;
using System.Collections.Concurrent;
using DS.Module.Core;
using DS.WMS.Core.TaskInteraction.Dtos;
using DS.WMS.Core.TaskInteraction.Entity;
using DS.WMS.Core.TaskInteraction.Interface;
@ -16,7 +17,7 @@ namespace DS.WMS.Core.QuarztJobs
public sealed class BackgroundTaskJob : IJob
{
IServiceProvider serviceProvider;
ISqlSugarClient? db;
ISqlSugarClient db;
ILogger<BackgroundTaskJob> logger;
/// <summary>
@ -37,11 +38,13 @@ namespace DS.WMS.Core.QuarztJobs
/// <returns></returns>
public async Task Execute(IJobExecutionContext context)
{
var dbLinks = await db.Queryable<Module.SqlSugar.SysTenantLink>().ToListAsync();
var exceptions = new ConcurrentBag<Exception>();
var options = new ParallelOptions { MaxDegreeOfParallelism = 10 };
var list = await db.Queryable<Module.SqlSugar.SysTenantLink>().ToListAsync();
await Parallel.ForEachAsync(list, options, async (dbLink, ct) =>
{
SqlSugarClient? tenantDb = null;
try
{
foreach (var dbLink in dbLinks)
{
tenantDb = new SqlSugarClient(new ConnectionConfig
{
@ -53,7 +56,7 @@ namespace DS.WMS.Core.QuarztJobs
tenantDb.QueryFilter.Clear();
var jobs = await tenantDb.Queryable<TaskJob>()
.InnerJoin<BusinessTask>((x, y) => x.TaskId == y.Id)
.Where((x, y) => y.TaskStatus == TaskStatusEnum.Create && !x.IsExecuted && SqlFunc.GetDate() >= x.NextTriggerTime)
.Where((x, y) => y.TaskStatus == TaskStatusEnum.Create && x.Status == TaskJobStatus.Ready)
.Select((x, y) => new TaskJob
{
Id = x.Id,
@ -62,50 +65,53 @@ namespace DS.WMS.Core.QuarztJobs
TypeName = x.TypeName,
BusinessId = y.BusinessId,
BusinessType = y.BusinessType,
}).ToListAsync();
}).ToListAsync(ct);
if (jobs.Count == 0)
return;
var jobContext = new TaskJobContext
{
AdditionalData = context.MergedJobDataMap!
AdditionalData = context.MergedJobDataMap
};
foreach (var item in jobs)
{
var type = Type.GetType(item.TypeName, false);
if (type == null)
continue;
try
{
if (ConstructorExtensions.CreateInstance(type, serviceProvider) is ITaskJob job)
{
jobContext.JobInfo = item;
jobContext.Result = DataResult.Success;
await job.Execute(jobContext);
if (jobContext.Result.Succeeded)
try
{
item.IsExecuted = true;
item.NextTriggerTime = null;
}
}
await job.ExecuteAsync(jobContext);
}
catch (Exception ex)
{
item.LastException = $"执行作业({item.Id})时发生错误:" + ex.Message + Environment.NewLine + ex.StackTrace;
jobContext.Result = DataResult.Failed(item.LastException);
exceptions.Add(new ApplicationException(item.LastException, ex));
continue;
}
if (jobContext.Result.Succeeded)
{
item.Status = TaskJobStatus.Completed;
item.NextTriggerTime = null;
}
}
}
await tenantDb.Updateable(jobs).UpdateColumns(x => new
{
x.IsExecuted,
x.Status,
x.NextTriggerTime,
x.LastException
}).ExecuteCommandAsync();
}).ExecuteCommandAsync(ct);
var finishedJobs = jobs.FindAll(x => x.IsExecuted);
var finishedJobs = jobs.FindAll(x => x.Status == TaskJobStatus.Completed);
if (finishedJobs.Count > 0) //尝试更新关联任务
{
List<BusinessTask> tasks = [];
@ -115,13 +121,13 @@ namespace DS.WMS.Core.QuarztJobs
{
x.Id,
x.TaskId,
x.IsExecuted
}).ToListAsync();
x.Status
}).ToListAsync(ct);
var groups = list.GroupBy(x => x.TaskId);
DateTime dtNow = DateTime.Now;
foreach (var group in groups)
{
if (group.Count() == group.Count(x => x.IsExecuted))
if (group.Count() == group.Count(x => x.Status == TaskJobStatus.Completed))
{
tasks.Add(new BusinessTask
{
@ -137,14 +143,21 @@ namespace DS.WMS.Core.QuarztJobs
x.TaskStatus,
x.UpdateBy,
x.UpdateTime
}).ExecuteCommandAsync();
}
}).ExecuteCommandAsync(ct);
}
}
catch (Exception ex)
{
logger.LogError(ex, "执行定时作业时发生异常");
exceptions.Add(ex);
}
finally
{
tenantDb?.Dispose();
}
});
if (!exceptions.IsEmpty)
throw new AggregateException(exceptions);
}
}
}

@ -16,7 +16,7 @@ namespace DS.WMS.Core.TaskInteraction.Dtos
/// <summary>
/// 任务参数
/// </summary>
public IDictionary<string, object?>? AdditionalData { get; set; }
public IDictionary<string, object>? AdditionalData { get; set; }
/// <summary>
/// 执行结果

@ -1,4 +1,5 @@
using DS.WMS.Core.Op.Entity;
using System.ComponentModel;
using DS.WMS.Core.Op.Entity;
using SqlSugar;
namespace DS.WMS.Core.TaskInteraction.Entity
@ -34,10 +35,10 @@ namespace DS.WMS.Core.TaskInteraction.Entity
public DateTime? NextTriggerTime { get; set; }
/// <summary>
/// 是否已执行
/// 状态
/// </summary>
[SugarColumn(ColumnDescription = "是否已执行")]
public bool IsExecuted { get; set; }
[SugarColumn(ColumnDescription = "状态")]
public TaskJobStatus Status { get; set; }
/// <summary>
/// 上次执行异常信息
@ -69,4 +70,34 @@ namespace DS.WMS.Core.TaskInteraction.Entity
[SugarColumn(IsIgnore = true)]
public BusinessType? BusinessType { get; set; }
}
/// <summary>
/// 定时作业状态
/// </summary>
public enum TaskJobStatus
{
/// <summary>
/// 就绪
/// </summary>
[Description("就绪")]
Ready = 0,
/// <summary>
/// 挂起
/// </summary>
[Description("挂起")]
Pending = 1,
/// <summary>
/// 完成
/// </summary>
[Description("完成")]
Completed = 2,
/// <summary>
/// 取消
/// </summary>
[Description("取消")]
Cancelled = -1,
}
}

@ -12,6 +12,6 @@ namespace DS.WMS.Core.TaskInteraction.Interface
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
Task Execute(TaskJobContext context);
Task ExecuteAsync(TaskJobContext context);
}
}

@ -1,13 +0,0 @@
using DS.WMS.Core.TaskInteraction.Dtos;
using DS.WMS.Core.TaskInteraction.Interface;
namespace DS.WMS.Core.TaskInteraction.Method
{
public class FeeBillTaskJob : ITaskJob
{
public async Task Execute(TaskJobContext context)
{
}
}
}

@ -6,6 +6,7 @@ using DS.WMS.Core.Op.Entity;
using DS.WMS.Core.TaskInteraction.Dtos;
using DS.WMS.Core.TaskInteraction.Entity;
using DS.WMS.Core.TaskInteraction.Interface;
using DS.WMS.Core.TaskInteraction.Method.Jobs;
using Masuit.Tools.Systems;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
@ -34,6 +35,27 @@ namespace DS.WMS.Core.TaskInteraction.Method
UserId = long.Parse(User.UserId);
}
/// <summary>
/// 创建任务
/// </summary>
/// <param name="request"></param>
/// <param name="useTransaction"></param>
/// <returns></returns>
public override Task<DataResult> CreateTaskAsync(TaskCreationRequest request, bool useTransaction = true)
{
if (request.TaskType == TaskBaseTypeEnum.BILL_SENDING)
{
request.Jobs ??= [];
request.Jobs.Add(new TaskJob
{
TypeName = typeof(BillSendingTaskJob).FullName!,
NextTriggerTime = DateTime.Now.AddMinutes(60)
});
}
return base.CreateTaskAsync(request, useTransaction);
}
/// <summary>
/// 批量创建
/// </summary>
@ -196,14 +218,8 @@ namespace DS.WMS.Core.TaskInteraction.Method
BusinessId = biz.BusinessId,
BusinessType = biz.BusinessType,
TaskTypeName = TaskBaseTypeEnum.BILL_SENDING.ToString(),
TaskTitle = $"【{TaskBaseTypeEnum.BILL_SENDING.GetDescription()}】{biz.CustomerNo}",
Jobs = []
TaskTitle = $"【{TaskBaseTypeEnum.BILL_SENDING.GetDescription()}】{biz.CustomerNo}"
};
req.Jobs.Add(new TaskJob
{
TypeName = typeof(FeeBillTaskJob).FullName!,
NextTriggerTime = dtNow.AddMinutes(60)
});
//生成账单任务
result = await CreateTaskAsync(req);
if (!result.Succeeded)
@ -311,19 +327,6 @@ namespace DS.WMS.Core.TaskInteraction.Method
}
}
/// <summary>
/// 设置任务状态
/// </summary>
/// <param name="request"></param>
/// <param name="useTransaction"></param>
/// <returns></returns>
public override Task<DataResult> SetTaskStatusAsync(TaskUpdateRequest request, bool useTransaction = true)
{
return base.SetTaskStatusAsync(request, useTransaction);
}
//更新当前任务状态
private async Task UpdateTaskStatus(FlowCallback callback, BusinessFeeStatus business)
{

@ -0,0 +1,16 @@
using DS.WMS.Core.TaskInteraction.Dtos;
using DS.WMS.Core.TaskInteraction.Interface;
namespace DS.WMS.Core.TaskInteraction.Method.Jobs
{
/// <summary>
/// 账单发送定时任务
/// </summary>
public class BillSendingTaskJob : ITaskJob
{
public async Task ExecuteAsync(TaskJobContext context)
{
}
}
}

@ -35,7 +35,8 @@ namespace DS.WMS.Core.TaskInteraction.Method
/// </summary>
public class TaskService : ServiceBase, ITaskService, IAuditTaskService
{
const long PERMISSION_ID = 1815294400855674880;
internal const long PERMISSION_ID = 1815294400855674880;
internal const string BillSendingInterval = "BillSendingInterval";
/// <summary>
/// 获取支持审核的任务类型
@ -291,7 +292,7 @@ namespace DS.WMS.Core.TaskInteraction.Method
await TenantDb.Insertable(task).ExecuteCommandAsync();
if (request.Steps?.Count > 0)
if (request.Steps?.Count > 0) //保存任务步骤
{
var steps = request.Steps.Select(x => new TaskStep
{
@ -306,12 +307,13 @@ namespace DS.WMS.Core.TaskInteraction.Method
await TenantDb.Insertable(steps).ExecuteCommandAsync();
}
if (request.Jobs?.Count > 0)
if (request.Jobs?.Count > 0) //保存定时任务
{
foreach (var item in request.Jobs)
{
item.TaskId = task.Id;
//item.NextTriggerTime ??= dtNow.AddMinutes(30);
item.NextTriggerTime ??= dtNow.AddMinutes(60);
}
await TenantDb.Insertable(request.Jobs).ExecuteCommandAsync();
}
@ -430,10 +432,7 @@ namespace DS.WMS.Core.TaskInteraction.Method
/// </summary>
/// <param name="task"></param>
/// <returns></returns>
protected virtual Task<DataResult> OnTaskCreated(BusinessTask task)
{
return Task.FromResult(DataResult.Success);
}
protected virtual Task<DataResult> OnTaskCreated(BusinessTask task) => Task.FromResult(DataResult.Success);
/// <summary>
/// 设置任务状态
@ -467,6 +466,7 @@ namespace DS.WMS.Core.TaskInteraction.Method
MultiLanguageConst.GetDescription(nameof(MultiLanguageConst.UnfinishedItems)) + "" + string.Join("|", steps.Select(x => x.Type == StepType.NotSpecified ? x.Name : x.Type.GetDescription())),
nameof(MultiLanguageConst.UnfinishedItems));
request.ExtraData = task;
//触发任务状态变更通知
if (task.TaskStatus != request.TaskStatus)
await OnTaskStatusChanged(request);
@ -658,19 +658,59 @@ namespace DS.WMS.Core.TaskInteraction.Method
}
}
//public async Task<DataResult> SetJobsAsync(List<TaskJob> jobs)
//{
//}
/// <summary>
/// 当任务状态发生变化时调用
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
protected virtual Task OnTaskStatusChanged(TaskUpdateRequest request)
protected virtual async Task OnTaskStatusChanged(TaskUpdateRequest request)
{
return Task.CompletedTask;
var task = request.ExtraData as BusinessTask;
if (task == null)
return;
//待执行的定时作业状态随主任务变化
TaskJobStatus jobStatus = default;
switch (request.TaskStatus)
{
case TaskStatusEnum.Complete:
jobStatus = TaskJobStatus.Completed;
break;
case TaskStatusEnum.Cancel:
jobStatus = TaskJobStatus.Cancelled;
break;
case TaskStatusEnum.Pending:
jobStatus = TaskJobStatus.Pending;
break;
case TaskStatusEnum.Create:
jobStatus = TaskJobStatus.Ready;
break;
}
var value = await Db.Queryable<SysConfig>().Where(x => x.Code == BillSendingInterval).Select(x => x.Value).FirstAsync();
if (!int.TryParse(value, out int interval))
interval = 60;
var list = await TenantDb.Queryable<TaskJob>().Where(x => x.TaskId == task.Id && x.Status != TaskJobStatus.Completed)
.Select(x => new TaskJob
{
Id = x.Id,
Status = jobStatus,
NextTriggerTime = x.NextTriggerTime
}).ToListAsync();
if (jobStatus == TaskJobStatus.Ready) //重新设置下次触发时间
foreach (var item in list)
{
item.NextTriggerTime = item.NextTriggerTime.HasValue ?
item.NextTriggerTime.Value.AddMinutes(interval) : DateTime.Now.AddMinutes(interval);
}
await TenantDb.Updateable(list).UpdateColumns(x => new
{
x.Status,
x.NextTriggerTime
}).ExecuteCommandAsync();
}
/// <summary>

Loading…
Cancel
Save