Skip to content

Commit

Permalink
Merge pull request #926 from dlcs/feature/batchCompletedNotification
Browse files Browse the repository at this point in the history
Adding ability for engine to notify on batch completion
  • Loading branch information
JackLewis-digirati authored Dec 3, 2024
2 parents ec1a4b9 + c930673 commit 34560bc
Show file tree
Hide file tree
Showing 16 changed files with 262 additions and 11 deletions.
27 changes: 27 additions & 0 deletions src/protagonist/API.Tests/Integration/CustomerQueueTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Threading;
using API.Client;
using API.Tests.Integration.Infrastructure;
using DLCS.AWS.SNS.Messaging;
using DLCS.Core.Types;
using DLCS.Model.Assets;
using DLCS.Model.Policies;
Expand Down Expand Up @@ -35,6 +36,7 @@ public class CustomerQueueTests : IClassFixture<ProtagonistAppFactory<Startup>>
private readonly DlcsContext dbContext;
private readonly HttpClient httpClient;
private static readonly IEngineClient EngineClient = A.Fake<IEngineClient>();
private static readonly IBatchCompletedNotificationSender NotificationSender = A.Fake<IBatchCompletedNotificationSender>();

public CustomerQueueTests(DlcsDatabaseFixture dbFixture, ProtagonistAppFactory<Startup> factory)
{
Expand All @@ -43,6 +45,7 @@ public CustomerQueueTests(DlcsDatabaseFixture dbFixture, ProtagonistAppFactory<S
.WithConnectionString(dbFixture.ConnectionString)
.WithTestServices(services =>
{
services.AddSingleton(NotificationSender);
services.AddScoped<IEngineClient>(_ => EngineClient);
services.AddAuthentication("API-Test")
.AddScheme<AuthenticationSchemeOptions, TestAuthHandler>(
Expand Down Expand Up @@ -1172,6 +1175,12 @@ public async Task Post_TestBatch_MarksBatchAsSuperseded_IfNoImagesFound()

var dbBatch = await dbContext.Batches.SingleAsync(b => b.Id == 201);
dbBatch.Superseded.Should().BeTrue();

A.CallTo(() =>
NotificationSender.SendBatchCompletedMessage(
A<Batch>.That.Matches(b => b.Id == dbBatch.Id),
A<CancellationToken>._))
.MustHaveHappened();
}

[Fact]
Expand Down Expand Up @@ -1202,6 +1211,12 @@ await dbContext.Images.AddTestAsset(AssetId.FromString("2/1/fake"), batch: batch
dbBatch.Count.Should().Be(4);
dbBatch.Errors.Should().Be(1);
dbBatch.Completed.Should().Be(3);

A.CallTo(() =>
NotificationSender.SendBatchCompletedMessage(
A<Batch>.That.Matches(b => b.Id == dbBatch.Id),
A<CancellationToken>._))
.MustHaveHappened();
}

[Fact]
Expand All @@ -1228,6 +1243,12 @@ public async Task Post_TestBatch_ReturnsFalse_IfNotSupersededOrFinished()
dbBatch.Superseded.Should().BeFalse();
dbBatch.Finished.Should().BeNull();
dbBatch.Count.Should().Be(100);

A.CallTo(() =>
NotificationSender.SendBatchCompletedMessage(
A<Batch>._,
A<CancellationToken>._))
.MustNotHaveHappened();
}

[Fact]
Expand Down Expand Up @@ -1255,6 +1276,12 @@ public async Task Post_TestBatch_DoesNotChangeBatchFinished_IfImagesFoundAndAllF
dbBatch.Superseded.Should().BeFalse();
dbBatch.Finished.Should().BeCloseTo(finished, TimeSpan.FromMinutes((1)));
dbBatch.Count.Should().Be(3);

A.CallTo(() =>
NotificationSender.SendBatchCompletedMessage(
A<Batch>.That.Matches(b => b.Id == dbBatch.Id),
A<CancellationToken>._))
.MustNotHaveHappened();
}

[Fact]
Expand Down
6 changes: 6 additions & 0 deletions src/protagonist/API/Features/Queues/Requests/TestBatch.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System.Collections;
using System.Collections.Generic;
using DLCS.AWS.SNS;
using DLCS.AWS.SNS.Messaging;
using DLCS.Model.Assets;
using DLCS.Repository;
using MediatR;
Expand Down Expand Up @@ -27,12 +29,15 @@ public class TestBatchHandler : IRequestHandler<TestBatch, bool?>
{
private readonly DlcsContext dlcsContext;
private readonly ILogger<TestBatchHandler> logger;
private readonly IBatchCompletedNotificationSender batchCompletedNotificationSender;

public TestBatchHandler(
DlcsContext dlcsContext,
IBatchCompletedNotificationSender batchCompletedNotificationSender,
ILogger<TestBatchHandler> logger)
{
this.dlcsContext = dlcsContext;
this.batchCompletedNotificationSender = batchCompletedNotificationSender;
this.logger = logger;
}

Expand Down Expand Up @@ -63,6 +68,7 @@ public TestBatchHandler(
logger.LogInformation("Batch {BatchId} complete but not finished. Setting Finished.", request.BatchId);
changesMade = true;
batch.Finished = DateTime.UtcNow;
await batchCompletedNotificationSender.SendBatchCompletedMessage(batch, cancellationToken);
}

if (batch.Count != batchImages.Count)
Expand Down
15 changes: 14 additions & 1 deletion src/protagonist/API/Infrastructure/ServiceCollectionX.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
using System.Reflection;
using API.Features.Assets;
using API.Features.Customer;
using API.Features.DeliveryChannels;
using API.Features.DeliveryChannels.DataAccess;
using API.Infrastructure.Messaging;
using API.Infrastructure.Requests.Pipelines;
using DLCS.AWS.Configuration;
using DLCS.AWS.ElasticTranscoder;
using DLCS.AWS.S3;
using DLCS.AWS.SNS;
using DLCS.AWS.SNS.Messaging;
using DLCS.AWS.SQS;
using DLCS.Core.Caching;
using DLCS.Mediatr.Behaviours;
Expand Down Expand Up @@ -154,4 +155,16 @@ public static IServiceCollection ConfigureSwagger(this IServiceCollection servic
var xmlPath = Path.Combine(AppContext.BaseDirectory, xmlFile);
c.IncludeXmlComments(xmlPath);
});

/// <summary>
/// Add topic notifiers
/// </summary>
public static IServiceCollection AddTopicNotifiers(this IServiceCollection services)
{
services
.AddScoped<ICustomerNotificationSender, CustomerNotificationSender>()
.AddScoped<IBatchCompletedNotificationSender, BatchCompletedNotificationSender>();

return services;
}
}
3 changes: 1 addition & 2 deletions src/protagonist/API/Startup.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System.Security.Claims;
using API.Auth;
using API.Features.DeliveryChannels.Converters;
using API.Features.DeliveryChannels.Validation;
using API.Features.Image.Ingest;
using API.Features.OriginStrategies.Credentials;
Expand Down Expand Up @@ -75,7 +74,6 @@ public void ConfigureServices(IServiceCollection services)
.AddDataAccess(configuration)
.AddScoped<IIngestNotificationSender, IngestNotificationSender>()
.AddScoped<IAssetNotificationSender, AssetNotificationSender>()
.AddScoped<ICustomerNotificationSender, CustomerNotificationSender>()
.AddScoped<AssetProcessor>()
.AddScoped<DeliveryChannelProcessor>()
.AddTransient<TimingHandler>()
Expand All @@ -85,6 +83,7 @@ public void ConfigureServices(IServiceCollection services)
.AddNamedQueriesCore()
.AddAws(configuration, webHostEnvironment)
.AddCorrelationIdHeaderPropagation()
.AddTopicNotifiers()
.ConfigureSwagger();

services.AddHttpClient<IEngineClient, EngineClient>(client =>
Expand Down
4 changes: 3 additions & 1 deletion src/protagonist/API/appsettings-Development-Example.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
"FileQueueName": "dlcs-file"
},
"SNS": {
"AssetModifiedNotificationTopicArn": "arn:aws:sns:eu-west-1:{AWS account}:dlcsspinup-asset-modified-notifications"
"AssetModifiedNotificationTopicArn": "arn:aws:sns:eu-west-1:{AWS account}:dlcsspinup-asset-modified-notifications",
"CustomerCreatedTopicArn": "arn:aws:sns:{region}:{account}:{prefix}-customer-created",
"BatchCompletedTopicArn": "arn:aws:sns:{region}:{account}:{prefix}-batch-completion"
}
},
"DLCS": {
Expand Down
34 changes: 34 additions & 0 deletions src/protagonist/DLCS.AWS/SNS/BatchCompletedNotification.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
using DLCS.Model.Assets;

namespace DLCS.AWS.SNS;

public class BatchCompletedNotification
{
public int Id { get; private set; }

public int CustomerId { get; private set; }

public int Total { get; private set; }

public int Success { get; private set; }

public int Errors { get; private set; }

public bool Superseded { get; private set; }

public DateTime Started { get; private set; }

public DateTime? Finished { get; private set; }

public BatchCompletedNotification(Batch completedBatch)
{
Id = completedBatch.Id;
CustomerId = completedBatch.Customer;
Total = completedBatch.Count;
Success = completedBatch.Completed;
Errors = completedBatch.Errors;
Superseded = completedBatch.Superseded;
Started = completedBatch.Submitted;
Finished = completedBatch.Finished;
}
}
7 changes: 7 additions & 0 deletions src/protagonist/DLCS.AWS/SNS/ITopicPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ public Task<bool> PublishToAssetModifiedTopic(IReadOnlyList<AssetModifiedNotific
/// <returns>Boolean representing the overall success/failure status of request</returns>
public Task<bool> PublishToCustomerCreatedTopic(CustomerCreatedNotification message,
CancellationToken cancellationToken);

/// <summary>
/// Asynchronously publishes a message to the Batch completed topic
/// </summary>
/// <returns>Boolean representing the overall success/failure status of request</returns>
public Task<bool> PublishToBatchCompletedTopic(BatchCompletedNotification message,
CancellationToken cancellationToken);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using DLCS.Model.Assets;
using Microsoft.Extensions.Logging;

namespace DLCS.AWS.SNS.Messaging;

public class BatchCompletedNotificationSender : IBatchCompletedNotificationSender
{
private readonly ITopicPublisher topicPublisher;
private readonly ILogger<BatchCompletedNotificationSender> logger;

public BatchCompletedNotificationSender(ITopicPublisher topicPublisher,
ILogger<BatchCompletedNotificationSender> logger)
{
this.topicPublisher = topicPublisher;
this.logger = logger;
}

public async Task SendBatchCompletedMessage(Batch batch, CancellationToken cancellationToken = default)
{
logger.LogDebug("Sending notification of creation of batch {Batch}", batch.Id);

var batchCompletedNotification = new BatchCompletedNotification(batch);
await topicPublisher.PublishToBatchCompletedTopic(batchCompletedNotification, cancellationToken);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using DLCS.Model.Assets;

namespace DLCS.AWS.SNS.Messaging;

public interface IBatchCompletedNotificationSender
{
/// <summary>
/// Broadcast batch completed notification
/// </summary>
Task SendBatchCompletedMessage(Batch completedBatch, CancellationToken cancellationToken = default);
}
26 changes: 26 additions & 0 deletions src/protagonist/DLCS.AWS/SNS/TopicPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,32 @@ public async Task<bool> PublishToCustomerCreatedTopic(CustomerCreatedNotificatio
return await TryPublishRequest(request, cancellationToken);
}

/// <inheritdoc />
public async Task<bool> PublishToBatchCompletedTopic(BatchCompletedNotification message, CancellationToken cancellationToken)
{
if (string.IsNullOrEmpty(snsSettings.BatchCompletedTopicArn))
{
logger.LogWarning("Customer Created Topic Arn is not set - cannot send CustomerCreatedNotification");
return false;
}

var request = new PublishRequest
{
TopicArn = snsSettings.BatchCompletedTopicArn,
Message = JsonSerializer.Serialize(message, settings),
MessageAttributes = new Dictionary<string, MessageAttributeValue>()
{
{"CustomerId", new MessageAttributeValue
{
StringValue = message.CustomerId.ToString(),
DataType = "String"
}}
}
};

return await TryPublishRequest(request, cancellationToken);
}

private Task<bool> PublishToAssetModifiedTopic(AssetModifiedNotification message,
CancellationToken cancellationToken = default)
{
Expand Down
5 changes: 5 additions & 0 deletions src/protagonist/DLCS.AWS/Settings/SNSSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ public class SNSSettings
/// </summary>
public string? CustomerCreatedTopicArn { get; set; }

/// <summary>
/// Name of the SNS topic for notifying that
/// </summary>
public string? BatchCompletedTopicArn { get; set; }

/// <summary>
/// Service root for SNS. Only used if running LocalStack
/// </summary>
Expand Down
Loading

0 comments on commit 34560bc

Please sign in to comment.