Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/koralium/flowtide
Browse files Browse the repository at this point in the history
  • Loading branch information
Ulimo committed Dec 21, 2023
2 parents 5efd0fb + 2ee04c3 commit 1f2c57a
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ internal interface IStateClient<V, TMetadata>
{
TMetadata? Metadata { get; set; }
long GetNewPageId();
ValueTask AddOrUpdate(in long key, in V value);
bool AddOrUpdate(in long key, in V value);
Task WaitForNotFullAsync();
ValueTask<V?> GetValue(in long key, string from);
ValueTask Commit();
void Delete(in long key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,20 @@ public bool TryGetValue(long key, out ICacheObject? cacheObject)
return false;
}

public void Add(long key, ICacheObject value, ILruEvictHandler evictHandler)
public async Task Wait()
{
logger.LogWarning("LRU Table is full, waiting for cleanup to finish.");
await _fullLock.WaitAsync().ConfigureAwait(false);
_fullLock.Release();
logger.LogInformation("LRU Table is no longer full.");
}

public bool Add(long key, ICacheObject value, ILruEvictHandler evictHandler)
{
bool full = false;
if (Volatile.Read(ref m_count) > maxSize)
{
logger.LogWarning("LRU Table is full, waiting for cleanup to finish.");
_fullLock.Wait();
_fullLock.Release();
full = true;
}
cache.AddOrUpdate(key, (key) =>
{
Expand Down Expand Up @@ -205,6 +212,8 @@ public void Add(long key, ICacheObject value, ILruEvictHandler evictHandler)
}
}
});

return full;
}

private async Task Cleanup()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,20 @@ public TMetadata? Metadata
}
}

public ValueTask AddOrUpdate(in long key, in V value)
public bool AddOrUpdate(in long key, in V value)
{
lock (m_lock)
{
m_modified[key] = 0;
stateManager.AddOrUpdate(key, value, this);
return ValueTask.CompletedTask;
return stateManager.AddOrUpdate(key, value, this);
}
}

public Task WaitForNotFullAsync()
{
return stateManager.WaitForNotFullAsync();
}

public async ValueTask Commit()
{
foreach (var kv in m_modified)
Expand Down
10 changes: 8 additions & 2 deletions src/FlowtideDotNet.Storage/StateManager/StateManagerSync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,17 @@ private long GetNewPageId_Internal()
return id;
}

internal void AddOrUpdate<V>(in long key, in V value, in ILruEvictHandler evictHandler)
internal bool AddOrUpdate<V>(in long key, in V value, in ILruEvictHandler evictHandler)
where V : ICacheObject
{
Debug.Assert(m_lruTable != null);
m_lruTable.Add(key, value, evictHandler);
return m_lruTable.Add(key, value, evictHandler);
}

internal Task WaitForNotFullAsync()
{
Debug.Assert(m_lruTable != null);
return m_lruTable.Wait();
}

internal void DeleteFromCache(in long key)
Expand Down
165 changes: 78 additions & 87 deletions src/FlowtideDotNet.Storage/Tree/Internal/BPlusTree.GenericWrite.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// limitations under the License.

using System.Diagnostics;
using System.Runtime.InteropServices;

namespace FlowtideDotNet.Storage.Tree.Internal
{
Expand Down Expand Up @@ -83,24 +84,23 @@ private ValueTask<GenericWriteOperation> GenericWriteRoot_AfterGetRoot(

var (newNode, _) = SplitLeafNode(newParentNode, 0, leafNode);

var upsertNewParentStatus = m_stateClient.AddOrUpdate(newParentNode.Id, newParentNode);
var upsertLeafNodeStatus = m_stateClient.AddOrUpdate(leafNode.Id, leafNode);
var upsertNewNodeStatus = m_stateClient.AddOrUpdate(newNode.Id, newNode);
var isFull = false;
isFull |= m_stateClient.AddOrUpdate(newParentNode.Id, newParentNode);
isFull |= m_stateClient.AddOrUpdate(leafNode.Id, leafNode);
isFull |= m_stateClient.AddOrUpdate(newNode.Id, newNode);

if (!upsertNewParentStatus.IsCompletedSuccessfully ||
!upsertLeafNodeStatus.IsCompletedSuccessfully ||
!upsertNewNodeStatus.IsCompletedSuccessfully)
if (isFull)
{
return GenericWrite_SlowUpsert(result, upsertNewNodeStatus, upsertLeafNodeStatus, upsertNewNodeStatus);
return GenericWrite_SlowUpsert(result, m_stateClient.WaitForNotFullAsync());
}
}
else
{
var upsertLeafStatus = m_stateClient.AddOrUpdate(leafNode.Id, leafNode);
var isFull = m_stateClient.AddOrUpdate(leafNode.Id, leafNode);

if (!upsertLeafStatus.IsCompletedSuccessfully)
if (isFull)
{
return GenericWrite_SlowUpsert(result, upsertLeafStatus);
return GenericWrite_SlowUpsert(result, m_stateClient.WaitForNotFullAsync());
}
}
return ValueTask.FromResult(result);
Expand Down Expand Up @@ -143,15 +143,14 @@ in InternalNode<K, V> internalNode

var (newNode, _) = SplitInternalNode(newParentNode, 0, internalNode);

var upsertNewNodeStatus = m_stateClient.AddOrUpdate(newNode.Id, newNode);
var upsertParentNodeStatus = m_stateClient.AddOrUpdate(newParentNode.Id, newParentNode);
var upsertInternalNodeStatus = m_stateClient.AddOrUpdate(internalNode.Id, internalNode);
var isFull = false;
isFull |= m_stateClient.AddOrUpdate(newNode.Id, newNode);
isFull |= m_stateClient.AddOrUpdate(newParentNode.Id, newParentNode);
isFull |= m_stateClient.AddOrUpdate(internalNode.Id, internalNode);

if (!upsertNewNodeStatus.IsCompletedSuccessfully ||
!upsertParentNodeStatus.IsCompletedSuccessfully ||
!upsertInternalNodeStatus.IsCompletedSuccessfully)
if (isFull)
{
return GenericWrite_SlowUpsert(result, upsertNewNodeStatus, upsertParentNodeStatus, upsertInternalNodeStatus);
return GenericWrite_SlowUpsert(result, m_stateClient.WaitForNotFullAsync());
}
}
if (internalNode.children.Count == 1)
Expand Down Expand Up @@ -219,15 +218,14 @@ in GenericWriteFunction<V> function
{
var (newNode, _) = SplitLeafNode(parentNode, index, leafNode);
// Save all the nodes that was changed
var upsertNewNodeStatus = m_stateClient.AddOrUpdate(newNode.Id, newNode);
var upsertLeafNodeStatus = m_stateClient.AddOrUpdate(leafNode.Id, leafNode);
var upsertParentNodeStatus = m_stateClient.AddOrUpdate(parentNode.Id, parentNode);
var isFull = false;
isFull |= m_stateClient.AddOrUpdate(newNode.Id, newNode);
isFull |= m_stateClient.AddOrUpdate(leafNode.Id, leafNode);
isFull |= m_stateClient.AddOrUpdate(parentNode.Id, parentNode);

if (!upsertNewNodeStatus.IsCompletedSuccessfully ||
!upsertLeafNodeStatus.IsCompletedSuccessfully ||
!upsertParentNodeStatus.IsCompletedSuccessfully)
if (isFull)
{
return GenericWrite_SlowUpsert(result, upsertNewNodeStatus, upsertLeafNodeStatus, upsertParentNodeStatus);
return GenericWrite_SlowUpsert(result, m_stateClient.WaitForNotFullAsync());
}
}
// Check if the node is too small
Expand Down Expand Up @@ -262,11 +260,11 @@ in GenericWriteFunction<V> function
}
else
{
var leafUpsertStatus = m_stateClient.AddOrUpdate(leafNode.Id, leafNode);
var isFull = m_stateClient.AddOrUpdate(leafNode.Id, leafNode);

if (!leafUpsertStatus.IsCompletedSuccessfully)
if (isFull)
{
return GenericWrite_SlowUpsert(result, leafUpsertStatus);
return GenericWrite_SlowUpsert(result, m_stateClient.WaitForNotFullAsync());
}
}
return ValueTask.FromResult(result);
Expand Down Expand Up @@ -308,15 +306,14 @@ in GenericWriteOperation result
{
var (newNode, splitKey) = SplitInternalNode(parentNode, index, internalNode);

var upsertNewNodeStatus = m_stateClient.AddOrUpdate(newNode.Id, newNode);
var upsertParentNodeStatus = m_stateClient.AddOrUpdate(parentNode.Id, parentNode);
var upsertInternalNodeStatus = m_stateClient.AddOrUpdate(internalNode.Id, internalNode);
var isFull = false;
isFull |= m_stateClient.AddOrUpdate(newNode.Id, newNode);
isFull |= m_stateClient.AddOrUpdate(parentNode.Id, parentNode);
isFull |= m_stateClient.AddOrUpdate(internalNode.Id, internalNode);

if (!upsertNewNodeStatus.IsCompletedSuccessfully ||
!upsertParentNodeStatus.IsCompletedSuccessfully ||
!upsertInternalNodeStatus.IsCompletedSuccessfully)
if (isFull)
{
return GenericWrite_SlowUpsert(result, upsertNewNodeStatus, upsertParentNodeStatus, upsertInternalNodeStatus);
return GenericWrite_SlowUpsert(result, m_stateClient.WaitForNotFullAsync());
}
}
// Check if the node is too small
Expand Down Expand Up @@ -379,15 +376,14 @@ in GenericWriteOperation result
parentNode.keys[index] = newSplitKey;
parentNode.ExitWriteLock();

var upsertRightNodeStatus = m_stateClient.AddOrUpdate(rightNode.Id, rightNode);
var upsertInternalNodeStatus = m_stateClient.AddOrUpdate(internalNode.Id, internalNode);
var upsertParentNodeStatus = m_stateClient.AddOrUpdate(parentNode.Id, parentNode);
var isFull = false;
isFull |= m_stateClient.AddOrUpdate(rightNode.Id, rightNode);
isFull |= m_stateClient.AddOrUpdate(internalNode.Id, internalNode);
isFull |= m_stateClient.AddOrUpdate(parentNode.Id, parentNode);

if (!upsertRightNodeStatus.IsCompletedSuccessfully ||
!upsertInternalNodeStatus.IsCompletedSuccessfully ||
!upsertParentNodeStatus.IsCompletedSuccessfully)
if (isFull)
{
return GenericWrite_SlowUpsert(result, upsertRightNodeStatus, upsertInternalNodeStatus, upsertParentNodeStatus);
return GenericWrite_SlowUpsert(result, m_stateClient.WaitForNotFullAsync());
}
}
else
Expand All @@ -401,13 +397,13 @@ in GenericWriteOperation result
parentNode.ExitWriteLock();

m_stateClient.Delete(rightNode.Id);
var upsertParentNodeStatus = m_stateClient.AddOrUpdate(parentNode.Id, parentNode);
var upsertInternalNodeStatus = m_stateClient.AddOrUpdate(internalNode.Id, internalNode);
var isFull = false;
isFull |= m_stateClient.AddOrUpdate(parentNode.Id, parentNode);
isFull |= m_stateClient.AddOrUpdate(internalNode.Id, internalNode);

if (!upsertParentNodeStatus.IsCompletedSuccessfully ||
!upsertInternalNodeStatus.IsCompletedSuccessfully)
if (isFull)
{
return GenericWrite_SlowUpsert(result, upsertParentNodeStatus, upsertInternalNodeStatus);
return GenericWrite_SlowUpsert(result, m_stateClient.WaitForNotFullAsync());
}
}
return ValueTask.FromResult(result);
Expand Down Expand Up @@ -443,15 +439,14 @@ in GenericWriteOperation result
parentNode.ExitWriteLock();

// Save all changes
var upsertLeafNodeStatus = m_stateClient.AddOrUpdate(leafNode.Id, leafNode);
var upsertRightNodeStatus = m_stateClient.AddOrUpdate(rightNode.Id, rightNode);
var upsertParentNodeStatus = m_stateClient.AddOrUpdate(parentNode.Id, parentNode);
var isFull = false;
isFull |= m_stateClient.AddOrUpdate(leafNode.Id, leafNode);
isFull |= m_stateClient.AddOrUpdate(rightNode.Id, rightNode);
isFull |= m_stateClient.AddOrUpdate(parentNode.Id, parentNode);

if (!upsertLeafNodeStatus.IsCompletedSuccessfully ||
!upsertRightNodeStatus.IsCompletedSuccessfully ||
!upsertParentNodeStatus.IsCompletedSuccessfully)
if (isFull)
{
return GenericWrite_SlowUpsert(result, upsertLeafNodeStatus, upsertRightNodeStatus, upsertParentNodeStatus);
return GenericWrite_SlowUpsert(result, m_stateClient.WaitForNotFullAsync());
}
}
else
Expand All @@ -464,13 +459,13 @@ in GenericWriteOperation result
parentNode.ExitWriteLock();

m_stateClient.Delete(rightNode.Id);
var upsertParentNodeStatus = m_stateClient.AddOrUpdate(parentNode.Id, parentNode);
var upsertLeafNodeStatus = m_stateClient.AddOrUpdate(leafNode.Id, leafNode);
var isFull = false;
isFull |= m_stateClient.AddOrUpdate(parentNode.Id, parentNode);
isFull |= m_stateClient.AddOrUpdate(leafNode.Id, leafNode);

if (!upsertParentNodeStatus.IsCompletedSuccessfully ||
!upsertLeafNodeStatus.IsCompletedSuccessfully)
if (isFull)
{
return GenericWrite_SlowUpsert(result, upsertParentNodeStatus, upsertLeafNodeStatus);
return GenericWrite_SlowUpsert(result, m_stateClient.WaitForNotFullAsync());
}
}
return ValueTask.FromResult(result);
Expand Down Expand Up @@ -508,15 +503,14 @@ in GenericWriteOperation result
parentNode.ExitWriteLock();

// Save all changes
var upsertLeafNodeStatus = m_stateClient.AddOrUpdate(leafNode.Id, leafNode);
var upsertLeftNodeStatus = m_stateClient.AddOrUpdate(leftNode.Id, leftNode);
var upsertParentNodeStatus = m_stateClient.AddOrUpdate(parentNode.Id, parentNode);
var isFull = false;
isFull |= m_stateClient.AddOrUpdate(leafNode.Id, leafNode);
isFull |= m_stateClient.AddOrUpdate(leftNode.Id, leftNode);
isFull |= m_stateClient.AddOrUpdate(parentNode.Id, parentNode);

if (!upsertLeafNodeStatus.IsCompletedSuccessfully ||
!upsertLeftNodeStatus.IsCompletedSuccessfully ||
!upsertParentNodeStatus.IsCompletedSuccessfully)
if (isFull)
{
return GenericWrite_SlowUpsert(result, upsertLeafNodeStatus, upsertLeftNodeStatus, upsertParentNodeStatus);
return GenericWrite_SlowUpsert(result, m_stateClient.WaitForNotFullAsync());
}
}
else
Expand All @@ -530,13 +524,13 @@ in GenericWriteOperation result

// Save all changes
m_stateClient.Delete(leafNode.Id);
var upsertParentNodeStatus = m_stateClient.AddOrUpdate(parentNode.Id, parentNode);
var upsertLeftNodeStatus = m_stateClient.AddOrUpdate(leftNode.Id, leftNode);
var isFull = false;
isFull |= m_stateClient.AddOrUpdate(parentNode.Id, parentNode);
isFull |= m_stateClient.AddOrUpdate(leftNode.Id, leftNode);

if (!upsertParentNodeStatus.IsCompletedSuccessfully ||
!upsertLeftNodeStatus.IsCompletedSuccessfully)
if (isFull)
{
return GenericWrite_SlowUpsert(result, upsertParentNodeStatus, upsertLeftNodeStatus);
return GenericWrite_SlowUpsert(result, m_stateClient.WaitForNotFullAsync());
}
}
return ValueTask.FromResult(result);
Expand Down Expand Up @@ -573,15 +567,14 @@ in GenericWriteOperation result
parentNode.keys[index - 1] = newSplitKey;
parentNode.ExitWriteLock();

var upsertLeftNodeStatus = m_stateClient.AddOrUpdate(leftNode.Id, leftNode);
var upsertInternalNodeStatus = m_stateClient.AddOrUpdate(internalNode.Id, internalNode);
var upsertParentNodeStatus = m_stateClient.AddOrUpdate(parentNode.Id, parentNode);
var isFull = false;
isFull |= m_stateClient.AddOrUpdate(leftNode.Id, leftNode);
isFull |= m_stateClient.AddOrUpdate(internalNode.Id, internalNode);
isFull |= m_stateClient.AddOrUpdate(parentNode.Id, parentNode);

if (!upsertLeftNodeStatus.IsCompletedSuccessfully ||
!upsertInternalNodeStatus.IsCompletedSuccessfully ||
!upsertParentNodeStatus.IsCompletedSuccessfully)
if (isFull)
{
return GenericWrite_SlowUpsert(result, upsertLeftNodeStatus, upsertInternalNodeStatus, upsertParentNodeStatus);
return GenericWrite_SlowUpsert(result, m_stateClient.WaitForNotFullAsync());
}
}
else
Expand All @@ -596,13 +589,14 @@ in GenericWriteOperation result

// Save all changes
m_stateClient.Delete(internalNode.Id);
var upsertParentNodeStatus = m_stateClient.AddOrUpdate(parentNode.Id, parentNode);
var upsertLeftNodeStatus = m_stateClient.AddOrUpdate(leftNode.Id, leftNode);

if (!upsertParentNodeStatus.IsCompletedSuccessfully ||
!upsertLeftNodeStatus.IsCompletedSuccessfully)
var isFull = false;
isFull |= m_stateClient.AddOrUpdate(parentNode.Id, parentNode);
isFull |= m_stateClient.AddOrUpdate(leftNode.Id, leftNode);

if (isFull)
{
return GenericWrite_SlowUpsert(result, upsertParentNodeStatus, upsertLeftNodeStatus);
return GenericWrite_SlowUpsert(result, m_stateClient.WaitForNotFullAsync());
}
}
return ValueTask.FromResult(result);
Expand Down Expand Up @@ -692,13 +686,10 @@ private GenericWriteOperation GenericWrite_Leaf(

private async ValueTask<GenericWriteOperation> GenericWrite_SlowUpsert(
GenericWriteOperation result,
params ValueTask[] tasks
Task task
)
{
foreach (var task in tasks)
{
await task;
}
await task;
return result;
}

Expand Down
Loading

0 comments on commit 1f2c57a

Please sign in to comment.