diff --git a/package.props b/package.props index 41e756d..74e0b0a 100644 --- a/package.props +++ b/package.props @@ -1,6 +1,6 @@  - 1.1.2 + 1.1.3 A message driven distributed asynchronous workflow framework. 消息驱动的分布式异步工作流程处理框架。 diff --git a/src/FluentWorkflow.Core/Extensions/WorkflowContextExtensions.cs b/src/FluentWorkflow.Core/Extensions/WorkflowContextExtensions.cs index 4d9bdee..3395fd5 100644 --- a/src/FluentWorkflow.Core/Extensions/WorkflowContextExtensions.cs +++ b/src/FluentWorkflow.Core/Extensions/WorkflowContextExtensions.cs @@ -140,5 +140,28 @@ public static bool TryGetCurrentStageState(this IWorkflowContext workflowContext #endregion StageState + #region Forwarded + + /// + /// 添加上下文流转信息 + /// + /// + [EditorBrowsable(EditorBrowsableState.Never)] + public static void AppendForwarded(this IWorkflowContext workflowContext) + { + var currentValue = workflowContext.GetValue(FluentWorkflowConstants.ContextKeys.Forwarded); + + if (string.IsNullOrEmpty(currentValue)) + { + workflowContext.SetValue(FluentWorkflowConstants.ContextKeys.Forwarded, FluentWorkflowEnvironment.Description); + } + else + { + workflowContext.SetValue(FluentWorkflowConstants.ContextKeys.Forwarded, $"{currentValue}, {FluentWorkflowEnvironment.Description}"); + } + } + + #endregion Forwarded + #endregion Public 方法 } diff --git a/src/FluentWorkflow.Core/FluentWorkflowConstants.cs b/src/FluentWorkflow.Core/FluentWorkflowConstants.cs index 5b4301a..43b5854 100644 --- a/src/FluentWorkflow.Core/FluentWorkflowConstants.cs +++ b/src/FluentWorkflow.Core/FluentWorkflowConstants.cs @@ -43,6 +43,13 @@ public static class ContextKeys /// public const string FailureStage = "-x-context-failure-stage"; + /// + /// 上下文发送信息,记录上下文的流转记录
+ /// 其值正确情况下应当形如
+ /// xx@host1, xx@host2, xx@host1, ... + ///
+ public const string Forwarded = "-x-context-forwarded"; + /// /// Id /// diff --git a/src/FluentWorkflow.Core/Implement/WorkflowMessageDispatcher.cs b/src/FluentWorkflow.Core/Implement/WorkflowMessageDispatcher.cs index 28850d8..9bc9bff 100644 --- a/src/FluentWorkflow.Core/Implement/WorkflowMessageDispatcher.cs +++ b/src/FluentWorkflow.Core/Implement/WorkflowMessageDispatcher.cs @@ -1,4 +1,5 @@ using FluentWorkflow.Diagnostics; +using FluentWorkflow.Extensions; using FluentWorkflow.Interface; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; @@ -42,6 +43,10 @@ public virtual Task PublishAsync(TMessage message, CancellationToken c { _diagnosticSource.MessagePublish(message); Logger.LogTrace("Publish [{EventName}] message - {{{Id}}}[{Message}].", TMessage.EventName, message.Id, message); + + message.Context.SetCurrentStageState(WorkflowStageState.Scheduled); + message.Context.AppendForwarded(); + return Task.CompletedTask; } diff --git a/src/FluentWorkflow.Core/Implement/WorkflowStateMachine.cs b/src/FluentWorkflow.Core/Implement/WorkflowStateMachine.cs index f858e73..651f28e 100644 --- a/src/FluentWorkflow.Core/Implement/WorkflowStateMachine.cs +++ b/src/FluentWorkflow.Core/Implement/WorkflowStateMachine.cs @@ -170,7 +170,7 @@ public Task PublishStageMessageAsync(TStageMessage message, Cance where TStageMessage : class, IWorkflowStageMessage, IWorkflowContextCarrier, TWorkflowBoundary, IEventNameDeclaration { InvokeCheck(); - message.Context.SetCurrentStageState(WorkflowStageState.Scheduled); + return messageDispatcher.PublishAsync(message, cancellationToken); } diff --git a/src/FluentWorkflow.Generator/Providers/Workflow/SourceProvider.StateMachine.cs b/src/FluentWorkflow.Generator/Providers/Workflow/SourceProvider.StateMachine.cs index 67049d6..9eb59bf 100644 --- a/src/FluentWorkflow.Generator/Providers/Workflow/SourceProvider.StateMachine.cs +++ b/src/FluentWorkflow.Generator/Providers/Workflow/SourceProvider.StateMachine.cs @@ -91,7 +91,6 @@ public override async Task MoveNextAsync(CancellationToken cancellationToken) {{ Context.TryGetFailureMessage(out var failureMessage); var finishedMessage = new {WorkflowName}FinishedMessage(TypedContext, false, failureMessage); - finishedMessage.Context.SetCurrentStageState(WorkflowStageState.Scheduled); await _messageDispatcher.PublishAsync(finishedMessage, cancellationToken); return; }} @@ -104,7 +103,6 @@ public override async Task MoveNextAsync(CancellationToken cancellationToken) || !Context.Flag.HasFlag(WorkflowFlag.NotNotifyOnFinish)) {{ var finishedMessage = new {WorkflowName}FinishedMessage(TypedContext, true, ""SUCCESS""); - finishedMessage.Context.SetCurrentStageState(WorkflowStageState.Scheduled); await _messageDispatcher.PublishAsync(finishedMessage, cancellationToken); }} return; diff --git a/src/FluentWorkflow.RabbitMQ/RabbitMQBootstrapper.cs b/src/FluentWorkflow.RabbitMQ/RabbitMQBootstrapper.cs index 8472031..82cb014 100644 --- a/src/FluentWorkflow.RabbitMQ/RabbitMQBootstrapper.cs +++ b/src/FluentWorkflow.RabbitMQ/RabbitMQBootstrapper.cs @@ -40,6 +40,12 @@ internal class RabbitMQBootstrapper : IFluentWorkflowBootstrapper #endregion Private 字段 + #region Public 属性 + + public string ObjectTag { get; } + + #endregion Public 属性 + #region Public 构造函数 public RabbitMQBootstrapper(IRabbitMQConnectionProvider connectionProvider, @@ -61,6 +67,8 @@ public RabbitMQBootstrapper(IRabbitMQConnectionProvider connectionProvider, _runningCancellationToken = _runningCancellationTokenSource.Token; _logger = loggerFactory.CreateLogger(); _consumeLogger = loggerFactory.CreateLogger(FluentWorkflowConstants.DefaultConsumerLoggerName); + + ObjectTag = ObjectTagUtil.GetHashCodeTag(this); } #endregion Public 构造函数 @@ -166,11 +174,17 @@ public async Task InitAsync(CancellationToken cancellationToken) channel.BasicConsume(queue: defaultConsumeQueueName, autoAck: false, - consumerTag: $"fwf:{FluentWorkflowEnvironment.Description}-{ObjectTagUtil.GetHashCodeTag(this)}", + consumerTag: $"fwf:{FluentWorkflowEnvironment.Description}-{ObjectTag}", consumer: consumer); } } + /// + public override string ToString() + { + return $"{nameof(RabbitMQBootstrapper)}-{ObjectTag}"; + } + #endregion Public 方法 #region Private 方法 @@ -224,7 +238,7 @@ private void SetupStandAloneConsumer(IModel channel, string standaloneQueueName, channel.BasicConsume(queue: standaloneQueueName, autoAck: false, - consumerTag: $"fwf:{FluentWorkflowEnvironment.Description}-{ObjectTagUtil.GetHashCodeTag(this)}", + consumerTag: $"fwf:{FluentWorkflowEnvironment.Description}-{ObjectTag}", consumer: consumer); } diff --git a/src/FluentWorkflow.RabbitMQ/RabbitMQConnectionProvider.cs b/src/FluentWorkflow.RabbitMQ/RabbitMQConnectionProvider.cs index 60bf6e7..da7aedc 100644 --- a/src/FluentWorkflow.RabbitMQ/RabbitMQConnectionProvider.cs +++ b/src/FluentWorkflow.RabbitMQ/RabbitMQConnectionProvider.cs @@ -12,6 +12,12 @@ internal class RabbitMQConnectionProvider : IRabbitMQConnectionProvider #endregion Private 字段 + #region Public 属性 + + public string ObjectTag { get; } + + #endregion Public 属性 + #region Public 构造函数 public RabbitMQConnectionProvider(IOptions optionsAccessor) @@ -21,6 +27,8 @@ public RabbitMQConnectionProvider(IOptions optionsAccessor) throw new ArgumentNullException(nameof(optionsAccessor)); } + ObjectTag = ObjectTagUtil.GetHashCodeTag(this); + var options = optionsAccessor.Value; if (options.ConnectionFactory is not null) { @@ -37,16 +45,26 @@ public RabbitMQConnectionProvider(IOptions optionsAccessor) DispatchConsumersAsync = true, AutomaticRecoveryEnabled = true, Uri = options.Uri, - ClientProvidedName = $"fwf:{FluentWorkflowEnvironment.Description}-{ObjectTagUtil.GetHashCodeTag(this)}", + ClientProvidedName = $"fwf:{FluentWorkflowEnvironment.Description}-{ObjectTag}", }; } } + #endregion Public 构造函数 + + #region Public 方法 + public Task GetAsync(CancellationToken cancellationToken) { var connection = _connectionFactory.CreateConnection(); return Task.FromResult(connection); } - #endregion Public 构造函数 + /// + public override string ToString() + { + return $"{nameof(RabbitMQConnectionProvider)}-{ObjectTag}"; + } + + #endregion Public 方法 } diff --git a/src/FluentWorkflow.RabbitMQ/RabbitMQWorkflowMessageDispatcher.cs b/src/FluentWorkflow.RabbitMQ/RabbitMQWorkflowMessageDispatcher.cs index fb5ad92..597a29c 100644 --- a/src/FluentWorkflow.RabbitMQ/RabbitMQWorkflowMessageDispatcher.cs +++ b/src/FluentWorkflow.RabbitMQ/RabbitMQWorkflowMessageDispatcher.cs @@ -60,29 +60,20 @@ private IModel Channel #region Public 构造函数 -#pragma warning disable CS8618 - public RabbitMQWorkflowMessageDispatcher(IRabbitMQConnectionProvider connectionProvider, IWorkflowDiagnosticSource diagnosticSource, IObjectSerializer objectSerializer, IOptionsMonitor rabbitMQOptionsMonitor, - ILogger logger) : base(diagnosticSource, logger) + ILogger logger) + : base(diagnosticSource, logger) { _connectionProvider = connectionProvider ?? throw new ArgumentNullException(nameof(connectionProvider)); _objectSerializer = objectSerializer ?? throw new ArgumentNullException(nameof(objectSerializer)); - RefreshRabbitMQOptions(rabbitMQOptionsMonitor.CurrentValue); - - _optionsMonitorDisposer = rabbitMQOptionsMonitor.OnChange(RefreshRabbitMQOptions); - - void RefreshRabbitMQOptions(RabbitMQOptions rabbitMQOptions) - { - _rabbitMQOptions = rabbitMQOptionsMonitor.CurrentValue; - } + _optionsMonitorDisposer = rabbitMQOptionsMonitor.OnChange(options => _rabbitMQOptions = options); + _rabbitMQOptions = rabbitMQOptionsMonitor.CurrentValue; } -#pragma warning restore CS8618 - #endregion Public 构造函数 #region Public 方法 diff --git a/template/FluentWorkflow.Template/TemplateWorkflow.StateMachine.cs b/template/FluentWorkflow.Template/TemplateWorkflow.StateMachine.cs index 0825036..a10b7f8 100644 --- a/template/FluentWorkflow.Template/TemplateWorkflow.StateMachine.cs +++ b/template/FluentWorkflow.Template/TemplateWorkflow.StateMachine.cs @@ -75,7 +75,6 @@ public override async Task MoveNextAsync(CancellationToken cancellationToken) { Context.TryGetFailureMessage(out var failureMessage); var finishedMessage = new TemplateWorkflowFinishedMessage(TypedContext, false, failureMessage ?? "Unknown error"); - finishedMessage.Context.SetCurrentStageState(WorkflowStageState.Scheduled); await _messageDispatcher.PublishAsync(finishedMessage, cancellationToken); return; } @@ -88,7 +87,6 @@ public override async Task MoveNextAsync(CancellationToken cancellationToken) || !Context.Flag.HasFlag(WorkflowFlag.NotNotifyOnFinish)) { var finishedMessage = new TemplateWorkflowFinishedMessage(TypedContext, true, "SUCCESS"); - finishedMessage.Context.SetCurrentStageState(WorkflowStageState.Scheduled); await _messageDispatcher.PublishAsync(finishedMessage, cancellationToken); } return;