Skip to content

Commit

Permalink
在上下文中添加-x-context-forwarded, 用于记录上下文流转记录
Browse files Browse the repository at this point in the history
  • Loading branch information
stratosblue committed Jun 2, 2024
1 parent 3f24dac commit 2f0d2aa
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 23 deletions.
2 changes: 1 addition & 1 deletion package.props
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project>
<PropertyGroup>
<Version>1.1.2</Version>
<Version>1.1.3</Version>

<Description>A message driven distributed asynchronous workflow framework. 消息驱动的分布式异步工作流程处理框架。</Description>

Expand Down
23 changes: 23 additions & 0 deletions src/FluentWorkflow.Core/Extensions/WorkflowContextExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,5 +140,28 @@ public static bool TryGetCurrentStageState(this IWorkflowContext workflowContext

#endregion StageState

#region Forwarded

/// <summary>
/// 添加上下文流转信息 <see cref="FluentWorkflowConstants.ContextKeys.Forwarded"/>
/// </summary>
/// <param name="workflowContext"></param>
[EditorBrowsable(EditorBrowsableState.Never)]
public static void AppendForwarded(this IWorkflowContext workflowContext)
{
var currentValue = workflowContext.GetValue(FluentWorkflowConstants.ContextKeys.Forwarded);

if (string.IsNullOrEmpty(currentValue))
{
workflowContext.SetValue(FluentWorkflowConstants.ContextKeys.Forwarded, FluentWorkflowEnvironment.Description);
}
else
{
workflowContext.SetValue(FluentWorkflowConstants.ContextKeys.Forwarded, $"{currentValue}, {FluentWorkflowEnvironment.Description}");
}
}

#endregion Forwarded

#endregion Public 方法
}
7 changes: 7 additions & 0 deletions src/FluentWorkflow.Core/FluentWorkflowConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ public static class ContextKeys
/// </summary>
public const string FailureStage = "-x-context-failure-stage";

/// <summary>
/// 上下文发送信息,记录上下文的流转记录<br/>
/// 其值正确情况下应当形如<br/>
/// xx@host1, xx@host2, xx@host1, ...
/// </summary>
public const string Forwarded = "-x-context-forwarded";

/// <summary>
/// Id
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using FluentWorkflow.Diagnostics;
using FluentWorkflow.Extensions;
using FluentWorkflow.Interface;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
Expand Down Expand Up @@ -42,6 +43,10 @@ public virtual Task PublishAsync<TMessage>(TMessage message, CancellationToken c
{
_diagnosticSource.MessagePublish(message);
Logger.LogTrace("Publish [{EventName}] message - {{{Id}}}[{Message}].", TMessage.EventName, message.Id, message);

message.Context.SetCurrentStageState(WorkflowStageState.Scheduled);
message.Context.AppendForwarded();

return Task.CompletedTask;
}

Expand Down
2 changes: 1 addition & 1 deletion src/FluentWorkflow.Core/Implement/WorkflowStateMachine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public Task PublishStageMessageAsync<TStageMessage>(TStageMessage message, Cance
where TStageMessage : class, IWorkflowStageMessage, IWorkflowContextCarrier<IWorkflowContext>, TWorkflowBoundary, IEventNameDeclaration
{
InvokeCheck();
message.Context.SetCurrentStageState(WorkflowStageState.Scheduled);

return messageDispatcher.PublishAsync(message, cancellationToken);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ public override async Task MoveNextAsync(CancellationToken cancellationToken)
{{
Context.TryGetFailureMessage(out var failureMessage);
var finishedMessage = new {WorkflowName}FinishedMessage(TypedContext, false, failureMessage);
finishedMessage.Context.SetCurrentStageState(WorkflowStageState.Scheduled);
await _messageDispatcher.PublishAsync(finishedMessage, cancellationToken);
return;
}}
Expand All @@ -104,7 +103,6 @@ public override async Task MoveNextAsync(CancellationToken cancellationToken)
|| !Context.Flag.HasFlag(WorkflowFlag.NotNotifyOnFinish))
{{
var finishedMessage = new {WorkflowName}FinishedMessage(TypedContext, true, ""SUCCESS"");
finishedMessage.Context.SetCurrentStageState(WorkflowStageState.Scheduled);
await _messageDispatcher.PublishAsync(finishedMessage, cancellationToken);
}}
return;
Expand Down
18 changes: 16 additions & 2 deletions src/FluentWorkflow.RabbitMQ/RabbitMQBootstrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ internal class RabbitMQBootstrapper : IFluentWorkflowBootstrapper

#endregion Private 字段

#region Public 属性

public string ObjectTag { get; }

#endregion Public 属性

#region Public 构造函数

public RabbitMQBootstrapper(IRabbitMQConnectionProvider connectionProvider,
Expand All @@ -61,6 +67,8 @@ public RabbitMQBootstrapper(IRabbitMQConnectionProvider connectionProvider,
_runningCancellationToken = _runningCancellationTokenSource.Token;
_logger = loggerFactory.CreateLogger<RabbitMQBootstrapper>();
_consumeLogger = loggerFactory.CreateLogger(FluentWorkflowConstants.DefaultConsumerLoggerName);

ObjectTag = ObjectTagUtil.GetHashCodeTag(this);
}

#endregion Public 构造函数
Expand Down Expand Up @@ -166,11 +174,17 @@ public async Task InitAsync(CancellationToken cancellationToken)

channel.BasicConsume(queue: defaultConsumeQueueName,
autoAck: false,
consumerTag: $"fwf:{FluentWorkflowEnvironment.Description}-{ObjectTagUtil.GetHashCodeTag(this)}",
consumerTag: $"fwf:{FluentWorkflowEnvironment.Description}-{ObjectTag}",
consumer: consumer);
}
}

/// <inheritdoc/>
public override string ToString()
{
return $"{nameof(RabbitMQBootstrapper)}-{ObjectTag}";
}

#endregion Public 方法

#region Private 方法
Expand Down Expand Up @@ -224,7 +238,7 @@ private void SetupStandAloneConsumer(IModel channel, string standaloneQueueName,

channel.BasicConsume(queue: standaloneQueueName,
autoAck: false,
consumerTag: $"fwf:{FluentWorkflowEnvironment.Description}-{ObjectTagUtil.GetHashCodeTag(this)}",
consumerTag: $"fwf:{FluentWorkflowEnvironment.Description}-{ObjectTag}",
consumer: consumer);
}

Expand Down
22 changes: 20 additions & 2 deletions src/FluentWorkflow.RabbitMQ/RabbitMQConnectionProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ internal class RabbitMQConnectionProvider : IRabbitMQConnectionProvider

#endregion Private 字段

#region Public 属性

public string ObjectTag { get; }

#endregion Public 属性

#region Public 构造函数

public RabbitMQConnectionProvider(IOptions<RabbitMQOptions> optionsAccessor)
Expand All @@ -21,6 +27,8 @@ public RabbitMQConnectionProvider(IOptions<RabbitMQOptions> optionsAccessor)
throw new ArgumentNullException(nameof(optionsAccessor));
}

ObjectTag = ObjectTagUtil.GetHashCodeTag(this);

var options = optionsAccessor.Value;
if (options.ConnectionFactory is not null)
{
Expand All @@ -37,16 +45,26 @@ public RabbitMQConnectionProvider(IOptions<RabbitMQOptions> optionsAccessor)
DispatchConsumersAsync = true,
AutomaticRecoveryEnabled = true,
Uri = options.Uri,
ClientProvidedName = $"fwf:{FluentWorkflowEnvironment.Description}-{ObjectTagUtil.GetHashCodeTag(this)}",
ClientProvidedName = $"fwf:{FluentWorkflowEnvironment.Description}-{ObjectTag}",
};
}
}

#endregion Public 构造函数

#region Public 方法

public Task<IConnection> GetAsync(CancellationToken cancellationToken)
{
var connection = _connectionFactory.CreateConnection();
return Task.FromResult(connection);
}

#endregion Public 构造函数
/// <inheritdoc/>
public override string ToString()
{
return $"{nameof(RabbitMQConnectionProvider)}-{ObjectTag}";
}

#endregion Public 方法
}
17 changes: 4 additions & 13 deletions src/FluentWorkflow.RabbitMQ/RabbitMQWorkflowMessageDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,29 +60,20 @@ private IModel Channel

#region Public 构造函数

#pragma warning disable CS8618

public RabbitMQWorkflowMessageDispatcher(IRabbitMQConnectionProvider connectionProvider,
IWorkflowDiagnosticSource diagnosticSource,
IObjectSerializer objectSerializer,
IOptionsMonitor<RabbitMQOptions> rabbitMQOptionsMonitor,
ILogger<RabbitMQWorkflowMessageDispatcher> logger) : base(diagnosticSource, logger)
ILogger<RabbitMQWorkflowMessageDispatcher> logger)
: base(diagnosticSource, logger)
{
_connectionProvider = connectionProvider ?? throw new ArgumentNullException(nameof(connectionProvider));
_objectSerializer = objectSerializer ?? throw new ArgumentNullException(nameof(objectSerializer));

RefreshRabbitMQOptions(rabbitMQOptionsMonitor.CurrentValue);

_optionsMonitorDisposer = rabbitMQOptionsMonitor.OnChange(RefreshRabbitMQOptions);

void RefreshRabbitMQOptions(RabbitMQOptions rabbitMQOptions)
{
_rabbitMQOptions = rabbitMQOptionsMonitor.CurrentValue;
}
_optionsMonitorDisposer = rabbitMQOptionsMonitor.OnChange(options => _rabbitMQOptions = options);
_rabbitMQOptions = rabbitMQOptionsMonitor.CurrentValue;
}

#pragma warning restore CS8618

#endregion Public 构造函数

#region Public 方法
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ public override async Task MoveNextAsync(CancellationToken cancellationToken)
{
Context.TryGetFailureMessage(out var failureMessage);
var finishedMessage = new TemplateWorkflowFinishedMessage(TypedContext, false, failureMessage ?? "Unknown error");
finishedMessage.Context.SetCurrentStageState(WorkflowStageState.Scheduled);
await _messageDispatcher.PublishAsync(finishedMessage, cancellationToken);
return;
}
Expand All @@ -88,7 +87,6 @@ public override async Task MoveNextAsync(CancellationToken cancellationToken)
|| !Context.Flag.HasFlag(WorkflowFlag.NotNotifyOnFinish))
{
var finishedMessage = new TemplateWorkflowFinishedMessage(TypedContext, true, "SUCCESS");
finishedMessage.Context.SetCurrentStageState(WorkflowStageState.Scheduled);
await _messageDispatcher.PublishAsync(finishedMessage, cancellationToken);
}
return;
Expand Down

0 comments on commit 2f0d2aa

Please sign in to comment.