You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

383 lines
16 KiB
C#

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

using DS.Module.Core;
using DS.Module.Core.Data;
using DS.WMS.Core.TaskPlat.Entity;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using SqlSugar;
using System.Diagnostics;
using System.Reflection;
namespace DS.WMS.Core.TaskPlat
{
/// <summary>
/// 任务编排执行器
/// </summary>
public class TaskFlowRuner
{
private readonly SqlSugarScopeProvider tenantDb = null!;
private readonly IServiceProvider serviceProvider = null!;
private readonly ILogger<TaskFlowRuner> logger = null!;
private TaskFlowRuner() { }
/// <summary>
///
/// </summary>
/// <param name="tenantDb"></param>
/// <param name="serviceProvider"></param>
public TaskFlowRuner(SqlSugarScopeProvider tenantDb, IServiceProvider serviceProvider)
{
this.tenantDb = tenantDb;
this.serviceProvider = serviceProvider;
this.logger = serviceProvider.GetRequiredService<ILogger<TaskFlowRuner>>();
}
/// <summary>
/// 执行
/// </summary>
/// <param name="taskBaseType">任务类型</param>
/// <param name="taskId">任务Id</param>
/// <param name="dataContext">起始入参数据上下文</param>
/// <returns>(执行日志Id,模块是否全部执行完成,模块执行结果是否全部为success)</returns>
public async Task<(long? flowLogId, bool isAllComplete, bool isAllSuccess)> Run(TaskBaseTypeEnum taskBaseType, long taskId, TaskFlowDataContext dataContext)
{
if (dataContext == null)
{
throw new ArgumentNullException(nameof(dataContext));
}
var allConfigList = await tenantDb.Queryable<TaskFlowConfig>()
.Where(t => t.IsMain && t.TaskType == taskBaseType.ToString())
.OrderBy(t => t.Id)
.ToListAsync();
if (allConfigList.Count == 0)
{
string msg = $"未找到任何用于【{taskBaseType}】业务的处理流程";
TaskFlowLog log = new()
{
IsMatch = false,
TaskId = taskId,
TaskType = taskBaseType.ToString(),
IsComplete = false,
ExceptionMessage = msg,
IsSuccess = false,
};
await tenantDb.Insertable(log).ExecuteCommandAsync();
logger.LogInformation(msg);
return (null, false, false);
//throw new Exception(msg);
}
var allConfigIdList = allConfigList.Select(x => x.Id);
var allConditionList = await tenantDb.Queryable<TaskFlowCondition>()
.Where(x => allConfigIdList.Contains(x.ConfigId))
.ToListAsync();
// 判断每项流程达成的条件数量
Dictionary<long, int> configMatchCount = allConfigList.ToDictionary(x => x.Id, x => 0);
foreach (var configItem in allConfigList)
{
var conditionListGroup = allConditionList.Where(x => x.ConfigId == configItem.Id);
foreach (var conditionItem in conditionListGroup)
{
string? valStr;
if (conditionItem.FieldName.Contains('.'))
{
var firstKey = conditionItem.FieldName.Split('.').First();
if (!dataContext.ContainsKey(firstKey)) continue;
var obj = dataContext.Get<object>(firstKey)!;
var propertyPath = conditionItem.FieldName.Substring(conditionItem.FieldName.IndexOf('.') + 1);
var (_, val, _) = GetPropertyValue(obj, propertyPath);
if (val == null) continue;
valStr = val.ToString();
}
else
{
if (!dataContext.ContainsKey(conditionItem.FieldName)) continue;
var obj = dataContext.Get<object>(conditionItem.FieldName)!;
valStr = obj.ToString();
}
var oldValue = configMatchCount[configItem.Id];
if (conditionItem.MatchType == 1)
{
if (valStr?.Equals(conditionItem.MatchValue, StringComparison.CurrentCultureIgnoreCase) == true)
{
configMatchCount[configItem.Id] = oldValue + 1;
}
else
{
configMatchCount[configItem.Id] = oldValue - 1;
}
}
else if (conditionItem.MatchType == 2)
{
if (valStr?.Contains(conditionItem.MatchValue, StringComparison.CurrentCultureIgnoreCase) == true)
{
configMatchCount[configItem.Id] = oldValue + 1;
}
else
{
configMatchCount[configItem.Id] = oldValue - 1;
}
}
else if (conditionItem.MatchType == 3)
{
if (valStr?.StartsWith(conditionItem.MatchValue, StringComparison.CurrentCultureIgnoreCase) == true)
{
configMatchCount[configItem.Id] = oldValue + 1;
}
else
{
configMatchCount[configItem.Id] = oldValue - 1;
}
}
else if (conditionItem.MatchType == 4)
{
if (valStr?.EndsWith(conditionItem.MatchValue, StringComparison.CurrentCultureIgnoreCase) == true)
{
configMatchCount[configItem.Id] = oldValue + 1;
}
else
{
configMatchCount[configItem.Id] = oldValue - 1;
}
}
}
}
// 取出最匹配的流程
KeyValuePair<long, int>? bestMatched = null!;
foreach (var item in configMatchCount)
{
if (bestMatched == null)
{
bestMatched = new KeyValuePair<long, int>(item.Key, item.Value);
continue;
}
if (item.Value > bestMatched.Value.Value)
{
bestMatched = new KeyValuePair<long, int>(item.Key, item.Value);
continue;
}
if (item.Value == bestMatched.Value.Value)
{
// 如果达成的条件数量相等,则取最早的流程
if (item.Key < bestMatched.Value.Key)
{
bestMatched = new KeyValuePair<long, int>(item.Key, item.Value);
continue;
}
}
}
var targetConfig = allConfigList.First(x => x.Id == bestMatched.Value.Key);
var configList = await tenantDb.Queryable<TaskFlowConfig>().Where(x => x.MainConfigId == targetConfig.Id).ToListAsync();
var configIdList = configList.Select(x => x.Id).ToList();
var moduleIdList = configList.Select(x => x.ExecuteModuleId).ToList();
var paramList = await tenantDb.Queryable<TaskFlowParam>().Where(x => configIdList.Contains(x.ConfigId)).ToListAsync();
var moduleList = await tenantDb.Queryable<TaskFlowModule>().Where(x => moduleIdList.Contains(x.Id)).ToListAsync();
TaskFlowLog flowLog = new()
{
Id = SnowFlakeSingle.Instance.NextId(),
IsMatch = true,
TaskId = taskId,
TaskType = taskBaseType.ToString(),
MatchMainConfigId = targetConfig.Id,
ConfigList = JsonConvert.SerializeObject(configList),
ModuleList = JsonConvert.SerializeObject(moduleList),
IsComplete = true,
IsSuccess = true,
};
// 执行流程
foreach (var configItem in configList)
{
TaskFlowLogDetail flowLogDetail = new()
{
ConfigId = configItem.Id,
PId = flowLog.Id,
IsSuccess = true,
};
try
{
var paramItemList = paramList.Where(x => x.ConfigId == configItem.Id).ToList();
foreach (var paramItem in paramItemList)
{
dataContext.Set(paramItem.FieldName!, paramItem.FieldValue!);
//if (dataContext.ContainsKey(paramItem.FieldName!))
//{
// dataContext[paramItem.FieldName!] = paramItem.FieldValue!;
//}
//else
//{
// dataContext.TryAdd(paramItem.FieldName!, paramItem.FieldValue!);
//}
}
var module = moduleList.FirstOrDefault(x => x.Id == configItem.ExecuteModuleId);
if (module == null)
{
throw new Exception($"未找到指定流程配置ExecuteModuleId{configItem.ExecuteModuleId}");
}
flowLogDetail.ModuleId = module.Id;
flowLogDetail.ModuleName = module.Name;
Assembly assembly;
if (module.AssemblyName!.StartsWith("DS.WMS.Core"))
{
assembly = Assembly.GetExecutingAssembly();
}
else
{
assembly = Assembly.Load(module.AssemblyName!);
}
var interfaceType = assembly.DefinedTypes.FirstOrDefault(x => x.Name == module.InterfaceName!);
if (interfaceType == null)
{
throw new Exception($"在程序集【{module.AssemblyName}】下未找到Interface【{module.InterfaceName}】");
}
object? imp = serviceProvider.GetService(interfaceType);
if (imp == null)
{
throw new Exception($"在依赖注入容器未找到Interface【{module.InterfaceName}】的实现类");
}
var impType = imp.GetType();
MethodInfo? method = impType.GetMethod(module.MethodName!, BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic);
if (method == null)
{
throw new Exception($"在Interface【{module.InterfaceName}】的实现类中未找到Method【{module.MethodName}】");
}
Stopwatch stopwatch = Stopwatch.StartNew();
var taskObj = method.Invoke(imp, new object[] { dataContext })!;
if (taskObj is Task task)
{
await task;
if (task.IsFaulted)
{
if (task.Exception != null)
{
throw task.Exception;
}
else
{
throw new Exception("模块执行过程中发生未知异常但是Exception为null请检查");
}
}
dynamic dynamicTask = task;
if (dynamicTask.Result != null)
{
flowLogDetail.ExecuteReturn = JsonConvert.SerializeObject(dynamicTask.Result);
var isSuccess = dynamicTask.Result.Succeeded;
if (isSuccess != null && isSuccess is bool)
{
flowLogDetail.IsSuccess = isSuccess;
if (isSuccess == false)
{
flowLog.IsSuccess = false;
}
}
}
}
else
{
throw new Exception("模块执行结果不是Task类型请检查");
// 后面支持非异步
}
stopwatch.Stop();
flowLogDetail.ElapsedMillisecond = stopwatch.ElapsedMilliseconds;
flowLogDetail.IsComplete = true;
await tenantDb.Insertable(flowLogDetail).ExecuteCommandAsync();
}
catch (Exception ex)
{
flowLog.IsComplete = false;
flowLog.IsSuccess = false;
flowLogDetail.IsComplete = false;
flowLogDetail.IsSuccess = false;
flowLogDetail.ExceptionMessage = WriteLog("模块执行过程中发生异常", ex);
await tenantDb.Insertable(flowLogDetail).ExecuteCommandAsync();
if (configItem.IsExceptionContinue)
{
continue;
}
else
{
break;
}
}
}
await tenantDb.Insertable(flowLog).ExecuteCommandAsync();
return (flowLog.Id, flowLog.IsComplete, flowLog.IsSuccess);
}
/// <summary>
/// 深度获取对象属性值
/// </summary>
/// <param name="obj">对象</param>
/// <param name="propertyPath">属性路径</param>
static (bool isOk, object? val, string? otherPropertyPath) GetPropertyValue(object obj, string propertyPath)
{
var propertyName = propertyPath.Split('.').FirstOrDefault();
var type = obj.GetType();
var property = type.GetProperty(propertyName);
if (property == null)
{
return (true, null, null);
}
if (property.PropertyType.IsClass && property.PropertyType != typeof(string))
{
var val = property.GetValue(obj);
if (val == null)
{
return (true, null, null);
}
var otherPropertyPath = propertyPath.Substring(propertyPath.IndexOf('.') + 1);
return GetPropertyValue(val, otherPropertyPath);
}
else
{
var val = property.GetValue(obj);
return (true, val, null);
}
}
static string WriteLog(string throwMsg, Exception ex)
{
return string.Format("【自定义错误】:{0} \r\n【异常类型】{1} \r\n【异常信息】{2} \r\n【堆栈调用】{3}", new object[]
{
throwMsg,
ex.GetType().Name,
ex.Message,
ex.StackTrace
});
}
}
}