Skip to content

Commit

Permalink
Added WhenAllCompletedOrFaulted to Durable Future
Browse files Browse the repository at this point in the history
  • Loading branch information
zyofeng authored and phatboyg committed Apr 26, 2024
1 parent be70dec commit 1050df6
Show file tree
Hide file tree
Showing 11 changed files with 263 additions and 8 deletions.
11 changes: 11 additions & 0 deletions src/MassTransit.TestFramework/Futures/BatchCompleted.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace MassTransit.TestFramework.Futures;

using System;
using System.Collections.Generic;


public interface BatchCompleted
{
public Guid CorrelationId { get; }
public IReadOnlyList<string> ProcessedJobsNumbers { get; }
}
11 changes: 11 additions & 0 deletions src/MassTransit.TestFramework/Futures/BatchFaulted.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace MassTransit.TestFramework.Futures;

using System;
using System.Collections.Generic;


public interface BatchFaulted
{
public Guid CorrelationId { get; }
public IReadOnlyList<string> ProcessedJobsNumbers { get; }
}
46 changes: 46 additions & 0 deletions src/MassTransit.TestFramework/Futures/BatchFuture.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
namespace MassTransit.TestFramework.Futures;

using System.Collections.Generic;
using System.Linq;


public class BatchFuture :
Future<BatchRequest, BatchCompleted, BatchFaulted>
{
public BatchFuture()
{
ConfigureCommand(x => x.CorrelateById(context => context.Message.CorrelationId));

SendRequests<string, ProcessBatchItem>(x => x.JobNumbers,
x =>
{
x.UsingRequestInitializer(context => new
{
CorrelationId = InVar.Id,
JobNumber = context.Message
});
x.TrackPendingRequest(message => message.CorrelationId);
})
.OnResponseReceived<ProcessBatchItemCompleted>(x =>
{
x.CompletePendingRequest(y => y.CorrelationId);
});

WhenAllCompleted(r => r.SetCompletedUsingInitializer(MapResponse));
WhenAllCompletedOrFaulted(r => r.SetFaultedUsingInitializer(MapResponse));
}

object MapResponse(BehaviorContext<FutureState> context)
{
var command = context.GetCommand<BatchRequest>();
List<string> processedJobNumbers = context
.SelectResults<ProcessBatchItemCompleted>()
.Select(r => r.JobNumber).ToList();

return new
{
command.CorrelationId,
ProcessedJobsNumbers = processedJobNumbers
};
}
}
12 changes: 12 additions & 0 deletions src/MassTransit.TestFramework/Futures/BatchRequest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace MassTransit.TestFramework.Futures;

using System;
using System.Collections.Generic;


public interface BatchRequest
{
public DateTime? BatchExpiry { get; }
public Guid CorrelationId { get; }
public IReadOnlyList<string> JobNumbers { get; }
}
10 changes: 10 additions & 0 deletions src/MassTransit.TestFramework/Futures/ProcessBatchItem.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
namespace MassTransit.TestFramework.Futures;

using System;


public interface ProcessBatchItem
{
public Guid CorrelationId { get; }
public string JobNumber { get; }
}
10 changes: 10 additions & 0 deletions src/MassTransit.TestFramework/Futures/ProcessBatchItemCompleted.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
namespace MassTransit.TestFramework.Futures;

using System;


public interface ProcessBatchItemCompleted
{
public Guid CorrelationId { get; }
public string JobNumber { get; }
}
29 changes: 29 additions & 0 deletions src/MassTransit.TestFramework/Futures/ProcessBatchItemConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
namespace MassTransit.TestFramework.Futures;

using System;
using System.Threading.Tasks;


public class ProcessBatchItemConsumer :
IConsumer<ProcessBatchItem>
{
public Task Consume(ConsumeContext<ProcessBatchItem> context)
{
async Task WaitAndRespond(int milliSecond)
{
await Task.Delay(milliSecond);
await context.RespondAsync<ProcessBatchItemCompleted>(new
{
context.Message.CorrelationId,
context.Message.JobNumber
});
}

return context.Message.JobNumber switch
{
"Delay" => WaitAndRespond(2000),
"Error" => throw new InvalidOperationException(),
_ => WaitAndRespond(0)
};
}
}
20 changes: 20 additions & 0 deletions src/MassTransit.TestFramework/Futures/Tests/BatchFuture_Specs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
namespace MassTransit.TestFramework.Futures.Tests;

using NUnit.Framework;


[TestFixture]
public class BatchFuture_Specs :
FutureTestFixture
{
public BatchFuture_Specs(IFutureTestFixtureConfigurator testFixtureConfigurator)
: base(testFixtureConfigurator)
{
}

protected override void ConfigureMassTransit(IBusRegistrationConfigurator configurator)
{
configurator.AddConsumer<ProcessBatchItemConsumer>();
configurator.AddFuture<BatchFuture>();
}
}
12 changes: 12 additions & 0 deletions src/MassTransit/Futures/Future.cs
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,18 @@ protected void WhenAnyFaulted(Action<IFutureFaultConfigurator<TFault>> configure
configure?.Invoke(configurator);
}

/// <summary>
/// When all requests have either completed or faulted, Set the future Faulted
/// </summary>
/// <param name="configure"></param>
protected void WhenAllCompletedOrFaulted(Action<IFutureFaultConfigurator<TFault>> configure)
{
_fault.WaitForPending = true;
var configurator = new FutureFaultConfigurator<TFault>(_fault);

configure?.Invoke(configurator);
}

static Task<TResult> GetResult(BehaviorContext<FutureState> context)
{
if (context.TryGetResult(context.Saga.CorrelationId, out TResult completed))
Expand Down
26 changes: 18 additions & 8 deletions src/MassTransit/Futures/Futures/FutureFault.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,24 @@ public ContextMessageFactory<BehaviorContext<FutureState, TInput>, TFault> Facto
set => _factory = value;
}

public bool WaitForPending { get; set; }

public IEnumerable<ValidationResult> Validate()
{
yield break;
}

public async Task SetFaulted(BehaviorContext<FutureState, TInput> context)
{
context.SetFaulted(context.Saga.CorrelationId);
if (!WaitForPending || !context.Saga.HasPending())
{
context.SetFaulted(context.Saga.CorrelationId);

var fault = await context.SendMessageToSubscriptions(_factory,
context.Saga.HasSubscriptions() ? context.Saga.Subscriptions.ToArray() : Array.Empty<FutureSubscription>());
var fault = await context.SendMessageToSubscriptions(_factory,
context.Saga.HasSubscriptions() ? context.Saga.Subscriptions.ToArray() : Array.Empty<FutureSubscription>());

context.SetFault(context.Saga.CorrelationId, fault);
context.SetFault(context.Saga.CorrelationId, fault);
}
}

static Task<SendTuple<TFault>> DefaultFactory(BehaviorContext<FutureState, TInput> context)
Expand Down Expand Up @@ -86,19 +91,24 @@ public ContextMessageFactory<BehaviorContext<FutureState>, TFault> Factory
set => _factory = value;
}

public bool WaitForPending { get; set; }

public IEnumerable<ValidationResult> Validate()
{
yield break;
}

public async Task SetFaulted(BehaviorContext<FutureState> context)
{
context.SetFaulted(context.Saga.CorrelationId);
if (!WaitForPending || !context.Saga.HasPending())
{
context.SetFaulted(context.Saga.CorrelationId);

var fault = await context.SendMessageToSubscriptions(_factory,
context.Saga.HasSubscriptions() ? context.Saga.Subscriptions.ToArray() : Array.Empty<FutureSubscription>());
var fault = await context.SendMessageToSubscriptions(_factory,
context.Saga.HasSubscriptions() ? context.Saga.Subscriptions.ToArray() : Array.Empty<FutureSubscription>());

context.SetFault(context.Saga.CorrelationId, fault);
context.SetFault(context.Saga.CorrelationId, fault);
}
}

static Task<SendTuple<TFault>> DefaultFactory(BehaviorContext<FutureState> context)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
namespace MassTransit.Tests.ContainerTests.Scenarios;

using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using NUnit.Framework;
using TestFramework.Futures;
using TestFramework.Futures.Tests;


[TestFixture]
public class WhenAllCompletedOrFaulted :
BatchFuture_Specs
{
[Test]
public async Task Delayed_success()
{
var batchId = NewId.NextGuid();
var jobNumbers = new[] { "C12345", "Delay" };

var scope = Provider.CreateScope();

var client = scope.ServiceProvider.GetRequiredService<IRequestClient<BatchRequest>>();

Response<BatchCompleted> response = await client.GetResponse<BatchCompleted>(new
{
CorrelationId = batchId,
JobNumbers = jobNumbers
}, timeout: RequestTimeout.After(s: 5));

Assert.That(response.Message.ProcessedJobsNumbers, Is.EqualTo(jobNumbers));
}

[Test]
public async Task Error_partially_uploaded()
{
var batchId = NewId.NextGuid();
var jobNumbers = new[] { "C12345", "Error", "C54321", "Error", "C33454" };

var scope = Provider.CreateScope();

var client = scope.ServiceProvider.GetRequiredService<IRequestClient<BatchRequest>>();

Response response = await client.GetResponse<BatchCompleted, BatchFaulted>(new
{
CorrelationId = batchId,
JobNumbers = jobNumbers
});

switch (response)
{
case (_, BatchFaulted faulted):
//Batch is partially successful, downstream consumers are notified of succeeded uploads
Assert.That(faulted.ProcessedJobsNumbers, Is.EquivalentTo(new[] { "C12345", "C54321", "C33454" }));
break;
default:
Assert.Fail("Unexpected response");
break;
}
}

[Test]
public async Task Should_succeed()
{
var batchId = NewId.NextGuid();
var jobNumbers = new[] { "C12345", "C54321" };

var scope = Provider.CreateScope();

var client = scope.ServiceProvider.GetRequiredService<IRequestClient<BatchRequest>>();

Response<BatchCompleted> response = await client.GetResponse<BatchCompleted>(new
{
CorrelationId = batchId,
JobNumbers = jobNumbers
}, timeout: RequestTimeout.After(s: 5));

Assert.That(response.Message.ProcessedJobsNumbers, Is.EquivalentTo(jobNumbers));
}

public WhenAllCompletedOrFaulted()
: base(new InMemoryFutureTestFixtureConfigurator())
{
}
}

0 comments on commit 1050df6

Please sign in to comment.