Skip to content

Commit

Permalink
Added simple retry if not single chunk was not delivered, default is …
Browse files Browse the repository at this point in the history
…no retry, attempts configurable
  • Loading branch information
KnollNi committed Dec 16, 2024
1 parent 349f55c commit 23dd0b2
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 16 deletions.
38 changes: 26 additions & 12 deletions src/Toolbox/SAF.Toolbox.Tests/Filetransfer/FileSenderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//
// SPDX-License-Identifier: MPL-2.0

using Microsoft.Extensions.Options;
using NSubstitute;
using SAF.Common;
using SAF.Toolbox.FileTransfer;
Expand All @@ -15,9 +16,9 @@ public class FileSenderTests
[Theory]
[InlineData(1)] // 1 byte
[InlineData(1024)] // 1 kByte
[InlineData(1024 * 3)] // 3 kByte
[InlineData(1024 * 3)] // 3 kByte
[InlineData(1024 * 1024)] // 1 MByte
[InlineData(1024 * 1024 * 3)] // 3 MByte
[InlineData(1024 * 1024 * 3)] // 3 MByte
[InlineData(FileSender.MaxChunkSize - 1)] // excact chunk size - 1
[InlineData(FileSender.MaxChunkSize)] // excact chunk size
[InlineData(FileSender.MaxChunkSize + 1)] // excact chunk size + 1
Expand All @@ -30,7 +31,9 @@ public async Task SendInChunksCallsPublishOk(int fileSizeInBytes)
Action<Message>? senderHandler = null;
var messaging = Substitute.For<IMessagingInfrastructure>();
messaging.When(m => m.Subscribe(Arg.Any<string>(), Arg.Any<Action<Message>>()))
.Do(args => senderHandler = args.Arg<Action<Message>>());
.Do(args => senderHandler = args.Arg<Action<Message>>());
var options = Substitute.For<IOptions<FileSenderConfiguration>>();
options.Value.Returns(new FileSenderConfiguration());

var testChannel = $"tests/fileSender/{fileSizeInBytes}";
var buffer = new byte[fileSizeInBytes];
Expand All @@ -43,7 +46,7 @@ public async Task SendInChunksCallsPublishOk(int fileSizeInBytes)
});
using (var tempFile = new TemporaryFile($"file{fileSizeInBytes}.tmp", buffer))
{
var fileSender = new FileSender(messaging, null);
var fileSender = new FileSender(messaging, null, options);
var sendResult = await fileSender.SendInChunks(testChannel, tempFile.TempFilePath);
Assert.Equal(FileTransferStatus.Delivered, sendResult);
}
Expand All @@ -56,9 +59,9 @@ public async Task SendInChunksCallsPublishOk(int fileSizeInBytes)
[Theory]
[InlineData(1)] // 1 byte
[InlineData(1024)] // 1 kByte
[InlineData(1024 * 3)] // 3 kByte
[InlineData(1024 * 3)] // 3 kByte
[InlineData(1024 * 1024)] // 1 MByte
[InlineData(1024 * 1024 * 3)] // 3 MByte
[InlineData(1024 * 1024 * 3)] // 3 MByte
[InlineData(FileSender.MaxChunkSize - 1)] // excact chunk size - 1
[InlineData(FileSender.MaxChunkSize)] // excact chunk size
[InlineData(FileSender.MaxChunkSize + 1)] // excact chunk size + 1
Expand All @@ -71,6 +74,8 @@ public async Task SendInChunksAllowsWriteAccessToFileAfterSendingLastChunkOk(int
var messaging = Substitute.For<IMessagingInfrastructure>();
messaging.When(m => m.Subscribe(Arg.Any<string>(), Arg.Any<Action<Message>>()))
.Do(args => senderHandler = args.Arg<Action<Message>>());
var options = Substitute.For<IOptions<FileSenderConfiguration>>();
options.Value.Returns(new FileSenderConfiguration());

var testChannel = $"tests/fileSender/{fileSizeInBytes}";
var buffer = new byte[fileSizeInBytes];
Expand All @@ -93,7 +98,7 @@ public async Task SendInChunksAllowsWriteAccessToFileAfterSendingLastChunkOk(int
senderHandler?.Invoke(new Message { Topic = req.ReplyTo, Payload = "OK" });
});

var fileSender = new FileSender(messaging, null);
var fileSender = new FileSender(messaging, null, options);
var sendResult = await fileSender.SendInChunks(testChannel, tempFile.TempFilePath);
Assert.Equal(FileTransferStatus.Delivered, sendResult);
}
Expand Down Expand Up @@ -121,6 +126,8 @@ public async Task SendInChunksUsesSameUniqueTransferIdForEachChunkOk(int fileSiz
var messaging = Substitute.For<IMessagingInfrastructure>();
messaging.When(m => m.Subscribe(Arg.Any<string>(), Arg.Any<Action<Message>>()))
.Do(args => senderHandler = args.Arg<Action<Message>>());
var options = Substitute.For<IOptions<FileSenderConfiguration>>();
options.Value.Returns(new FileSenderConfiguration());

var testChannel = $"tests/fileSender/{fileSizeInBytes}";
var buffer = new byte[fileSizeInBytes];
Expand All @@ -139,7 +146,7 @@ public async Task SendInChunksUsesSameUniqueTransferIdForEachChunkOk(int fileSiz
});
using (var tempFile = new TemporaryFile($"file{fileSizeInBytes}.tmp", buffer))
{
var fileSender = new FileSender(messaging, null);
var fileSender = new FileSender(messaging, null, options);
var sendResult = await fileSender.SendInChunks(testChannel, tempFile.TempFilePath);
Assert.Equal(FileTransferStatus.Delivered, sendResult);
}
Expand All @@ -149,23 +156,30 @@ public async Task SendInChunksUsesSameUniqueTransferIdForEachChunkOk(int fileSiz
messaging.Received(Convert.ToInt32(expectedCalls)).Publish(Arg.Is<Message>(msg => msg.Topic == testChannel));
}

[Fact]
public async Task SendInChunksWithMissingAnswerReturnsTimedOutOk()
[Theory]
[InlineData(1)]
[InlineData(4)]
public async Task SendInChunksWithMissingAnswerReturnsTimedOutOk(int expectedCalls)
{
const int fileSizeInBytes = 1024;

var messaging = Substitute.For<IMessagingInfrastructure>();
var options = Substitute.For<IOptions<FileSenderConfiguration>>();
options.Value.Returns(new FileSenderConfiguration
{
RetryAttemptsForFailedChunks = expectedCalls - 1
});

var testChannel = $"tests/fileSender/{fileSizeInBytes}";
var buffer = new byte[fileSizeInBytes];
using (var tempFile = new TemporaryFile($"file{fileSizeInBytes}.tst", buffer))
{
var fileSender = new FileSender(messaging, null);
var fileSender = new FileSender(messaging, null, options);
fileSender.Timeout = 2000;
var sendResult = await fileSender.SendInChunks(testChannel, tempFile.TempFilePath);
Assert.Equal(FileTransferStatus.TimedOut, sendResult);
}

var expectedCalls = 1;
messaging.Received(expectedCalls).Publish(Arg.Is<Message>(msg => msg.Topic == testChannel));
}
}
11 changes: 11 additions & 0 deletions src/Toolbox/SAF.Toolbox/FileSenderConfiguration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// *****************************************************************************
// Copyright (c) 2024 TRUMPF Laser- und Systemtechnik GmbH
// All Rights Reserved, see LICENSE.TXT for further details
// *****************************************************************************

namespace SAF.Toolbox;

public class FileSenderConfiguration
{
public int RetryAttemptsForFailedChunks { get; set; } = 0;
}
51 changes: 48 additions & 3 deletions src/Toolbox/SAF.Toolbox/Filetransfer/FileSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.IO.MemoryMappedFiles;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using SAF.Common;
using SAF.Toolbox.Serialization;

Expand All @@ -21,6 +22,7 @@ private sealed class SendRequest
private readonly CancellationTokenSource _cancelTokenSource = new();
private readonly IMessagingInfrastructure _messaging;
private readonly ILogger<FileSender> _log;
private readonly FileSenderConfiguration _options;

private readonly ConcurrentDictionary<string, SendRequest> _sendRequests = new();
private readonly object _subscription;
Expand All @@ -35,10 +37,11 @@ private sealed class SendRequest

public ulong Timeout { get; set; } = 10_000; // 10 sec.

public FileSender(IMessagingInfrastructure messaging, ILogger<FileSender>? log)
public FileSender(IMessagingInfrastructure messaging, ILogger<FileSender>? log, IOptions<FileSenderConfiguration> options)
{
_messaging = messaging ?? throw new ArgumentNullException(nameof(messaging));
_log = log ?? NullLogger<FileSender>.Instance;
_options = options.Value;

_subscription = _messaging.Subscribe($"{ReplyToPrefix}/*", msg =>
{
Expand Down Expand Up @@ -157,7 +160,12 @@ public async Task<FileTransferStatus> SendInChunks(string topic, string filePath
for (; pos < length - chunkSize; pos += chunkSize, n += 1)
{
await ReadChunkFromFile(mmf, offset, buffer);
status = await TransferFileChunk(topic, filePath, properties, offset, buffer, length, n, false, uniqueTransferId, timeoutMs);
status = await RetryAsync(
action: () => TransferFileChunk(topic, filePath, properties, offset, buffer, length, n, false,
uniqueTransferId, timeoutMs),
isDesiredResult: s => s == FileTransferStatus.Delivered,
intervalFactory: attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)),
retryAttempts: _options.RetryAttemptsForFailedChunks);
if (status != FileTransferStatus.Delivered)
{
_log.LogError($"File {filePath} could not be sent. Status: {status}, chunk no. {n}");
Expand All @@ -174,7 +182,12 @@ public async Task<FileTransferStatus> SendInChunks(string topic, string filePath
}

// Send last chunk
status = await TransferFileChunk(topic, filePath, properties, offset, buffer, length, n, true, uniqueTransferId, timeoutMs);
status = await RetryAsync(
action: () => TransferFileChunk(topic, filePath, properties, offset, buffer, length, n, true,
uniqueTransferId, timeoutMs),
isDesiredResult: s => s == FileTransferStatus.Delivered,
intervalFactory: attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)),
retryAttempts: _options.RetryAttemptsForFailedChunks);
if (status != FileTransferStatus.Delivered)
{
_log.LogError($"File {filePath} could not be sent. Status: {status}, last chunk no. {n}");
Expand All @@ -188,6 +201,38 @@ public async Task<FileTransferStatus> SendInChunks(string topic, string filePath
}
}

private static async Task<TResult> RetryAsync<TResult>(
Func<Task<TResult>> action,
Predicate<TResult> isDesiredResult,
Func<int, TimeSpan> intervalFactory,
int retryAttempts = 0)
{
// Perform the action once before retrying
var result = await action();

if (isDesiredResult(result))
{
return result;
}

for (var attempted = 0; attempted < retryAttempts; attempted++)
{
result = await action();

if (isDesiredResult(result))
{
return result;
}

var retryInterval = intervalFactory(attempted);
// Wait before retrying
await Task.Delay(retryInterval);
}

// Return the last result, even if it is not the desired one
return result;
}

private long GenerateUniqueTransferId()
{
lock (_syncTransferId)
Expand Down
3 changes: 2 additions & 1 deletion src/Toolbox/SAF.Toolbox/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ public static IServiceCollection AddRequestClient(this IServiceCollection servic
return services;
}

public static IServiceCollection AddFileSender(this IServiceCollection services)
public static IServiceCollection AddFileSender(this IServiceCollection services, IConfiguration? hostConfig = null)
{
services.AddTransient<IFileSender, FileSender>();
if (hostConfig != null) services.AddServiceConfiguration<FileSenderConfiguration>(hostConfig, nameof(FileSender));
return services;
}

Expand Down

0 comments on commit 23dd0b2

Please sign in to comment.