Skip to content

Commit

Permalink
Add page version to reduce rewriting pages to disk (#219)
Browse files Browse the repository at this point in the history
This change should reduce the number of page writes to disk greatly when running with low memory.
It does not reduce page reads, but should allow the cache to evict pages faster.
  • Loading branch information
Ulimo authored Dec 22, 2023
1 parent 1f2c57a commit d10565b
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ internal interface IStateClient<V, TMetadata>
{
TMetadata? Metadata { get; set; }
long GetNewPageId();
bool AddOrUpdate(in long key, in V value);
bool AddOrUpdate(in long key, V value);
Task WaitForNotFullAsync();
ValueTask<V?> GetValue(in long key, string from);
ValueTask Commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ internal class SyncStateClient<V, TMetadata> : StateClient, IStateClient<V, TMet
private ConcurrentDictionary<long, int> m_modified;
private readonly object m_lock = new object();
private readonly FlowtideDotNet.Storage.FileCache.FileCache m_fileCache;
private readonly ConcurrentDictionary<long, int> m_fileCacheVersion;

/// <summary>
/// Value of how many pages have changed since last commit.
Expand All @@ -50,6 +51,7 @@ public SyncStateClient(
this.options = options;
m_fileCache = new FlowtideDotNet.Storage.FileCache.FileCache(fileCacheOptions, name);
m_modified = new ConcurrentDictionary<long, int>();
m_fileCacheVersion = new ConcurrentDictionary<long, int>();
}

public TMetadata? Metadata
Expand All @@ -64,12 +66,22 @@ public TMetadata? Metadata
}
}

public bool AddOrUpdate(in long key, in V value)
public bool AddOrUpdate(in long key, V value)
{
lock (m_lock)
{
m_modified[key] = 0;
return stateManager.AddOrUpdate(key, value, this);
bool isFull = false;
m_modified.AddOrUpdate(key, (key) =>
{
isFull = stateManager.AddOrUpdate(key, value, this);
return 0;
},
(key, old) =>
{
isFull = stateManager.AddOrUpdate(key, value, this);
return old + 1;
});
return isFull;
}
}

Expand Down Expand Up @@ -123,9 +135,10 @@ public async ValueTask Commit()
// Modify active pages
Interlocked.Add(ref stateManager.m_metadata.PageCount, newPages);
newPages = 0;

m_modified.Clear();
m_fileCache.FreeAll();

m_fileCacheVersion.Clear();
{
var bytes = StateClientMetadataSerializer.Instance.Serialize(metadata);
await session.Write(metadataId, bytes);
Expand All @@ -137,6 +150,7 @@ public void Delete(in long key)
lock (m_lock)
{
m_modified[key] = -1;
m_fileCacheVersion.Remove(key, out _);
m_fileCache.Free(key);
stateManager.DeleteFromCache(key);
}
Expand Down Expand Up @@ -212,6 +226,7 @@ public override async ValueTask Reset(bool clearMetadata)
}
m_modified.Clear();
m_fileCache.FreeAll();
m_fileCacheVersion.Clear();
}
if (clearMetadata)
{
Expand All @@ -232,10 +247,15 @@ public void Evict(List<(LinkedListNode<LruTableSync.LinkedListValue>, long)> val
{
continue;
}
if (m_fileCacheVersion.TryGetValue(value.Item1.ValueRef.key, out var storedVersion) && storedVersion == val)
{
continue;
}
value.Item1.ValueRef.value.EnterWriteLock();
var bytes = options.ValueSerializer.Serialize(value.Item1.ValueRef.value, stateManager.SerializeOptions);
m_fileCache.WriteAsync(value.Item1.ValueRef.key, bytes);
value.Item1.ValueRef.value.ExitWriteLock();
m_fileCacheVersion[value.Item1.ValueRef.key] = val;
}
m_fileCache.Flush();

Expand Down

0 comments on commit d10565b

Please sign in to comment.