You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

423 lines
18 KiB
C#

4 months ago
using DS.Module.Core;
using DS.Module.Core.Data;
using DS.WMS.Core.TaskPlat.Entity;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
4 months ago
using Newtonsoft.Json;
using SqlSugar;
using System.Diagnostics;
using System.Reflection;
namespace DS.WMS.Core.TaskPlat
{
/// <summary>
/// 任务编排执行器
/// </summary>
public class TaskFlowRuner
{
private readonly SqlSugarScopeProvider tenantDb = null!;
private readonly IServiceProvider serviceProvider = null!;
private readonly ILogger<TaskFlowRuner> logger = null!;
4 months ago
private TaskFlowRuner() { }
/// <summary>
///
/// </summary>
/// <param name="tenantDb"></param>
/// <param name="serviceProvider"></param>
public TaskFlowRuner(SqlSugarScopeProvider tenantDb, IServiceProvider serviceProvider)
{
this.tenantDb = tenantDb;
this.serviceProvider = serviceProvider;
this.logger = serviceProvider.GetRequiredService<ILogger<TaskFlowRuner>>();
4 months ago
}
4 months ago
/// <summary>
/// 执行
/// </summary>
/// <param name="taskBaseType">任务类型</param>
4 months ago
/// <param name="taskId">任务Id</param>
/// <param name="dataContext">起始入参数据上下文</param>
/// <returns>(执行日志Id,模块是否全部执行完成,模块执行结果是否全部为success)</returns>
public async Task<(long? flowLogId, bool isAllComplete, bool isAllSuccess)> Run(TaskBaseTypeEnum taskBaseType, long taskId, TaskFlowDataContext dataContext)
4 months ago
{
if (dataContext == null)
{
throw new ArgumentNullException(nameof(dataContext));
}
var allConfigList = await tenantDb.Queryable<TaskFlowConfig>()
.Where(t => t.IsMain && t.TaskType == taskBaseType.ToString())
.OrderBy(t => t.Id)
.ToListAsync();
if (allConfigList.Count == 0)
{
string msg = $"未找到任何用于【{taskBaseType}】业务的处理流程";
TaskFlowLog log = new()
{
IsMatch = false,
4 months ago
TaskId = taskId,
4 months ago
TaskType = taskBaseType.ToString(),
IsComplete = false,
ExceptionMessage = msg,
IsSuccess = false,
4 months ago
};
await tenantDb.Insertable(log).ExecuteCommandAsync();
logger.LogInformation(msg);
return (null, false, false);
//throw new Exception(msg);
4 months ago
}
var allConfigIdList = allConfigList.Select(x => x.Id);
var allConditionList = await tenantDb.Queryable<TaskFlowCondition>()
.Where(x => allConfigIdList.Contains(x.ConfigId))
.ToListAsync();
4 months ago
// 判断每项流程达成的条件数量
4 months ago
Dictionary<long, int> configMatchCount = allConfigList.ToDictionary(x => x.Id, x => 0);
foreach (var configItem in allConfigList)
{
var conditionListGroup = allConditionList.Where(x => x.ConfigId == configItem.Id);
foreach (var conditionItem in conditionListGroup)
{
4 months ago
string? valStr;
if (conditionItem.FieldName.Contains('.'))
{
var firstKey = conditionItem.FieldName.Split('.').First();
if (!dataContext.ContainsKey(firstKey)) continue;
var obj = dataContext.Get<object>(firstKey)!;
4 months ago
4 months ago
var propertyPath = conditionItem.FieldName.Substring(conditionItem.FieldName.IndexOf('.') + 1);
var (_, val, _) = GetPropertyValue(obj, propertyPath);
if (val == null) continue;
valStr = val.ToString();
}
else
{
if (!dataContext.ContainsKey(conditionItem.FieldName)) continue;
var obj = dataContext.Get<object>(conditionItem.FieldName)!;
valStr = obj.ToString();
}
4 months ago
var oldValue = configMatchCount[configItem.Id];
4 months ago
if (conditionItem.MatchType == 1)
{
4 months ago
if (valStr?.Equals(conditionItem.MatchValue, StringComparison.CurrentCultureIgnoreCase) == true)
4 months ago
{
configMatchCount[configItem.Id] = oldValue + 1;
}
else
{
configMatchCount[configItem.Id] = oldValue - 1;
4 months ago
}
}
else if (conditionItem.MatchType == 2)
{
4 months ago
if (valStr?.Contains(conditionItem.MatchValue, StringComparison.CurrentCultureIgnoreCase) == true)
4 months ago
{
configMatchCount[configItem.Id] = oldValue + 1;
}
else
{
configMatchCount[configItem.Id] = oldValue - 1;
4 months ago
}
}
else if (conditionItem.MatchType == 3)
{
4 months ago
if (valStr?.StartsWith(conditionItem.MatchValue, StringComparison.CurrentCultureIgnoreCase) == true)
4 months ago
{
configMatchCount[configItem.Id] = oldValue + 1;
}
else
{
configMatchCount[configItem.Id] = oldValue - 1;
4 months ago
}
}
else if (conditionItem.MatchType == 4)
{
4 months ago
if (valStr?.EndsWith(conditionItem.MatchValue, StringComparison.CurrentCultureIgnoreCase) == true)
4 months ago
{
configMatchCount[configItem.Id] = oldValue + 1;
}
else
{
configMatchCount[configItem.Id] = oldValue - 1;
4 months ago
}
}
}
}
4 months ago
// 取出最匹配的流程
4 months ago
KeyValuePair<long, int>? bestMatched = null!;
foreach (var item in configMatchCount)
{
if (bestMatched == null)
{
bestMatched = new KeyValuePair<long, int>(item.Key, item.Value);
continue;
}
if (item.Value > bestMatched.Value.Value)
{
bestMatched = new KeyValuePair<long, int>(item.Key, item.Value);
continue;
}
if (item.Value == bestMatched.Value.Value)
{
4 months ago
// 如果达成的条件数量相等,则取最早的流程
4 months ago
if (item.Key < bestMatched.Value.Key)
{
bestMatched = new KeyValuePair<long, int>(item.Key, item.Value);
continue;
}
}
}
var targetConfig = allConfigList.First(x => x.Id == bestMatched.Value.Key);
var configList = await tenantDb.Queryable<TaskFlowConfig>().Where(x => x.MainConfigId == targetConfig.Id).ToListAsync();
var configIdList = configList.Select(x => x.Id).ToList();
var moduleIdList = configList.Select(x => x.ExecuteModuleId).ToList();
var paramList = await tenantDb.Queryable<TaskFlowParam>().Where(x => configIdList.Contains(x.ConfigId)).ToListAsync();
var moduleList = await tenantDb.Queryable<TaskFlowModule>().Where(x => moduleIdList.Contains(x.Id)).ToListAsync();
TaskFlowLog flowLog = new()
{
Id = SnowFlakeSingle.Instance.NextId(),
IsMatch = true,
4 months ago
TaskId = taskId,
4 months ago
TaskType = taskBaseType.ToString(),
MatchMainConfigId = targetConfig.Id,
ConfigList = JsonConvert.SerializeObject(configList),
ModuleList = JsonConvert.SerializeObject(moduleList),
IsComplete = true,
IsSuccess = true,
4 months ago
};
4 months ago
// 执行流程
4 months ago
long? nextExecuteConfigId = targetConfig.Id;
for (int i = 0; i < configList.Count; i++)
4 months ago
{
4 months ago
if (nextExecuteConfigId == null || nextExecuteConfigId == 0)
{
break;
}
var executeConfig = configList.FirstOrDefault(x => x.Id == nextExecuteConfigId);
if (executeConfig == null)
{
break;
}
nextExecuteConfigId = executeConfig.NextExecuteConfigId;
4 months ago
TaskFlowLogDetail flowLogDetail = new()
{
4 months ago
ConfigId = executeConfig.Id,
4 months ago
PId = flowLog.Id,
IsSuccess = true,
4 months ago
};
try
{
4 months ago
var paramItemList = paramList.Where(x => x.ConfigId == executeConfig.Id).ToList();
4 months ago
foreach (var paramItem in paramItemList)
{
dataContext.Set(paramItem.FieldName!, paramItem.FieldValue!);
//if (dataContext.ContainsKey(paramItem.FieldName!))
//{
// dataContext[paramItem.FieldName!] = paramItem.FieldValue!;
//}
//else
//{
// dataContext.TryAdd(paramItem.FieldName!, paramItem.FieldValue!);
//}
}
4 months ago
var module = moduleList.FirstOrDefault(x => x.Id == executeConfig.ExecuteModuleId);
4 months ago
if (module == null)
{
4 months ago
throw new Exception($"未找到指定流程配置ExecuteModuleId{executeConfig.ExecuteModuleId}");
4 months ago
}
flowLogDetail.ModuleId = module.Id;
flowLogDetail.ModuleName = module.Name;
4 months ago
Assembly assembly;
if (module.AssemblyName!.StartsWith("DS.WMS.Core"))
4 months ago
{
assembly = Assembly.GetExecutingAssembly();
}
else
{
assembly = Assembly.Load(module.AssemblyName!);
4 months ago
}
var interfaceType = assembly.DefinedTypes.FirstOrDefault(x => x.Name == module.InterfaceName!);
if (interfaceType == null)
{
throw new Exception($"在程序集【{module.AssemblyName}】下未找到Interface【{module.InterfaceName}】");
4 months ago
}
object? imp = serviceProvider.GetService(interfaceType);
if (imp == null)
{
throw new Exception($"在依赖注入容器未找到Interface【{module.InterfaceName}】的实现类");
}
var impType = imp.GetType();
4 months ago
MethodInfo? method = impType.GetMethod(module.MethodName!, BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic);
4 months ago
if (method == null)
{
throw new Exception($"在Interface【{module.InterfaceName}】的实现类中未找到Method【{module.MethodName}】");
}
Stopwatch stopwatch = Stopwatch.StartNew();
var taskObj = method.Invoke(imp, new object[] { dataContext })!;
if (taskObj is Task task)
4 months ago
{
await task;
4 months ago
if (task.IsFaulted)
{
if (task.Exception != null)
{
throw task.Exception;
}
else
{
throw new Exception("模块执行过程中发生未知异常但是Exception为null请检查");
}
}
4 months ago
dynamic dynamicTask = task;
if (dynamicTask.Result != null)
{
flowLogDetail.ExecuteReturn = JsonConvert.SerializeObject(dynamicTask.Result);
var isSuccess = dynamicTask.Result.Succeeded;
if (isSuccess != null && isSuccess is bool)
{
flowLogDetail.IsSuccess = isSuccess;
if (isSuccess == false)
{
flowLog.IsSuccess = false;
}
}
}
4 months ago
}
else
4 months ago
{
throw new Exception("模块执行结果不是Task类型请检查");
// 后面支持非异步
4 months ago
}
4 months ago
stopwatch.Stop();
flowLogDetail.ElapsedMillisecond = stopwatch.ElapsedMilliseconds;
flowLogDetail.IsComplete = true;
4 months ago
await tenantDb.Insertable(flowLogDetail).ExecuteCommandAsync();
}
catch (Exception ex)
{
string exMessage;
if (ex is TargetInvocationException ex2 && ex2.InnerException != null)
{
exMessage = WriteLog("模块内部执行过程中发生异常", ex2.InnerException);
}
else
{
exMessage = WriteLog("模块外部调用过程中发生异常", ex);
}
flowLogDetail.ExceptionMessage = exMessage;
flowLog.IsComplete = false;
flowLog.IsSuccess = false;
4 months ago
flowLogDetail.IsComplete = false;
flowLogDetail.IsSuccess = false;
4 months ago
await tenantDb.Insertable(flowLogDetail).ExecuteCommandAsync();
4 months ago
if (executeConfig.IsExceptionContinue)
4 months ago
{
continue;
}
else
{
break;
}
}
}
await tenantDb.Insertable(flowLog).ExecuteCommandAsync();
return (flowLog.Id, flowLog.IsComplete, flowLog.IsSuccess);
4 months ago
}
4 months ago
/// <summary>
/// 执行通过业务Id
/// </summary>
/// <param name="taskBaseType">任务类型</param>
/// <param name="bsno">业务Id</param>
/// <param name="dataContext">起始入参数据上下文</param>
/// <returns>(执行日志Id,模块是否全部执行完成,模块执行结果是否全部为success)</returns>
public async Task<(long? flowLogId, bool isAllComplete, bool isAllSuccess)> RunWithBsno(TaskBaseTypeEnum taskBaseType, long bsno, TaskFlowDataContext dataContext)
{
long taskId = await tenantDb.Queryable<TaskBaseInfo>()
.Where(t => t.OUT_BS_NO == bsno && t.TASK_TYPE == taskBaseType.ToString())
.OrderByDescending(a => a.Id)
.Select(t => t.Id)
.FirstAsync();
return await Run(taskBaseType, taskId, dataContext);
}
4 months ago
/// <summary>
/// 深度获取对象属性值
/// </summary>
/// <param name="obj">对象</param>
/// <param name="propertyPath">属性路径</param>
static (bool isOk, object? val, string? otherPropertyPath) GetPropertyValue(object obj, string propertyPath)
{
var propertyName = propertyPath.Split('.').FirstOrDefault();
var type = obj.GetType();
var property = type.GetProperty(propertyName);
if (property == null)
{
return (true, null, null);
}
if (property.PropertyType.IsClass && property.PropertyType != typeof(string))
{
var val = property.GetValue(obj);
if (val == null)
{
return (true, null, null);
}
var otherPropertyPath = propertyPath.Substring(propertyPath.IndexOf('.') + 1);
return GetPropertyValue(val, otherPropertyPath);
}
else
{
var val = property.GetValue(obj);
return (true, val, null);
}
}
static string WriteLog(string throwMsg, Exception ex)
{
return string.Format("【自定义错误】:{0} \r\n【异常类型】{1} \r\n【异常信息】{2} \r\n【堆栈调用】{3}", new object[]
{
throwMsg,
ex.GetType().Name,
ex.Message,
ex.StackTrace
});
}
}
}