You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

511 lines
24 KiB
C#

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

using DS.Module.Core;
using DS.Module.Core.Condition;
using DS.Module.Core.Data;
using DS.WMS.Core.TaskPlat.Entity;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using SqlSugar;
using System.Diagnostics;
using System.Reflection;
namespace DS.WMS.Core.TaskPlat
{
/// <summary>
/// 任务编排执行器
/// </summary>
public class TaskFlowRuner
{
private readonly SqlSugarScopeProvider tenantDb = null!;
private readonly IServiceProvider serviceProvider = null!;
private readonly ILogger<TaskFlowRuner> logger = null!;
private TaskFlowRuner() { }
/// <summary>
///
/// </summary>
/// <param name="tenantDb"></param>
/// <param name="serviceProvider"></param>
public TaskFlowRuner(SqlSugarScopeProvider tenantDb, IServiceProvider serviceProvider)
{
this.tenantDb = tenantDb;
this.serviceProvider = serviceProvider;
this.logger = serviceProvider.GetRequiredService<ILogger<TaskFlowRuner>>();
}
/// <summary>
/// 执行
/// </summary>
/// <param name="taskBaseType">任务类型</param>
/// <param name="taskId">任务Id</param>
/// <param name="dataContext">起始入参数据上下文</param>
/// <param name="taskFlowLogId">日志主表主键</param>
/// <returns>(执行日志Id,模块是否全部执行完成,模块执行结果是否全部为success)</returns>
public async Task<(long? flowLogId, bool isAllComplete, bool isAllSuccess)> Run(TaskBaseTypeEnum taskBaseType, long taskId, TaskFlowDataContext dataContext, long? taskFlowLogId = null)
{
if (dataContext == null)
{
throw new ArgumentNullException(nameof(dataContext));
}
var allConfigList = await tenantDb.Queryable<TaskFlowConfig>()
.Where(t => t.IsMain && t.TaskType == taskBaseType.ToString())
.OrderBy(t => t.Id)
.ToListAsync();
if (allConfigList.Count == 0)
{
string msg = $"未找到任何用于【{taskBaseType}】业务的处理流程";
TaskFlowLog log = new()
{
Id = taskFlowLogId ?? SnowFlakeSingle.Instance.NextId(),
IsMatch = false,
TaskId = taskId,
TaskType = taskBaseType.ToString(),
IsComplete = false,
ExceptionMessage = msg,
IsSuccess = false,
};
await tenantDb.Insertable(log).ExecuteCommandAsync();
logger.LogInformation(msg);
return (null, false, false);
}
var allConfigIdList = allConfigList.Select(x => x.Id);
var allConditionList = await tenantDb.Queryable<TaskFlowCondition>()
.Where(x => allConfigIdList.Contains(x.ConfigId) && x.Type == 1)
.ToListAsync();
// 判断每项流程达成的条件数量
Dictionary<long, int> configMatchCount = new(allConditionList.Count);
foreach (var configItem in allConfigList)
{
var conditionItem = allConditionList.FirstOrDefault(x => x.ConfigId == configItem.Id);
if (conditionItem == null || string.IsNullOrEmpty(conditionItem.Content))
{
configMatchCount.Add(configItem.Id, 0);
continue;
}
else
{
var conditionContent = JsonConvert.DeserializeObject<ConditionContent>(conditionItem.Content)!;
if (ConditionHelper.IsPass(conditionContent, dataContext))
{
var conditionCount = ConditionHelper.GetConditionCount(conditionContent);
configMatchCount.Add(configItem.Id, conditionCount);
}
}
}
// 取出最匹配的流程
TaskFlowConfig? targetConfig = null;
if (configMatchCount.Count == 1)
{
targetConfig = allConfigList.First(x => x.Id == configMatchCount.First().Key);
}
else if (configMatchCount.Count > 1)
{
var maxMatchNum = configMatchCount.OrderByDescending(x => x.Value).First().Value; // 取匹配条件数最多的一条
var temp = configMatchCount.Where(x => x.Value == maxMatchNum).OrderBy(x => x.Key).First().Key;
targetConfig = allConfigList.First(x => x.Id == temp);
}
if (targetConfig == null)
{
throw new Exception("targetConfig目标流程意外为null"); // 不会走到这里,至少会有一条流程匹配到
}
// 待执行的流程(节点列表)
var configList = await tenantDb.Queryable<TaskFlowConfig>().Where(x => x.MainConfigId == targetConfig.Id).ToListAsync();
var configIdList = configList.Select(x => x.Id).ToList();
// 待注入的参数列表
var paramList = await tenantDb.Queryable<TaskFlowParam>().Where(x => configIdList.Contains(x.ConfigId)).ToListAsync();
// 待执行的模块列表
var moduleIdList = configList.Select(x => x.ExecuteModuleId).ToList();
var moduleList = await tenantDb.Queryable<TaskFlowModule>().Where(x => moduleIdList.Contains(x.Id) && x.ModuleType == 1).ToListAsync();
// 待判断的条件列表
var conditionList = await tenantDb.Queryable<TaskFlowCondition>().Where(x => configIdList.Contains(x.ConfigId) && x.Type == 2).ToListAsync();
TaskFlowLog flowLog = new()
{
Id = taskFlowLogId ?? SnowFlakeSingle.Instance.NextId(),
IsMatch = true,
TaskId = taskId,
TaskType = taskBaseType.ToString(),
MatchMainConfigId = targetConfig.Id,
ConfigList = JsonConvert.SerializeObject(configList),
ModuleList = JsonConvert.SerializeObject(moduleList),
IsComplete = true,
IsSuccess = true,
};
// 执行流程
List<TaskFlowConfig> waitMatchConfigList = new([targetConfig]);
for (int i = 0; i < configList.Count; i++)
{
TaskFlowLogDetail flowLogDetail = new()
{
PId = flowLog.Id,
IsSuccess = true,
};
TaskFlowConfig? executeConfig = null;
try
{
if (waitMatchConfigList == null || waitMatchConfigList.Count == 0)
{
break;
}
// 对节点列表里面的节点进行依次判断,取出要执行的节点
Dictionary<TaskFlowConfig, int> matchedConfigList = new();
foreach (var waitMatchConfigItem in waitMatchConfigList)
{
var condition = conditionList.FirstOrDefault(x => x.ConfigId == waitMatchConfigItem.Id);
if (condition == null || string.IsNullOrEmpty(condition.Content))
{
matchedConfigList.Add(waitMatchConfigItem, 0);
continue;
}
var conditionContent = JsonConvert.DeserializeObject<ConditionContent>(condition.Content)!;
if (ConditionHelper.IsPass(conditionContent, dataContext))
{
var conditionCount = ConditionHelper.GetConditionCount(conditionContent);
matchedConfigList.Add(waitMatchConfigItem, conditionCount);
continue;
}
}
if (matchedConfigList.Count == 1)
{
executeConfig = matchedConfigList.First().Key;
}
else if (matchedConfigList.Count > 1)
{
var maxMatchNum = matchedConfigList.OrderByDescending(x => x.Value).First().Value; // 取匹配条件数最多的一条
executeConfig = matchedConfigList.Where(x => x.Value == maxMatchNum).OrderBy(x => x.Key.Id).First().Key;
}
if (executeConfig == null)
{
executeConfig = waitMatchConfigList.FirstOrDefault(x => x.IsUnMatchDefault);
}
if (executeConfig == null)
{
//调整:没有找到待执行的节点,不认为整个流程失败或者未完成
//flowLog.IsComplete = false;
//flowLog.IsSuccess = false;
flowLog.Note += $"【执行时未找到符合执行条件的节点经判断条件的节点Id如下{string.Join(',', waitMatchConfigList.Select(x => x.Id))}】";
break;
}
flowLogDetail.ConfigId = executeConfig.Id;
// 如果当前节点要执行(或者配置了默认执行节点)取出下一批要进行条件判断的节点列表
waitMatchConfigList.Clear();
if (!string.IsNullOrEmpty(executeConfig.NextConfigId))
{
var ids = executeConfig.NextConfigId.Split(',').Where(x => !string.IsNullOrEmpty(x)).Select(x => Convert.ToInt64(x));
waitMatchConfigList = configList.Where(x => ids.Contains(x.Id)).ToList();
}
// 注入参数
var paramItemList = paramList.Where(x => x.ConfigId == executeConfig.Id).ToList();
foreach (var paramItem in paramItemList)
{
dataContext.Set(paramItem.FieldName!, paramItem.FieldValue!);
}
if (executeConfig.ExecuteModuleId == 0 || executeConfig.ExecuteModuleId == null)
{
flowLogDetail.ElapsedMillisecond = 0;
flowLogDetail.IsComplete = true;
flowLogDetail.ModuleId = 0;
flowLogDetail.ModuleName = "(终止模块)";
await tenantDb.CopyNew().Insertable(flowLogDetail).ExecuteCommandAsync();
continue;
}
var module = moduleList.FirstOrDefault(x => x.Id == executeConfig.ExecuteModuleId);
if (module == null)
{
throw new Exception($"未找到指定流程配置ExecuteModuleId{executeConfig.ExecuteModuleId}");
}
flowLogDetail.ModuleId = module.Id;
flowLogDetail.ModuleName = module.Name;
Assembly assembly;
if (module.AssemblyName!.StartsWith("DS.WMS.Core"))
{
assembly = Assembly.GetExecutingAssembly();
}
else
{
assembly = Assembly.Load(module.AssemblyName!);
}
var interfaceType = assembly.DefinedTypes.FirstOrDefault(x => x.Name == module.InterfaceName!);
if (interfaceType == null)
{
throw new Exception($"在程序集【{module.AssemblyName}】下未找到Interface【{module.InterfaceName}】");
}
object? imp = serviceProvider.GetService(interfaceType);
if (imp == null)
{
throw new Exception($"在依赖注入容器未找到Interface【{module.InterfaceName}】的实现类");
}
var impType = imp.GetType();
MethodInfo? method = impType.GetMethod(module.MethodName!, BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic);
if (method == null)
{
throw new Exception($"在Interface【{module.InterfaceName}】的实现类中未找到Method【{module.MethodName}】");
}
Stopwatch stopwatch = Stopwatch.StartNew();
var taskObj = method.Invoke(imp, new object[] { dataContext })!;
if (taskObj is Task task)
{
await task;
if (task.IsFaulted)
{
if (task.Exception != null)
{
throw task.Exception;
}
else
{
throw new Exception("模块执行过程中发生未知异常但是Exception为null请检查");
}
}
dynamic dynamicTask = task;
if (taskObj.GetType().GetProperty("Result")?.PropertyType?.Name == "VoidTaskResult")
{
flowLogDetail.ExecuteReturn = "执行返回结果为void";
}
else
{
if (dynamicTask.Result != null)
{
flowLogDetail.ExecuteReturn = JsonConvert.SerializeObject(dynamicTask.Result);
}
else
{
flowLogDetail.ExecuteReturn = "执行返回结果为null";
}
if (dynamicTask.Result is DataResult dataResult && dataResult != null)
{
var isSuccess = dynamicTask.Result.Succeeded;
flowLogDetail.IsSuccess = isSuccess;
if (isSuccess == false)
{
flowLog.IsSuccess = false;
if (!executeConfig.IsExceptionContinue)
{
throw new Exception("模块执行结果为失败并且IsExceptionContinue配置为false已跳出流程");
}
}
}
}
}
else
{
throw new Exception("模块执行结果不是Task类型请检查");
// 后面支持非异步
}
stopwatch.Stop();
flowLogDetail.ElapsedMillisecond = stopwatch.ElapsedMilliseconds;
flowLogDetail.IsComplete = true;
await tenantDb.CopyNew().Insertable(flowLogDetail).ExecuteCommandAsync();
}
catch (Exception ex)
{
string exMessage;
if (ex is TargetInvocationException ex2 && ex2.InnerException != null)
{
exMessage = WriteLog("模块内部执行过程中发生异常", ex2.InnerException);
}
else
{
exMessage = WriteLog("模块外部调用过程中发生异常", ex);
}
flowLog.IsComplete = false;
flowLog.IsSuccess = false;
flowLogDetail.ExceptionMessage = exMessage;
flowLogDetail.IsComplete = false;
flowLogDetail.IsSuccess = false;
await tenantDb.CopyNew().Insertable(flowLogDetail).ExecuteCommandAsync();
if (executeConfig.IsExceptionContinue)
{
continue;
}
else
{
break;
}
}
}
await tenantDb.CopyNew().Insertable(flowLog).ExecuteCommandAsync();
return (flowLog.Id, flowLog.IsComplete, flowLog.IsSuccess);
}
/// <summary>
/// 执行通过业务Id
/// </summary>
/// <param name="taskBaseType">任务类型</param>
/// <param name="bsno">业务Id</param>
/// <param name="dataContext">起始入参数据上下文</param>
/// <returns>(执行日志Id,模块是否全部执行完成,模块执行结果是否全部为success)</returns>
public async Task<(long? flowLogId, bool isAllComplete, bool isAllSuccess)> RunWithBsno(TaskBaseTypeEnum taskBaseType, long bsno, TaskFlowDataContext dataContext)
{
long taskId = await tenantDb.Queryable<TaskBaseInfo>()
.Where(t => t.OUT_BS_NO == bsno && t.TASK_TYPE == taskBaseType.ToString())
.OrderByDescending(a => a.Id)
.Select(t => t.Id)
.FirstAsync();
return await Run(taskBaseType, taskId, dataContext);
}
/// <summary>
/// 根据当前节点Id获取工作流下一个任务类型
/// </summary>
/// <param name="workFlowType">工作流流程类型</param>
/// <param name="dataContext"></param>
/// <param name="currentTaskType">当前执行的任务类型(如果为空,则返回首个任务类型)</param>
/// <returns>(下一个任务类型)</returns>
public async Task<TaskBaseTypeEnum?> GetWorkFlowNextConfigByTaskType(TaskBaseTypeEnum workFlowType, TaskFlowDataContext dataContext, TaskBaseTypeEnum? currentTaskType)
{
var allConfigList = await tenantDb.Queryable<TaskFlowConfig>()
.Where(t => t.MainConfigId == SqlFunc.Subqueryable<TaskFlowConfig>().Where(x => x.IsMain && x.TaskType == workFlowType.ToString()).Select(x => x.Id))
.OrderBy(t => t.Id)
.ToListAsync();
long configId;
if (currentTaskType == null)
{
configId = allConfigList.First(x => x.IsMain).Id;
}
else
{
var allConfigModuleIdList = allConfigList.Select(x => x.ExecuteModuleId).ToList();
var currentModuleId = await tenantDb.Queryable<TaskFlowModule>()
.Where(x => allConfigModuleIdList.Contains(x.Id) && x.TaskType == currentTaskType.ToString())
.Select(x => x.Id)
.FirstAsync();
var currentConfig = allConfigList.FirstOrDefault(x => x.ExecuteModuleId == currentModuleId);
if (currentConfig == null)
{
return null;
}
configId = currentConfig.Id;
}
List<TaskFlowConfig> waitMatchConfigList = new();
for (int i = 0; i < allConfigList.Count; i++)
{
var currentConfig = allConfigList.FirstOrDefault(x => x.Id == configId);
if (currentConfig == null || string.IsNullOrEmpty(currentConfig.NextConfigId))
return null;
var nextIds = currentConfig.NextConfigId.Split(',').Where(x => !string.IsNullOrEmpty(x)).Select(x => Convert.ToInt64(x));
waitMatchConfigList = allConfigList.Where(x => nextIds.Contains(x.Id)).ToList();
if (waitMatchConfigList.Count == 0) return null; // 如果走了这一步的return说明配置有问题配置了下一节点的id但是却查不到
var configIdList = waitMatchConfigList.Select(x => x.Id);
var conditionList = await tenantDb.Queryable<TaskFlowCondition>()
.Where(x => configIdList.Contains(x.ConfigId))
.ToListAsync();
Dictionary<TaskFlowConfig, int> matchedConfigList = new();
foreach (var item in waitMatchConfigList)
{
var condition = conditionList.FirstOrDefault(x => x.ConfigId == item.Id);
if (condition == null || string.IsNullOrEmpty(condition.Content))
{
matchedConfigList.Add(item, 0);
}
else
{
var conditionContent = JsonConvert.DeserializeObject<ConditionContent>(condition.Content)!;
if (ConditionHelper.IsPass(conditionContent, dataContext))
{
var conditionCount = ConditionHelper.GetConditionCount(conditionContent);
matchedConfigList.Add(item, conditionCount);
}
}
}
TaskFlowConfig? executeConfig = null;
if (matchedConfigList.Count == 1)
{
executeConfig = matchedConfigList.First().Key;
}
else if (matchedConfigList.Count > 1)
{
var maxMatchNum = matchedConfigList.OrderByDescending(x => x.Value).First().Value; // 取匹配条件数最多的一条
executeConfig = matchedConfigList.Where(x => x.Value == maxMatchNum).OrderBy(x => x.Key.Id).First().Key;
}
if (executeConfig == null)
{
executeConfig = waitMatchConfigList.FirstOrDefault(x => x.IsUnMatchDefault);
}
if (executeConfig == null)
{
// 如果最终还是没有匹配到,则需要判断情况
// 如果待匹配的分支只有1个则继续循环判断下一节点否则如果待匹配的分支有多个则无法判断下一节点
if (waitMatchConfigList.Count == 1)
{
configId = waitMatchConfigList[0].Id;
continue;
}
else
{
return null;
}
}
else
{
var taskType = await tenantDb.Queryable<TaskFlowModule>().Where(x => x.Id == executeConfig.ExecuteModuleId && x.ModuleType == 2).Select(x => x.TaskType).FirstAsync();
if (taskType != null && Enum.TryParse(typeof(TaskBaseTypeEnum), taskType, out object? temp))
{
return (TaskBaseTypeEnum)temp;
}
}
}
return null;
}
static string WriteLog(string throwMsg, Exception ex)
{
return string.Format("【自定义错误】:{0} \r\n【异常类型】{1} \r\n【异常信息】{2} \r\n【堆栈调用】{3}", new object[]
{
throwMsg,
ex.GetType().Name,
ex.Message,
ex.StackTrace
});
}
}
}