Skip to content

Commit

Permalink
Add settings to mongodb to configure write batches (#224)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ulimo authored Dec 24, 2023
1 parent 21899b9 commit 6a75bfe
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 3 deletions.
10 changes: 10 additions & 0 deletions src/FlowtideDotNet.Connector.MongoDB/FlowtideMongoDBSinkOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,15 @@ public class FlowtideMongoDBSinkOptions
public Func<IMongoCollection<BsonDocument>, Task>? OnInitialDataSent { get; set; }

public Func<Watermark, Task>? OnWatermarkUpdate { get; set; }

/// <summary>
/// Set the amount of documents that will be sent per batch to mongodb.
/// </summary>
public int DocumentsPerBatch { get; set; } = 100;

/// <summary>
/// Set the amount of batches that will be sent in parallel to mongodb.
/// </summary>
public int ParallelBatches { get; set; } = 10;
}
}
6 changes: 3 additions & 3 deletions src/FlowtideDotNet.Connector.MongoDB/Internal/MongoDBSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ protected override async Task UploadChanges(IAsyncEnumerable<SimpleChangeEvent>
writes.Add(new ReplaceOneModel<BsonDocument>(filter, doc) { IsUpsert = true });
}

if (writes.Count >= 100)
if (writes.Count >= options.DocumentsPerBatch)
{
while (writeTasks.Count >= 100)
while (writeTasks.Count >= options.ParallelBatches)
{
for(int i = 0; i < writeTasks.Count; i++)
{
Expand All @@ -178,7 +178,7 @@ protected override async Task UploadChanges(IAsyncEnumerable<SimpleChangeEvent>
writeTasks.RemoveAt(i);
}
}
if (writeTasks.Count >= 100)
if (writeTasks.Count >= options.ParallelBatches)
{
await Task.WhenAny(writeTasks);
}
Expand Down

0 comments on commit 6a75bfe

Please sign in to comment.