Skip to content

Commit

Permalink
Add support for MongoDB as a sink (#181)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ulimo authored Dec 6, 2023
1 parent c08684c commit b47016b
Show file tree
Hide file tree
Showing 15 changed files with 540 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,7 @@ jobs:
run: dotnet pack -c Release -o . src/FlowtideDotNet.Connector.CosmosDB/FlowtideDotNet.Connector.CosmosDB.csproj
- name: Package elasticsearch
run: dotnet pack -c Release -o . src/FlowtideDotNet.Connector.ElasticSearch/FlowtideDotNet.Connector.ElasticSearch.csproj
- name: Package mongodb
run: dotnet pack -c Release -o . src/FlowtideDotNet.Connector.MongoDB/FlowtideDotNet.Connector.MongoDB.csproj
- name: Publish
run: dotnet nuget push *.nupkg -k ${{ secrets.NUGET_KEY }} -s https://api.nuget.org/v3/index.json
14 changes: 14 additions & 0 deletions Flowtide.sln
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FlowtideDotNet.Connector.El
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FlowtideDotNet.Connector.SqlServer", "src\FlowtideDotNet.Connector.SqlServer\FlowtideDotNet.Connector.SqlServer.csproj", "{B80C06C0-B88D-44D7-958B-AD41CB3CE16B}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FlowtideDotNet.Connector.MongoDB", "src\FlowtideDotNet.Connector.MongoDB\FlowtideDotNet.Connector.MongoDB.csproj", "{197D0AAD-A6ED-412A-A164-905145A796D5}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "FlowtideDotNet.Connector.MongoDB.Tests", "tests\FlowtideDotNet.Connector.MongoDB.Tests\FlowtideDotNet.Connector.MongoDB.Tests.csproj", "{45516C5A-FB7A-4EB2-9F84-48A58F838A0A}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -141,6 +145,14 @@ Global
{B80C06C0-B88D-44D7-958B-AD41CB3CE16B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B80C06C0-B88D-44D7-958B-AD41CB3CE16B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B80C06C0-B88D-44D7-958B-AD41CB3CE16B}.Release|Any CPU.Build.0 = Release|Any CPU
{197D0AAD-A6ED-412A-A164-905145A796D5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{197D0AAD-A6ED-412A-A164-905145A796D5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{197D0AAD-A6ED-412A-A164-905145A796D5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{197D0AAD-A6ED-412A-A164-905145A796D5}.Release|Any CPU.Build.0 = Release|Any CPU
{45516C5A-FB7A-4EB2-9F84-48A58F838A0A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{45516C5A-FB7A-4EB2-9F84-48A58F838A0A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{45516C5A-FB7A-4EB2-9F84-48A58F838A0A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{45516C5A-FB7A-4EB2-9F84-48A58F838A0A}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -167,6 +179,8 @@ Global
{F57CC67D-A066-4B30-BBFC-517C4E35A7E8} = {2E41A9E4-8F33-4708-81F2-1626EBEE6114}
{155EE952-A173-4DAA-91C5-6424B7F5EF9E} = {16CED581-7A8D-4806-B6F6-C544D030CC95}
{B80C06C0-B88D-44D7-958B-AD41CB3CE16B} = {2E41A9E4-8F33-4708-81F2-1626EBEE6114}
{197D0AAD-A6ED-412A-A164-905145A796D5} = {2E41A9E4-8F33-4708-81F2-1626EBEE6114}
{45516C5A-FB7A-4EB2-9F84-48A58F838A0A} = {16CED581-7A8D-4806-B6F6-C544D030CC95}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {314C9503-DE99-42AA-8FB0-E355C91986C4}
Expand Down
31 changes: 31 additions & 0 deletions docs/docs/connectors/mongodb.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
---
sidebar_position: 2
---

# MongoDB Connector

The MongoDB connector allows you to insert data into a MongoDB collection.
At this time only a sink is implemented, there is no support yet to have MongoDB as a source.

## Sink

The MongoDB sink allows the insertion of data into a MongoDB collection.

The nuget package for the connector is:

* FlowtideDotNet.Connector.MongoDB

Its implementation waits fully until the stream has reached a steady state at a time T until it writes data to the collection.
This means that its table output can always be traced back to a state from the source systems.

To use the *MongoDB Sink* add the following line to the *ReadWriteFactory*:

```csharp
factory.AddMongoDbSink("regex pattern for tablename", new FlowtideMongoDBSinkOptions()
{
Collection = collection, //MongoDB collection
Database = databaseName, // MongoDB database
ConnectionString = connectionString, //Connection string to MongoDB
PrimaryKeys = primaryKeys //List of columns that will be treated as primary keys in the collection
});
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Licensed under the Apache License, Version 2.0 (the "License")
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using FlowtideDotNet.Connector.MongoDB.Internal;
using FlowtideDotNet.Core.Engine;
using FlowtideDotNet.Substrait.Relations;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading.Tasks;

namespace FlowtideDotNet.Connector.MongoDB.Extensions
{
public static class FlowtideMongoDbReadWriteFactoryExtensions
{
public static ReadWriteFactory AddMongoDbSink(this ReadWriteFactory factory, string regexPattern, FlowtideMongoDBSinkOptions options, Action<WriteRelation>? transform = null)
{
if (regexPattern == "*")
{
regexPattern = ".*";
}
factory.AddWriteResolver((writeRel, opt) =>
{
var regexResult = Regex.Match(writeRel.NamedObject.DotSeperated, regexPattern, RegexOptions.IgnoreCase);
if (!regexResult.Success)
{
return null;
}
transform?.Invoke(writeRel);

return new MongoDBSink(options, writeRel, Core.Operators.Write.ExecutionMode.OnCheckpoint, opt);
});
return factory;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net7.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="MongoDB.Driver" Version="2.22.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\FlowtideDotNet.Core\FlowtideDotNet.Core.csproj" />
</ItemGroup>

</Project>
31 changes: 31 additions & 0 deletions src/FlowtideDotNet.Connector.MongoDB/FlowtideMongoDBSinkOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Licensed under the Apache License, Version 2.0 (the "License")
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace FlowtideDotNet.Connector.MongoDB
{
public class FlowtideMongoDBSinkOptions
{
public string ConnectionString { get; set; }

public string Database { get; set; }

public string Collection { get; set; }

public List<string> PrimaryKeys { get; set; }
}
}
109 changes: 109 additions & 0 deletions src/FlowtideDotNet.Connector.MongoDB/Internal/MongoDBSink.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Licensed under the Apache License, Version 2.0 (the "License")
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using FlowtideDotNet.Core.Operators.Write;
using FlowtideDotNet.Substrait.Relations;
using Microsoft.Extensions.Options;
using MongoDB.Bson;
using MongoDB.Driver;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace FlowtideDotNet.Connector.MongoDB.Internal
{
internal class MongoDBSink : SimpleGroupedWriteOperator
{
private readonly FlowtideMongoDBSinkOptions options;
private readonly WriteRelation writeRelation;
private readonly StreamEventToBson streamEventToBson;
private IMongoCollection<BsonDocument> collection;
List<int> primaryKeys;

public MongoDBSink(FlowtideMongoDBSinkOptions options, WriteRelation writeRelation, ExecutionMode executionMode, ExecutionDataflowBlockOptions executionDataflowBlockOptions) : base(executionMode, executionDataflowBlockOptions)
{
this.options = options;
this.writeRelation = writeRelation;
streamEventToBson = new StreamEventToBson(writeRelation.TableSchema.Names);
}

public override string DisplayName => "MongoDB Sink";

protected override Task<MetadataResult> SetupAndLoadMetadataAsync()
{
var connection = new MongoUrl(options.ConnectionString);
var client = new MongoClient(connection);
var database = client.GetDatabase(options.Database);
collection = database.GetCollection<BsonDocument>(options.Collection);
primaryKeys = new List<int>();
foreach (var primaryKey in options.PrimaryKeys)
{
var index = writeRelation.TableSchema.Names.FindIndex(x => x.Equals(primaryKey, StringComparison.OrdinalIgnoreCase));
if (index < 0)
{
throw new InvalidOperationException($"Primary key '{primaryKey}' not found in table schema");
}
primaryKeys.Add(index);
}
return Task.FromResult(new MetadataResult(primaryKeys));
}

protected override async Task UploadChanges(IAsyncEnumerable<SimpleChangeEvent> rows)
{
List<WriteModel<BsonDocument>> writes = new List<WriteModel<BsonDocument>>();
await foreach(var row in rows)
{
FilterDefinition<BsonDocument>[] filters = new FilterDefinition<BsonDocument>[primaryKeys.Count];
for (int i = 0; i < primaryKeys.Count; i++)
{
var pkname = options.PrimaryKeys[i];
var col = row.Row.GetColumn(i);
// Need to take the row value into a bson value
filters[i] = Builders<BsonDocument>.Filter.Eq(pkname, StreamEventToBson.ToBsonValue(col));
}
FilterDefinition<BsonDocument>? filter = null;
if (filters.Length > 1)
{
filter = Builders<BsonDocument>.Filter.And(filters);
}
else
{
filter = filters[0];
}
if (row.IsDeleted)
{
writes.Add(new DeleteOneModel<BsonDocument>(filter));
}
else
{
var doc = streamEventToBson.ToBson(row.Row);
writes.Add(new ReplaceOneModel<BsonDocument>(filter, doc) { IsUpsert = true });
}

if (writes.Count >= 1000)
{
await collection.BulkWriteAsync(writes);
writes.Clear();
}
}

if (writes.Count > 0)
{
await collection.BulkWriteAsync(writes);
writes.Clear();
}
}
}
}
91 changes: 91 additions & 0 deletions src/FlowtideDotNet.Connector.MongoDB/Internal/StreamEventToBson.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Licensed under the Apache License, Version 2.0 (the "License")
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using FlexBuffers;
using FlowtideDotNet.Core;
using MongoDB.Bson;

namespace FlowtideDotNet.Connector.MongoDB.Internal
{
internal class StreamEventToBson
{
private readonly List<string> names;

public StreamEventToBson(List<string> names)
{
this.names = names;
}

public BsonDocument ToBson(in StreamEvent streamEvent)
{
var doc = new BsonDocument();

for (int i = 0; i < names.Count; i++)
{
doc.Add(names[i], ToBsonValue(streamEvent.GetColumn(i)));
}

return doc;
}

public static BsonValue ToBsonValue(FlxValue flxValue)
{
switch (flxValue.ValueType)
{
case FlexBuffers.Type.Null:
return BsonNull.Value;
case FlexBuffers.Type.Bool:
return new BsonBoolean(flxValue.AsBool);
case FlexBuffers.Type.Int:
return new BsonInt64(flxValue.AsLong);
case FlexBuffers.Type.Uint:
return new BsonInt64((long)flxValue.AsULong);
case FlexBuffers.Type.Float:
return new BsonDouble((long)flxValue.AsULong);
case FlexBuffers.Type.String:
return new BsonString(flxValue.AsString);
case FlexBuffers.Type.Key:
return new BsonString(flxValue.AsString);
case FlexBuffers.Type.Blob:
return new BsonBinaryData(flxValue.AsBlob.ToArray());
case FlexBuffers.Type.Decimal:
return new BsonDecimal128(flxValue.AsDecimal);
case FlexBuffers.Type.Vector:
return ToBsonArray(flxValue.AsVector);
case FlexBuffers.Type.Map:
return ToBsonDocument(flxValue.AsMap);
default:
throw new ArgumentOutOfRangeException();
}
}

private static BsonValue ToBsonArray(FlxVector values)
{
var arr = new BsonArray();
for (int i = 0; i < values.Length; i++)
{
arr.Add(ToBsonValue(values[i]));
}
return arr;
}

private static BsonValue ToBsonDocument(FlxMap map)
{
BsonDocument doc = new BsonDocument();
foreach (var kv in map)
{
doc.Add(kv.Key, ToBsonValue(kv.Value));
}
return doc;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ private async Task SendData()
{
var rowIterator = GetChangedRows();
await UploadChanges(rowIterator);
await m_modified.Clear();
m_hasModified = false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,12 @@ public void EgressCrashOnCheckpoint(int times)
_egressCrashOnCheckpointCount = times;
}

public async Task SchedulerTick()
{
var scheduler = _stream.Scheduler as DefaultStreamScheduler;
await scheduler!.Tick();
}

public async Task WaitForUpdate()
{
Debug.Assert(_stream != null);
Expand Down
Loading

0 comments on commit b47016b

Please sign in to comment.