Skip to content

Commit

Permalink
Change mongodb connector to write in parallel (#197)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ulimo authored Dec 15, 2023
1 parent ec656b5 commit 14a96b1
Showing 1 changed file with 59 additions and 2 deletions.
61 changes: 59 additions & 2 deletions src/FlowtideDotNet.Connector.MongoDB/Internal/MongoDBSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

using FlowtideDotNet.Core.Operators.Write;
using FlowtideDotNet.Substrait.Relations;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MongoDB.Bson;
using MongoDB.Driver;
Expand Down Expand Up @@ -43,8 +44,11 @@ public MongoDBSink(FlowtideMongoDBSinkOptions options, WriteRelation writeRelati

protected override Task<MetadataResult> SetupAndLoadMetadataAsync()
{
var connection = new MongoUrl(options.ConnectionString);
var urlBuilder = new MongoUrlBuilder(options.ConnectionString);
var connection = urlBuilder.ToMongoUrl();
var client = new MongoClient(connection);


var database = client.GetDatabase(options.Database);
collection = database.GetCollection<BsonDocument>(options.Collection);
primaryKeys = new List<int>();
Expand All @@ -60,6 +64,46 @@ protected override Task<MetadataResult> SetupAndLoadMetadataAsync()
return Task.FromResult(new MetadataResult(primaryKeys));
}

private Task WriteData(List<WriteModel<BsonDocument>> writes, CancellationToken cancellationToken)
{
if (writes.Count > 0)
{
return Task.Factory.StartNew((state) =>
{
var w = state as List<WriteModel<BsonDocument>>;
return WriteDataTask(w, cancellationToken);
}, writes)
.Unwrap();
}
return Task.CompletedTask;
}

private async Task WriteDataTask(List<WriteModel<BsonDocument>> writes, CancellationToken cancellationToken)
{
if (writes.Count > 0)
{
int retryCount = 0;
while (retryCount < 10)
{
try
{
await collection.BulkWriteAsync(writes);
return;
}
catch (Exception e)
{
if (retryCount == 10)
{
throw;
}
Logger.LogWarning("Failed to write to mongoDB, will retry");
retryCount++;
await Task.Delay(TimeSpan.FromSeconds(5));
}
}
}
}

protected override async Task UploadChanges(IAsyncEnumerable<SimpleChangeEvent> rows, CancellationToken cancellationToken)
{
List<WriteModel<BsonDocument>> writes = new List<WriteModel<BsonDocument>>();
Expand Down Expand Up @@ -102,6 +146,18 @@ protected override async Task UploadChanges(IAsyncEnumerable<SimpleChangeEvent>
{
if (writeTasks[i].IsCompleted)
{
if (writeTasks[i].IsFaulted)
{
var exception = writeTasks[i].Exception;
if (exception != null)
{
throw exception;
}
else
{
throw new InvalidOperationException("MongoDB write failed without exception");
}
}
writeTasks.RemoveAt(i);
}
}
Expand All @@ -110,7 +166,8 @@ protected override async Task UploadChanges(IAsyncEnumerable<SimpleChangeEvent>
await Task.WhenAny(writeTasks);
}
}
writeTasks.Add(collection.BulkWriteAsync(writes));

writeTasks.Add(WriteData(writes, cancellationToken));
writes = new List<WriteModel<BsonDocument>>();
}
}
Expand Down

0 comments on commit 14a96b1

Please sign in to comment.