diff --git a/src/FlowtideDotNet.Connector.ElasticSearch/Extensions/FlowtideElasticsearchReadWriteFactoryExtensions.cs b/src/FlowtideDotNet.Connector.ElasticSearch/Extensions/FlowtideElasticsearchReadWriteFactoryExtensions.cs index 20444fcba..b75bcdc90 100644 --- a/src/FlowtideDotNet.Connector.ElasticSearch/Extensions/FlowtideElasticsearchReadWriteFactoryExtensions.cs +++ b/src/FlowtideDotNet.Connector.ElasticSearch/Extensions/FlowtideElasticsearchReadWriteFactoryExtensions.cs @@ -38,7 +38,7 @@ public static ReadWriteFactory AddElasticsearchSink( } transform?.Invoke(writeRel); - var sink = new ElasticSearchSink(writeRel, options, Operators.Write.ExecutionMode.OnCheckpoint, opt); + var sink = new ElasticSearchSink(writeRel, options, options.ExecutionMode, opt); sink.CreateIndexAndMappings(); return sink; }); diff --git a/src/FlowtideDotNet.Connector.ElasticSearch/FlowtideElasticsearchOptions.cs b/src/FlowtideDotNet.Connector.ElasticSearch/FlowtideElasticsearchOptions.cs index ce5f73011..c516adee1 100644 --- a/src/FlowtideDotNet.Connector.ElasticSearch/FlowtideElasticsearchOptions.cs +++ b/src/FlowtideDotNet.Connector.ElasticSearch/FlowtideElasticsearchOptions.cs @@ -1,4 +1,5 @@ -using FlowtideDotNet.Substrait.Relations; +using FlowtideDotNet.Core.Operators.Write; +using FlowtideDotNet.Substrait.Relations; using Nest; namespace FlowtideDotNet.Connector.ElasticSearch @@ -24,5 +25,7 @@ public class FlowtideElasticsearchOptions /// This function can be used for instance to create an alias to the index. /// public Func? OnInitialDataSent { get; set; } + + public ExecutionMode ExecutionMode { get; set; } = ExecutionMode.Hybrid; } } \ No newline at end of file diff --git a/src/FlowtideDotNet.Connector.MongoDB/Extensions/FlowtideMongoDbReadWriteFactoryExtensions.cs b/src/FlowtideDotNet.Connector.MongoDB/Extensions/FlowtideMongoDbReadWriteFactoryExtensions.cs index 6a8aa6daf..0e44a9a50 100644 --- a/src/FlowtideDotNet.Connector.MongoDB/Extensions/FlowtideMongoDbReadWriteFactoryExtensions.cs +++ b/src/FlowtideDotNet.Connector.MongoDB/Extensions/FlowtideMongoDbReadWriteFactoryExtensions.cs @@ -34,7 +34,7 @@ public static ReadWriteFactory AddMongoDbSink(this ReadWriteFactory factory, str } transform?.Invoke(writeRel); - return new MongoDBSink(options, writeRel, Core.Operators.Write.ExecutionMode.OnCheckpoint, opt); + return new MongoDBSink(options, writeRel, options.ExecutionMode, opt); }); return factory; } diff --git a/src/FlowtideDotNet.Connector.MongoDB/FlowtideMongoDBSinkOptions.cs b/src/FlowtideDotNet.Connector.MongoDB/FlowtideMongoDBSinkOptions.cs index a7874043c..9b0bf2d8f 100644 --- a/src/FlowtideDotNet.Connector.MongoDB/FlowtideMongoDBSinkOptions.cs +++ b/src/FlowtideDotNet.Connector.MongoDB/FlowtideMongoDBSinkOptions.cs @@ -11,6 +11,7 @@ // limitations under the License. using FlowtideDotNet.Base; +using FlowtideDotNet.Core.Operators.Write; using MongoDB.Bson; using MongoDB.Driver; @@ -41,5 +42,7 @@ public class FlowtideMongoDBSinkOptions /// Set the amount of batches that will be sent in parallel to mongodb. /// public int ParallelBatches { get; set; } = 10; + + public ExecutionMode ExecutionMode { get; set; } = ExecutionMode.Hybrid; } } diff --git a/src/FlowtideDotNet.Connector.OpenFGA/Extensions/OpenFGAReadWriteFactoryExtensions.cs b/src/FlowtideDotNet.Connector.OpenFGA/Extensions/OpenFGAReadWriteFactoryExtensions.cs index b3525ab34..954bb6de8 100644 --- a/src/FlowtideDotNet.Connector.OpenFGA/Extensions/OpenFGAReadWriteFactoryExtensions.cs +++ b/src/FlowtideDotNet.Connector.OpenFGA/Extensions/OpenFGAReadWriteFactoryExtensions.cs @@ -45,7 +45,7 @@ public static ReadWriteFactory AddOpenFGASink( } transform?.Invoke(writeRel); - var sink = new FlowtideOpenFgaSink(options, writeRel, ExecutionMode.OnCheckpoint, opt); + var sink = new FlowtideOpenFgaSink(options, writeRel, options.ExecutionMode, opt); return sink; }); return factory; diff --git a/src/FlowtideDotNet.Connector.OpenFGA/OpenFGASinkOptions.cs b/src/FlowtideDotNet.Connector.OpenFGA/OpenFGASinkOptions.cs index 0bd68285f..0cd8ed3f6 100644 --- a/src/FlowtideDotNet.Connector.OpenFGA/OpenFGASinkOptions.cs +++ b/src/FlowtideDotNet.Connector.OpenFGA/OpenFGASinkOptions.cs @@ -11,6 +11,7 @@ // limitations under the License. using FlowtideDotNet.Base; +using FlowtideDotNet.Core.Operators.Write; using OpenFga.Sdk.Client; using OpenFga.Sdk.Client.Model; using OpenFga.Sdk.Model; @@ -72,5 +73,7 @@ public class OpenFgaSinkOptions /// will be deleted after the initial loading of data has completed. /// public Func>? DeleteExistingDataFetcher { get; set; } + + public ExecutionMode ExecutionMode { get; set; } = ExecutionMode.Hybrid; } } diff --git a/src/FlowtideDotNet.Connector.SpiceDB/Extensions/SpiceDbReadWriteFactoryExtensions.cs b/src/FlowtideDotNet.Connector.SpiceDB/Extensions/SpiceDbReadWriteFactoryExtensions.cs index 850d2e72d..b6916bbda 100644 --- a/src/FlowtideDotNet.Connector.SpiceDB/Extensions/SpiceDbReadWriteFactoryExtensions.cs +++ b/src/FlowtideDotNet.Connector.SpiceDB/Extensions/SpiceDbReadWriteFactoryExtensions.cs @@ -39,7 +39,7 @@ public static ReadWriteFactory AddSpiceDbSink(this ReadWriteFactory factory, str return null; } transform?.Invoke(writeRel); - return new SpiceDbSink(spiceDbSinkOptions, writeRel, Core.Operators.Write.ExecutionMode.OnCheckpoint, opt); + return new SpiceDbSink(spiceDbSinkOptions, writeRel, spiceDbSinkOptions.ExecutionMode, opt); }); return factory; } diff --git a/src/FlowtideDotNet.Connector.SpiceDB/SpiceDbSinkOptions.cs b/src/FlowtideDotNet.Connector.SpiceDB/SpiceDbSinkOptions.cs index bb385e8ff..70786b84c 100644 --- a/src/FlowtideDotNet.Connector.SpiceDB/SpiceDbSinkOptions.cs +++ b/src/FlowtideDotNet.Connector.SpiceDB/SpiceDbSinkOptions.cs @@ -12,6 +12,7 @@ using Authzed.Api.V1; using FlowtideDotNet.Base; +using FlowtideDotNet.Core.Operators.Write; using Grpc.Core; using System; using System.Collections.Generic; @@ -58,5 +59,7 @@ public class SpiceDbSinkOptions public Func? OnInitialDataSentFunc { get; set; } public int MaxParallellCalls { get; set; } = 4; + + public ExecutionMode ExecutionMode { get; set; } = ExecutionMode.Hybrid; } } diff --git a/src/FlowtideDotNet.Core/Operators/Write/SimpleGroupedWriteOperator.cs b/src/FlowtideDotNet.Core/Operators/Write/SimpleGroupedWriteOperator.cs index 09e6bba60..89bcea2ec 100644 --- a/src/FlowtideDotNet.Core/Operators/Write/SimpleGroupedWriteOperator.cs +++ b/src/FlowtideDotNet.Core/Operators/Write/SimpleGroupedWriteOperator.cs @@ -43,7 +43,11 @@ public MetadataResult(IReadOnlyList primaryKeyColumns) public enum ExecutionMode { OnCheckpoint = 0, - OnWatermark = 1 + OnWatermark = 1, + /// + /// Hybrid mode starts with on checkpoint and then switches to on watermark after initial data + /// + Hybrid = 2 } public readonly struct SimpleChangeEvent @@ -81,7 +85,8 @@ protected SimpleGroupedWriteOperator(ExecutionMode executionMode, ExecutionDataf protected override async Task Checkpoint(long checkpointTime) { Debug.Assert(_state != null); - if (m_executionMode == ExecutionMode.OnCheckpoint) + if (m_executionMode == ExecutionMode.OnCheckpoint || + (m_executionMode == ExecutionMode.Hybrid && !_state.SentInitialData)) { await SendData(); } @@ -215,8 +220,10 @@ protected virtual IAsyncEnumerable GetExistingData() protected override Task OnWatermark(Watermark watermark) { + Debug.Assert(_state != null); _latestWatermark = watermark; - if (m_executionMode == ExecutionMode.OnWatermark) + if (m_executionMode == ExecutionMode.OnWatermark || + (m_executionMode == ExecutionMode.Hybrid && _state.SentInitialData)) { return SendData(); }