Skip to content

Commit

Permalink
Add support for hybrid execution mode (#411)
Browse files Browse the repository at this point in the history
Hybrid execution mode starts with on checkpoint and then switches to on watermark.
  • Loading branch information
Ulimo authored Mar 17, 2024
1 parent dffdfe0 commit b888a1c
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static ReadWriteFactory AddElasticsearchSink(
}
transform?.Invoke(writeRel);

var sink = new ElasticSearchSink(writeRel, options, Operators.Write.ExecutionMode.OnCheckpoint, opt);
var sink = new ElasticSearchSink(writeRel, options, options.ExecutionMode, opt);
sink.CreateIndexAndMappings();
return sink;
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using FlowtideDotNet.Substrait.Relations;
using FlowtideDotNet.Core.Operators.Write;
using FlowtideDotNet.Substrait.Relations;
using Nest;

namespace FlowtideDotNet.Connector.ElasticSearch
Expand All @@ -24,5 +25,7 @@ public class FlowtideElasticsearchOptions
/// This function can be used for instance to create an alias to the index.
/// </summary>
public Func<IElasticClient, WriteRelation, string, Task>? OnInitialDataSent { get; set; }

public ExecutionMode ExecutionMode { get; set; } = ExecutionMode.Hybrid;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public static ReadWriteFactory AddMongoDbSink(this ReadWriteFactory factory, str
}
transform?.Invoke(writeRel);

return new MongoDBSink(options, writeRel, Core.Operators.Write.ExecutionMode.OnCheckpoint, opt);
return new MongoDBSink(options, writeRel, options.ExecutionMode, opt);
});
return factory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// limitations under the License.

using FlowtideDotNet.Base;
using FlowtideDotNet.Core.Operators.Write;
using MongoDB.Bson;
using MongoDB.Driver;

Expand Down Expand Up @@ -41,5 +42,7 @@ public class FlowtideMongoDBSinkOptions
/// Set the amount of batches that will be sent in parallel to mongodb.
/// </summary>
public int ParallelBatches { get; set; } = 10;

public ExecutionMode ExecutionMode { get; set; } = ExecutionMode.Hybrid;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public static ReadWriteFactory AddOpenFGASink(
}
transform?.Invoke(writeRel);

var sink = new FlowtideOpenFgaSink(options, writeRel, ExecutionMode.OnCheckpoint, opt);
var sink = new FlowtideOpenFgaSink(options, writeRel, options.ExecutionMode, opt);
return sink;
});
return factory;
Expand Down
3 changes: 3 additions & 0 deletions src/FlowtideDotNet.Connector.OpenFGA/OpenFGASinkOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// limitations under the License.

using FlowtideDotNet.Base;
using FlowtideDotNet.Core.Operators.Write;
using OpenFga.Sdk.Client;
using OpenFga.Sdk.Client.Model;
using OpenFga.Sdk.Model;
Expand Down Expand Up @@ -72,5 +73,7 @@ public class OpenFgaSinkOptions
/// will be deleted after the initial loading of data has completed.
/// </summary>
public Func<OpenFgaClient, IAsyncEnumerable<TupleKey>>? DeleteExistingDataFetcher { get; set; }

public ExecutionMode ExecutionMode { get; set; } = ExecutionMode.Hybrid;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public static ReadWriteFactory AddSpiceDbSink(this ReadWriteFactory factory, str
return null;
}
transform?.Invoke(writeRel);
return new SpiceDbSink(spiceDbSinkOptions, writeRel, Core.Operators.Write.ExecutionMode.OnCheckpoint, opt);
return new SpiceDbSink(spiceDbSinkOptions, writeRel, spiceDbSinkOptions.ExecutionMode, opt);
});
return factory;
}
Expand Down
3 changes: 3 additions & 0 deletions src/FlowtideDotNet.Connector.SpiceDB/SpiceDbSinkOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

using Authzed.Api.V1;
using FlowtideDotNet.Base;
using FlowtideDotNet.Core.Operators.Write;
using Grpc.Core;
using System;
using System.Collections.Generic;
Expand Down Expand Up @@ -58,5 +59,7 @@ public class SpiceDbSinkOptions
public Func<Task>? OnInitialDataSentFunc { get; set; }

public int MaxParallellCalls { get; set; } = 4;

public ExecutionMode ExecutionMode { get; set; } = ExecutionMode.Hybrid;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ public MetadataResult(IReadOnlyList<int> primaryKeyColumns)
public enum ExecutionMode
{
OnCheckpoint = 0,
OnWatermark = 1
OnWatermark = 1,
/// <summary>
/// Hybrid mode starts with on checkpoint and then switches to on watermark after initial data
/// </summary>
Hybrid = 2
}

public readonly struct SimpleChangeEvent
Expand Down Expand Up @@ -81,7 +85,8 @@ protected SimpleGroupedWriteOperator(ExecutionMode executionMode, ExecutionDataf
protected override async Task<SimpleWriteState> Checkpoint(long checkpointTime)
{
Debug.Assert(_state != null);
if (m_executionMode == ExecutionMode.OnCheckpoint)
if (m_executionMode == ExecutionMode.OnCheckpoint ||
(m_executionMode == ExecutionMode.Hybrid && !_state.SentInitialData))
{
await SendData();
}
Expand Down Expand Up @@ -215,8 +220,10 @@ protected virtual IAsyncEnumerable<RowEvent> GetExistingData()

protected override Task OnWatermark(Watermark watermark)
{
Debug.Assert(_state != null);
_latestWatermark = watermark;
if (m_executionMode == ExecutionMode.OnWatermark)
if (m_executionMode == ExecutionMode.OnWatermark ||
(m_executionMode == ExecutionMode.Hybrid && _state.SentInitialData))
{
return SendData();
}
Expand Down

0 comments on commit b888a1c

Please sign in to comment.