diff --git a/src/FlowtideDotNet.Connector.MongoDB/Internal/MongoDBSink.cs b/src/FlowtideDotNet.Connector.MongoDB/Internal/MongoDBSink.cs index 04901b48d..90857136a 100644 --- a/src/FlowtideDotNet.Connector.MongoDB/Internal/MongoDBSink.cs +++ b/src/FlowtideDotNet.Connector.MongoDB/Internal/MongoDBSink.cs @@ -12,6 +12,7 @@ using FlowtideDotNet.Core.Operators.Write; using FlowtideDotNet.Substrait.Relations; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using MongoDB.Bson; using MongoDB.Driver; @@ -43,8 +44,11 @@ public MongoDBSink(FlowtideMongoDBSinkOptions options, WriteRelation writeRelati protected override Task SetupAndLoadMetadataAsync() { - var connection = new MongoUrl(options.ConnectionString); + var urlBuilder = new MongoUrlBuilder(options.ConnectionString); + var connection = urlBuilder.ToMongoUrl(); var client = new MongoClient(connection); + + var database = client.GetDatabase(options.Database); collection = database.GetCollection(options.Collection); primaryKeys = new List(); @@ -60,6 +64,46 @@ protected override Task SetupAndLoadMetadataAsync() return Task.FromResult(new MetadataResult(primaryKeys)); } + private Task WriteData(List> writes, CancellationToken cancellationToken) + { + if (writes.Count > 0) + { + return Task.Factory.StartNew((state) => + { + var w = state as List>; + return WriteDataTask(w, cancellationToken); + }, writes) + .Unwrap(); + } + return Task.CompletedTask; + } + + private async Task WriteDataTask(List> writes, CancellationToken cancellationToken) + { + if (writes.Count > 0) + { + int retryCount = 0; + while (retryCount < 10) + { + try + { + await collection.BulkWriteAsync(writes); + return; + } + catch (Exception e) + { + if (retryCount == 10) + { + throw; + } + Logger.LogWarning("Failed to write to mongoDB, will retry"); + retryCount++; + await Task.Delay(TimeSpan.FromSeconds(5)); + } + } + } + } + protected override async Task UploadChanges(IAsyncEnumerable rows, CancellationToken cancellationToken) { List> writes = new List>(); @@ -102,6 +146,18 @@ protected override async Task UploadChanges(IAsyncEnumerable { if (writeTasks[i].IsCompleted) { + if (writeTasks[i].IsFaulted) + { + var exception = writeTasks[i].Exception; + if (exception != null) + { + throw exception; + } + else + { + throw new InvalidOperationException("MongoDB write failed without exception"); + } + } writeTasks.RemoveAt(i); } } @@ -110,7 +166,8 @@ protected override async Task UploadChanges(IAsyncEnumerable await Task.WhenAny(writeTasks); } } - writeTasks.Add(collection.BulkWriteAsync(writes)); + + writeTasks.Add(WriteData(writes, cancellationToken)); writes = new List>(); } }