Skip to content

Commit

Permalink
Add support to configure default page size in bplustree (#414)
Browse files Browse the repository at this point in the history
* Add support to configure default page size in bplustree

* Default page size to 1024
  • Loading branch information
Ulimo authored Mar 19, 2024
1 parent b888a1c commit 3f164e1
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ internal interface IStateClient<V, TMetadata>
ValueTask Commit();
void Delete(in long key);
ValueTask Reset(bool clearMetadata);
int BPlusTreePageSize { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ public IStateManagerClient GetChildManager(string name)
public async ValueTask<IBPlusTree<K, V>> GetOrCreateTree<K, V>(string name, BPlusTreeOptions<K, V> options)
{
var stateClient = await CreateStateClient<IBPlusTreeNode, BPlusTreeMetadata>(name, new BPlusTreeSerializer<K, V>(options.KeySerializer, options.ValueSerializer));

if (options.BucketSize == null)
{
options.BucketSize = stateClient.BPlusTreePageSize;
}

var tree = new BPlusTree<K, V>(stateClient, options);
await tree.InitializeAsync();
return tree;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ internal class SyncStateClient<V, TMetadata> : StateClient, IStateClient<V, TMet
private readonly IPersistentStorageSession session;
private readonly StateClientOptions<V> options;
private readonly bool useReadCache;
private readonly int m_bplusTreePageSize;
private readonly ConcurrentDictionary<long, int> m_modified;
private readonly object m_lock = new object();
private readonly FlowtideDotNet.Storage.FileCache.FileCache m_fileCache;
Expand All @@ -50,14 +51,16 @@ public SyncStateClient(
StateClientOptions<V> options,
FileCacheOptions fileCacheOptions,
Meter meter,
bool useReadCache)
bool useReadCache,
int bplusTreePageSize)
{
this.stateManager = stateManager;
this.metadataId = metadataId;
this.metadata = metadata;
this.session = session;
this.options = options;
this.useReadCache = useReadCache;
this.m_bplusTreePageSize = bplusTreePageSize;
m_fileCache = new FlowtideDotNet.Storage.FileCache.FileCache(fileCacheOptions, name);
m_modified = new ConcurrentDictionary<long, int>();
m_fileCacheVersion = new ConcurrentDictionary<long, int>();
Expand All @@ -79,6 +82,8 @@ public TMetadata? Metadata
}
}

public int BPlusTreePageSize => m_bplusTreePageSize;

public bool AddOrUpdate(in long key, V value)
{
lock (m_lock)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,7 @@ public class StateManagerOptions
/// Useful if persistent storage is not on disk and instead in a remote location.
/// </summary>
public bool UseReadCache { get; set; } = false;

public int DefaultBPlusTreePageSize { get; set; } = 1024;
}
}
4 changes: 2 additions & 2 deletions src/FlowtideDotNet.Storage/StateManager/StateManagerSync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ internal async ValueTask<IStateClient<TValue, TMetadata>> CreateClientAsync<TVal
{
var metadata = StateClientMetadataSerializer.Deserialize<TMetadata>(new ByteMemoryOwner(bytes), bytes.Length);
var persistentSession = m_persistentStorage.CreateSession();
var stateClient = new SyncStateClient<TValue, TMetadata>(this, client, location, metadata, persistentSession, options, m_fileCacheOptions, meter, this.options.UseReadCache);
var stateClient = new SyncStateClient<TValue, TMetadata>(this, client, location, metadata, persistentSession, options, m_fileCacheOptions, meter, this.options.UseReadCache, this.options.DefaultBPlusTreePageSize);
lock (m_lock)
{
_stateClients.Add(client, stateClient);
Expand All @@ -257,7 +257,7 @@ internal async ValueTask<IStateClient<TValue, TMetadata>> CreateClientAsync<TVal
lock (m_lock)
{
var session = m_persistentStorage.CreateSession();
var stateClient = new SyncStateClient<TValue, TMetadata>(this, client, clientMetadataPageId, clientMetadata, session, options, m_fileCacheOptions, meter, this.options.UseReadCache);
var stateClient = new SyncStateClient<TValue, TMetadata>(this, client, clientMetadataPageId, clientMetadata, session, options, m_fileCacheOptions, meter, this.options.UseReadCache, this.options.DefaultBPlusTreePageSize);
_stateClients.Add(client, stateClient);
return stateClient;
}
Expand Down
5 changes: 4 additions & 1 deletion src/FlowtideDotNet.Storage/Tree/BPlusTreeOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ namespace FlowtideDotNet.Storage.Tree
{
public class BPlusTreeOptions<K, V>
{
public int BucketSize { get; set; } = 64;
/// <summary>
/// Override the default page size. This should only be set if the operator works best with a specific size.
/// </summary>
public int? BucketSize { get; set; }

public required IComparer<K> Comparer { get; set; }

Expand Down
10 changes: 7 additions & 3 deletions src/FlowtideDotNet.Storage/Tree/Internal/BPlusTree.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// limitations under the License.

using FlowtideDotNet.Storage.StateManager.Internal;
using Microsoft.Extensions.Options;
using System.Diagnostics;
using System.Text;

Expand All @@ -25,22 +26,24 @@ internal partial class BPlusTree<K, V> : IBPlusTree<K, V>

public BPlusTree(IStateClient<IBPlusTreeNode, BPlusTreeMetadata> stateClient, BPlusTreeOptions<K, V> options)
{
Debug.Assert(options.BucketSize.HasValue);
this.m_stateClient = stateClient;
this.m_options = options;
minSize = options.BucketSize / 3;
minSize = options.BucketSize.Value / 3;
this.m_keyComparer = options.Comparer;
}

public Task InitializeAsync()
{
Debug.Assert(m_options.BucketSize.HasValue);
if (m_stateClient.Metadata == null)
{
var rootId = m_stateClient.GetNewPageId();
var root = new LeafNode<K, V>(rootId);
m_stateClient.Metadata = new BPlusTreeMetadata()
{
Root = rootId,
BucketLength = m_options.BucketSize,
BucketLength = m_options.BucketSize.Value,
Left = rootId
};
m_stateClient.AddOrUpdate(rootId, root);
Expand Down Expand Up @@ -172,6 +175,7 @@ private sealed class RMWContainer

public async ValueTask Clear()
{
Debug.Assert(m_options.BucketSize.HasValue);
// Clear the current state from the state storage
await m_stateClient.Reset(true);

Expand All @@ -181,7 +185,7 @@ public async ValueTask Clear()
m_stateClient.Metadata = new BPlusTreeMetadata()
{
Root = rootId,
BucketLength = m_options.BucketSize,
BucketLength = m_options.BucketSize.Value,
Left = rootId
};
m_stateClient.AddOrUpdate(rootId, root);
Expand Down

0 comments on commit 3f164e1

Please sign in to comment.