Skip to content

Commit

Permalink
feat: 优化Activity细节;RabbitMQOptions添加PreferSingleConnection选项,用于使用更少的R…
Browse files Browse the repository at this point in the history
…abbitMQ链接;
  • Loading branch information
stratosblue committed Jun 16, 2024
1 parent 38539fa commit d178ad1
Show file tree
Hide file tree
Showing 15 changed files with 260 additions and 41 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ A message driven distributed asynchronous workflow framework. 消息驱动的分
- `Workflow` 中重写`各个阶段的触发事件`方法时,方法内`不能往外抛出异常`,会导致该阶段消息重新进入队列,再次执行;
- 默认分发器 `FluentWorkflow.RabbitMQ` 依赖 `交换机``队列` 进行消息分发,当存在`多套环境`需要`隔离`时,确保 `交换机``队列` 都不相同,否则将会出现消息重复消费;
- 默认分发器 `FluentWorkflow.RabbitMQ``绑定信息``交换机``队列`)变更时不能完全自动调整,需要人工修正,如手动移除队列错误的`交换机绑定``RoutingKey绑定`,否则将会出现消息重复消费;
- 框架暂时没有保证消息可靠性,即在`消息队列中间件`异常的情况下可能会出现流程中断、重复消费等情况;

## 3. 开始使用

Expand Down
2 changes: 1 addition & 1 deletion package.props
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project>
<PropertyGroup>
<Version>1.1.4</Version>
<Version>1.1.5</Version>

<Description>A message driven distributed asynchronous workflow framework. 消息驱动的分布式异步工作流程处理框架。</Description>

Expand Down
29 changes: 29 additions & 0 deletions src/FluentWorkflow.Core/ActivityExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,22 @@ public static class ActivityExtensions
{
#region Public 方法

/// <summary>
/// 在任务 <paramref name="task"/> 完成时处理 <paramref name="activity"/>
/// </summary>
/// <param name="task"></param>
/// <param name="activity"></param>
[EditorBrowsable(EditorBrowsableState.Never)]
public static void DisposeActivityWhenTaskCompleted(this Task task, Activity? activity)
{
if (activity is null)
{
return;
}

task.ContinueWith(RecordTaskExceptionAndDisposeActivity, activity, CancellationToken.None);
}

/// <summary>
/// 记录异常异常
/// </summary>
Expand Down Expand Up @@ -46,5 +62,18 @@ private static void RecordExceptionEvent(this Activity activity, Exception excep
activity.AddEvent(new ActivityEvent("exception", timestamp ?? DateTimeOffset.UtcNow, tagsCollection));
}

private static void RecordTaskExceptionAndDisposeActivity(Task task, object? state)
{
if (state is Activity stateActivity)
{
if (task.Exception is not null)
{
stateActivity.RecordException(task.Exception);
}

stateActivity.Dispose();
}
}

#endregion Private 方法
}
12 changes: 11 additions & 1 deletion src/FluentWorkflow.Core/Diagnostics/PrettyJSONObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,17 @@ public PrettyJSONObject(T? value, IObjectSerializer objectSerializer)
#region Public 方法

/// <inheritdoc/>
public override string ToString() => _objectSerializer.PrettySerialize(Value);
public override string ToString()
{
try
{
return _objectSerializer.PrettySerialize(Value);
}
catch (Exception ex)
{
return $"Serialize \"{typeof(T)}\" failed: {ex}";
}
}

#endregion Public 方法
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
using System.Runtime.Serialization;
#pragma warning disable CS0618
#pragma warning disable CS8618

using System.Runtime.Serialization;
using FluentWorkflow.Interface;

namespace FluentWorkflow;
Expand Down Expand Up @@ -67,11 +70,8 @@ public WorkflowFailureException(string id, string stage, string message, string?

#region Protected 构造函数

#pragma warning disable CS8618

/// <inheritdoc cref="WorkflowFailureException"/>
protected WorkflowFailureException(SerializationInfo info, StreamingContext context) : base(info, context)
#pragma warning restore CS8618
{
}

Expand Down
3 changes: 2 additions & 1 deletion src/FluentWorkflow.Core/Implement/WorkflowScheduler.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.ComponentModel;
using FluentWorkflow.Diagnostics;
using FluentWorkflow.Interface;
using FluentWorkflow.Tracing;
using Microsoft.Extensions.DependencyInjection;

namespace FluentWorkflow;
Expand Down Expand Up @@ -65,7 +66,7 @@ public virtual Task StartAsync(TWorkflow workflow, CancellationToken cancellatio
if (activity != null)
{
activity.AddTag(DiagnosticConstants.ActivityNames.TagKeys.Context, PrettyJSONObject.Create(workflow.Context, ObjectSerializer));
task.ContinueWith(static (_, state) => ((IDisposable)state!).Dispose(), activity, CancellationToken.None);
task.DisposeActivityWhenTaskCompleted(activity);
}

return task;
Expand Down
6 changes: 4 additions & 2 deletions src/FluentWorkflow.Core/Implement/WorkflowStageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using FluentWorkflow.Diagnostics;
using FluentWorkflow.Extensions;
using FluentWorkflow.Interface;
using FluentWorkflow.Tracing;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

Expand Down Expand Up @@ -86,13 +87,13 @@ public virtual async Task HandleAsync(TStageMessage stageMessage, CancellationTo
using var activity = ActivitySource.StartActivity($"{DiagnosticConstants.ActivityNames.StageProcessing} - {stageMessage.Stage}", System.Diagnostics.ActivityKind.Consumer);
activity?.AddTag(DiagnosticConstants.ActivityNames.TagKeys.Message, PrettyJSONObject.Create(stageMessage, ObjectSerializer));

_diagnosticSource.StageMessageHandleStart(stageMessage);

Exception? exception = null;
var notFiredDiagnostic = true;

try
{
_diagnosticSource.StageMessageHandleStart(stageMessage);

ThrowIfStageNotMatch(stageMessage.Context);

Logger.LogTrace("Start handle stage message {{{Id}}}[{Message}]", stageMessage.Id, stageMessage);
Expand Down Expand Up @@ -159,6 +160,7 @@ public virtual async Task HandleAsync(TStageMessage stageMessage, CancellationTo
}
catch (Exception ex)
{
activity?.RecordException(ex);
_diagnosticSource.StageMessageHandleEnd(stageMessage, false, ex);
notFiredDiagnostic = false;
throw;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Diagnostics.CodeAnalysis;
using FluentWorkflow.Diagnostics;
using FluentWorkflow.Interface;
using FluentWorkflow.Tracing;
using Microsoft.Extensions.DependencyInjection;

namespace FluentWorkflow;
Expand Down Expand Up @@ -65,7 +66,7 @@ public virtual Task HandleAsync(TStageCompletedMessage message, CancellationToke
{
activity.AddTag(DiagnosticConstants.ActivityNames.TagKeys.Message, PrettyJSONObject.Create(message, ObjectSerializer));
activity.AddTag(DiagnosticConstants.ActivityNames.TagKeys.StageState, "completed");
task.ContinueWith(static (_, state) => ((IDisposable)state!).Dispose(), activity, CancellationToken.None);
task.DisposeActivityWhenTaskCompleted(activity);
}
return task;
}
Expand All @@ -80,7 +81,7 @@ public virtual Task HandleAsync(TFailureMessage message, CancellationToken cance
activity.AddTag(DiagnosticConstants.ActivityNames.TagKeys.Message, PrettyJSONObject.Create(message, ObjectSerializer));
activity.AddTag(DiagnosticConstants.ActivityNames.TagKeys.StageState, "failure");
activity.AddTag(DiagnosticConstants.ActivityNames.TagKeys.FailureMessage, message.Message);
task.ContinueWith(static (_, state) => ((IDisposable)state!).Dispose(), activity, CancellationToken.None);
task.DisposeActivityWhenTaskCompleted(activity);
}
return task;
}
Expand Down
2 changes: 1 addition & 1 deletion src/FluentWorkflow.RabbitMQ/BootstrapperHostedService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

namespace FluentWorkflow.RabbitMQ;

internal class BootstrapperHostedService : IHostedService
internal sealed class BootstrapperHostedService : IHostedService
{
#region Private 字段

Expand Down
2 changes: 1 addition & 1 deletion src/FluentWorkflow.RabbitMQ/RabbitMQBootstrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

namespace FluentWorkflow.RabbitMQ;

internal class RabbitMQBootstrapper : IFluentWorkflowBootstrapper
internal sealed class RabbitMQBootstrapper : IFluentWorkflowBootstrapper
{
#region Private 字段

Expand Down
96 changes: 87 additions & 9 deletions src/FluentWorkflow.RabbitMQ/RabbitMQConnectionProvider.cs
Original file line number Diff line number Diff line change
@@ -1,31 +1,38 @@
using FluentWorkflow.Util;
using System.Diagnostics.CodeAnalysis;
using FluentWorkflow.Util;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;

namespace FluentWorkflow.RabbitMQ;

internal class RabbitMQConnectionProvider : IRabbitMQConnectionProvider
internal sealed class RabbitMQConnectionProvider : IRabbitMQConnectionProvider, IDisposable
{
#region Private 字段

private readonly IAsyncConnectionFactory _connectionFactory;

private SemaphoreSlim? _connectionGetSemaphore;

private IConnection? _existedConnection;

private bool _isDisposed;

#endregion Private 字段

#region Public 属性

public string ObjectTag { get; }

[MemberNotNull(nameof(_connectionGetSemaphore))]
public bool PreferSingleConnection { get; }

#endregion Public 属性

#region Public 构造函数

public RabbitMQConnectionProvider(IOptions<RabbitMQOptions> optionsAccessor)
{
if (optionsAccessor is null)
{
throw new ArgumentNullException(nameof(optionsAccessor));
}
ArgumentNullException.ThrowIfNull(optionsAccessor);

ObjectTag = ObjectTagUtil.GetHashCodeTag(this);

Expand All @@ -48,16 +55,58 @@ public RabbitMQConnectionProvider(IOptions<RabbitMQOptions> optionsAccessor)
ClientProvidedName = $"fwf:{FluentWorkflowEnvironment.Description}-{ObjectTag}",
};
}

PreferSingleConnection = options.PreferSingleConnection;
if (PreferSingleConnection)
{
_connectionGetSemaphore = new(1, 1);
}
}

#endregion Public 构造函数

#region Public 方法

public Task<IConnection> GetAsync(CancellationToken cancellationToken)
/// <inheritdoc/>
public async Task<IConnection> GetAsync(CancellationToken cancellationToken)
{
var connection = _connectionFactory.CreateConnection();
return Task.FromResult(connection);
ObjectDisposedException.ThrowIf(_isDisposed, this);

if (PreferSingleConnection)
{
await _connectionGetSemaphore.WaitAsync(cancellationToken);
try
{
if (_existedConnection is { } existedConnection)
{
try
{
var shutdownEventArgs = existedConnection.CloseReason;
if (shutdownEventArgs is null
|| existedConnection is IAutorecoveringConnection)
{
return existedConnection;
}
_existedConnection = null;
}
catch (ObjectDisposedException)
{
_existedConnection = null;
}
}
var connection = _connectionFactory.CreateConnection();
_existedConnection = connection;
return connection;
}
finally
{
_connectionGetSemaphore.Release();
}
}
else
{
return _connectionFactory.CreateConnection();
}
}

/// <inheritdoc/>
Expand All @@ -67,4 +116,33 @@ public override string ToString()
}

#endregion Public 方法

#region IDisposable

/// <summary>
///
/// </summary>
~RabbitMQConnectionProvider()
{
Dispose(disposing: false);
}

/// <inheritdoc/>
public void Dispose()
{
Dispose(disposing: true);
GC.SuppressFinalize(this);
}

private void Dispose(bool disposing)
{
if (!_isDisposed)
{
_connectionGetSemaphore?.Dispose();
_connectionGetSemaphore = null;
_isDisposed = true;
}
}

#endregion IDisposable
}
6 changes: 6 additions & 0 deletions src/FluentWorkflow.RabbitMQ/RabbitMQOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ public class RabbitMQOptions
/// </summary>
public ushort GlobalQos { get; set; } = 0;

/// <summary>
/// 更倾向于使用单一链接<br/>
/// 值为 <see langword="true"/> 时,会尽可能的使用单个 <see cref="IConnection"/>
/// </summary>
public bool PreferSingleConnection { get; set; } = false;

/// <summary>
/// 队列 Arguments 设置委托
/// </summary>
Expand Down
Loading

0 comments on commit d178ad1

Please sign in to comment.