Skip to content

Commit

Permalink
Fix invalid restores in storage system on crash (#179)
Browse files Browse the repository at this point in the history
This removes the change to pass in a function for state options.
It fixes so compaction cannot cause an invalid state.
It also makes sure all state metadata is restored correctly.
  • Loading branch information
Ulimo authored Dec 4, 2023
1 parent c7d7e0f commit 763fbaf
Show file tree
Hide file tree
Showing 24 changed files with 133 additions and 55 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ builder.Services.AddFlowtideStream(b =>
{
b.AddPlan(plan)
.AddReadWriteFactory(factory)
.WithStateOptions(() => new StateManagerOptions()
.WithStateOptions(new StateManagerOptions()
{
// This is non persistent storage, use FasterKV persistence storage instead if you want persistent storage
PersistentStorage = new FileCachePersistentStorage(new FlowtideDotNet.Storage.FileCacheOptions()
Expand Down
4 changes: 2 additions & 2 deletions docs/docs/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ builder.Services.AddFlowtideStream(b =>
{
b.AddPlan(plan)
.AddReadWriteFactory(factory)
.WithStateOptions(() => new StateManagerOptions()
.WithStateOptions(new StateManagerOptions()
{
// This is non persistent storage, use FasterKV persistence storage instead if you want persistent storage
PersistentStorage = new FileCachePersistentStorage(new FlowtideDotNet.Storage.FileCacheOptions()
Expand Down Expand Up @@ -146,7 +146,7 @@ builder.Services.AddFlowtideStream(b =>
{
b.AddPlan(plan)
.AddReadWriteFactory(factory)
.WithStateOptions(() => new StateManagerOptions()
.WithStateOptions(new StateManagerOptions()
{
// This is non persistent storage, use FasterKV persistence storage instead if you want persistent storage
PersistentStorage = new FileCachePersistentStorage(new FlowtideDotNet.Storage.FileCacheOptions()
Expand Down
2 changes: 1 addition & 1 deletion samples/MonitoringAzureMonitor/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ LEFT JOIN other o
{
b.AddPlan(plan)
.AddReadWriteFactory(factory)
.WithStateOptions(() => new StateManagerOptions()
.WithStateOptions(new StateManagerOptions()
{
// This is non persistent storage, use FasterKV persistence storage instead if you want persistent storage
PersistentStorage = new FileCachePersistentStorage(new FlowtideDotNet.Storage.FileCacheOptions()
Expand Down
2 changes: 1 addition & 1 deletion samples/MonitoringPrometheus/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ LEFT JOIN other o
{
b.AddPlan(plan)
.AddReadWriteFactory(factory)
.WithStateOptions(() => new StateManagerOptions()
.WithStateOptions(new StateManagerOptions()
{
// This is non persistent storage, use FasterKV persistence storage instead if you want persistent storage
PersistentStorage = new FileCachePersistentStorage(new FlowtideDotNet.Storage.FileCacheOptions()
Expand Down
2 changes: 1 addition & 1 deletion samples/SqlSampleWithUI/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ LEFT JOIN other o
{
b.AddPlan(plan)
.AddReadWriteFactory(factory)
.WithStateOptions(() => new StateManagerOptions()
.WithStateOptions(new StateManagerOptions()
{
// This is non persistent storage, use FasterKV persistence storage instead if you want persistent storage
PersistentStorage = new FileCachePersistentStorage(new FlowtideDotNet.Storage.FileCacheOptions()
Expand Down
4 changes: 2 additions & 2 deletions src/FlowtideDotNet.Base/Engine/DataflowStreamBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class DataflowStreamBuilder
private readonly string _streamName;
private IStreamScheduler? _streamScheduler;
private IStreamNotificationReciever? _streamNotificationReciever;
private Func<StateManagerOptions>? _stateManagerOptions;
private StateManagerOptions? _stateManagerOptions;
private ILoggerFactory? _loggerFactory;
private StreamVersionInformation? _streamVersionInformation;

Expand Down Expand Up @@ -78,7 +78,7 @@ public DataflowStreamBuilder WithStateHandler(IStateHandler stateHandler)
return this;
}

public DataflowStreamBuilder WithStateOptions(Func<StateManagerOptions> stateManagerOptions)
public DataflowStreamBuilder WithStateOptions(StateManagerOptions stateManagerOptions)
{
_stateManagerOptions = stateManagerOptions;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public StreamContext(
StreamState? fromState,
IStreamScheduler streamScheduler,
IStreamNotificationReciever? notificationReciever,
Func<StateManagerOptions> stateManagerOptions,
StateManagerOptions stateManagerOptions,
ILoggerFactory? loggerFactory,
StreamVersionInformation? streamVersionInformation)
{
Expand Down
2 changes: 1 addition & 1 deletion src/FlowtideDotNet.Core/Engine/FlowtideBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public FlowtideBuilder AddReadWriteFactory(IReadWriteFactory readWriteFactory)
return this;
}

public FlowtideBuilder WithStateOptions(Func<StateManagerOptions> stateManagerOptions)
public FlowtideBuilder WithStateOptions(StateManagerOptions stateManagerOptions)
{
dataflowStreamBuilder.WithStateOptions(stateManagerOptions);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ public Task Delete(long key)
return Task.CompletedTask;
}

public void Dispose()
{
}

public ValueTask<byte[]> Read(long key)
{
return ValueTask.FromResult(fileCache.Read(key));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ public async Task Delete(long key)
_ = result.Complete();
}

public void Dispose()
{
session.Dispose();
}

public async ValueTask<byte[]> Read(long key)
{
var tokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(10));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ public async Task InitializeAsync()
public async ValueTask CompactAsync()
{
m_adminSession.Compact(m_persistentStorage.Log.SafeReadOnlyAddress, CompactionType.Lookup);
m_persistentStorage.Log.Truncate();
await m_persistentStorage.TakeFullCheckpointAsync(CheckpointType.Snapshot);
}

public ValueTask ResetAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

namespace FlowtideDotNet.Storage.Persistence
{
public interface IPersistentStorageSession
public interface IPersistentStorageSession : IDisposable
{
ValueTask<byte[]> Read(long key);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ internal class SyncStateClient<V, TMetadata> : StateClient, IStateClient<V, TMet
private bool disposedValue;
private readonly StateManagerSync stateManager;
private readonly long metadataId;
private readonly StateClientMetadata<TMetadata> metadata;
private StateClientMetadata<TMetadata> metadata;
private readonly IPersistentStorageSession session;
private readonly StateClientOptions<V> options;
private Dictionary<long, int> m_modified;
Expand Down Expand Up @@ -197,7 +197,7 @@ public void Dispose()
GC.SuppressFinalize(this);
}

public override ValueTask Reset(bool clearMetadata)
public override async ValueTask Reset(bool clearMetadata)
{
lock (m_lock)
{
Expand All @@ -207,12 +207,16 @@ public override ValueTask Reset(bool clearMetadata)
m_fileCache.Free(kv.Key);
}
m_modified.Clear();
if (clearMetadata)
{
Metadata = default;
}
}
return ValueTask.CompletedTask;
if (clearMetadata)
{
Metadata = default;
}
else
{
var bytes = await session.Read(metadataId);
metadata = StateClientMetadataSerializer.Instance.Deserialize<TMetadata>(new ByteMemoryOwner(bytes), bytes.Length);
}
}

public void Evict(List<(LinkedListNode<LruTableSync.LinkedListValue>, long)> valuesToEvict)
Expand Down
14 changes: 8 additions & 6 deletions src/FlowtideDotNet.Storage/StateManager/StateManagerSync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace FlowtideDotNet.Storage.StateManager
{
public class StateManagerSync<TMetadata> : StateManagerSync
{
public StateManagerSync(Func<StateManagerOptions> getOptions, ILogger logger) : base(new StateManagerMetadataSerializer<TMetadata>(), getOptions, logger)
public StateManagerSync(StateManagerOptions options, ILogger logger) : base(new StateManagerMetadataSerializer<TMetadata>(), options, logger)
{
}

Expand Down Expand Up @@ -58,9 +58,8 @@ public abstract class StateManagerSync : IStateManager, IDisposable
private LruTableSync? m_lruTable;
//private readonly FasterKV<long, SpanByte> m_persistentStorage;
private readonly IStateSerializer<StateManagerMetadata> m_metadataSerializer;
private readonly Func<StateManagerOptions> getOptions;
private readonly StateManagerOptions options;
private readonly ILogger logger;
private StateManagerOptions? options;
private readonly object m_lock = new object();
internal StateManagerMetadata? m_metadata;
//private Functions m_functions;
Expand All @@ -84,16 +83,15 @@ public abstract class StateManagerSync : IStateManager, IDisposable

public long PageCount => m_metadata != null ? Volatile.Read(ref m_metadata.PageCount) : throw new InvalidOperationException("Manager must be initialized before getting page count");

internal StateManagerSync(IStateSerializer<StateManagerMetadata> metadataSerializer, Func<StateManagerOptions> getOptions, ILogger logger)
internal StateManagerSync(IStateSerializer<StateManagerMetadata> metadataSerializer, StateManagerOptions options, ILogger logger)
{
this.m_metadataSerializer = metadataSerializer;
this.getOptions = getOptions;
this.options = options;
this.logger = logger;
}

private void Setup()
{
this.options = getOptions();
if (m_lruTable == null)
{
m_lruTable = new LruTableSync(options.CachePageCount, logger, options.MaxProcessMemory);
Expand Down Expand Up @@ -215,6 +213,10 @@ internal async ValueTask<IStateClient<TValue, TMetadata>> CreateClientAsync<TVal
var metadata = StateClientMetadataSerializer.Instance.Deserialize<TMetadata>(new ByteMemoryOwner(bytes), bytes.Length);
var persistentSession = m_persistentStorage.CreateSession();
var stateClient = new SyncStateClient<TValue, TMetadata>(this, client, location, metadata, persistentSession, options, m_fileCacheOptions);
lock (m_lock)
{
_stateClients.Add(client, stateClient);
}
return stateClient;
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public async Task StartStream(string sql, int parallelism = 1, StateSerializeOpt
.AddPlan(plan)
.SetParallelism(parallelism)
.AddReadWriteFactory(factory)
.WithStateOptions(() => new Storage.StateManager.StateManagerOptions()
.WithStateOptions(new Storage.StateManager.StateManagerOptions()
{
SerializeOptions = stateSerializeOptions,
PersistentStorage = _fileCachePersistence,
Expand Down
2 changes: 1 addition & 1 deletion tests/FlowtideDotNet.AspNetCore.WebTest/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
{
b.AddPlan(plan);
b.AddReadWriteFactory(factory);
b.WithStateOptions(() => new FlowtideDotNet.Storage.StateManager.StateManagerOptions()
b.WithStateOptions(new FlowtideDotNet.Storage.StateManager.StateManagerOptions()
{
PersistentStorage = new FileCachePersistentStorage(new FlowtideDotNet.Storage.FileCacheOptions()
{
Expand Down
4 changes: 2 additions & 2 deletions tests/FlowtideDotNet.Core.Tests/BuilderValidationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public async Task ValidateSamePlan()
var stream = new FlowtideBuilder("test")
.AddPlan(plan)
.AddReadWriteFactory(factory)
.WithStateOptions(() => new FlowtideDotNet.Storage.StateManager.StateManagerOptions()
.WithStateOptions(new FlowtideDotNet.Storage.StateManager.StateManagerOptions()
{
PersistentStorage = cache
})
Expand All @@ -141,7 +141,7 @@ public async Task ValidateSamePlan()
var stream2 = new FlowtideBuilder("test")
.AddPlan(plan2)
.AddReadWriteFactory(factory)
.WithStateOptions(() => new FlowtideDotNet.Storage.StateManager.StateManagerOptions()
.WithStateOptions(new FlowtideDotNet.Storage.StateManager.StateManagerOptions()
{
PersistentStorage = cache
})
Expand Down
2 changes: 1 addition & 1 deletion tests/FlowtideDotNet.Core.Tests/Failure/FailureTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ public async Task TestEgressFailure()
var tmpStorage = new InMemoryDeviceFactory();
tmpStorage.Initialize("./data/tmp");
FlowtideBuilder differentialComputeBuilder = new FlowtideBuilder("teststream")
.WithStateOptions(() => new FlowtideDotNet.Storage.StateManager.StateManagerOptions()
.WithStateOptions(new FlowtideDotNet.Storage.StateManager.StateManagerOptions()
{
CachePageCount = 100,
CheckpointDir = "./data",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private async Task InitTests()

var localStorage = new LocalStorageNamedDeviceFactory(deleteOnClose: true);
localStorage.Initialize("./data/temp");
stateManager = new StateManagerSync<object>(() => new StateManagerOptions()
stateManager = new StateManagerSync<object>(new StateManagerOptions()
{
CachePageCount = 1000,
PersistentStorage = new FileCachePersistentStorage(new FlowtideDotNet.Storage.FileCacheOptions())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private async Task StartStream<TResult>(string planLocation, Action<List<TResult
.AddPlan(modifiedPlan, false)
.AddReadWriteFactory(readWriteFactory)
.WithScheduler(_streamScheduler)
.WithStateOptions(() => new FlowtideDotNet.Storage.StateManager.StateManagerOptions()
.WithStateOptions(new FlowtideDotNet.Storage.StateManager.StateManagerOptions()
{
CachePageCount = 100000,
LogDevice = logDevice,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public void TestChangeTrackingError()
var stream = new FlowtideBuilder("stream")
.AddPlan(plan)
.AddReadWriteFactory(readWriteFactory)
.WithStateOptions(() => new Storage.StateManager.StateManagerOptions()
.WithStateOptions(new Storage.StateManager.StateManagerOptions()
{
PersistentStorage = new FileCachePersistentStorage(new Storage.FileCacheOptions())
})
Expand Down Expand Up @@ -198,7 +198,7 @@ public async Task PrimaryKeyOnlyColumnInSink()
}
};

var stateManager = new StateManagerSync<object>(() => new StateManagerOptions()
var stateManager = new StateManagerSync<object>(new StateManagerOptions()
{
CachePageCount = 1000,
PersistentStorage = new FileCachePersistentStorage(new FlowtideDotNet.Storage.FileCacheOptions())
Expand Down
2 changes: 1 addition & 1 deletion tests/FlowtideDotNet.Storage.Tests/BPlusTreeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ private async Task<IBPlusTree<long, string>> Init()
{
var localStorage = new LocalStorageNamedDeviceFactory(deleteOnClose: true);
localStorage.Initialize("./data/temp");
stateManager = new StateManager.StateManagerSync<object>(() => new StateManagerOptions()
stateManager = new StateManager.StateManagerSync<object>(new StateManagerOptions()
{
CachePageCount = 10,
PersistentStorage = new FileCachePersistentStorage(new FileCacheOptions())
Expand Down
Loading

0 comments on commit 763fbaf

Please sign in to comment.