diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 8d7e883c8..9c9c9bdbb 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -48,5 +48,7 @@ jobs: run: dotnet pack -c Release -o . src/FlowtideDotNet.Connector.CosmosDB/FlowtideDotNet.Connector.CosmosDB.csproj - name: Package elasticsearch run: dotnet pack -c Release -o . src/FlowtideDotNet.Connector.ElasticSearch/FlowtideDotNet.Connector.ElasticSearch.csproj + - name: Package mongodb + run: dotnet pack -c Release -o . src/FlowtideDotNet.Connector.MongoDB/FlowtideDotNet.Connector.MongoDB.csproj - name: Publish run: dotnet nuget push *.nupkg -k ${{ secrets.NUGET_KEY }} -s https://api.nuget.org/v3/index.json diff --git a/Flowtide.sln b/Flowtide.sln index baff32783..c099a3967 100644 --- a/Flowtide.sln +++ b/Flowtide.sln @@ -51,6 +51,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FlowtideDotNet.Connector.El EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FlowtideDotNet.Connector.SqlServer", "src\FlowtideDotNet.Connector.SqlServer\FlowtideDotNet.Connector.SqlServer.csproj", "{B80C06C0-B88D-44D7-958B-AD41CB3CE16B}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FlowtideDotNet.Connector.MongoDB", "src\FlowtideDotNet.Connector.MongoDB\FlowtideDotNet.Connector.MongoDB.csproj", "{197D0AAD-A6ED-412A-A164-905145A796D5}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "FlowtideDotNet.Connector.MongoDB.Tests", "tests\FlowtideDotNet.Connector.MongoDB.Tests\FlowtideDotNet.Connector.MongoDB.Tests.csproj", "{45516C5A-FB7A-4EB2-9F84-48A58F838A0A}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -141,6 +145,14 @@ Global {B80C06C0-B88D-44D7-958B-AD41CB3CE16B}.Debug|Any CPU.Build.0 = Debug|Any CPU {B80C06C0-B88D-44D7-958B-AD41CB3CE16B}.Release|Any CPU.ActiveCfg = Release|Any CPU {B80C06C0-B88D-44D7-958B-AD41CB3CE16B}.Release|Any CPU.Build.0 = Release|Any CPU + {197D0AAD-A6ED-412A-A164-905145A796D5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {197D0AAD-A6ED-412A-A164-905145A796D5}.Debug|Any CPU.Build.0 = Debug|Any CPU + {197D0AAD-A6ED-412A-A164-905145A796D5}.Release|Any CPU.ActiveCfg = Release|Any CPU + {197D0AAD-A6ED-412A-A164-905145A796D5}.Release|Any CPU.Build.0 = Release|Any CPU + {45516C5A-FB7A-4EB2-9F84-48A58F838A0A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {45516C5A-FB7A-4EB2-9F84-48A58F838A0A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {45516C5A-FB7A-4EB2-9F84-48A58F838A0A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {45516C5A-FB7A-4EB2-9F84-48A58F838A0A}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -167,6 +179,8 @@ Global {F57CC67D-A066-4B30-BBFC-517C4E35A7E8} = {2E41A9E4-8F33-4708-81F2-1626EBEE6114} {155EE952-A173-4DAA-91C5-6424B7F5EF9E} = {16CED581-7A8D-4806-B6F6-C544D030CC95} {B80C06C0-B88D-44D7-958B-AD41CB3CE16B} = {2E41A9E4-8F33-4708-81F2-1626EBEE6114} + {197D0AAD-A6ED-412A-A164-905145A796D5} = {2E41A9E4-8F33-4708-81F2-1626EBEE6114} + {45516C5A-FB7A-4EB2-9F84-48A58F838A0A} = {16CED581-7A8D-4806-B6F6-C544D030CC95} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {314C9503-DE99-42AA-8FB0-E355C91986C4} diff --git a/docs/docs/connectors/mongodb.md b/docs/docs/connectors/mongodb.md new file mode 100644 index 000000000..c8c1bdbc1 --- /dev/null +++ b/docs/docs/connectors/mongodb.md @@ -0,0 +1,31 @@ +--- +sidebar_position: 2 +--- + +# MongoDB Connector + +The MongoDB connector allows you to insert data into a MongoDB collection. +At this time only a sink is implemented, there is no support yet to have MongoDB as a source. + +## Sink + +The MongoDB sink allows the insertion of data into a MongoDB collection. + +The nuget package for the connector is: + +* FlowtideDotNet.Connector.MongoDB + +Its implementation waits fully until the stream has reached a steady state at a time T until it writes data to the collection. +This means that its table output can always be traced back to a state from the source systems. + +To use the *MongoDB Sink* add the following line to the *ReadWriteFactory*: + +```csharp +factory.AddMongoDbSink("regex pattern for tablename", new FlowtideMongoDBSinkOptions() + { + Collection = collection, //MongoDB collection + Database = databaseName, // MongoDB database + ConnectionString = connectionString, //Connection string to MongoDB + PrimaryKeys = primaryKeys //List of columns that will be treated as primary keys in the collection + }); +``` \ No newline at end of file diff --git a/src/FlowtideDotNet.Connector.MongoDB/Extensions/FlowtideMongoDbReadWriteFactoryExtensions.cs b/src/FlowtideDotNet.Connector.MongoDB/Extensions/FlowtideMongoDbReadWriteFactoryExtensions.cs new file mode 100644 index 000000000..770bc38c6 --- /dev/null +++ b/src/FlowtideDotNet.Connector.MongoDB/Extensions/FlowtideMongoDbReadWriteFactoryExtensions.cs @@ -0,0 +1,47 @@ +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using FlowtideDotNet.Connector.MongoDB.Internal; +using FlowtideDotNet.Core.Engine; +using FlowtideDotNet.Substrait.Relations; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Text.RegularExpressions; +using System.Threading.Tasks; + +namespace FlowtideDotNet.Connector.MongoDB.Extensions +{ + public static class FlowtideMongoDbReadWriteFactoryExtensions + { + public static ReadWriteFactory AddMongoDbSink(this ReadWriteFactory factory, string regexPattern, FlowtideMongoDBSinkOptions options, Action? transform = null) + { + if (regexPattern == "*") + { + regexPattern = ".*"; + } + factory.AddWriteResolver((writeRel, opt) => + { + var regexResult = Regex.Match(writeRel.NamedObject.DotSeperated, regexPattern, RegexOptions.IgnoreCase); + if (!regexResult.Success) + { + return null; + } + transform?.Invoke(writeRel); + + return new MongoDBSink(options, writeRel, Core.Operators.Write.ExecutionMode.OnCheckpoint, opt); + }); + return factory; + } + } +} diff --git a/src/FlowtideDotNet.Connector.MongoDB/FlowtideDotNet.Connector.MongoDB.csproj b/src/FlowtideDotNet.Connector.MongoDB/FlowtideDotNet.Connector.MongoDB.csproj new file mode 100644 index 000000000..7c0321a21 --- /dev/null +++ b/src/FlowtideDotNet.Connector.MongoDB/FlowtideDotNet.Connector.MongoDB.csproj @@ -0,0 +1,17 @@ + + + + net7.0 + enable + enable + + + + + + + + + + + diff --git a/src/FlowtideDotNet.Connector.MongoDB/FlowtideMongoDBSinkOptions.cs b/src/FlowtideDotNet.Connector.MongoDB/FlowtideMongoDBSinkOptions.cs new file mode 100644 index 000000000..d871dbba8 --- /dev/null +++ b/src/FlowtideDotNet.Connector.MongoDB/FlowtideMongoDBSinkOptions.cs @@ -0,0 +1,31 @@ +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace FlowtideDotNet.Connector.MongoDB +{ + public class FlowtideMongoDBSinkOptions + { + public string ConnectionString { get; set; } + + public string Database { get; set; } + + public string Collection { get; set; } + + public List PrimaryKeys { get; set; } + } +} diff --git a/src/FlowtideDotNet.Connector.MongoDB/Internal/MongoDBSink.cs b/src/FlowtideDotNet.Connector.MongoDB/Internal/MongoDBSink.cs new file mode 100644 index 000000000..fb6f19c8e --- /dev/null +++ b/src/FlowtideDotNet.Connector.MongoDB/Internal/MongoDBSink.cs @@ -0,0 +1,109 @@ +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using FlowtideDotNet.Core.Operators.Write; +using FlowtideDotNet.Substrait.Relations; +using Microsoft.Extensions.Options; +using MongoDB.Bson; +using MongoDB.Driver; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; + +namespace FlowtideDotNet.Connector.MongoDB.Internal +{ + internal class MongoDBSink : SimpleGroupedWriteOperator + { + private readonly FlowtideMongoDBSinkOptions options; + private readonly WriteRelation writeRelation; + private readonly StreamEventToBson streamEventToBson; + private IMongoCollection collection; + List primaryKeys; + + public MongoDBSink(FlowtideMongoDBSinkOptions options, WriteRelation writeRelation, ExecutionMode executionMode, ExecutionDataflowBlockOptions executionDataflowBlockOptions) : base(executionMode, executionDataflowBlockOptions) + { + this.options = options; + this.writeRelation = writeRelation; + streamEventToBson = new StreamEventToBson(writeRelation.TableSchema.Names); + } + + public override string DisplayName => "MongoDB Sink"; + + protected override Task SetupAndLoadMetadataAsync() + { + var connection = new MongoUrl(options.ConnectionString); + var client = new MongoClient(connection); + var database = client.GetDatabase(options.Database); + collection = database.GetCollection(options.Collection); + primaryKeys = new List(); + foreach (var primaryKey in options.PrimaryKeys) + { + var index = writeRelation.TableSchema.Names.FindIndex(x => x.Equals(primaryKey, StringComparison.OrdinalIgnoreCase)); + if (index < 0) + { + throw new InvalidOperationException($"Primary key '{primaryKey}' not found in table schema"); + } + primaryKeys.Add(index); + } + return Task.FromResult(new MetadataResult(primaryKeys)); + } + + protected override async Task UploadChanges(IAsyncEnumerable rows) + { + List> writes = new List>(); + await foreach(var row in rows) + { + FilterDefinition[] filters = new FilterDefinition[primaryKeys.Count]; + for (int i = 0; i < primaryKeys.Count; i++) + { + var pkname = options.PrimaryKeys[i]; + var col = row.Row.GetColumn(i); + // Need to take the row value into a bson value + filters[i] = Builders.Filter.Eq(pkname, StreamEventToBson.ToBsonValue(col)); + } + FilterDefinition? filter = null; + if (filters.Length > 1) + { + filter = Builders.Filter.And(filters); + } + else + { + filter = filters[0]; + } + if (row.IsDeleted) + { + writes.Add(new DeleteOneModel(filter)); + } + else + { + var doc = streamEventToBson.ToBson(row.Row); + writes.Add(new ReplaceOneModel(filter, doc) { IsUpsert = true }); + } + + if (writes.Count >= 1000) + { + await collection.BulkWriteAsync(writes); + writes.Clear(); + } + } + + if (writes.Count > 0) + { + await collection.BulkWriteAsync(writes); + writes.Clear(); + } + } + } +} diff --git a/src/FlowtideDotNet.Connector.MongoDB/Internal/StreamEventToBson.cs b/src/FlowtideDotNet.Connector.MongoDB/Internal/StreamEventToBson.cs new file mode 100644 index 000000000..57fe59ac8 --- /dev/null +++ b/src/FlowtideDotNet.Connector.MongoDB/Internal/StreamEventToBson.cs @@ -0,0 +1,91 @@ +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using FlexBuffers; +using FlowtideDotNet.Core; +using MongoDB.Bson; + +namespace FlowtideDotNet.Connector.MongoDB.Internal +{ + internal class StreamEventToBson + { + private readonly List names; + + public StreamEventToBson(List names) + { + this.names = names; + } + + public BsonDocument ToBson(in StreamEvent streamEvent) + { + var doc = new BsonDocument(); + + for (int i = 0; i < names.Count; i++) + { + doc.Add(names[i], ToBsonValue(streamEvent.GetColumn(i))); + } + + return doc; + } + + public static BsonValue ToBsonValue(FlxValue flxValue) + { + switch (flxValue.ValueType) + { + case FlexBuffers.Type.Null: + return BsonNull.Value; + case FlexBuffers.Type.Bool: + return new BsonBoolean(flxValue.AsBool); + case FlexBuffers.Type.Int: + return new BsonInt64(flxValue.AsLong); + case FlexBuffers.Type.Uint: + return new BsonInt64((long)flxValue.AsULong); + case FlexBuffers.Type.Float: + return new BsonDouble((long)flxValue.AsULong); + case FlexBuffers.Type.String: + return new BsonString(flxValue.AsString); + case FlexBuffers.Type.Key: + return new BsonString(flxValue.AsString); + case FlexBuffers.Type.Blob: + return new BsonBinaryData(flxValue.AsBlob.ToArray()); + case FlexBuffers.Type.Decimal: + return new BsonDecimal128(flxValue.AsDecimal); + case FlexBuffers.Type.Vector: + return ToBsonArray(flxValue.AsVector); + case FlexBuffers.Type.Map: + return ToBsonDocument(flxValue.AsMap); + default: + throw new ArgumentOutOfRangeException(); + } + } + + private static BsonValue ToBsonArray(FlxVector values) + { + var arr = new BsonArray(); + for (int i = 0; i < values.Length; i++) + { + arr.Add(ToBsonValue(values[i])); + } + return arr; + } + + private static BsonValue ToBsonDocument(FlxMap map) + { + BsonDocument doc = new BsonDocument(); + foreach (var kv in map) + { + doc.Add(kv.Key, ToBsonValue(kv.Value)); + } + return doc; + } + } +} diff --git a/src/FlowtideDotNet.Core/Operators/Write/SimpleGroupedWriteOperator.cs b/src/FlowtideDotNet.Core/Operators/Write/SimpleGroupedWriteOperator.cs index b77f118bb..abc4c667e 100644 --- a/src/FlowtideDotNet.Core/Operators/Write/SimpleGroupedWriteOperator.cs +++ b/src/FlowtideDotNet.Core/Operators/Write/SimpleGroupedWriteOperator.cs @@ -91,6 +91,8 @@ private async Task SendData() { var rowIterator = GetChangedRows(); await UploadChanges(rowIterator); + await m_modified.Clear(); + m_hasModified = false; } } diff --git a/tests/FlowtideDotNet.AcceptanceTests/Internal/FlowtideTestStream.cs b/tests/FlowtideDotNet.AcceptanceTests/Internal/FlowtideTestStream.cs index 42209a799..8d22fc72b 100644 --- a/tests/FlowtideDotNet.AcceptanceTests/Internal/FlowtideTestStream.cs +++ b/tests/FlowtideDotNet.AcceptanceTests/Internal/FlowtideTestStream.cs @@ -142,6 +142,12 @@ public void EgressCrashOnCheckpoint(int times) _egressCrashOnCheckpointCount = times; } + public async Task SchedulerTick() + { + var scheduler = _stream.Scheduler as DefaultStreamScheduler; + await scheduler!.Tick(); + } + public async Task WaitForUpdate() { Debug.Assert(_stream != null); diff --git a/tests/FlowtideDotNet.Connector.MongoDB.Tests/FlowtideDotNet.Connector.MongoDB.Tests.csproj b/tests/FlowtideDotNet.Connector.MongoDB.Tests/FlowtideDotNet.Connector.MongoDB.Tests.csproj new file mode 100644 index 000000000..33c3a1e25 --- /dev/null +++ b/tests/FlowtideDotNet.Connector.MongoDB.Tests/FlowtideDotNet.Connector.MongoDB.Tests.csproj @@ -0,0 +1,31 @@ + + + + net7.0 + enable + enable + + false + true + + + + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + + + + + + + diff --git a/tests/FlowtideDotNet.Connector.MongoDB.Tests/GlobalUsings.cs b/tests/FlowtideDotNet.Connector.MongoDB.Tests/GlobalUsings.cs new file mode 100644 index 000000000..8c927eb74 --- /dev/null +++ b/tests/FlowtideDotNet.Connector.MongoDB.Tests/GlobalUsings.cs @@ -0,0 +1 @@ +global using Xunit; \ No newline at end of file diff --git a/tests/FlowtideDotNet.Connector.MongoDB.Tests/MongoDBFixture.cs b/tests/FlowtideDotNet.Connector.MongoDB.Tests/MongoDBFixture.cs new file mode 100644 index 000000000..b5166d952 --- /dev/null +++ b/tests/FlowtideDotNet.Connector.MongoDB.Tests/MongoDBFixture.cs @@ -0,0 +1,42 @@ +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Testcontainers.MongoDb; + +namespace FlowtideDotNet.Connector.MongoDB.Tests +{ + public class MongoDBFixture : IAsyncLifetime + { + private readonly MongoDbContainer _mongoDbContainer = + new MongoDbBuilder().Build(); + + public string GetConnectionString() + { + return _mongoDbContainer.GetConnectionString(); + } + + public async Task DisposeAsync() + { + await _mongoDbContainer.DisposeAsync(); + } + + public async Task InitializeAsync() + { + await _mongoDbContainer.StartAsync(); + } + } +} diff --git a/tests/FlowtideDotNet.Connector.MongoDB.Tests/MongoDBTestStream.cs b/tests/FlowtideDotNet.Connector.MongoDB.Tests/MongoDBTestStream.cs new file mode 100644 index 000000000..752aacb8c --- /dev/null +++ b/tests/FlowtideDotNet.Connector.MongoDB.Tests/MongoDBTestStream.cs @@ -0,0 +1,50 @@ +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using FlowtideDotNet.AcceptanceTests.Internal; +using FlowtideDotNet.Connector.MongoDB.Extensions; +using FlowtideDotNet.Core.Engine; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace FlowtideDotNet.Connector.MongoDB.Tests +{ + internal class MongoDBTestStream : FlowtideTestStream + { + private readonly MongoDBFixture mongoDBFixture; + private readonly string databaseName; + private readonly string collection; + private readonly List primaryKeys; + + public MongoDBTestStream(MongoDBFixture mongoDBFixture, string databaseName, string collection, List primaryKeys, string testName) : base(testName) + { + this.mongoDBFixture = mongoDBFixture; + this.databaseName = databaseName; + this.collection = collection; + this.primaryKeys = primaryKeys; + } + + protected override void AddWriteResolvers(ReadWriteFactory factory) + { + factory.AddMongoDbSink("*", new FlowtideMongoDBSinkOptions() + { + Collection = collection, + Database = databaseName, + ConnectionString = mongoDBFixture.GetConnectionString(), + PrimaryKeys = primaryKeys + }); + } + } +} diff --git a/tests/FlowtideDotNet.Connector.MongoDB.Tests/SinkTests.cs b/tests/FlowtideDotNet.Connector.MongoDB.Tests/SinkTests.cs new file mode 100644 index 000000000..306dbb3e4 --- /dev/null +++ b/tests/FlowtideDotNet.Connector.MongoDB.Tests/SinkTests.cs @@ -0,0 +1,66 @@ +using MongoDB.Bson; +using MongoDB.Driver; +using System.IO; + +namespace FlowtideDotNet.Connector.MongoDB.Tests +{ + public class SinkTests : IClassFixture + { + private readonly MongoDBFixture mongoDBFixture; + + public SinkTests(MongoDBFixture mongoDBFixture) + { + this.mongoDBFixture = mongoDBFixture; + } + + [Fact] + public async Task TestInsert() + { + MongoDBTestStream testStream = new MongoDBTestStream( + mongoDBFixture, + "test", + "test", + new List() { "UserKey" }, "test"); + + testStream.Generate(); + await testStream.StartStream(@" + INSERT INTO testindex + SELECT + UserKey, + FirstName, + LastName + FROM users + "); + + var mongoClient = new MongoClient(mongoDBFixture.GetConnectionString()); + var database = mongoClient.GetDatabase("test"); + var collection = database.GetCollection("test"); + + + while (true) + { + var count = await collection.CountDocumentsAsync(new BsonDocument()); + if (count == 1000) + { + break; + } + await Task.Delay(100); + } + + var user = testStream.Users.First(); + user.FirstName = "updated"; + testStream.AddOrUpdateUser(user); + + while (true) + { + var doc = collection.Find(Builders.Filter.Eq("UserKey", new BsonInt64(user.UserKey))).FirstOrDefault(); + if (doc?.GetElement("FirstName").Value.AsString == "updated") + { + break; + } + await testStream.SchedulerTick(); + await Task.Delay(100); + } + } + } +} \ No newline at end of file