Skip to content

Commit

Permalink
Enable state compaction (#118)
Browse files Browse the repository at this point in the history
State compaction happens when more than 30% of pages have been modified.
  • Loading branch information
Ulimo authored Nov 14, 2023
1 parent 1817741 commit 440b6a0
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,14 @@ private void StartCheckpointDoneTask()

await run._context.stateHandler.WriteLatestState(run._context.streamName, run._context._lastState);

// TODO: Enable, For now, compact every 100 checkpoints
//if (_currentCheckpoint.CheckpointTime % 100 == 0)
//{
// await run._context._stateManager.Compact();
//}

// Compaction: if more than 30% of the pages has been changed since last compaction, do compaction
long changesSinceLastCompaction = run._context._stateManager.PageCommitsSinceLastCompaction;
var compactionThreshold = (long)(run._context._stateManager.PageCount * 0.3);
if (changesSinceLastCompaction > compactionThreshold)
{
await run._context._stateManager.Compact();
}


// After writing do compaction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,23 @@ public StateManagerMetadata()

public Dictionary<string, long> ClientMetadataLocations { get; set; }

/// <summary>
/// Incremental counter for all commited pages.
/// Can be used to detect changes
/// </summary>
public ulong PageCommits;

/// <summary>
/// Total amount of pages in the state manager
/// </summary>
public long PageCount;

/// <summary>
/// The page commit number at the last compaction.
/// Can be used together with page commits to see how many changes have happened.
/// </summary>
public ulong PageCommitsAtLastCompaction { get; set; }

public void EnterWriteLock()
{
throw new NotImplementedException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// limitations under the License.

using FlowtideDotNet.Storage.Persistence;
using System.Diagnostics;

namespace FlowtideDotNet.Storage.StateManager.Internal.Sync
{
Expand All @@ -27,6 +28,11 @@ internal class SyncStateClient<V, TMetadata> : StateClient, IStateClient<V, TMet
private readonly object m_lock = new object();
private readonly FlowtideDotNet.Storage.FileCache.FileCache m_fileCache;

/// <summary>
/// Value of how many pages have changed since last commit.
/// </summary>
private long newPages;

public SyncStateClient(
StateManagerSync stateManager,
string name,
Expand Down Expand Up @@ -71,12 +77,16 @@ public ValueTask Commit()
{
lock (m_lock)
{

foreach(var kv in m_modified)
{
if (kv.Value == -1)
{
// deleted
session.Delete(kv.Key);

// Remove a page from the new pages counter
Interlocked.Decrement(ref newPages);
//stateManager.DeleteFromPersistentStore(kv.Key, session);
continue;
}
Expand Down Expand Up @@ -104,6 +114,13 @@ public ValueTask Commit()
//stateManager.WriteToPersistentStore(kv.Key, bytes, session);
}
}
var modifiedPagesCount = m_modified.Count;
Debug.Assert(stateManager.m_metadata != null);
// Add modified page count to the page commits counter
Interlocked.Add(ref stateManager.m_metadata.PageCommits, (ulong)modifiedPagesCount);
// Modify active pages
Interlocked.Add(ref stateManager.m_metadata.PageCount, newPages);
newPages = 0;
m_modified.Clear();
{
var bytes = StateClientMetadataSerializer.Instance.Serialize(metadata);
Expand All @@ -126,6 +143,8 @@ public void Delete(in long key)

public long GetNewPageId()
{
// Add to the new pages counter
Interlocked.Increment(ref newPages);
return stateManager.GetNewPageId();
}

Expand Down
91 changes: 10 additions & 81 deletions src/FlowtideDotNet.Storage/StateManager/StateManagerSync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ public abstract class StateManagerSync : IStateManager, IDisposable

public bool Initialized { get; private set; }

public ulong PageCommits => m_metadata != null ? Volatile.Read(ref m_metadata.PageCommits) : throw new InvalidOperationException("Manager must be initialized before getting page commits");

public ulong PageCommitsAtLastCompaction => m_metadata != null ? m_metadata.PageCommitsAtLastCompaction : throw new InvalidOperationException("Manager must be initialized before getting page commits");

public long PageCommitsSinceLastCompaction => (long)(PageCommits - PageCommitsAtLastCompaction);

public long PageCount => m_metadata != null ? Volatile.Read(ref m_metadata.PageCount) : throw new InvalidOperationException("Manager must be initialized before getting page count");

internal StateManagerSync(IStateSerializer<StateManagerMetadata> metadataSerializer, StateManagerOptions options, ILogger logger)
{
m_lruTable = new LruTableSync(options.CachePageCount, logger);
Expand All @@ -87,25 +95,8 @@ internal StateManagerSync(IStateSerializer<StateManagerMetadata> metadataSeriali
{
m_persistentStorage = options.PersistentStorage;
}


//m_persistentStorage = new FasterKvPersistentStorage(new FasterKVSettings<long, SpanByte>()
//{
// RemoveOutdatedCheckpoints = true,
// //MutableFraction = 0.1,
// MemorySize = 1024 * 1024 * 128,
// PageSize = 1024 * 1024 * 16,
// LogDevice = options.LogDevice,
// CheckpointManager = options.CheckpointManager,
// CheckpointDir = options.CheckpointDir,
// //ReadCacheEnabled = true,
// //ReadCopyOptions = ReadCopyOptions.None
// //ConcurrencyControlMode = ConcurrencyControlMode.RecordIsolation
//});
this.m_metadataSerializer = metadataSerializer;
this.options = options;
//m_functions = new Functions();
//m_adminSession = m_persistentStorage.For(m_functions).NewSession(m_functions);
m_fileCacheOptions = options.TemporaryStorageOptions ?? new FileCacheOptions()
{
DirectoryPath = "./data/tempFiles"
Expand Down Expand Up @@ -152,49 +143,6 @@ internal bool TryGetValueFromCache<T>(in long key, out T? value)
return false;
}

internal void WriteToPersistentStore(in long key, in byte[] bytes, in ClientSession<long, SpanByte, SpanByte, byte[], long, Functions> session)
{
var spanByte = SpanByte.FromPinnedMemory(bytes);
var status = session.Upsert(key, spanByte);
if (status.IsCompleted || status.IsPending)
{
session.CompletePending(true);
}
}

internal void DeleteFromPersistentStore(in long key, in ClientSession<long, SpanByte, SpanByte, byte[], long, Functions> session)
{
var status = session.Delete(key);
if (!status.IsCompleted || status.IsPending)
{
session.CompletePending(true);
}
}

internal byte[] ReadFromPersistentStore(in long key, in ClientSession<long, SpanByte, SpanByte, byte[], long, Functions> session)
{
var result = session.Read(key);
if (result.status.IsCompleted && !result.status.IsPending)
{
return result.output;
}
if (session.CompletePendingWithOutputs(out var completedOutputs, true))
{
var hasNext = completedOutputs.Next();
var bytes = completedOutputs.Current.Output;
hasNext = completedOutputs.Next();
if (hasNext)
{
throw new Exception();
}
return bytes;
}
else
{
throw new Exception();
}
}

public async ValueTask CheckpointAsync()
{
byte[] bytes;
Expand All @@ -205,32 +153,13 @@ public async ValueTask CheckpointAsync()
}

await m_persistentStorage.CheckpointAsync(bytes);
//var status = m_adminSession.Upsert(1, SpanByte.FromFixedSpan(bytes));
//if (status.IsCompleted || status.IsPending)
//{
// m_adminSession.CompletePending(true);
//}

//var guid = await TakeCheckpointAsync();
}

//internal async Task<Guid> TakeCheckpointAsync()
//{
// bool success = false;
// Guid token;
// do
// {
// (success, token) = await m_persistentStorage.TakeHybridLogCheckpointAsync(CheckpointType.FoldOver).ConfigureAwait(false);
// } while (!success);
// return token;
//}

public async Task Compact()
{
Debug.Assert(m_metadata != null);
await m_persistentStorage.CompactAsync();
//m_adminSession.Compact(m_persistentStorage.Log.SafeReadOnlyAddress, CompactionType.Lookup);
//m_persistentStorage.Log.Truncate();
//await m_persistentStorage.TakeFullCheckpointAsync(CheckpointType.Snapshot);
m_metadata.PageCommitsAtLastCompaction = m_metadata.PageCommits;
}

internal async ValueTask<IStateClient<TValue, TMetadata>> CreateClientAsync<TValue, TMetadata>(string client, StateClientOptions<TValue> options)
Expand Down

0 comments on commit 440b6a0

Please sign in to comment.