You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

151 lines
5.9 KiB
C#

using DS.Module.Core;
using DS.WMS.Core.TaskInteraction.Dtos;
using DS.WMS.Core.TaskInteraction.Entity;
using DS.WMS.Core.TaskInteraction.Interface;
using Fasterflect;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Quartz;
using SqlSugar;
namespace DS.WMS.Core.QuarztJobs
{
/// <summary>
/// 定时作业触发
/// </summary>
public sealed class BackgroundTaskJob : IJob
{
IServiceProvider serviceProvider;
ISqlSugarClient? db;
ILogger<BackgroundTaskJob> logger;
/// <summary>
/// 初始化
/// </summary>
/// <param name="serviceProvider"></param>
public BackgroundTaskJob(IServiceProvider serviceProvider)
{
this.serviceProvider = serviceProvider;
logger = serviceProvider.GetRequiredService<ILogger<BackgroundTaskJob>>();
db = serviceProvider.GetRequiredService<ISqlSugarClient>();
}
/// <summary>
/// 执行任务
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
public async Task Execute(IJobExecutionContext context)
{
var dbLinks = await db.Queryable<Module.SqlSugar.SysTenantLink>().ToListAsync();
SqlSugarClient? tenantDb = null;
try
{
foreach (var dbLink in dbLinks)
{
tenantDb = new SqlSugarClient(new ConnectionConfig
{
ConfigId = dbLink.Id,
ConnectionString = dbLink.Connection,
DbType = dbLink.DbType,
IsAutoCloseConnection = true
});
tenantDb.QueryFilter.Clear();
var jobs = await tenantDb.Queryable<TaskJob>()
.InnerJoin<BusinessTask>((x, y) => x.TaskId == y.Id)
.Where((x, y) => y.TaskStatus == TaskStatusEnum.Create && !x.IsExecuted && SqlFunc.GetDate() >= x.NextTriggerTime)
.Select((x, y) => new TaskJob
{
Id = x.Id,
TaskId = x.TaskId,
NextTriggerTime = x.NextTriggerTime,
TypeName = x.TypeName,
BusinessId = y.BusinessId,
BusinessType = y.BusinessType,
}).ToListAsync();
if (jobs.Count == 0)
return;
var jobContext = new TaskJobContext
{
AdditionalData = context.MergedJobDataMap!
};
foreach (var item in jobs)
{
var type = Type.GetType(item.TypeName, false);
if (type == null)
continue;
try
{
if (ConstructorExtensions.CreateInstance(type, serviceProvider) is ITaskJob job)
{
jobContext.JobInfo = item;
jobContext.Result = DataResult.Success;
await job.Execute(jobContext);
if (jobContext.Result.Succeeded)
{
item.IsExecuted = true;
item.NextTriggerTime = null;
}
}
}
catch (Exception ex)
{
item.LastException = $"执行作业({item.Id})时发生错误:" + ex.Message + Environment.NewLine + ex.StackTrace;
}
}
await tenantDb.Updateable(jobs).UpdateColumns(x => new
{
x.IsExecuted,
x.NextTriggerTime,
x.LastException
}).ExecuteCommandAsync();
var finishedJobs = jobs.FindAll(x => x.IsExecuted);
if (finishedJobs.Count > 0) //尝试更新关联任务
{
List<BusinessTask> tasks = [];
var ids = finishedJobs.Select(x => x.TaskId).Distinct();
var list = await tenantDb.Queryable<TaskJob>().Where(x => ids.Contains(x.TaskId))
.Select(x => new
{
x.Id,
x.TaskId,
x.IsExecuted
}).ToListAsync();
var groups = list.GroupBy(x => x.TaskId);
DateTime dtNow = DateTime.Now;
foreach (var group in groups)
{
if (group.Count() == group.Count(x => x.IsExecuted))
{
tasks.Add(new BusinessTask
{
Id = group.Key,
TaskStatus = TaskStatusEnum.Complete,
UpdateTime = dtNow
});
}
}
await tenantDb.Updateable(tasks).UpdateColumns(x => new
{
x.TaskStatus,
x.UpdateBy,
x.UpdateTime
}).ExecuteCommandAsync();
}
}
}
catch (Exception ex)
{
logger.LogError(ex, "执行定时作业时发生异常");
}
}
}
}