Skip to content

Commit

Permalink
fix: better consumer worker exception handling
Browse files Browse the repository at this point in the history
  • Loading branch information
filipeesch committed Jan 30, 2023
1 parent 7cb78d0 commit e7ffd29
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 65 deletions.
15 changes: 8 additions & 7 deletions src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,17 @@ public void Dispose()
private async Task DispatchAsync(IMessageContext context, MiddlewareDelegate next)
{
await this.dispatchSemaphore.WaitAsync();
var localBatch = this.batch.ToList();

try
{
var batchContext = new BatchConsumeMessageContext(context.ConsumerContext, this.batch.ToList());
var batchContext = new BatchConsumeMessageContext(context.ConsumerContext, localBatch);

await next(batchContext).ConfigureAwait(false);

foreach (var messageContext in this.batch)
{
messageContext.ConsumerContext.StoreOffset();
}
}
catch (OperationCanceledException) when (context.ConsumerContext.WorkerStopped.IsCancellationRequested)
{
// Do nothing
return;
}
catch (Exception ex)
{
Expand All @@ -106,6 +102,11 @@ private async Task DispatchAsync(IMessageContext context, MiddlewareDelegate nex
this.batch.Clear();
this.dispatchSemaphore.Release();
}

foreach (var messageContext in localBatch)
{
messageContext.ConsumerContext.StoreOffset();
}
}
}
}
114 changes: 64 additions & 50 deletions src/KafkaFlow/Consumers/ConsumerWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,31 @@ public Task StartAsync()
this.backgroundTask = Task.Run(
async () =>
{
var cancellationTokenSource = new CancellationTokenSource();

this.stopCancellationTokenSource.Token.Register(
() => cancellationTokenSource.CancelAfter(this.consumer.Configuration.WorkerStopTimeout));

try
{
while (await this.messagesBuffer.Reader.WaitToReadAsync(CancellationToken.None).ConfigureAwait(false))
var cancellationTokenSource = new CancellationTokenSource();

this.stopCancellationTokenSource.Token.Register(
() => cancellationTokenSource.CancelAfter(this.consumer.Configuration.WorkerStopTimeout));

try
{
while (this.messagesBuffer.Reader.TryRead(out var message))
while (await this.messagesBuffer.Reader.WaitToReadAsync(CancellationToken.None).ConfigureAwait(false))
{
await this.ProcessMessageAsync(message, cancellationTokenSource.Token).ConfigureAwait(false);
while (this.messagesBuffer.Reader.TryRead(out var message))
{
await this.ProcessMessageAsync(message, cancellationTokenSource.Token).ConfigureAwait(false);
}
}
}
catch (OperationCanceledException)
{
// Ignores the exception
}
}
catch (OperationCanceledException)
catch (Exception ex)
{
// Ignores the exception
this.logHandler.Error("KafkaFlow consumer worker fatal error", ex, null);
}
});

Expand All @@ -97,56 +104,63 @@ public void OnTaskCompleted(Action handler)

private async Task ProcessMessageAsync(ConsumeResult<byte[], byte[]> message, CancellationToken cancellationToken)
{
var context = new MessageContext(
new Message(message.Message.Key, message.Message.Value),
new MessageHeaders(message.Message.Headers),
new ConsumerContext(
this.consumer,
this.offsetManager,
message,
cancellationToken,
this.Id),
null);

try
{
var scope = this.dependencyResolver.CreateScope();
var context = new MessageContext(
new Message(message.Message.Key, message.Message.Value),
new MessageHeaders(message.Message.Headers),
new ConsumerContext(
this.consumer,
this.offsetManager,
message,
cancellationToken,
this.Id),
null);

try
{
var scope = this.dependencyResolver.CreateScope();

this.offsetManager.OnOffsetProcessed(
message.TopicPartitionOffset,
() => scope.Dispose());
this.offsetManager.OnOffsetProcessed(
message.TopicPartitionOffset,
() => scope.Dispose());

await this.middlewareExecutor
.Execute(scope.Resolver, context, _ => Task.CompletedTask)
.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
if (cancellationToken.IsCancellationRequested)
await this.middlewareExecutor
.Execute(scope.Resolver, context, _ => Task.CompletedTask)
.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
return;
if (cancellationToken.IsCancellationRequested)
{
return;
}
}
catch (Exception ex)
{
this.logHandler.Error(
"Error processing message",
ex,
new
{
context.Message,
context.ConsumerContext.Topic,
MessageKey = context.Message.Key,
context.ConsumerContext.ConsumerName,
});
}

if (this.consumer.Configuration.AutoStoreOffsets && context.ConsumerContext.ShouldStoreOffset)
{
this.offsetManager.MarkAsProcessed(message.TopicPartitionOffset);
}

this.onMessageFinishedHandler?.Invoke();
}
catch (Exception ex)
{
this.logHandler.Error(
"Error processing message",
ex,
new
{
context.Message,
context.ConsumerContext.Topic,
MessageKey = context.Message.Key,
context.ConsumerContext.ConsumerName,
});
}

if (this.consumer.Configuration.AutoStoreOffsets && context.ConsumerContext.ShouldStoreOffset)
{
this.offsetManager.MarkAsProcessed(message.TopicPartitionOffset);
this.logHandler.Error("KafkaFlow internal message error", ex, null);
}

this.onMessageFinishedHandler?.Invoke();
}
}
}
22 changes: 14 additions & 8 deletions src/KafkaFlow/Consumers/OffsetCommitter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,19 @@ private void PendingOffsetsHandler(

private void CommitHandler()
{
if (!this.offsetsToCommit.Any())
{
return;
}

var offsets = this.offsetsToCommit;
this.offsetsToCommit = new ConcurrentDictionary<(string, int), TopicPartitionOffset>();
ConcurrentDictionary<(string, int), TopicPartitionOffset> offsets = null;

try
{
if (!this.offsetsToCommit.Any())
{
return;
}

offsets = Interlocked.Exchange(
ref this.offsetsToCommit,
new ConcurrentDictionary<(string, int), TopicPartitionOffset>());

this.consumer.Commit(offsets.Values);

if (!this.consumer.Configuration.ManagementDisabled)
Expand Down Expand Up @@ -113,7 +116,10 @@ private void CommitHandler()
"Error Commiting Offsets",
new { ErrorMessage = e.Message });

this.RequeueFailedOffsets(offsets.Values);
if (offsets is not null)
{
this.RequeueFailedOffsets(offsets.Values);
}
}
}

Expand Down

0 comments on commit e7ffd29

Please sign in to comment.