Skip to content

Commit

Permalink
opt: 调整RabbitMQ实现细节,避免可能出现的异常;
Browse files Browse the repository at this point in the history
  • Loading branch information
stratosblue committed Jun 13, 2024
1 parent 2f0d2aa commit 38539fa
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 18 deletions.
15 changes: 10 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@ A message driven distributed asynchronous workflow framework. 消息驱动的分

### NOTE:
- 更新包时应当`尽可能``全链路更新`,避免导致的未知问题;
- `WorkflowContext` 核心为 `字符串字典` 其属性在赋值时进行序列化存放,对象后续的修改不会反应到上下文中;

- `WorkflowContext` 核心为 `字符串字典` 其属性在`赋值时`进行`序列化存放`,对象后续的修改`不会`反应到上下文中;
- `Workflow` 中重写`各个阶段的触发事件`方法时,方法内`不能往外抛出异常`,会导致该阶段消息重新进入队列,再次执行;
- 默认分发器 `FluentWorkflow.RabbitMQ` 依赖 `交换机``队列` 进行消息分发,当存在`多套环境`需要`隔离`时,确保 `交换机``队列` 都不相同,否则将会出现消息重复消费;
- 默认分发器 `FluentWorkflow.RabbitMQ``绑定信息``交换机``队列`)变更时不能完全自动调整,需要人工修正,如手动移除队列错误的`交换机绑定``RoutingKey绑定`,否则将会出现消息重复消费;

## 3. 开始使用

### 3.1 引用 `FluentWorkflow.Core`
```xml
<ItemGroup>
<PackageReference Include="FluentWorkflow.Core" Version="1.0.0-*" />
<PackageReference Include="FluentWorkflow.Core" Version="1.1.4" />
</ItemGroup>
```

Expand Down Expand Up @@ -153,7 +156,7 @@ await workflow.StartAsync(default);

- 启动工作流程的服务可以不是配置工作流程调度器 - `WorkflowScheduler`的服务,但需要接入`消息分发器`并在配置时使用 `Add****Workflow()` 添加对应的工作流程构造器;
- 源代码生成器生成的绝大部分类型都是`partial`的,可以声明`partial`类进行拓展,不可使用`partial`类拓展的功能基本上都可以继承后重写,在配置服务时替换默认实现即可;
- 定义的 `Workflow` 类会添加生命周期各个阶段的触发事件方法,可以继承后重写其逻辑以在各个阶段执行相关的逻辑(注意每次触发可能不在同一个服务实例中);
- 定义的 `Workflow` 类会添加生命周期`各个阶段的触发事件`方法,可以`继承后重写`其逻辑以在各个阶段执行相关的逻辑(注意每次触发可能不在同一个服务实例中。重写后应当捕获并处理所有异常,不要抛出);
- `WorkflowContext` 核心为`字符串字典`,对其修改理论上只对后续可见并在整个执行周期可用,可以将执行参数、结果、中间值等存放其中;
- 消息的分发、重试等逻辑由具体使用的消息分发器`IWorkflowMessageDispatcher`控制(默认提供了基于`CAP``Abp`以及基础的`FluentWorkflow.RabbitMQ`可选);
- 默认情况下 `StageHandler` 出现异常则认为工作流程失败,不会将异常抛给上层 `IWorkflowMessageDispatcher`(消息分发的重试不会触发),可以重写 `StageHandler``OnException` 方法来将异常向上抛出;
Expand Down Expand Up @@ -186,7 +189,7 @@ await workflow.StartAsync(default);
#### 引用 `FluentWorkflow.RabbitMQ`
```xml
<ItemGroup>
<PackageReference Include="FluentWorkflow.RabbitMQ" Version="1.0.0-*" />
<PackageReference Include="FluentWorkflow.RabbitMQ" Version="1.1.4" />
</ItemGroup>
```
#### 配置
Expand Down Expand Up @@ -216,6 +219,8 @@ services.Configure<RabbitMQOptions>(options =>
#### *消息确认超时

RabbitMQ消息的消费ack超时时间默认为30分钟,进行长时间处理时可能会出现意外情况,可参照 [acknowledgement-timeout](https://www.rabbitmq.com/docs/consumers#acknowledgement-timeout) 进行调整
- 框架已默认尝试设置队列参数 `x-consumer-timeout` 为 1 小时(如果RabbitMQ版本支持的话);
- 可使用 `RabbitMQOptions.QueueArgumentsSetup` 对队列的 `x-consumer-timeout` 参数进行调整;

-------

Expand Down
2 changes: 1 addition & 1 deletion package.props
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project>
<PropertyGroup>
<Version>1.1.3</Version>
<Version>1.1.4</Version>

<Description>A message driven distributed asynchronous workflow framework. 消息驱动的分布式异步工作流程处理框架。</Description>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ Task IWorkflowMessageDispatcher.PublishAsync<TMessage>(TMessage message, Cancell
{
var handler = serviceProvider.GetRequiredService(invokerDescriptor.TargetType);
return invokerDescriptor.HandlerInvokeDelegate(handler, messageClone, CancellationToken.None);
}).ToArray();
}).ToList();

await Task.WhenAll(tasks);
}
Expand Down
8 changes: 7 additions & 1 deletion src/FluentWorkflow.RabbitMQ/Consumers/ConsumeDescriptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,10 @@

namespace FluentWorkflow.RabbitMQ;

internal record class ConsumeDescriptor(string EventName, WorkflowEventInvokerDescriptor[] InvokerDescriptors, MessageRequeuePolicy RequeuePolicy, TimeSpan RequeueDelay);
internal record class ConsumeDescriptor(string EventName, WorkflowEventInvokerDescriptor[] InvokerDescriptors, MessageRequeuePolicy RequeuePolicy, TimeSpan RequeueDelay)
{
/// <summary>
/// 单工作流程事件执行程序
/// </summary>
public bool SingleWorkflowEventInvoker { get; } = InvokerDescriptors.Length == 1;
}
28 changes: 26 additions & 2 deletions src/FluentWorkflow.RabbitMQ/Consumers/EventMessageBasicConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,27 @@ public EventMessageBasicConsumer(IModel model,

#endregion Public 构造函数

#region Public 方法

/// <inheritdoc/>
public override sealed async Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory<byte> body)
{
try
{
await InternalHandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
}
catch (Exception ex)
{
Model.BasicReject(deliveryTag, true);

Logger.LogError(ex, "Error at rabbitmq HandleBasicDeliver. DeliveryTag {DeliveryTag} [{ConsumerTag}] Routing: {Exchange} -> {RoutingKey}. requeued.", deliveryTag, consumerTag, exchange, routingKey);

throw;
}
}

#endregion Public 方法

#region Protected 方法

public Task HandleErrorMessageAsync(string? eventName,
Expand Down Expand Up @@ -118,7 +139,7 @@ protected async Task ConsumeMessageAsync(ConsumeDescriptor consumeDescriptor, st
{
await using var serviceScope = ServiceScopeFactory.CreateAsyncScope();
var serviceProvider = serviceScope.ServiceProvider;
if (invokerDescriptors.Length == 1)
if (consumeDescriptor.SingleWorkflowEventInvoker)
{
await InvokeHandlerAsync(body, serviceProvider, invokerDescriptors[0], ObjectSerializer, RunningCancellationToken);
}
Expand All @@ -127,7 +148,7 @@ protected async Task ConsumeMessageAsync(ConsumeDescriptor consumeDescriptor, st
var tasks = invokerDescriptors.Select([StackTraceHidden][DebuggerStepThrough] (invokerDescriptor) =>
{
return InvokeHandlerAsync(body, serviceProvider, invokerDescriptor, ObjectSerializer, RunningCancellationToken);
}).ToArray();
}).ToList();

await Task.WhenAll(tasks);
}
Expand All @@ -148,6 +169,9 @@ protected async Task ConsumeMessageAsync(ConsumeDescriptor consumeDescriptor, st
}
}

/// <inheritdoc cref="HandleBasicDeliver(string, ulong, bool, string, string, IBasicProperties, ReadOnlyMemory{byte})"/>
protected abstract Task InternalHandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory<byte> body);

#endregion Protected 方法

#region Private 方法
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ internal sealed class MultipleEventMessageConsumer : EventMessageBasicConsumer

public ImmutableDictionary<string, ConsumeDescriptor> ConsumeDescriptors { get; }

public HashSet<string> StandaloneEventNames { get; }
public ImmutableHashSet<string> StandaloneEventNames { get; }

#endregion Public 属性

Expand All @@ -47,16 +47,16 @@ public MultipleEventMessageConsumer(ImmutableDictionary<string, ConsumeDescripto
}

ConsumeDescriptors = consumeDescriptors ?? throw new ArgumentNullException(nameof(consumeDescriptors));
StandaloneEventNames = standaloneEventNames ?? throw new ArgumentNullException(nameof(standaloneEventNames));
StandaloneEventNames = standaloneEventNames?.ToImmutableHashSet() ?? throw new ArgumentNullException(nameof(standaloneEventNames));
_errorMessageRequeuePolicy = options.ErrorMessageRequeuePolicy;
_errorMessageRequeueDelay = options.ErrorMessageRequeueDelay;
}

#endregion Public 构造函数

#region Public 方法
#region Protected 方法

public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory<byte> body)
protected override Task InternalHandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory<byte> body)
{
var eventName = "UnknownEventName";
if (properties?.Headers is { } headers
Expand Down Expand Up @@ -84,5 +84,5 @@ public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, b
forceNotRequeue: forceNotRequeue);
}

#endregion Public 方法
#endregion Protected 方法
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ public StandAloneEventMessageConsumer(ConsumeDescriptor consumeDescriptor,

#endregion Public 构造函数

#region Public 方法
#region Protected 方法

public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory<byte> body)
protected override Task InternalHandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory<byte> body)
{
var eventName = "UnknownEventName";
if (properties?.Headers is { } headers
Expand All @@ -56,5 +56,5 @@ public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, b
forceNotRequeue: false);
}

#endregion Public 方法
#endregion Protected 方法
}

0 comments on commit 38539fa

Please sign in to comment.