using DS.Module.Core; using DS.Module.Core.Condition; using DS.Module.Core.Data; using DS.WMS.Core.TaskPlat.Entity; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using SqlSugar; using System.Diagnostics; using System.Reflection; namespace DS.WMS.Core.TaskPlat { /// /// 任务编排执行器 /// public class TaskFlowRuner { private readonly SqlSugarScopeProvider tenantDb = null!; private readonly IServiceProvider serviceProvider = null!; private readonly ILogger logger = null!; private TaskFlowRuner() { } /// /// /// /// /// public TaskFlowRuner(SqlSugarScopeProvider tenantDb, IServiceProvider serviceProvider) { this.tenantDb = tenantDb; this.serviceProvider = serviceProvider; this.logger = serviceProvider.GetRequiredService>(); } /// /// 执行 /// /// 任务类型 /// 任务Id /// 起始入参数据上下文 /// (执行日志Id,模块是否全部执行完成,模块执行结果是否全部为success) public async Task<(long? flowLogId, bool isAllComplete, bool isAllSuccess)> Run(TaskBaseTypeEnum taskBaseType, long taskId, TaskFlowDataContext dataContext) { if (dataContext == null) { throw new ArgumentNullException(nameof(dataContext)); } var allConfigList = await tenantDb.Queryable() .Where(t => t.IsMain && t.TaskType == taskBaseType.ToString()) .OrderBy(t => t.Id) .ToListAsync(); if (allConfigList.Count == 0) { string msg = $"未找到任何用于【{taskBaseType}】业务的处理流程"; TaskFlowLog log = new() { IsMatch = false, TaskId = taskId, TaskType = taskBaseType.ToString(), IsComplete = false, ExceptionMessage = msg, IsSuccess = false, }; await tenantDb.Insertable(log).ExecuteCommandAsync(); logger.LogInformation(msg); return (null, false, false); } var allConfigIdList = allConfigList.Select(x => x.Id); var allConditionList = await tenantDb.Queryable() .Where(x => allConfigIdList.Contains(x.ConfigId) && x.Type == 1) .ToListAsync(); // 判断每项流程达成的条件数量 Dictionary configMatchCount = allConfigList.ToDictionary(x => x.Id, x => 0); foreach (var configItem in allConfigList) { var conditionItem = allConditionList.FirstOrDefault(x => x.ConfigId == configItem.Id); if (conditionItem != null && !string.IsNullOrEmpty(conditionItem.Content)) { var contitionContent = JsonConvert.DeserializeObject(conditionItem.Content)!; var oldValue = configMatchCount[configItem.Id]; if (ConditionHelper.IsPass(contitionContent, dataContext)) { configMatchCount[configItem.Id] = oldValue + 1; } else { configMatchCount[configItem.Id] = oldValue - 1; } } } // 取出最匹配的流程 KeyValuePair? bestMatched = null!; foreach (var item in configMatchCount) { if (bestMatched == null) { bestMatched = new KeyValuePair(item.Key, item.Value); continue; } if (item.Value > bestMatched.Value.Value) { bestMatched = new KeyValuePair(item.Key, item.Value); continue; } if (item.Value == bestMatched.Value.Value) { // 如果达成的条件数量相等,则取最早的流程 if (item.Key < bestMatched.Value.Key) { bestMatched = new KeyValuePair(item.Key, item.Value); continue; } } } var targetConfig = allConfigList.First(x => x.Id == bestMatched.Value.Key); // 待执行的流程(节点列表) var configList = await tenantDb.Queryable().Where(x => x.MainConfigId == targetConfig.Id).ToListAsync(); var configIdList = configList.Select(x => x.Id).ToList(); // 待注入的参数列表 var paramList = await tenantDb.Queryable().Where(x => configIdList.Contains(x.ConfigId)).ToListAsync(); // 待执行的模块列表 var moduleIdList = configList.Select(x => x.ExecuteModuleId).ToList(); var moduleList = await tenantDb.Queryable().Where(x => moduleIdList.Contains(x.Id) && x.ModuleType == 1).ToListAsync(); // 待判断的条件列表 var conditionList = await tenantDb.Queryable().Where(x => configIdList.Contains(x.ConfigId) && x.Type == 2).ToListAsync(); TaskFlowLog flowLog = new() { Id = SnowFlakeSingle.Instance.NextId(), IsMatch = true, TaskId = taskId, TaskType = taskBaseType.ToString(), MatchMainConfigId = targetConfig.Id, ConfigList = JsonConvert.SerializeObject(configList), ModuleList = JsonConvert.SerializeObject(moduleList), IsComplete = true, IsSuccess = true, }; // 执行流程 List waitMatchConfigList = new([targetConfig]); for (int i = 0; i < configList.Count; i++) { TaskFlowLogDetail flowLogDetail = new() { PId = flowLog.Id, IsSuccess = true, }; TaskFlowConfig? executeConfig = null; try { if (waitMatchConfigList == null || waitMatchConfigList.Count == 0) { break; } // 对节点列表里面的节点进行依次判断,取出要执行的节点 var matchedConfigList = new List<(TaskFlowConfig config, bool isHasCondition)>(); foreach (var waitMatchConfigItem in waitMatchConfigList) { var condition = conditionList.FirstOrDefault(x => x.ConfigId == waitMatchConfigItem.Id); if (condition == null || string.IsNullOrEmpty(condition.Content)) { matchedConfigList.Add((waitMatchConfigItem, false)); continue; } var contitionContent = JsonConvert.DeserializeObject(condition.Content)!; if (ConditionHelper.IsPass(contitionContent, dataContext)) { matchedConfigList.Add((waitMatchConfigItem, true)); continue; } } if (matchedConfigList.Count == 1) { executeConfig = matchedConfigList[0].config; } else if (matchedConfigList.Count > 1) { var temp = matchedConfigList.OrderBy(x => x.config.Id).Where(x => x.isHasCondition).ToList(); executeConfig = temp.FirstOrDefault().config; } if (executeConfig == null) { executeConfig = waitMatchConfigList.FirstOrDefault(x => x.IsUnMatchDefault); } if (executeConfig == null) { flowLog.IsComplete = false; flowLog.IsSuccess = false; flowLog.Note += $"【执行时未找到符合执行条件的节点,经判断条件的节点Id如下:{string.Join(',', waitMatchConfigList.Select(x => x.Id))}】"; break; } flowLogDetail.ConfigId = executeConfig.Id; // 如果当前节点要执行(或者配置了默认执行节点)取出下一批要进行条件判断的节点列表 waitMatchConfigList.Clear(); if (!string.IsNullOrEmpty(executeConfig.NextConfigId)) { var ids = executeConfig.NextConfigId.Split(',').Where(x => !string.IsNullOrEmpty(x)).Select(x => Convert.ToInt64(x)); waitMatchConfigList = configList.Where(x => ids.Contains(x.Id)).ToList(); } // 注入参数 var paramItemList = paramList.Where(x => x.ConfigId == executeConfig.Id).ToList(); foreach (var paramItem in paramItemList) { dataContext.Set(paramItem.FieldName!, paramItem.FieldValue!); } if (executeConfig.ExecuteModuleId == 0) { flowLogDetail.ElapsedMillisecond = 0; flowLogDetail.IsComplete = true; flowLogDetail.ModuleId = 0; flowLogDetail.ModuleName = "(终止模块)"; await tenantDb.CopyNew().Insertable(flowLogDetail).ExecuteCommandAsync(); continue; } var module = moduleList.FirstOrDefault(x => x.Id == executeConfig.ExecuteModuleId); if (module == null) { throw new Exception($"未找到指定流程配置,ExecuteModuleId:{executeConfig.ExecuteModuleId}"); } flowLogDetail.ModuleId = module.Id; flowLogDetail.ModuleName = module.Name; Assembly assembly; if (module.AssemblyName!.StartsWith("DS.WMS.Core")) { assembly = Assembly.GetExecutingAssembly(); } else { assembly = Assembly.Load(module.AssemblyName!); } var interfaceType = assembly.DefinedTypes.FirstOrDefault(x => x.Name == module.InterfaceName!); if (interfaceType == null) { throw new Exception($"在程序集【{module.AssemblyName}】下未找到Interface【{module.InterfaceName}】"); } object? imp = serviceProvider.GetService(interfaceType); if (imp == null) { throw new Exception($"在依赖注入容器未找到Interface【{module.InterfaceName}】的实现类"); } var impType = imp.GetType(); MethodInfo? method = impType.GetMethod(module.MethodName!, BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic); if (method == null) { throw new Exception($"在Interface【{module.InterfaceName}】的实现类中未找到Method【{module.MethodName}】"); } Stopwatch stopwatch = Stopwatch.StartNew(); var taskObj = method.Invoke(imp, new object[] { dataContext })!; if (taskObj is Task task) { await task; if (task.IsFaulted) { if (task.Exception != null) { throw task.Exception; } else { throw new Exception("模块执行过程中发生未知异常,但是Exception为null,请检查"); } } dynamic dynamicTask = task; if (taskObj.GetType().GetProperty("Result")?.PropertyType?.Name == "VoidTaskResult") { flowLogDetail.ExecuteReturn = "执行返回结果为void"; } else { if (dynamicTask.Result != null) { flowLogDetail.ExecuteReturn = JsonConvert.SerializeObject(dynamicTask.Result); } else { flowLogDetail.ExecuteReturn = "执行返回结果为null"; } if (dynamicTask.Result is DataResult dataResult && dataResult != null) { var isSuccess = dynamicTask.Result.Succeeded; flowLogDetail.IsSuccess = isSuccess; if (isSuccess == false) { flowLog.IsSuccess = false; } } } } else { throw new Exception("模块执行结果不是Task类型,请检查"); // 后面支持非异步 } stopwatch.Stop(); flowLogDetail.ElapsedMillisecond = stopwatch.ElapsedMilliseconds; flowLogDetail.IsComplete = true; await tenantDb.CopyNew().Insertable(flowLogDetail).ExecuteCommandAsync(); } catch (Exception ex) { string exMessage; if (ex is TargetInvocationException ex2 && ex2.InnerException != null) { exMessage = WriteLog("模块内部执行过程中发生异常", ex2.InnerException); } else { exMessage = WriteLog("模块外部调用过程中发生异常", ex); } flowLogDetail.ExceptionMessage = exMessage; flowLog.IsComplete = false; flowLog.IsSuccess = false; flowLogDetail.IsComplete = false; flowLogDetail.IsSuccess = false; await tenantDb.CopyNew().Insertable(flowLogDetail).ExecuteCommandAsync(); if (executeConfig.IsExceptionContinue) { continue; } else { break; } } } await tenantDb.CopyNew().Insertable(flowLog).ExecuteCommandAsync(); return (flowLog.Id, flowLog.IsComplete, flowLog.IsSuccess); } /// /// 执行(通过业务Id) /// /// 任务类型 /// 业务Id /// 起始入参数据上下文 /// (执行日志Id,模块是否全部执行完成,模块执行结果是否全部为success) public async Task<(long? flowLogId, bool isAllComplete, bool isAllSuccess)> RunWithBsno(TaskBaseTypeEnum taskBaseType, long bsno, TaskFlowDataContext dataContext) { long taskId = await tenantDb.Queryable() .Where(t => t.OUT_BS_NO == bsno && t.TASK_TYPE == taskBaseType.ToString()) .OrderByDescending(a => a.Id) .Select(t => t.Id) .FirstAsync(); return await Run(taskBaseType, taskId, dataContext); } /// /// 根据当前节点Id,获取工作流下一个任务类型 /// /// 工作流流程类型 /// /// 当前执行的任务类型(如果为空,则返回首个任务类型) /// (下一个任务类型,下一节点Id) public async Task GetWorkFlowNextConfigByTaskType(TaskBaseTypeEnum workFlowType, TaskFlowDataContext dataContext, TaskBaseTypeEnum? currentTaskType) { var allConfigList = await tenantDb.Queryable() .Where(t => t.MainConfigId == SqlFunc.Subqueryable().Where(x => x.IsMain && x.TaskType == workFlowType.ToString()).Select(x => x.Id)) .OrderBy(t => t.Id) .ToListAsync(); long configId; if (currentTaskType == null) { configId = allConfigList.First(x => x.IsMain).Id; } else { var allConfigModuleIdList = allConfigList.Select(x => x.ExecuteModuleId).ToList(); var currentModuleId = await tenantDb.Queryable() .Where(x => allConfigModuleIdList.Contains(x.Id) && x.TaskType == currentTaskType.ToString()) .Select(x => x.Id) .FirstAsync(); configId = allConfigList.First(x => x.ExecuteModuleId == currentModuleId).Id; } List waitMatchConfigList = new(); for (int i = 0; i < allConfigList.Count; i++) { var currentConfig = allConfigList.FirstOrDefault(x => x.Id == configId); if (currentConfig == null || string.IsNullOrEmpty(currentConfig.NextConfigId)) return null; var nextIds = currentConfig.NextConfigId.Split(',').Where(x => !string.IsNullOrEmpty(x)).Select(x => Convert.ToInt64(x)); waitMatchConfigList = allConfigList.Where(x => nextIds.Contains(x.Id)).ToList(); if (waitMatchConfigList.Count == 0) return null; // 如果走了这一步的return,说明配置有问题:配置了下一节点的id,但是却查不到 var configIdList = waitMatchConfigList.Select(x => x.Id); var conditionList = await tenantDb.Queryable() .Where(x => configIdList.Contains(x.ConfigId)) .ToListAsync(); var matchedConfigList = new List(); foreach (var item in waitMatchConfigList) { var condition = conditionList.FirstOrDefault(x => x.ConfigId == item.Id); if (condition == null || string.IsNullOrEmpty(condition.Content)) { matchedConfigList.Add(item); } else { var contitionContent = JsonConvert.DeserializeObject(condition.Content)!; if (ConditionHelper.IsPass(contitionContent, dataContext)) { matchedConfigList.Add(item); } } } TaskFlowConfig? executeConfig = null; if (matchedConfigList.Count == 1) { executeConfig = matchedConfigList[0]; } else if (matchedConfigList.Count > 1) { executeConfig = matchedConfigList.FirstOrDefault(x => x.IsMoreMatchDefault); } if (executeConfig == null) { executeConfig = waitMatchConfigList.FirstOrDefault(x => x.IsUnMatchDefault); } if (executeConfig == null) { // 如果最终还是没有匹配到,则需要判断情况 // 如果待匹配的分支只有1个,则继续循环判断下一节点;否则如果待匹配的分支有多个,则无法判断下一节点 if (waitMatchConfigList.Count == 1) { configId = waitMatchConfigList[0].Id; continue; } else { return null; } } else { var taskType = await tenantDb.Queryable().Where(x => x.Id == executeConfig.ExecuteModuleId && x.ModuleType == 2).Select(x => x.TaskType).FirstAsync(); if (taskType != null && Enum.TryParse(typeof(TaskBaseTypeEnum), taskType, out object? temp)) { return (TaskBaseTypeEnum)temp; } } } return null; } /// /// 根据当前节点Id,获取工作流下一个任务类型 /// /// 工作流流程类型 /// /// 当前执行的配置Id(如果为空,则返回首个任务类型) /// (下一个任务类型,下一节点Id) public async Task<(TaskBaseTypeEnum taskType, long configId)?> GetWorkFlowNextConfig(TaskBaseTypeEnum workFlowType, TaskFlowDataContext dataContext, long? currentConfigId = null) { var allConfigList = await tenantDb.Queryable() .Where(t => t.MainConfigId == SqlFunc.Subqueryable().Where(x => x.IsMain && x.TaskType == workFlowType.ToString()).Select(x => x.Id)) .OrderBy(t => t.Id) .ToListAsync(); if (allConfigList.Count == 0) return null; long configId; if (currentConfigId == null) { configId = allConfigList.First(x => x.IsMain).Id; } else { configId = currentConfigId.Value; } List waitMatchConfigList = new(); for (int i = 0; i < allConfigList.Count; i++) { var currentConfig = allConfigList.FirstOrDefault(x => x.Id == configId); if (currentConfig == null || string.IsNullOrEmpty(currentConfig.NextConfigId)) return null; var nextIds = currentConfig.NextConfigId.Split(',').Where(x => !string.IsNullOrEmpty(x)).Select(x => Convert.ToInt64(x)); waitMatchConfigList = allConfigList.Where(x => nextIds.Contains(x.Id)).ToList(); if (waitMatchConfigList.Count == 0) return null; // 如果走了这一步的return,说明配置有问题:配置了下一节点的id,但是却查不到 var configIdList = waitMatchConfigList.Select(x => x.Id); var conditionList = await tenantDb.Queryable() .Where(x => configIdList.Contains(x.ConfigId)) .ToListAsync(); var matchedConfigList = new List(); foreach (var item in waitMatchConfigList) { var condition = conditionList.FirstOrDefault(x => x.ConfigId == item.Id); if (condition == null || string.IsNullOrEmpty(condition.Content)) { matchedConfigList.Add(item); } else { var contitionContent = JsonConvert.DeserializeObject(condition.Content)!; if (ConditionHelper.IsPass(contitionContent, dataContext)) { matchedConfigList.Add(item); } } } TaskFlowConfig? executeConfig = null; if (matchedConfigList.Count == 1) { executeConfig = matchedConfigList[0]; } else if (matchedConfigList.Count > 1) { executeConfig = matchedConfigList.FirstOrDefault(x => x.IsMoreMatchDefault); } if (executeConfig == null) { executeConfig = waitMatchConfigList.FirstOrDefault(x => x.IsUnMatchDefault); } if (executeConfig == null) { // 如果最终还是没有匹配到,则需要判断情况 // 如果待匹配的分支只有1个,则继续循环判断下一节点;否则如果待匹配的分支有多个,则无法判断下一节点 if (waitMatchConfigList.Count == 1) { configId = waitMatchConfigList[0].Id; continue; } else { return null; } } else { var taskType = await tenantDb.Queryable().Where(x => x.Id == executeConfig.ExecuteModuleId && x.ModuleType == 2).Select(x => x.TaskType).FirstAsync(); if (taskType != null && Enum.TryParse(typeof(TaskBaseTypeEnum), taskType, out object? temp)) { return ((TaskBaseTypeEnum)temp, executeConfig.Id); } } } return null; } static string WriteLog(string throwMsg, Exception ex) { return string.Format("【自定义错误】:{0} \r\n【异常类型】:{1} \r\n【异常信息】:{2} \r\n【堆栈调用】:{3}", new object[] { throwMsg, ex.GetType().Name, ex.Message, ex.StackTrace }); } } }