diff --git a/src/MassTransit.TestFramework/Futures/BatchCompleted.cs b/src/MassTransit.TestFramework/Futures/BatchCompleted.cs new file mode 100644 index 00000000000..7f41a5b2129 --- /dev/null +++ b/src/MassTransit.TestFramework/Futures/BatchCompleted.cs @@ -0,0 +1,11 @@ +namespace MassTransit.TestFramework.Futures; + +using System; +using System.Collections.Generic; + + +public interface BatchCompleted +{ + public Guid CorrelationId { get; } + public IReadOnlyList ProcessedJobsNumbers { get; } +} diff --git a/src/MassTransit.TestFramework/Futures/BatchFaulted.cs b/src/MassTransit.TestFramework/Futures/BatchFaulted.cs new file mode 100644 index 00000000000..7e7e2bbad8d --- /dev/null +++ b/src/MassTransit.TestFramework/Futures/BatchFaulted.cs @@ -0,0 +1,11 @@ +namespace MassTransit.TestFramework.Futures; + +using System; +using System.Collections.Generic; + + +public interface BatchFaulted +{ + public Guid CorrelationId { get; } + public IReadOnlyList ProcessedJobsNumbers { get; } +} diff --git a/src/MassTransit.TestFramework/Futures/BatchFuture.cs b/src/MassTransit.TestFramework/Futures/BatchFuture.cs new file mode 100644 index 00000000000..4e074bd0428 --- /dev/null +++ b/src/MassTransit.TestFramework/Futures/BatchFuture.cs @@ -0,0 +1,46 @@ +namespace MassTransit.TestFramework.Futures; + +using System.Collections.Generic; +using System.Linq; + + +public class BatchFuture : + Future +{ + public BatchFuture() + { + ConfigureCommand(x => x.CorrelateById(context => context.Message.CorrelationId)); + + SendRequests(x => x.JobNumbers, + x => + { + x.UsingRequestInitializer(context => new + { + CorrelationId = InVar.Id, + JobNumber = context.Message + }); + x.TrackPendingRequest(message => message.CorrelationId); + }) + .OnResponseReceived(x => + { + x.CompletePendingRequest(y => y.CorrelationId); + }); + + WhenAllCompleted(r => r.SetCompletedUsingInitializer(MapResponse)); + WhenAllCompletedOrFaulted(r => r.SetFaultedUsingInitializer(MapResponse)); + } + + object MapResponse(BehaviorContext context) + { + var command = context.GetCommand(); + List processedJobNumbers = context + .SelectResults() + .Select(r => r.JobNumber).ToList(); + + return new + { + command.CorrelationId, + ProcessedJobsNumbers = processedJobNumbers + }; + } +} diff --git a/src/MassTransit.TestFramework/Futures/BatchRequest.cs b/src/MassTransit.TestFramework/Futures/BatchRequest.cs new file mode 100644 index 00000000000..1b3be1a7c24 --- /dev/null +++ b/src/MassTransit.TestFramework/Futures/BatchRequest.cs @@ -0,0 +1,12 @@ +namespace MassTransit.TestFramework.Futures; + +using System; +using System.Collections.Generic; + + +public interface BatchRequest +{ + public DateTime? BatchExpiry { get; } + public Guid CorrelationId { get; } + public IReadOnlyList JobNumbers { get; } +} diff --git a/src/MassTransit.TestFramework/Futures/ProcessBatchItem.cs b/src/MassTransit.TestFramework/Futures/ProcessBatchItem.cs new file mode 100644 index 00000000000..2e0d09c206d --- /dev/null +++ b/src/MassTransit.TestFramework/Futures/ProcessBatchItem.cs @@ -0,0 +1,10 @@ +namespace MassTransit.TestFramework.Futures; + +using System; + + +public interface ProcessBatchItem +{ + public Guid CorrelationId { get; } + public string JobNumber { get; } +} diff --git a/src/MassTransit.TestFramework/Futures/ProcessBatchItemCompleted.cs b/src/MassTransit.TestFramework/Futures/ProcessBatchItemCompleted.cs new file mode 100644 index 00000000000..111d2774987 --- /dev/null +++ b/src/MassTransit.TestFramework/Futures/ProcessBatchItemCompleted.cs @@ -0,0 +1,10 @@ +namespace MassTransit.TestFramework.Futures; + +using System; + + +public interface ProcessBatchItemCompleted +{ + public Guid CorrelationId { get; } + public string JobNumber { get; } +} diff --git a/src/MassTransit.TestFramework/Futures/ProcessBatchItemConsumer.cs b/src/MassTransit.TestFramework/Futures/ProcessBatchItemConsumer.cs new file mode 100644 index 00000000000..9df157480fb --- /dev/null +++ b/src/MassTransit.TestFramework/Futures/ProcessBatchItemConsumer.cs @@ -0,0 +1,29 @@ +namespace MassTransit.TestFramework.Futures; + +using System; +using System.Threading.Tasks; + + +public class ProcessBatchItemConsumer : + IConsumer +{ + public Task Consume(ConsumeContext context) + { + async Task WaitAndRespond(int milliSecond) + { + await Task.Delay(milliSecond); + await context.RespondAsync(new + { + context.Message.CorrelationId, + context.Message.JobNumber + }); + } + + return context.Message.JobNumber switch + { + "Delay" => WaitAndRespond(2000), + "Error" => throw new InvalidOperationException(), + _ => WaitAndRespond(0) + }; + } +} diff --git a/src/MassTransit.TestFramework/Futures/Tests/BatchFuture_Specs.cs b/src/MassTransit.TestFramework/Futures/Tests/BatchFuture_Specs.cs new file mode 100644 index 00000000000..8c1f6d374bb --- /dev/null +++ b/src/MassTransit.TestFramework/Futures/Tests/BatchFuture_Specs.cs @@ -0,0 +1,20 @@ +namespace MassTransit.TestFramework.Futures.Tests; + +using NUnit.Framework; + + +[TestFixture] +public class BatchFuture_Specs : + FutureTestFixture +{ + public BatchFuture_Specs(IFutureTestFixtureConfigurator testFixtureConfigurator) + : base(testFixtureConfigurator) + { + } + + protected override void ConfigureMassTransit(IBusRegistrationConfigurator configurator) + { + configurator.AddConsumer(); + configurator.AddFuture(); + } +} diff --git a/src/MassTransit/Futures/Future.cs b/src/MassTransit/Futures/Future.cs index b9b091b5bda..6928f301390 100644 --- a/src/MassTransit/Futures/Future.cs +++ b/src/MassTransit/Futures/Future.cs @@ -417,6 +417,18 @@ protected void WhenAnyFaulted(Action> configure configure?.Invoke(configurator); } + /// + /// When all requests have either completed or faulted, Set the future Faulted + /// + /// + protected void WhenAllCompletedOrFaulted(Action> configure) + { + _fault.WaitForPending = true; + var configurator = new FutureFaultConfigurator(_fault); + + configure?.Invoke(configurator); + } + static Task GetResult(BehaviorContext context) { if (context.TryGetResult(context.Saga.CorrelationId, out TResult completed)) diff --git a/src/MassTransit/Futures/Futures/FutureFault.cs b/src/MassTransit/Futures/Futures/FutureFault.cs index 52039513dfa..e28c71edfc8 100644 --- a/src/MassTransit/Futures/Futures/FutureFault.cs +++ b/src/MassTransit/Futures/Futures/FutureFault.cs @@ -26,6 +26,8 @@ public ContextMessageFactory, TFault> Facto set => _factory = value; } + public bool WaitForPending { get; set; } + public IEnumerable Validate() { yield break; @@ -33,12 +35,15 @@ public IEnumerable Validate() public async Task SetFaulted(BehaviorContext context) { - context.SetFaulted(context.Saga.CorrelationId); + if (!WaitForPending || !context.Saga.HasPending()) + { + context.SetFaulted(context.Saga.CorrelationId); - var fault = await context.SendMessageToSubscriptions(_factory, - context.Saga.HasSubscriptions() ? context.Saga.Subscriptions.ToArray() : Array.Empty()); + var fault = await context.SendMessageToSubscriptions(_factory, + context.Saga.HasSubscriptions() ? context.Saga.Subscriptions.ToArray() : Array.Empty()); - context.SetFault(context.Saga.CorrelationId, fault); + context.SetFault(context.Saga.CorrelationId, fault); + } } static Task> DefaultFactory(BehaviorContext context) @@ -86,6 +91,8 @@ public ContextMessageFactory, TFault> Factory set => _factory = value; } + public bool WaitForPending { get; set; } + public IEnumerable Validate() { yield break; @@ -93,12 +100,15 @@ public IEnumerable Validate() public async Task SetFaulted(BehaviorContext context) { - context.SetFaulted(context.Saga.CorrelationId); + if (!WaitForPending || !context.Saga.HasPending()) + { + context.SetFaulted(context.Saga.CorrelationId); - var fault = await context.SendMessageToSubscriptions(_factory, - context.Saga.HasSubscriptions() ? context.Saga.Subscriptions.ToArray() : Array.Empty()); + var fault = await context.SendMessageToSubscriptions(_factory, + context.Saga.HasSubscriptions() ? context.Saga.Subscriptions.ToArray() : Array.Empty()); - context.SetFault(context.Saga.CorrelationId, fault); + context.SetFault(context.Saga.CorrelationId, fault); + } } static Task> DefaultFactory(BehaviorContext context) diff --git a/tests/MassTransit.Tests/ContainerTests/Scenarios/WhenAllCompletedOrFaulted.cs b/tests/MassTransit.Tests/ContainerTests/Scenarios/WhenAllCompletedOrFaulted.cs new file mode 100644 index 00000000000..c36c56535e8 --- /dev/null +++ b/tests/MassTransit.Tests/ContainerTests/Scenarios/WhenAllCompletedOrFaulted.cs @@ -0,0 +1,84 @@ +namespace MassTransit.Tests.ContainerTests.Scenarios; + +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using NUnit.Framework; +using TestFramework.Futures; +using TestFramework.Futures.Tests; + + +[TestFixture] +public class WhenAllCompletedOrFaulted : + BatchFuture_Specs +{ + [Test] + public async Task Delayed_success() + { + var batchId = NewId.NextGuid(); + var jobNumbers = new[] { "C12345", "Delay" }; + + var scope = Provider.CreateScope(); + + var client = scope.ServiceProvider.GetRequiredService>(); + + Response response = await client.GetResponse(new + { + CorrelationId = batchId, + JobNumbers = jobNumbers + }, timeout: RequestTimeout.After(s: 5)); + + Assert.That(response.Message.ProcessedJobsNumbers, Is.EqualTo(jobNumbers)); + } + + [Test] + public async Task Error_partially_uploaded() + { + var batchId = NewId.NextGuid(); + var jobNumbers = new[] { "C12345", "Error", "C54321", "Error", "C33454" }; + + var scope = Provider.CreateScope(); + + var client = scope.ServiceProvider.GetRequiredService>(); + + Response response = await client.GetResponse(new + { + CorrelationId = batchId, + JobNumbers = jobNumbers + }); + + switch (response) + { + case (_, BatchFaulted faulted): + //Batch is partially successful, downstream consumers are notified of succeeded uploads + Assert.That(faulted.ProcessedJobsNumbers, Is.EquivalentTo(new[] { "C12345", "C54321", "C33454" })); + break; + default: + Assert.Fail("Unexpected response"); + break; + } + } + + [Test] + public async Task Should_succeed() + { + var batchId = NewId.NextGuid(); + var jobNumbers = new[] { "C12345", "C54321" }; + + var scope = Provider.CreateScope(); + + var client = scope.ServiceProvider.GetRequiredService>(); + + Response response = await client.GetResponse(new + { + CorrelationId = batchId, + JobNumbers = jobNumbers + }, timeout: RequestTimeout.After(s: 5)); + + Assert.That(response.Message.ProcessedJobsNumbers, Is.EquivalentTo(jobNumbers)); + } + + public WhenAllCompletedOrFaulted() + : base(new InMemoryFutureTestFixtureConfigurator()) + { + } +}