From 440b6a0172a6c2c86dae8d3c607f1d7241854783 Mon Sep 17 00:00:00 2001 From: Ulimo Date: Tue, 14 Nov 2023 16:15:39 +0100 Subject: [PATCH] Enable state compaction (#118) State compaction happens when more than 30% of pages have been modified. --- .../StateMachine/RunningStreamState.cs | 13 ++- .../Internal/StateManagerMetadata.cs | 17 ++++ .../Internal/Sync/SyncStateClient.cs | 19 ++++ .../StateManager/StateManagerSync.cs | 91 ++----------------- 4 files changed, 54 insertions(+), 86 deletions(-) diff --git a/src/FlowtideDotNet.Base/Engine/Internal/StateMachine/RunningStreamState.cs b/src/FlowtideDotNet.Base/Engine/Internal/StateMachine/RunningStreamState.cs index c4e7aef58..6a36c5116 100644 --- a/src/FlowtideDotNet.Base/Engine/Internal/StateMachine/RunningStreamState.cs +++ b/src/FlowtideDotNet.Base/Engine/Internal/StateMachine/RunningStreamState.cs @@ -66,11 +66,14 @@ private void StartCheckpointDoneTask() await run._context.stateHandler.WriteLatestState(run._context.streamName, run._context._lastState); - // TODO: Enable, For now, compact every 100 checkpoints - //if (_currentCheckpoint.CheckpointTime % 100 == 0) - //{ - // await run._context._stateManager.Compact(); - //} + + // Compaction: if more than 30% of the pages has been changed since last compaction, do compaction + long changesSinceLastCompaction = run._context._stateManager.PageCommitsSinceLastCompaction; + var compactionThreshold = (long)(run._context._stateManager.PageCount * 0.3); + if (changesSinceLastCompaction > compactionThreshold) + { + await run._context._stateManager.Compact(); + } // After writing do compaction diff --git a/src/FlowtideDotNet.Storage/StateManager/Internal/StateManagerMetadata.cs b/src/FlowtideDotNet.Storage/StateManager/Internal/StateManagerMetadata.cs index fd7014648..e39d926fd 100644 --- a/src/FlowtideDotNet.Storage/StateManager/Internal/StateManagerMetadata.cs +++ b/src/FlowtideDotNet.Storage/StateManager/Internal/StateManagerMetadata.cs @@ -37,6 +37,23 @@ public StateManagerMetadata() public Dictionary ClientMetadataLocations { get; set; } + /// + /// Incremental counter for all commited pages. + /// Can be used to detect changes + /// + public ulong PageCommits; + + /// + /// Total amount of pages in the state manager + /// + public long PageCount; + + /// + /// The page commit number at the last compaction. + /// Can be used together with page commits to see how many changes have happened. + /// + public ulong PageCommitsAtLastCompaction { get; set; } + public void EnterWriteLock() { throw new NotImplementedException(); diff --git a/src/FlowtideDotNet.Storage/StateManager/Internal/Sync/SyncStateClient.cs b/src/FlowtideDotNet.Storage/StateManager/Internal/Sync/SyncStateClient.cs index cded80509..eb859ea36 100644 --- a/src/FlowtideDotNet.Storage/StateManager/Internal/Sync/SyncStateClient.cs +++ b/src/FlowtideDotNet.Storage/StateManager/Internal/Sync/SyncStateClient.cs @@ -11,6 +11,7 @@ // limitations under the License. using FlowtideDotNet.Storage.Persistence; +using System.Diagnostics; namespace FlowtideDotNet.Storage.StateManager.Internal.Sync { @@ -27,6 +28,11 @@ internal class SyncStateClient : StateClient, IStateClient + /// Value of how many pages have changed since last commit. + /// + private long newPages; + public SyncStateClient( StateManagerSync stateManager, string name, @@ -71,12 +77,16 @@ public ValueTask Commit() { lock (m_lock) { + foreach(var kv in m_modified) { if (kv.Value == -1) { // deleted session.Delete(kv.Key); + + // Remove a page from the new pages counter + Interlocked.Decrement(ref newPages); //stateManager.DeleteFromPersistentStore(kv.Key, session); continue; } @@ -104,6 +114,13 @@ public ValueTask Commit() //stateManager.WriteToPersistentStore(kv.Key, bytes, session); } } + var modifiedPagesCount = m_modified.Count; + Debug.Assert(stateManager.m_metadata != null); + // Add modified page count to the page commits counter + Interlocked.Add(ref stateManager.m_metadata.PageCommits, (ulong)modifiedPagesCount); + // Modify active pages + Interlocked.Add(ref stateManager.m_metadata.PageCount, newPages); + newPages = 0; m_modified.Clear(); { var bytes = StateClientMetadataSerializer.Instance.Serialize(metadata); @@ -126,6 +143,8 @@ public void Delete(in long key) public long GetNewPageId() { + // Add to the new pages counter + Interlocked.Increment(ref newPages); return stateManager.GetNewPageId(); } diff --git a/src/FlowtideDotNet.Storage/StateManager/StateManagerSync.cs b/src/FlowtideDotNet.Storage/StateManager/StateManagerSync.cs index d89255585..e70b6e6ff 100644 --- a/src/FlowtideDotNet.Storage/StateManager/StateManagerSync.cs +++ b/src/FlowtideDotNet.Storage/StateManager/StateManagerSync.cs @@ -72,6 +72,14 @@ public abstract class StateManagerSync : IStateManager, IDisposable public bool Initialized { get; private set; } + public ulong PageCommits => m_metadata != null ? Volatile.Read(ref m_metadata.PageCommits) : throw new InvalidOperationException("Manager must be initialized before getting page commits"); + + public ulong PageCommitsAtLastCompaction => m_metadata != null ? m_metadata.PageCommitsAtLastCompaction : throw new InvalidOperationException("Manager must be initialized before getting page commits"); + + public long PageCommitsSinceLastCompaction => (long)(PageCommits - PageCommitsAtLastCompaction); + + public long PageCount => m_metadata != null ? Volatile.Read(ref m_metadata.PageCount) : throw new InvalidOperationException("Manager must be initialized before getting page count"); + internal StateManagerSync(IStateSerializer metadataSerializer, StateManagerOptions options, ILogger logger) { m_lruTable = new LruTableSync(options.CachePageCount, logger); @@ -87,25 +95,8 @@ internal StateManagerSync(IStateSerializer metadataSeriali { m_persistentStorage = options.PersistentStorage; } - - - //m_persistentStorage = new FasterKvPersistentStorage(new FasterKVSettings() - //{ - // RemoveOutdatedCheckpoints = true, - // //MutableFraction = 0.1, - // MemorySize = 1024 * 1024 * 128, - // PageSize = 1024 * 1024 * 16, - // LogDevice = options.LogDevice, - // CheckpointManager = options.CheckpointManager, - // CheckpointDir = options.CheckpointDir, - // //ReadCacheEnabled = true, - // //ReadCopyOptions = ReadCopyOptions.None - // //ConcurrencyControlMode = ConcurrencyControlMode.RecordIsolation - //}); this.m_metadataSerializer = metadataSerializer; this.options = options; - //m_functions = new Functions(); - //m_adminSession = m_persistentStorage.For(m_functions).NewSession(m_functions); m_fileCacheOptions = options.TemporaryStorageOptions ?? new FileCacheOptions() { DirectoryPath = "./data/tempFiles" @@ -152,49 +143,6 @@ internal bool TryGetValueFromCache(in long key, out T? value) return false; } - internal void WriteToPersistentStore(in long key, in byte[] bytes, in ClientSession session) - { - var spanByte = SpanByte.FromPinnedMemory(bytes); - var status = session.Upsert(key, spanByte); - if (status.IsCompleted || status.IsPending) - { - session.CompletePending(true); - } - } - - internal void DeleteFromPersistentStore(in long key, in ClientSession session) - { - var status = session.Delete(key); - if (!status.IsCompleted || status.IsPending) - { - session.CompletePending(true); - } - } - - internal byte[] ReadFromPersistentStore(in long key, in ClientSession session) - { - var result = session.Read(key); - if (result.status.IsCompleted && !result.status.IsPending) - { - return result.output; - } - if (session.CompletePendingWithOutputs(out var completedOutputs, true)) - { - var hasNext = completedOutputs.Next(); - var bytes = completedOutputs.Current.Output; - hasNext = completedOutputs.Next(); - if (hasNext) - { - throw new Exception(); - } - return bytes; - } - else - { - throw new Exception(); - } - } - public async ValueTask CheckpointAsync() { byte[] bytes; @@ -205,32 +153,13 @@ public async ValueTask CheckpointAsync() } await m_persistentStorage.CheckpointAsync(bytes); - //var status = m_adminSession.Upsert(1, SpanByte.FromFixedSpan(bytes)); - //if (status.IsCompleted || status.IsPending) - //{ - // m_adminSession.CompletePending(true); - //} - - //var guid = await TakeCheckpointAsync(); } - //internal async Task TakeCheckpointAsync() - //{ - // bool success = false; - // Guid token; - // do - // { - // (success, token) = await m_persistentStorage.TakeHybridLogCheckpointAsync(CheckpointType.FoldOver).ConfigureAwait(false); - // } while (!success); - // return token; - //} - public async Task Compact() { + Debug.Assert(m_metadata != null); await m_persistentStorage.CompactAsync(); - //m_adminSession.Compact(m_persistentStorage.Log.SafeReadOnlyAddress, CompactionType.Lookup); - //m_persistentStorage.Log.Truncate(); - //await m_persistentStorage.TakeFullCheckpointAsync(CheckpointType.Snapshot); + m_metadata.PageCommitsAtLastCompaction = m_metadata.PageCommits; } internal async ValueTask> CreateClientAsync(string client, StateClientOptions options)