22. 事件总线
📝 模块更新日志
-
新特性
- 事件总线支持简单的
Order
编排规则 4.8.0 833c0d4 - 事件总线
.ReplaceStorerOrFallback
自定义事件源存储器方法,可在自定义初始失败时回退到默认值 4.7.6 #I602NU - 事件总线模块重试失败后支持回调 4.6.1 #I5UVMV
- 事件总线
LogEnabled
配置,可控制是否输出服务日志 #I5QLY5 - 事件总线
MessageCenter
静态类,解决从Fur v1.x
版本升级问题 a29fc7c - 事件总线工厂,支持运行时动态添加订阅程序和移除订阅程序 #I5NNQX
- 事件总线
[EventSubscribe]
事件Id
支持正则表达式匹配 #I5NNQX - 事件总线
[EventSubscribe]
支持局部失败重试配置 #I5NNQX - 事件总线
options.AddSubscriber(Type)
重载 42446078 - 事件总线
UseUtcTimestamp
选项配置,可选择使用DateTime.UtcNow
还是DateTime.Now
,默认是DateTime.Now
#I5JSEU - 事件总线模块事件
Id
支持枚举类型 2f328aa - 事件总线模块发布者
PublishAsync
和PublishDelayAsync
重载 2f328aa - 事件总线模块拓展方法:
Enum.ParseToString()
和String.ParseToEnum()
2f328aa
- 事件总线支持简单的
-
突破性变化
-
问题修复
-
其他调整
以下内容仅限 Furion 2.20.0 +
版本使用。
22.1 关于事件总线
事件总线是对发布-订阅模式的一种实现。它是一种 集中式事件处理机制,允许不同的组件之间进行彼此通信而又不需要相互依赖,达到一种解耦的目的。
22.2 快速入门
22.2.1 定义事件处理程序
定义事件订阅者 ToDoEventSubscriber
:
// 实现 IEventSubscriber 接口
public class ToDoEventSubscriber : IEventSubscriber
{
private readonly ILogger<ToDoEventSubscriber> _logger;
public ToDoEventSubscriber(ILogger<ToDoEventSubscriber> logger)
{
_logger = logger;
}
[EventSubscribe("ToDo:Create")]
public async Task CreateToDo(EventHandlerExecutingContext context)
{
var todo = context.Source;
_logger.LogInformation("创建一个 ToDo:{Name}", todo.Payload);
await Task.CompletedTask;
}
// 支持多个
[EventSubscribe("ToDo:Create")]
[EventSubscribe("ToDo:Update")]
public async Task CreateOrUpdateToDo(EventHandlerExecutingContext context)
{
var todo = context.Source;
_logger.LogInformation("创建或更新一个 ToDo:{Name}", todo.Payload);
await Task.CompletedTask;
}
// 支持枚举类型,v3.4.3+ 版本支持
[EventSubscribe(YourEnum.Some)]
public async Task EnumHandler(EventHandlerExecutingContext context)
{
var eventEnum = context.Source.EventId.ParseToEnum(); // 将事件 Id 转换成枚举对象
await Task.CompletedTask;
}
// 支持正则表达式匹配,4.2.10+ 版本支持
[EventSubscribe("(^1[3456789][0-9]{9}$)|((^[0-9]{3,4}\\-[0-9]{3,8}$)|(^[0-9]{3,8}$)|(^\\([0-9]{3,4}\\)[0-9]{3,8}$)|(^0{0,1}13[0-9]{9}$))", FuzzyMatch = true)]
public async Task RegexHandler(EventHandlerExecutingContext context)
{
var eventId = context.Source.EventId;
await Task.CompletedTask;
}
// 支持多种异常重试配置,Furion 4.2.10+ 版本支持
[EventSubscribe("test:error", NumRetries = 3)]
[EventSubscribe("test:error", NumRetries = 3, RetryTimeout = 1000)] // 重试间隔时间
[EventSubscribe("test:error", NumRetries = 3, ExceptionTypes = new[] { typeof(ArgumentException) })] // 特定类型异常才重试
public async Task ExceptionHandler(EventHandlerExecutingContext context)
{
var eventId = context.Source.EventId;
await Task.CompletedTask;
}
// 支持简单 Order 编排,Furion 4.8.0+ 版本支持
[EventSubscribe("test:order", Order = 1)]
public async Task ExceptionHandler(EventHandlerExecutingContext context)
{
var eventId = context.Source.EventId;
await Task.CompletedTask;
}
}
22.2.2 发布事件消息
创建控制器 ToDoController
,依赖注入 IEventPublisher
服务:
public class ToDoController : ControllerBase
{
// 依赖注入事件发布者 IEventPublisher
private readonly IEventPublisher _eventPublisher;
public ToDoController(IEventPublisher eventPublisher)
{
_eventPublisher = eventPublisher;
}
// 发布 ToDo:Create 消息
public async Task CreateDoTo(string name)
{
await _eventPublisher.PublishAsync(new ChannelEventSource("ToDo:Create", name));
// 也可以延迟发布,比如延迟 3s
await _eventPublisher.PublishDelayAsync(new ChannelEventSource("ToDo:Create", name), 3000);
}
// v3.4.3+ 版本支持发送消息简化
public async Task CreateDoTo(string name)
{
await _eventPublisher.PublishAsync("ToDo:Create", name);
// 也可以延迟发布,比如延迟 3s
await _eventPublisher.PublishDelayAsync("ToDo:Create", 3000, name);
// 也支持枚举
await _eventPublisher.PublishAsync(YourEnum.Some);
}
}
22.2.3 注册事件服务
在 Startup.cs
注册 EventBus
服务:
// 注册 EventBus 服务
services.AddEventBus(builder =>
{
// 注册 ToDo 事件订阅者
builder.AddSubscriber<ToDoEventSubscriber>();
// 通过类型注册,Furion 4.2.1+ 版本
builder.AddSubscriber(typeof(ToDoEventSubscriber));
// 批量注册事件订阅者
builder.AddSubscribers(ass1, ass2, ....);
});
在 Furion
中可以不用通过 builder.AddSubscriber<T>()
方式一一注册,只需要实现 ISingleton
接口即可,如:
public class ToDoEventSubscriber : IEventSubscriber, ISingleton
{
}
这样就无需写 代码了,只需保留 builder.AddSubscriber<ToDoEventSubscriber>();
services.AddEventBus()
服务即可。
22.2.4 运行项目
运行项目并触发事件发布接口或方法。
info: Jaina.Samples.ToDoEventSubscriber[0]
创建一个 ToDo:Jaina
22.3 自定义事件源
Furion
使用 IEventSource
作为消息载体,任何实现该接口的类都可以充当消息载体。
如需自定义,只需实现 IEventSource
接口即可:
public class ToDoEventSource : IEventSource
{
public ToDoEventSource()
{
}
public ToDoEventSource(string eventId, string todoName)
{
EventId = eventId;
ToDoName = todoName;
}
// 自定义属性
public string ToDoName { get; set; }
/// <summary>
/// 事件 Id
/// </summary>
public string EventId { get; set; }
/// <summary>
/// 事件承载(携带)数据
/// </summary>
public object Payload { get; set; }
/// <summary>
/// 事件创建时间
/// </summary>
public DateTime CreatedTime { get; set; } = DateTime.UtcNow;
/// <summary>
/// 取消任务 Token
/// </summary>
/// <remarks>用于取消本次消息处理</remarks>
[Newtonsoft.Json.JsonIgnore]
[System.Text.Json.Serialization.JsonIgnore]
public CancellationToken CancellationToken { get; set; }
}
使用:
await _eventPublisher.PublishAsync(new ToDoEventSource ("ToDo:Create", "我的 ToDo Name"));
22.4 自定义事件源存储器
Furion
默认采用 Channel
作为事件源 IEventSource
存储器,开发者可以使用任何消息队列组件进行替换,如 Kafka、RabbitMQ、ActiveMQ
等,也可以使用部分数据库 Redis、SQL Server、MySql
实现。
如需自定义,只需实现 IEventSourceStorer
接口即可。
22.4.1 Redis
自定义指南
Windows
版本 Redis
安装包下载1. 安装 Redis
拓展包
Install-Package Microsoft.Extensions.Caching.StackExchangeRedis
也可以直接安装 StackExchange.Redis
包。
2. 创建 RedisEventSourceStorer
自定义存储器
using Furion.EventBus;
using StackExchange.Redis;
using System;
using System.Text.Json;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace Furion.Core;
public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable
{
/// <summary>
/// 内存通道事件源存储器
/// </summary>
private readonly Channel<IEventSource> _channel;
/// <summary>
/// Redis 连接对象
/// </summary>
private readonly ConnectionMultiplexer _connectionMultiplexer;
/// <summary>
/// 路由键
/// </summary>
private readonly string _routeKey;
/// <summary>
/// 构造函数
/// </summary>
/// <param name="connectionMultiplexer">Redis 连接对象</param>
/// <param name="routeKey">路由键</param>
/// <param name="capacity">存储器最多能够处理多少消息,超过该容量进入等待写入</param>
public RedisEventSourceStorer(ConnectionMultiplexer connectionMultiplexer, string routeKey, int capacity)
{
// 配置通道,设置超出默认容 量后进入等待
var boundedChannelOptions = new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.Wait
};
// 创建有限容量通道
_channel = Channel.CreateBounded<IEventSource>(boundedChannelOptions);
_connectionMultiplexer = connectionMultiplexer;
_routeKey = routeKey;
// 获取一个订阅对象
var subscriber = connectionMultiplexer.GetSubscriber();
// 订阅消息
subscriber.Subscribe(routeKey, (channel, data) =>
{
// 转换为 IEventSource,这里可以选择自己喜欢的序列化工具,如果自定义了 EventSource,注意属性是可读可写
var eventSource = JsonSerializer.Deserialize<ChannelEventSource>(data.ToString());
// 写入内存管道存储器
_channel.Writer.WriteAsync(eventSource);
});
}
/// <summary>
/// 将事件源写入存储器
/// </summary>
/// <param name="eventSource">事件源对象</param>
/// <param name="cancellationToken">取消任务 Token</param>
/// <returns><see cref="ValueTask"/></returns>
public async ValueTask WriteAsync(IEventSource eventSource, CancellationToken cancellationToken)
{
// 空检查
if (eventSource == default)
{
throw new ArgumentNullException(nameof(eventSource));
}
// 这里判断是否是 ChannelEventSource 或者 自定义的 EventSource
if (eventSource is ChannelEventSource source)
{
// 序列化,这里可以选择自己喜欢的序列化工具
var data = JsonSerializer.Serialize(source);
// 获取一个订阅对象
var subscriber = _connectionMultiplexer.GetSubscriber();
await subscriber.PublishAsync(_routeKey, data);
}
else
{
// 这里处理动态订阅问题
await _channel.Writer.WriteAsync(eventSource, cancellationToken);
}
}
/// <summary>
/// 从存储器中读取一条事件源
/// </summary>
/// <param name="cancellationToken">取消任务 Token</param>
/// <returns>事件源对象</returns>
public async ValueTask<IEventSource> ReadAsync(CancellationToken cancellationToken)
{
// 读取一条事件源
var eventSource = await _channel.Reader.ReadAsync(cancellationToken);
return eventSource;
}
/// <summary>
/// 释放非托管资源
/// </summary>
public void Dispose()
{
_connectionMultiplexer.Dispose();
}
}
3. 替换默认事件存储器
services.AddEventBus(options =>
{
// 创建 Redis 连接对象
var connectionMultiplexer = ConnectionMultiplexer.Connect("localhost");
// 创建默认内存通道事件源对象,可自定义队列路由key,比如这里是 eventbus
var redisEventSourceStorer = new RedisEventSourceStorer(connectionMultiplexer, "eventbus", 3000);
// 替换默认事件总线存储器
options.ReplaceStorer(serviceProvider =>
{
return redisEventSourceStorer;
});
});
StackExchange.Redis
超时问题如遇到 StackExchange.Redis
超时(Timeout)问题,可编辑启动层的 .csproj
添加以下配置:
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<ThreadPoolMinThreads>50</ThreadPoolMinThreads>
</PropertyGroup>
</Project>
如果以上配置依然存在超时问题,那么可以在 Program.cs/Startup.cs
中添加 ThreadPool.SetMinThreads(200, 200);
。
22.4.2 RabbitMQ
自定义指南
以下内容仅限 Furion 4.3.4 +
版本使用。
由于使用 RabbitMQ
作为事件总线存储器的比较多,所以这里提供了完整的使用例子。
1. 安装 RabbitMQ.Client
拓展包
Install-Package RabbitMQ.Client
2. 创建 RabbitMQEventSourceStorer
自定义存储器
using Furion.EventBus;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace Furion.Core;
public sealed class RabbitMQEventSourceStorer : IEventSourceStorer, IDisposable
{
/// <summary>
/// 内存通道事件源存储器
/// </summary>
private readonly Channel<IEventSource> _channel;
/// <summary>
/// 通道对象
/// </summary>
private readonly IModel _model;
/// <summary>
/// 连接对象
/// </summary>
private readonly IConnection _connection;
/// <summary>
/// 路由键
/// </summary>
private readonly string _routeKey;
/// <summary>
/// 构造函数
/// </summary>
/// <param name="factory">连接工厂</param>
/// <param name="routeKey">路由键</param>
/// <param name="capacity">存储器最多能够处理多少消息,超过该容量进入等待写入</param>
public RabbitMQEventSourceStorer(ConnectionFactory factory, string routeKey, int capacity)
{
// 配置通道,设置超出默认容量后进入等待
var boundedChannelOptions = new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.Wait
};
// 创建有限容量通道
_channel = Channel.CreateBounded<IEventSource>(boundedChannelOptions);
// 创建连接
_connection = factory.CreateConnection();
_routeKey = routeKey;
// 创建通道
_model = _connection.CreateModel();
// 声明路由队列
_model.QueueDeclare(routeKey, false, false, false, null);
// 创建消息订阅者
var consumer = new EventingBasicConsumer(_model);
// 订阅消息并写入内存 Channel
consumer.Received += (ch, ea) =>
{
// 读取原始消息
var stringEventSource = Encoding.UTF8.GetString(ea.Body.ToArray());
// 转换为 IEventSource,这里可以选择自己喜欢的序列化工具,如果自定义了 EventSource,注意属性是可读可写
var eventSource = JsonSerializer.Deserialize<ChannelEventSource>(stringEventSource);
// 写入内存管道存储器
_channel.Writer.WriteAsync(eventSource);
// 确认该消息已被消费
_model.BasicAck(ea.DeliveryTag, false);
};
// 启动消费者 设置为手动应答消息
_model.BasicConsume(routeKey, false, consumer);
}
/// <summary>
/// 将事件源写入存储器
/// </summary>
/// <param name="eventSource">事件源对象</param>
/// <param name="cancellationToken">取消任务 Token</param>
/// <returns><see cref="ValueTask"/></returns>
public async ValueTask WriteAsync(IEventSource eventSource, CancellationToken cancellationToken)
{
// 空检查
if (eventSource == default)
{
throw new ArgumentNullException(nameof(eventSource));
}
// 这里判断是否是 ChannelEventSource 或者 自定义的 EventSource
if (eventSource is ChannelEventSource source)
{
// 序列化,这里可以选择自己喜欢的序列化工具
var data = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(source));
// 发布
_model.BasicPublish("", _routeKey, null, data);
}
else
{
// 这里处理动态订阅问题
await _channel.Writer.WriteAsync(eventSource, cancellationToken);
}
}
/// <summary>
/// 从存储器中读取一条事件源
/// </summary>
/// <param name="cancellationToken">取消任务 Token</param>
/// <returns>事件源对象</returns>
public async ValueTask<IEventSource> ReadAsync(CancellationToken cancellationToken)
{
// 读取一条事件源
var eventSource = await _channel.Reader.ReadAsync(cancellationToken);
return eventSource;
}
/// <summary>
/// 释放非托管资源
/// </summary>
public void Dispose()
{
_model.Dispose();
_connection.Dispose();
}
}
3. 替换默认事件存储器
services.AddEventBus(options =>
{
// 创建连接工厂
var factory = new ConnectionFactory
{
UserName = "admin",
Password = "q1w2e3",
};
// 创建默认内存通道事件源对象,可自定义队列路由key,比如这里是 eventbus
var rbmqEventSourceStorer = new RabbitMQEventSourceStorer(factory, "eventbus", 3000);
// 替换默认事件总线存储器
options.ReplaceStorer(serviceProvider =>
{
return rbmqEventSourceStorer;
});
});
22.4.3 Kafka
自定义指南
1. 安装 Confluent.Kafka
拓展包
Install-Package Confluent.Kafka
2. 创建 EventConsumer
订阅类
using Confluent.Kafka;
namespace Furion.Core;
/// <summary>
/// Kafka 消息扩展
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
public class EventConsumer<TKey, TValue> : IDisposable
{
private Task _consumerTask;
private CancellationTokenSource _consumerCts;
/// <summary>
/// 消费者
/// </summary>
public IConsumer<TKey, TValue> Consumer { get; }
/// <summary>
/// ConsumerBuilder
/// </summary>
public ConsumerBuilder<TKey, TValue> Builder { get; set; }
/// <summary>
/// 消息回调
/// </summary>
public event EventHandler<ConsumeResult<TKey, TValue>> Received;
/// <summary>
/// 异常回调
/// </summary>
public event EventHandler<ConsumeException> OnConsumeException;
/// <summary>
/// 构造函数
/// </summary>
/// <param name="config"></param>
public EventConsumer(IEnumerable<KeyValuePair<string, string>> config)
{
Builder = new ConsumerBuilder<TKey, TValue>(config);
Consumer = Builder.Build();
}
/// <summary>
/// 启动
/// </summary>
/// <exception cref="InvalidOperationException"></exception>
public void Start()
{
if (Consumer.Subscription?.Any() != true)
{
throw new InvalidOperationException("Subscribe first using the Consumer.Subscribe() function");
}
if (_consumerTask != null)
{
return;
}
_consumerCts = new CancellationTokenSource();
var ct = _consumerCts.Token;
_consumerTask = Task.Factory.StartNew(() =>
{
while (!ct.IsCancellationRequested)
{
try
{
var cr = Consumer.Consume(TimeSpan.FromSeconds(1));
if (cr == null) continue;
Received?.Invoke(this, cr);
}
catch (ConsumeException e)
{
OnConsumeException?.Invoke(this, e);
}
}
}, ct, TaskCreationOptions.LongRunning, System.Threading.Tasks.TaskScheduler.Default);
}
/// <summary>
/// 停止
/// </summary>
/// <returns></returns>
public async Task Stop()
{
if (_consumerCts == null || _consumerTask == null) return;
_consumerCts.Cancel();
try
{
await _consumerTask;
}
finally
{
_consumerTask = null;
_consumerCts = null;
}
}
/// <summary>
/// 释放
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// 释放
/// </summary>
/// <param name="disposing"></param>
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
if (_consumerTask != null)
{
Stop().Wait();
}
Consumer?.Dispose();
}
}
}
3. 创建 KafkaEventSourceStore
自定义存储器
using Confluent.Kafka;
using Furion.EventBus;
using Newtonsoft.Json;
using System.Threading.Channels;
namespace Furion.Core;
/// <summary>
/// Kafka 存储源
/// </summary>
public class KafkaEventSourceStore : IEventSourceStorer, IDisposable
{
/// <summary>
/// 内存通道事件源存储器
/// </summary>
private readonly Channel<IEventSource> _channel;
/// <summary>
/// 主题
/// </summary>
private readonly string _topic;
/// <summary>
/// 消费者
/// </summary>
private readonly EventConsumer<Null, string> _eventConsumer;
/// <summary>
/// 生产者
/// </summary>
private readonly IProducer<Null, string> _producer;
/// <summary>
/// 构造函数
/// </summary>
/// <param name="consumerConf">消费者配置</param>
/// <param name="producerConf">生产者配置</param>
/// <param name="topic">主题</param>
/// <param name="capacity">存储器最多能够处理多少消息,超过该容量进入等待写入</param>
public KafkaEventSourceStore(ConsumerConfig consumerConf, ProducerConfig producerConf, string topic, int capacity)
{
// 配置通道,设置超出默认容量后进入等待
var boundedChannelOptions = new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.Wait
};
// 创建有限容量通道
_channel = Channel.CreateBounded<IEventSource>(boundedChannelOptions);
// 主题
_topic = topic;
// 创建消息订阅者
_eventConsumer = new EventConsumer<Null, string>(consumerConf);
_eventConsumer.Consumer.Subscribe(new[] { topic });
// 订阅消息写入 Channel
_eventConsumer.Received += (send, cr) =>
{
// 反序列化消息
var eventSource = JsonConvert.DeserializeObject<ChannelEventSource>(cr.Message.Value);
// 写入内存管道存储器
_channel.Writer.WriteAsync(eventSource);
};
// 启动消费者
_eventConsumer.Start();
// 创建生产者
_producer = new ProducerBuilder<Null, string>(producerConf).Build();
}
/// <summary>
/// 将事件源写入存储器
/// </summary>
/// <param name="eventSource">事件源对象</param>
/// <param name="cancellationToken">取消任务 Token</param>
/// <returns><see cref="ValueTask"/></returns>
public async ValueTask WriteAsync(IEventSource eventSource, CancellationToken cancellationToken)
{
if (eventSource == default)
{
throw new ArgumentNullException(nameof(eventSource));
}
// 这里判断是否是 ChannelEventSource 或者 自定义的 EventSource
if (eventSource is ChannelEventSource source)
{
// 序列化
var data = JsonConvert.SerializeObject(source);
// 异步发布
await _producer.ProduceAsync(_topic, new Message<Null, string>
{
Value = data
}, cancellationToken);
}
else
{
// 这里处理动态订阅问题
await _channel.Writer.WriteAsync(eventSource, cancellationToken);
}
}
/// <summary>
/// 从存储器中读取一条事件源
/// </summary>
/// <param name="cancellationToken">取消任务 Token</param>
/// <returns>事件源对象</returns>
public async ValueTask<IEventSource> ReadAsync(CancellationToken cancellationToken)
{
// 读取一条事件源
var eventSource = await _channel.Reader.ReadAsync(cancellationToken);
return eventSource;
}
/// <summary>
/// 释放非托管资源
/// </summary>
public async void Dispose()
{
await _eventConsumer.Stop();
GC.SuppressFinalize(this);
}
}
4. 替换默认事件存储器
services.AddEventBus(options =>
{
var consumerConf = new ConsumerConfig
{
BootstrapServers = "xxx.xxx.xxx.xxx:9092",
GroupId = "Consumer",
AutoOffsetReset = AutoOffsetReset.Earliest // 从最早的开始消费起
};
var producerConf = new ProducerConfig
{
BootstrapServers = "xxx.xxx.xxx.xxx:9092",
BatchSize = 16384, // 修改批次大小为16K
LingerMs = 20 // 修改等待时间为20ms
};
// 创建默认内存通道事件源对象,可自定义队列路由key,比如这里是 eventbus
var kafkaEventSourceStorer = new KafkaEventSourceStore(consumerConf, producerConf, "testTopic", 30000);
// 替换默认事件总线存储器
options.ReplaceStorer(serviceProvider =>
{
return kafkaEventSourceStorer;
});
});
22.5 自定义事件发布者
Furion
默认内置基于 Channel
的事件发布者 ChannelEventPublisher
。
如需自定义,只需实现 IEventPublisher
接口即可:
public class ToDoEventPublisher : IEventPublisher
{
private readonly IEventSourceStorer _eventSourceStorer;
public ToDoEventPublisher(IEventSourceStorer eventSourceStorer)
{
_eventSourceStorer = eventSourceStorer;
}
public async Task PublishAsync(IEventSource eventSource)
{
await _eventSourceStorer.WriteAsync(eventSource, eventSource.CancellationToken);
}
// .... 其他接口实现
}
最后,在注册 EventBus
服务中替换默认 IEventPublisher
:
services.AddEventBus(builder =>
{
// 替换事件源存储器
builder.ReplacePublisher<ToDoEventPublisher>();
});
22.6 添加事件执行监视器
Furion
提供了 IEventHandlerMonitor
监视器接口,实现该接口可以监视所有订阅事件,包括 执行之前、执行之后,执行异常,共享上下文数据
。
如添加 ToDoEventHandlerMonitor
:
public class ToDoEventHandlerMonitor : IEventHandlerMonitor
{
private readonly ILogger<ToDoEventHandlerMonitor> _logger;
public ToDoEventHandlerMonitor(ILogger<ToDoEventHandlerMonitor> logger)
{
_logger = logger;
}
public Task OnExecutingAsync(EventHandlerExecutingContext context)
{
_logger.LogInformation("执行之前:{EventId}", context.Source.EventId);
return Task.CompletedTask;
}
public Task OnExecutedAsync(EventHandlerExecutedContext context)
{
_logger.LogInformation("执行之后:{EventId}", context.Source.EventId);
if (context.Exception != null)
{
_logger.LogError(context.Exception, "执行出错啦:{EventId}", context.Source.EventId);
}
return Task.CompletedTask;
}
}
最后,在注册 EventBus
服务中注册 ToDoEventHandlerMonitor
:
services.AddEventBus(builder =>
{
// 添加事件执行监视器
builder.AddMonitor<ToDoEventHandlerMonitor>();
});
22.7 添加事件执行器
Furion
提供了 IEventHandlerExecutor
执行器接口,可以让开发者自定义事件处理函数执行策略,如 超时控制,失败重试、熔断等等
。
如添加 RetryEventHandlerExecutor
:
public class RetryEventHandlerExecutor : IEventHandlerExecutor
{
public async Task ExecuteAsync(EventHandlerExecutingContext context, Func<EventHandlerExecutingContext, Task> handler)
{
// 如果执行失败,每隔 1s 重试,最多三次
await Retry.InvokeAsync(async () => {
await handler(context);
}, 3, 1000);
}
}
最后,在注册 EventBus
服务中注册 RetryEventHandlerExecutor
:
services.AddEventBus(builder =>
{
// 添加事件执行器
builder.AddExecutor<RetryEventHandlerExecutor>();
});
22.8 使用有作用域的服务
在 Furion
中, Event Bus
所有服务均注册为单例,如需使用作用域服务(单例服务可直接注入),可通过依赖注入 IServiceScopeFactory
实例并通过 CreateScope()
创建一个作用域,如:
public class ToDoEventSubscriber : IEventSubscriber
{
private readonly ILogger<ToDoEventSubscriber> _logger;
public ToDoEventSubscriber(IServiceScopeFactory scopeFactory // 单例服务可以直接注入
, ILogger<ToDoEventSubscriber> logger) // 单例服务可以直接注入
{
ScopeFactory = scopeFactory;
_logger = logger;
// 避免在这里解析非单例服务
}
public IServiceScopeFactory ScopeFactory { get; }
[EventSubscribe("ToDo:Create")]
public async Task CreateToDo(EventHandlerExecutingContext context)
{
// 创建新的作用域
using var scope = ScopeFactory.CreateScope();
// 解析服务
var scopedProcessingService = scope.ServiceProvider.GetRequiredService<IScopedProcessingService>();
// ....
}
}
如果是非单例对象,应避免在构造函数中解析服务,如把 scope.ServiceProvider.GetRequiredService<T>()
放在构造函数中初始化会导致一些服务无法使用。
22.9 订阅执行任务意外异常
事件处理程序使用的是 Task
对象进行创建并执行,但可能存在一些意外且难以捕获的异常,这时候可以通过以下方式订阅:
services.AddEventBus(builder =>
{
// 订阅 EventBus 意外未捕获异常
builder.UnobservedTaskExceptionHandler = (obj, args) =>
{
// ....
};
});
22.10 事件总线工厂
以下内容仅限 Furion 4.2.10 +
版本使用。
在该版本中,Furion
提供了 IEventBusFactory
工厂服 务,可在运行时动态新增或删除订阅,如:
public class TestEventBus : IDynamicApiController
{
private readonly IEventPublisher _eventPublisher;
private readonly IEventBusFactory _eventBusFactory;
public TestEventBus(IEventPublisher eventPublisher, IEventBusFactory eventBusFactory)
{
_eventPublisher = eventPublisher;
_eventBusFactory = eventBusFactory;
}
// 运行时动态添加一个订阅器
public async Task AddSubscriber()
{
await _eventBusFactory.Subscribe("xxx", async (ctx) =>
{
Console.WriteLine("我是动态的");
await Task.CompletedTask;
});
}
// 运行时动态删除一个订阅器
public async Task RemoveDynamic(string eventId)
{
await _eventBusFactory.Unsubscribe(eventId);
}
}
22.11 MessageCenter
静态类
以下内容仅限 Furion 4.3.3 +
版本使用。
在 Furion 4.3.3
版本新增了 MessageCenter
静态类,可在任何地方发送事件消息或订阅消息。
// 发送消息(含诸多重载)
await MessageCenter.PublishAsync("messageId", new {});
// 动态订阅消息
MessageCenter.Subscribe("messageId", async (ctx) => {
Console.WriteLine("我是动态的");
await Task.CompletedTask;
});
// 取消订阅
MessageCenter.Unsubscribe("messageId");
22.12 配置重试失败回调
以下内容仅限 Furion 4.6.1 +
版本使用。
- 创建
IEventFallbackPolicy
实现类并实现Callback
方法,如EventFallbackPolicy
:
public class EventFallbackPolicy : IEventFallbackPolicy
{
private readonly ILogger<EventFallbackPolicy> _logger;
public EventFallbackPolicy(ILogger<EventFallbackPolicy> logger)
{
_logger = logger;
}
public async Task CallbackAsync(EventHandlerExecutingContext context, Exception ex)
{
_logger.LogError(ex, "重试了多次最终还是失败了");
await Task.CompletedTask;
}
}
- 注册
EventFallbackPolicy
类型服务
services.AddEventBus(options =>
{
options.AddFallbackPolicy<EventFallbackPolicy>();
});
- 通过
[EventSubscribe]
特性配置FallbackPolicy
属性使用
[EventSubscribe("test:error", NumRetries = 3, FallbackPolicy = typeof(EventFallbackPolicy))] // 重试三次
public async Task TestError(EventHandlerExecutingContext context)
{
Console.WriteLine("我执行啦~~");
throw new NotImplementedException();
}
可以定义多个 IEventFallbackPolicy
实现类,然后通过 options.AddFallbackPolicy<T>()
注册多个,这样实现不同的事件订阅程序执行不同的策略。如:
[EventSubscribe("test:error", NumRetries = 3, FallbackPolicy = typeof(EventFallbackPolicy))]
[EventSubscribe("test:error", NumRetries = 1010, FallbackPolicy = typeof(OtherEventFallbackPolicy))]
22.13 自定义事件源存储器不可用时回退
有时候我们自定义了事件源存储器如使用 Redis
、RabbitMQ
等第三方存储介质,但由于一些原因导致启动时不可用如 Redis
服务未启动,这时候我们可以配置选择回退到默认事件源存储器(内存)中,如:
services.AddEventBus(options =>
{
options.ReplaceStorerOrFallback(() => new RedisEventSourceStorer(......));
});
// 还可以解析服务传入
services.AddEventBus(options =>
{
options.ReplaceStorerOrFallback(serviceProvider => // 提供 IServiceProvider 可解析服务
{
// var someService = serviceProvider.GetRequiredService<ISomeSerivce>();
return new RedisEventSourceStorer(......);
});
});
目前只支持在应用启动时检查,不支持在运行时实现故障转移切换。
22.14 EventBusOptionsBuilder
配置
EventBusOptionsBuilder
是 AddEventBus
构建服务选项,该选项包含以下属性和方法:
- 属性
ChannelCapacity
:默认内存通道容量UnobservedTaskExceptionHandler
:订阅执行任务未察觉异常UseUtcTimestamp
:是否使用UTC
时间,bool
类型,默认false
FuzzyMatch
:是否开启全局模糊匹配(正则表达式)事件Id
,bool
类型,默认false
LogEnabled
:是否启用日志输出,bool
类型,默认true
- 方法
AddSubscriber<TEventSubscriber>
:添加订阅者ReplacePublisher<TEventPublisher>
:替换发布者ReplaceStorer(Func<IServiceProvider, IEventSourceStorer>)
:替换存储器ReplaceStorerOrFallback(Func<IServiceProvider, IEventSourceStorer>)
:替换存储器如果失败则回滚默认AddMonitor<TEventHandlerMonitor>
:添加监视器AddExecutor<TEventHandlerExecutor>
:添加执行器
22.15 关于高频消息处 理方式
在一些高频发送消息的场景如 IoT
、日志记录、数据采集,为避免频繁解析服务和创建作用域,可使用 类全局作用域 和所有服务都采取单例的方式:
public class ToDoEventSubscriber : IEventSubscriber, IDisposable
{
private readonly ILogger<ToDoEventSubscriber> _logger;
private readonly IServiceScope _serviceScope;
public ToDoEventSubscriber(IServiceScopeFactory scopeFactory
, ILogger<ToDoEventSubscriber> logger)
{
_serviceScope = scopeFactory.CreateScope();
_logger = logger;
}
[EventSubscribe("iot:log")]
public async Task LogFromIoT(EventHandlerExecutingContext context)
{
// 解析服务
var scopedProcessingService = _serviceScope.ServiceProvider.GetRequiredService<IScopedProcessingService>();
// ....
}
/// <summary>
/// 释放服务作用域
/// </summary>
public void Dispose()
{
_serviceScope.Dispose();
}
}
22.16 IIS 部署回收设置
如果在项目中使用了事件总线且部署到 IIS
中,那么需要设置 IIS
禁止回收,避免事件总线服务进入休眠,点击查看 IIS
回收问题解决方案
22.17 使用第三方事件总线 CAP
示例
builder.Services.AddCap(options =>
{
options.UseInMemoryStorage();
options.UseInMemoryMessageQueue();
}).AddSubscriberAssembly(App.Assemblies.ToArray());
22.18 在 WinForm/WPF
中使用
事件总线支持在 WinForm/WPF
中完整的使用。
- 发送事件
private async void button1_Click(object sender, EventArgs e)
{
await _eventPublisher.PublishAsync("update.textbox", textBox1); // 可将 TextBox 控件作为 Payload 传递
}
- 订阅事件并更新
UI
public class UIEventSubscriber : IEventSubscriber
{
[EventSubscribe("update.textbox")]
public async Task UpdateTextBox(EventHandlerExecutingContext context)
{
// 将 Palload 转 TextBox
var textbox = context.Source.Payload as TextBox;
// 多线程更新 UI 组件(WinForm)
textbox?.BeginInvoke(() =>
{
text.Text = "我是事件总线更新的";
});
// 多线程更新 UI 组件(WPF)
textbox?.Dispatcher?.BeginInvoke(() =>
{
text.Content = " 我是事件总线更新的";
});
await Task.CompletedTask;
}
}
22.19 反馈与建议
给 Furion 提 Issue。