Skip to content

Commit

Permalink
Add support to set minimum time between checkpoints. (#413)
Browse files Browse the repository at this point in the history
* Add support to set minimum time between checkpoints.

This is useful when uploading data on watermark or in hybrid mode.

* fix code smell

* Add check so initial checkpoint is always taken

* Add to flowtide builder
  • Loading branch information
Ulimo authored Mar 19, 2024
1 parent 3f164e1 commit e913728
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 0 deletions.
6 changes: 6 additions & 0 deletions src/FlowtideDotNet.Base/Engine/DataflowStreamBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ public DataflowStreamBuilder WaitForCheckpointAfterInitialData(bool wait)
return this;
}

public DataflowStreamBuilder SetMinimumTimeBetweenCheckpoint(TimeSpan timeSpan)
{
_dataflowStreamOptions.MinimumTimeBetweenCheckpoints = timeSpan;
return this;
}

public DataflowStream Build()
{
if (_stateManagerOptions == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,7 @@ namespace FlowtideDotNet.Base.Engine.Internal
internal class DataflowStreamOptions
{
public bool WaitForCheckpointAfterInitialData { get; set; } = true;

public TimeSpan? MinimumTimeBetweenCheckpoints { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ private void CheckpointCompleted()

lock (_context._checkpointLock)
{
_context._initialCheckpointTaken = true;
if (_context.checkpointTask != null)
{
_context._scheduleCheckpointTask = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ internal class StreamContext : IStreamTriggerCaller, IAsyncDisposable

internal FlowtideDotNet.Storage.StateManager.StateManagerSync<StreamState> _stateManager;
internal readonly ILogger<StreamContext> _logger;

internal bool _initialCheckpointTaken = false;


public StreamContext(
Expand Down Expand Up @@ -336,6 +338,15 @@ internal void TryScheduleCheckpointIn(TimeSpan timeSpan)
internal bool TryScheduleCheckpointIn_NoLock(TimeSpan timeSpan)
{
Debug.Assert(Monitor.IsEntered(_checkpointLock));

// Check if minimum time has been set, if so default it to the minimum time.
if (_dataflowStreamOptions.MinimumTimeBetweenCheckpoints != null &&
_initialCheckpointTaken &&
_dataflowStreamOptions.MinimumTimeBetweenCheckpoints.Value.CompareTo(timeSpan) > 0)
{
timeSpan = _dataflowStreamOptions.MinimumTimeBetweenCheckpoints.Value;
}

var triggerTime = DateTime.Now.Add(timeSpan);

// Check if a checkpoint is already running, if so, add that a checkpoint is waiting
Expand Down
6 changes: 6 additions & 0 deletions src/FlowtideDotNet.Core/Engine/FlowtideBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ public FlowtideBuilder SetGetTimestampUpdateInterval(TimeSpan interval)
return this;
}

public FlowtideBuilder SetMinimumTimeBetweenCheckpoint(TimeSpan timeSpan)
{
dataflowStreamBuilder.SetMinimumTimeBetweenCheckpoint(timeSpan);
return this;
}

private string ComputePlanHash()
{
Debug.Assert(_plan != null, "Plan should not be null.");
Expand Down

0 comments on commit e913728

Please sign in to comment.