You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

511 lines
24 KiB
C#

5 months ago
using DS.Module.Core;
using DS.Module.Core.Condition;
5 months ago
using DS.Module.Core.Data;
using DS.WMS.Core.TaskPlat.Entity;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
5 months ago
using Newtonsoft.Json;
using SqlSugar;
using System.Diagnostics;
using System.Reflection;
namespace DS.WMS.Core.TaskPlat
{
/// <summary>
/// 任务编排执行器
/// </summary>
public class TaskFlowRuner
{
private readonly SqlSugarScopeProvider tenantDb = null!;
private readonly IServiceProvider serviceProvider = null!;
private readonly ILogger<TaskFlowRuner> logger = null!;
5 months ago
private TaskFlowRuner() { }
/// <summary>
///
/// </summary>
/// <param name="tenantDb"></param>
/// <param name="serviceProvider"></param>
public TaskFlowRuner(SqlSugarScopeProvider tenantDb, IServiceProvider serviceProvider)
{
this.tenantDb = tenantDb;
this.serviceProvider = serviceProvider;
this.logger = serviceProvider.GetRequiredService<ILogger<TaskFlowRuner>>();
5 months ago
}
4 months ago
/// <summary>
/// 执行
/// </summary>
/// <param name="taskBaseType">任务类型</param>
5 months ago
/// <param name="taskId">任务Id</param>
/// <param name="dataContext">起始入参数据上下文</param>
/// <param name="taskFlowLogId">日志主表主键</param>
/// <returns>(执行日志Id,模块是否全部执行完成,模块执行结果是否全部为success)</returns>
public async Task<(long? flowLogId, bool isAllComplete, bool isAllSuccess)> Run(TaskBaseTypeEnum taskBaseType, long taskId, TaskFlowDataContext dataContext, long? taskFlowLogId = null)
5 months ago
{
if (dataContext == null)
{
throw new ArgumentNullException(nameof(dataContext));
}
var allConfigList = await tenantDb.Queryable<TaskFlowConfig>()
.Where(t => t.IsMain && t.TaskType == taskBaseType.ToString())
.OrderBy(t => t.Id)
.ToListAsync();
if (allConfigList.Count == 0)
{
string msg = $"未找到任何用于【{taskBaseType}】业务的处理流程";
TaskFlowLog log = new()
{
Id = taskFlowLogId ?? SnowFlakeSingle.Instance.NextId(),
5 months ago
IsMatch = false,
5 months ago
TaskId = taskId,
5 months ago
TaskType = taskBaseType.ToString(),
IsComplete = false,
ExceptionMessage = msg,
IsSuccess = false,
5 months ago
};
await tenantDb.Insertable(log).ExecuteCommandAsync();
logger.LogInformation(msg);
return (null, false, false);
5 months ago
}
var allConfigIdList = allConfigList.Select(x => x.Id);
var allConditionList = await tenantDb.Queryable<TaskFlowCondition>()
.Where(x => allConfigIdList.Contains(x.ConfigId) && x.Type == 1)
.ToListAsync();
5 months ago
5 months ago
// 判断每项流程达成的条件数量
Dictionary<long, int> configMatchCount = new(allConditionList.Count);
5 months ago
foreach (var configItem in allConfigList)
{
var conditionItem = allConditionList.FirstOrDefault(x => x.ConfigId == configItem.Id);
if (conditionItem == null || string.IsNullOrEmpty(conditionItem.Content))
5 months ago
{
configMatchCount.Add(configItem.Id, 0);
5 months ago
continue;
}
else
5 months ago
{
var conditionContent = JsonConvert.DeserializeObject<ConditionContent>(conditionItem.Content)!;
5 months ago
if (ConditionHelper.IsPass(conditionContent, dataContext))
5 months ago
{
var conditionCount = ConditionHelper.GetConditionCount(conditionContent);
configMatchCount.Add(configItem.Id, conditionCount);
5 months ago
}
}
}
// 取出最匹配的流程
TaskFlowConfig? targetConfig = null;
if (configMatchCount.Count == 1)
{
targetConfig = allConfigList.First(x => x.Id == configMatchCount.First().Key);
}
else if (configMatchCount.Count > 1)
{
var maxMatchNum = configMatchCount.OrderByDescending(x => x.Value).First().Value; // 取匹配条件数最多的一条
var temp = configMatchCount.Where(x => x.Value == maxMatchNum).OrderBy(x => x.Key).First().Key;
targetConfig = allConfigList.First(x => x.Id == temp);
}
if (targetConfig == null)
{
3 months ago
throw new Exception("targetConfig目标流程意外为null"); // 不会走到这里,至少会有一条流程匹配到
}
5 months ago
// 待执行的流程(节点列表)
5 months ago
var configList = await tenantDb.Queryable<TaskFlowConfig>().Where(x => x.MainConfigId == targetConfig.Id).ToListAsync();
var configIdList = configList.Select(x => x.Id).ToList();
// 待注入的参数列表
5 months ago
var paramList = await tenantDb.Queryable<TaskFlowParam>().Where(x => configIdList.Contains(x.ConfigId)).ToListAsync();
// 待执行的模块列表
var moduleIdList = configList.Select(x => x.ExecuteModuleId).ToList();
var moduleList = await tenantDb.Queryable<TaskFlowModule>().Where(x => moduleIdList.Contains(x.Id) && x.ModuleType == 1).ToListAsync();
5 months ago
// 待判断的条件列表
var conditionList = await tenantDb.Queryable<TaskFlowCondition>().Where(x => configIdList.Contains(x.ConfigId) && x.Type == 2).ToListAsync();
5 months ago
TaskFlowLog flowLog = new()
{
Id = taskFlowLogId ?? SnowFlakeSingle.Instance.NextId(),
5 months ago
IsMatch = true,
5 months ago
TaskId = taskId,
5 months ago
TaskType = taskBaseType.ToString(),
MatchMainConfigId = targetConfig.Id,
ConfigList = JsonConvert.SerializeObject(configList),
ModuleList = JsonConvert.SerializeObject(moduleList),
IsComplete = true,
IsSuccess = true,
5 months ago
};
5 months ago
// 执行流程
List<TaskFlowConfig> waitMatchConfigList = new([targetConfig]);
4 months ago
for (int i = 0; i < configList.Count; i++)
5 months ago
{
TaskFlowLogDetail flowLogDetail = new()
{
PId = flowLog.Id,
IsSuccess = true,
5 months ago
};
TaskFlowConfig? executeConfig = null;
5 months ago
try
{
if (waitMatchConfigList == null || waitMatchConfigList.Count == 0)
{
break;
}
// 对节点列表里面的节点进行依次判断,取出要执行的节点
Dictionary<TaskFlowConfig, int> matchedConfigList = new();
foreach (var waitMatchConfigItem in waitMatchConfigList)
{
var condition = conditionList.FirstOrDefault(x => x.ConfigId == waitMatchConfigItem.Id);
if (condition == null || string.IsNullOrEmpty(condition.Content))
{
matchedConfigList.Add(waitMatchConfigItem, 0);
continue;
}
var conditionContent = JsonConvert.DeserializeObject<ConditionContent>(condition.Content)!;
if (ConditionHelper.IsPass(conditionContent, dataContext))
{
var conditionCount = ConditionHelper.GetConditionCount(conditionContent);
matchedConfigList.Add(waitMatchConfigItem, conditionCount);
continue;
}
}
if (matchedConfigList.Count == 1)
{
executeConfig = matchedConfigList.First().Key;
}
else if (matchedConfigList.Count > 1)
{
var maxMatchNum = matchedConfigList.OrderByDescending(x => x.Value).First().Value; // 取匹配条件数最多的一条
executeConfig = matchedConfigList.Where(x => x.Value == maxMatchNum).OrderBy(x => x.Key.Id).First().Key;
}
if (executeConfig == null)
{
executeConfig = waitMatchConfigList.FirstOrDefault(x => x.IsUnMatchDefault);
}
if (executeConfig == null)
{
3 months ago
//调整:没有找到待执行的节点,不认为整个流程失败或者未完成
//flowLog.IsComplete = false;
//flowLog.IsSuccess = false;
3 months ago
flowLog.Note += $"【执行时未找到符合执行条件的节点经判断条件的节点Id如下{string.Join(',', waitMatchConfigList.Select(x => x.Id))}】";
break;
}
flowLogDetail.ConfigId = executeConfig.Id;
// 如果当前节点要执行(或者配置了默认执行节点)取出下一批要进行条件判断的节点列表
waitMatchConfigList.Clear();
if (!string.IsNullOrEmpty(executeConfig.NextConfigId))
{
var ids = executeConfig.NextConfigId.Split(',').Where(x => !string.IsNullOrEmpty(x)).Select(x => Convert.ToInt64(x));
waitMatchConfigList = configList.Where(x => ids.Contains(x.Id)).ToList();
}
// 注入参数
4 months ago
var paramItemList = paramList.Where(x => x.ConfigId == executeConfig.Id).ToList();
5 months ago
foreach (var paramItem in paramItemList)
{
dataContext.Set(paramItem.FieldName!, paramItem.FieldValue!);
}
if (executeConfig.ExecuteModuleId == 0 || executeConfig.ExecuteModuleId == null)
{
flowLogDetail.ElapsedMillisecond = 0;
flowLogDetail.IsComplete = true;
flowLogDetail.ModuleId = 0;
flowLogDetail.ModuleName = "(终止模块)";
await tenantDb.CopyNew().Insertable(flowLogDetail).ExecuteCommandAsync();
continue;
}
4 months ago
var module = moduleList.FirstOrDefault(x => x.Id == executeConfig.ExecuteModuleId);
5 months ago
if (module == null)
{
4 months ago
throw new Exception($"未找到指定流程配置ExecuteModuleId{executeConfig.ExecuteModuleId}");
5 months ago
}
flowLogDetail.ModuleId = module.Id;
flowLogDetail.ModuleName = module.Name;
5 months ago
Assembly assembly;
if (module.AssemblyName!.StartsWith("DS.WMS.Core"))
5 months ago
{
assembly = Assembly.GetExecutingAssembly();
}
else
{
assembly = Assembly.Load(module.AssemblyName!);
5 months ago
}
var interfaceType = assembly.DefinedTypes.FirstOrDefault(x => x.Name == module.InterfaceName!);
if (interfaceType == null)
{
throw new Exception($"在程序集【{module.AssemblyName}】下未找到Interface【{module.InterfaceName}】");
5 months ago
}
object? imp = serviceProvider.GetService(interfaceType);
if (imp == null)
{
throw new Exception($"在依赖注入容器未找到Interface【{module.InterfaceName}】的实现类");
}
var impType = imp.GetType();
5 months ago
MethodInfo? method = impType.GetMethod(module.MethodName!, BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic);
5 months ago
if (method == null)
{
throw new Exception($"在Interface【{module.InterfaceName}】的实现类中未找到Method【{module.MethodName}】");
}
Stopwatch stopwatch = Stopwatch.StartNew();
var taskObj = method.Invoke(imp, new object[] { dataContext })!;
if (taskObj is Task task)
5 months ago
{
await task;
5 months ago
if (task.IsFaulted)
{
if (task.Exception != null)
{
throw task.Exception;
}
else
{
throw new Exception("模块执行过程中发生未知异常但是Exception为null请检查");
}
}
5 months ago
dynamic dynamicTask = task;
if (taskObj.GetType().GetProperty("Result")?.PropertyType?.Name == "VoidTaskResult")
{
flowLogDetail.ExecuteReturn = "执行返回结果为void";
}
else
{
if (dynamicTask.Result != null)
{
flowLogDetail.ExecuteReturn = JsonConvert.SerializeObject(dynamicTask.Result);
}
else
{
flowLogDetail.ExecuteReturn = "执行返回结果为null";
}
if (dynamicTask.Result is DataResult dataResult && dataResult != null)
{
var isSuccess = dynamicTask.Result.Succeeded;
flowLogDetail.IsSuccess = isSuccess;
if (isSuccess == false)
{
flowLog.IsSuccess = false;
if (!executeConfig.IsExceptionContinue)
{
throw new Exception("模块执行结果为失败并且IsExceptionContinue配置为false已跳出流程");
}
}
}
}
5 months ago
}
else
5 months ago
{
throw new Exception("模块执行结果不是Task类型请检查");
// 后面支持非异步
5 months ago
}
5 months ago
stopwatch.Stop();
flowLogDetail.ElapsedMillisecond = stopwatch.ElapsedMilliseconds;
flowLogDetail.IsComplete = true;
5 months ago
await tenantDb.CopyNew().Insertable(flowLogDetail).ExecuteCommandAsync();
5 months ago
}
catch (Exception ex)
{
string exMessage;
if (ex is TargetInvocationException ex2 && ex2.InnerException != null)
{
exMessage = WriteLog("模块内部执行过程中发生异常", ex2.InnerException);
}
else
{
exMessage = WriteLog("模块外部调用过程中发生异常", ex);
}
flowLog.IsComplete = false;
flowLog.IsSuccess = false;
5 months ago
flowLogDetail.ExceptionMessage = exMessage;
flowLogDetail.IsComplete = false;
flowLogDetail.IsSuccess = false;
await tenantDb.CopyNew().Insertable(flowLogDetail).ExecuteCommandAsync();
5 months ago
4 months ago
if (executeConfig.IsExceptionContinue)
5 months ago
{
continue;
}
else
{
break;
}
}
}
await tenantDb.CopyNew().Insertable(flowLog).ExecuteCommandAsync();
return (flowLog.Id, flowLog.IsComplete, flowLog.IsSuccess);
5 months ago
}
4 months ago
/// <summary>
/// 执行通过业务Id
/// </summary>
/// <param name="taskBaseType">任务类型</param>
/// <param name="bsno">业务Id</param>
/// <param name="dataContext">起始入参数据上下文</param>
/// <returns>(执行日志Id,模块是否全部执行完成,模块执行结果是否全部为success)</returns>
public async Task<(long? flowLogId, bool isAllComplete, bool isAllSuccess)> RunWithBsno(TaskBaseTypeEnum taskBaseType, long bsno, TaskFlowDataContext dataContext)
{
long taskId = await tenantDb.Queryable<TaskBaseInfo>()
.Where(t => t.OUT_BS_NO == bsno && t.TASK_TYPE == taskBaseType.ToString())
.OrderByDescending(a => a.Id)
.Select(t => t.Id)
.FirstAsync();
return await Run(taskBaseType, taskId, dataContext);
}
/// <summary>
/// 根据当前节点Id获取工作流下一个任务类型
/// </summary>
/// <param name="workFlowType">工作流流程类型</param>
/// <param name="dataContext"></param>
/// <param name="currentTaskType">当前执行的任务类型(如果为空,则返回首个任务类型)</param>
/// <returns>(下一个任务类型)</returns>
public async Task<TaskBaseTypeEnum?> GetWorkFlowNextConfigByTaskType(TaskBaseTypeEnum workFlowType, TaskFlowDataContext dataContext, TaskBaseTypeEnum? currentTaskType)
{
var allConfigList = await tenantDb.Queryable<TaskFlowConfig>()
.Where(t => t.MainConfigId == SqlFunc.Subqueryable<TaskFlowConfig>().Where(x => x.IsMain && x.TaskType == workFlowType.ToString()).Select(x => x.Id))
.OrderBy(t => t.Id)
.ToListAsync();
long configId;
if (currentTaskType == null)
{
configId = allConfigList.First(x => x.IsMain).Id;
}
else
{
var allConfigModuleIdList = allConfigList.Select(x => x.ExecuteModuleId).ToList();
var currentModuleId = await tenantDb.Queryable<TaskFlowModule>()
.Where(x => allConfigModuleIdList.Contains(x.Id) && x.TaskType == currentTaskType.ToString())
.Select(x => x.Id)
.FirstAsync();
var currentConfig = allConfigList.FirstOrDefault(x => x.ExecuteModuleId == currentModuleId);
if (currentConfig == null)
{
return null;
}
configId = currentConfig.Id;
}
List<TaskFlowConfig> waitMatchConfigList = new();
for (int i = 0; i < allConfigList.Count; i++)
{
var currentConfig = allConfigList.FirstOrDefault(x => x.Id == configId);
if (currentConfig == null || string.IsNullOrEmpty(currentConfig.NextConfigId))
return null;
var nextIds = currentConfig.NextConfigId.Split(',').Where(x => !string.IsNullOrEmpty(x)).Select(x => Convert.ToInt64(x));
waitMatchConfigList = allConfigList.Where(x => nextIds.Contains(x.Id)).ToList();
if (waitMatchConfigList.Count == 0) return null; // 如果走了这一步的return说明配置有问题配置了下一节点的id但是却查不到
var configIdList = waitMatchConfigList.Select(x => x.Id);
var conditionList = await tenantDb.Queryable<TaskFlowCondition>()
.Where(x => configIdList.Contains(x.ConfigId))
.ToListAsync();
Dictionary<TaskFlowConfig, int> matchedConfigList = new();
foreach (var item in waitMatchConfigList)
{
var condition = conditionList.FirstOrDefault(x => x.ConfigId == item.Id);
if (condition == null || string.IsNullOrEmpty(condition.Content))
{
matchedConfigList.Add(item, 0);
}
else
{
var conditionContent = JsonConvert.DeserializeObject<ConditionContent>(condition.Content)!;
if (ConditionHelper.IsPass(conditionContent, dataContext))
{
var conditionCount = ConditionHelper.GetConditionCount(conditionContent);
matchedConfigList.Add(item, conditionCount);
}
}
}
TaskFlowConfig? executeConfig = null;
if (matchedConfigList.Count == 1)
{
executeConfig = matchedConfigList.First().Key;
}
else if (matchedConfigList.Count > 1)
{
var maxMatchNum = matchedConfigList.OrderByDescending(x => x.Value).First().Value; // 取匹配条件数最多的一条
executeConfig = matchedConfigList.Where(x => x.Value == maxMatchNum).OrderBy(x => x.Key.Id).First().Key;
}
if (executeConfig == null)
{
executeConfig = waitMatchConfigList.FirstOrDefault(x => x.IsUnMatchDefault);
}
if (executeConfig == null)
{
// 如果最终还是没有匹配到,则需要判断情况
// 如果待匹配的分支只有1个则继续循环判断下一节点否则如果待匹配的分支有多个则无法判断下一节点
if (waitMatchConfigList.Count == 1)
{
configId = waitMatchConfigList[0].Id;
continue;
}
else
{
return null;
}
}
else
{
var taskType = await tenantDb.Queryable<TaskFlowModule>().Where(x => x.Id == executeConfig.ExecuteModuleId && x.ModuleType == 2).Select(x => x.TaskType).FirstAsync();
if (taskType != null && Enum.TryParse(typeof(TaskBaseTypeEnum), taskType, out object? temp))
{
return (TaskBaseTypeEnum)temp;
}
}
}
return null;
}
5 months ago
static string WriteLog(string throwMsg, Exception ex)
{
return string.Format("【自定义错误】:{0} \r\n【异常类型】{1} \r\n【异常信息】{2} \r\n【堆栈调用】{3}", new object[]
{
throwMsg,
ex.GetType().Name,
ex.Message,
ex.StackTrace
});
}
}
}