Skip to content

Commit

Permalink
Add support for serializers to write and read metadata pages (#643)
Browse files Browse the repository at this point in the history
This is a prestep to be able to store schema information in a global page, and also potentially compression dictionaries to help reduce the state storage size.
  • Loading branch information
Ulimo authored Dec 20, 2024
1 parent ba84961 commit 12f5248
Show file tree
Hide file tree
Showing 35 changed files with 578 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ public DoubleValueSerializer(IMemoryAllocator memoryAllocator)
{
this.memoryAllocator = memoryAllocator;
}

public Task CheckpointAsync(IBPlusTreeSerializerCheckpointContext context)
{
return Task.CompletedTask;
}

public DoubleValueContainer CreateEmpty()
{
return new DoubleValueContainer(memoryAllocator);
Expand All @@ -42,6 +48,11 @@ public DoubleValueContainer Deserialize(in BinaryReader reader)
return new DoubleValueContainer(new PrimitiveList<double>(nativeMemory, count / 8, memoryAllocator));
}

public Task InitializeAsync(IBPlusTreeSerializerInitializeContext context)
{
return Task.CompletedTask;
}

public void Serialize(in BinaryWriter writer, in DoubleValueContainer values)
{
var mem = values._list.SlicedMemory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ public TimestampKeySerializer(IMemoryAllocator memoryAllocator)
{
this.memoryAllocator = memoryAllocator;
}

public Task CheckpointAsync(IBPlusTreeSerializerCheckpointContext context)
{
return Task.CompletedTask;
}

public TimestampKeyContainer CreateEmpty()
{
return new TimestampKeyContainer(memoryAllocator);
Expand All @@ -41,6 +47,11 @@ public TimestampKeyContainer Deserialize(in BinaryReader reader)
return new TimestampKeyContainer(new PrimitiveList<long>(nativeMemory, count / 8, memoryAllocator));
}

public Task InitializeAsync(IBPlusTreeSerializerInitializeContext context)
{
return Task.CompletedTask;
}

public void Serialize(in BinaryWriter writer, in TimestampKeyContainer values)
{
var mem = values._list.SlicedMemory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,15 @@ public void Serialize(in BinaryWriter writer, in ColumnKeyStorageContainer value
var batchWriter = new ArrowStreamWriter(writer.BaseStream, recordBatch.Schema, true);
batchWriter.WriteRecordBatch(recordBatch);
}

public Task CheckpointAsync(IBPlusTreeSerializerCheckpointContext context)
{
return Task.CompletedTask;
}

public Task InitializeAsync(IBPlusTreeSerializerInitializeContext context)
{
return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ public ColumnValueSerializer(int columnCount, IMemoryAllocator memoryAllocator)
_columnCount = columnCount;
_memoryAllocator = memoryAllocator;
}

public Task CheckpointAsync(IBPlusTreeSerializerCheckpointContext context)
{
return Task.CompletedTask;
}

public ColumnValueStorageContainer CreateEmpty()
{
return new ColumnValueStorageContainer(_columnCount, _memoryAllocator);
Expand All @@ -40,6 +46,11 @@ public ColumnValueStorageContainer Deserialize(in BinaryReader reader)
throw new NotImplementedException();
}

public Task InitializeAsync(IBPlusTreeSerializerInitializeContext context)
{
return Task.CompletedTask;
}

public void Serialize(in BinaryWriter writer, in ColumnValueStorageContainer values)
{
throw new NotImplementedException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ public ListAggKeyStorageSerializer(int groupingKeyLength, IMemoryAllocator memor
_groupingKeyLength = groupingKeyLength;
_memoryAllocator = memoryAllocator;
}

public Task CheckpointAsync(IBPlusTreeSerializerCheckpointContext context)
{
return Task.CompletedTask;
}

public ListAggKeyStorageContainer CreateEmpty()
{
return new ListAggKeyStorageContainer(_groupingKeyLength, _memoryAllocator);
Expand All @@ -50,6 +56,11 @@ public ListAggKeyStorageContainer Deserialize(in BinaryReader reader)
return new ListAggKeyStorageContainer(_groupingKeyLength, eventBatch);
}

public Task InitializeAsync(IBPlusTreeSerializerInitializeContext context)
{
return Task.CompletedTask;
}

public void Serialize(in BinaryWriter writer, in ListAggKeyStorageContainer values)
{
var recordBatch = EventArrowSerializer.BatchToArrow(values._data, values.Count);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,15 @@ public void Serialize(in BinaryWriter writer, in AggregateKeyStorageContainer va
var batchWriter = new ArrowStreamWriter(writer.BaseStream, recordBatch.Schema, true);
batchWriter.WriteRecordBatch(recordBatch);
}

public Task CheckpointAsync(IBPlusTreeSerializerCheckpointContext context)
{
return Task.CompletedTask;
}

public Task InitializeAsync(IBPlusTreeSerializerInitializeContext context)
{
return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ public ColumnAggregateValueSerializer(int measureCount, IMemoryAllocator memoryA
this.measureCount = measureCount;
this.memoryAllocator = memoryAllocator;
}

public Task CheckpointAsync(IBPlusTreeSerializerCheckpointContext context)
{
return Task.CompletedTask;
}

public ColumnAggregateValueContainer CreateEmpty()
{
return new ColumnAggregateValueContainer(measureCount, memoryAllocator);
Expand Down Expand Up @@ -63,6 +69,11 @@ public ColumnAggregateValueContainer Deserialize(in BinaryReader reader)
return new ColumnAggregateValueContainer(measureCount, eventBatch, weightsList, previousValueList);
}

public Task InitializeAsync(IBPlusTreeSerializerInitializeContext context)
{
return Task.CompletedTask;
}

public void Serialize(in BinaryWriter writer, in ColumnAggregateValueContainer values)
{
var previousValueMemory = values._previousValueSent.SlicedMemory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ public JoinWeightsSerializer(IMemoryAllocator memoryAllocator)
_memoryAllocator = memoryAllocator;
}

public Task CheckpointAsync(IBPlusTreeSerializerCheckpointContext context)
{
return Task.CompletedTask;
}

public JoinWeightsValueContainer CreateEmpty()
{
return new JoinWeightsValueContainer(_memoryAllocator);
Expand All @@ -48,6 +53,11 @@ public JoinWeightsValueContainer Deserialize(in BinaryReader reader)
return new JoinWeightsValueContainer(nativeMemory, count, memoryAllocator);
}

public Task InitializeAsync(IBPlusTreeSerializerInitializeContext context)
{
return Task.CompletedTask;
}

public void Serialize(in BinaryWriter writer, in JoinWeightsValueContainer values)
{
var memory = values.Memory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ public NormalizeKeyStorageSerializer(List<int> columnsToStore, IMemoryAllocator
_memoryAllocator = memoryAllocator;
}

public Task CheckpointAsync(IBPlusTreeSerializerCheckpointContext context)
{
return Task.CompletedTask;
}

public NormalizeKeyStorage CreateEmpty()
{
return new NormalizeKeyStorage(_columnsToStore, _memoryAllocator);
Expand All @@ -49,6 +54,11 @@ public NormalizeKeyStorage Deserialize(in BinaryReader reader)
return new NormalizeKeyStorage(_columnsToStore, eventBatch);
}

public Task InitializeAsync(IBPlusTreeSerializerInitializeContext context)
{
return Task.CompletedTask;
}

public void Serialize(in BinaryWriter writer, in NormalizeKeyStorage values)
{
var recordBatch = EventArrowSerializer.BatchToArrow(values._data, values.Count);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ public NormalizeValueSerializer(List<int> columnsToStore, IMemoryAllocator memor
_memoryAllocator = memoryAllocator;
}

public Task CheckpointAsync(IBPlusTreeSerializerCheckpointContext context)
{
return Task.CompletedTask;
}

public NormalizeValueStorage CreateEmpty()
{
return new NormalizeValueStorage(_columnsToStore, _memoryAllocator);
Expand All @@ -49,6 +54,11 @@ public NormalizeValueStorage Deserialize(in BinaryReader reader)
return new NormalizeValueStorage(_columnsToStore, eventBatch, recordBatch.Length);
}

public Task InitializeAsync(IBPlusTreeSerializerInitializeContext context)
{
return Task.CompletedTask;
}

public void Serialize(in BinaryWriter writer, in NormalizeValueStorage values)
{
var recordBatch = EventArrowSerializer.BatchToArrow(values._data, values.Count);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ public ModifiedKeyStorageSerializer(List<int> columnsToStore, IMemoryAllocator m
this._memoryAllocator = memoryAllocator;
}

public Task CheckpointAsync(IBPlusTreeSerializerCheckpointContext context)
{
return Task.CompletedTask;
}

public ModifiedKeyStorage CreateEmpty()
{
return new ModifiedKeyStorage(_columnsToStore, _memoryAllocator);
Expand All @@ -50,6 +55,11 @@ public ModifiedKeyStorage Deserialize(in BinaryReader reader)
return new ModifiedKeyStorage(_columnsToStore, eventBatch);
}

public Task InitializeAsync(IBPlusTreeSerializerInitializeContext context)
{
return Task.CompletedTask;
}

public void Serialize(in BinaryWriter writer, in ModifiedKeyStorage values)
{
var recordBatch = EventArrowSerializer.BatchToArrow(values._data, values.Count);
Expand Down
5 changes: 5 additions & 0 deletions src/FlowtideDotNet.Storage/Serializers/IntSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ namespace FlowtideDotNet.Storage.Serializers
{
public class IntSerializer : IBplusTreeSerializer<int>
{
public Task CheckpointAsync()
{
return Task.CompletedTask;
}

public void Deserialize(in BinaryReader reader, in List<int> values)
{
var count = reader.ReadInt32();
Expand Down
11 changes: 11 additions & 0 deletions src/FlowtideDotNet.Storage/Serializers/KeyListSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ public KeyListSerializer(IBplusTreeSerializer<K> serializer)
{
this.serializer = serializer;
}

public Task CheckpointAsync(IBPlusTreeSerializerCheckpointContext context)
{
return Task.CompletedTask;
}

public ListKeyContainer<K> CreateEmpty()
{
return new ListKeyContainer<K>();
Expand All @@ -39,6 +45,11 @@ public ListKeyContainer<K> Deserialize(in BinaryReader reader)
return container;
}

public Task InitializeAsync(IBPlusTreeSerializerInitializeContext context)
{
return Task.CompletedTask;
}

public void Serialize(in BinaryWriter writer, in ListKeyContainer<K> values)
{
serializer.Serialize(writer, values._list);
Expand Down
5 changes: 5 additions & 0 deletions src/FlowtideDotNet.Storage/Serializers/LongSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ namespace FlowtideDotNet.Storage.Serializers
{
public class LongSerializer : IBplusTreeSerializer<long>
{
public Task CheckpointAsync()
{
return Task.CompletedTask;
}

public void Deserialize(in BinaryReader reader, in List<long> values)
{
var count = reader.ReadInt32();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ public PrimitiveListValueContainerSerializer(IMemoryAllocator memoryAllocator)
_memoryAllocator = memoryAllocator;
}

public Task CheckpointAsync(IBPlusTreeSerializerCheckpointContext context)
{
return Task.CompletedTask;
}

public PrimitiveListValueContainer<T> CreateEmpty()
{
return new PrimitiveListValueContainer<T>(_memoryAllocator);
Expand All @@ -49,6 +54,11 @@ public PrimitiveListValueContainer<T> Deserialize(in BinaryReader reader)
return new PrimitiveListValueContainer<T>(nativeMemory, count, memoryAllocator);
}

public Task InitializeAsync(IBPlusTreeSerializerInitializeContext context)
{
return Task.CompletedTask;
}

public void Serialize(in BinaryWriter writer, in PrimitiveListValueContainer<T> values)
{
var memory = values.Memory;
Expand Down
5 changes: 5 additions & 0 deletions src/FlowtideDotNet.Storage/Serializers/StringSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ namespace FlowtideDotNet.Storage.Serializers
{
public class StringSerializer : IBplusTreeSerializer<string>
{
public Task CheckpointAsync()
{
return Task.CompletedTask;
}

public void Deserialize(in BinaryReader reader, in List<string> values)
{
var count = reader.ReadInt32();
Expand Down
11 changes: 11 additions & 0 deletions src/FlowtideDotNet.Storage/Serializers/ValueListSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ public ValueListSerializer(IBplusTreeSerializer<V> serializer)
{
this.serializer = serializer;
}

public Task CheckpointAsync(IBPlusTreeSerializerCheckpointContext context)
{
return Task.CompletedTask;
}

public ListValueContainer<V> CreateEmpty()
{
return new ListValueContainer<V>();
Expand All @@ -39,6 +45,11 @@ public ListValueContainer<V> Deserialize(in BinaryReader reader)
return container;
}

public Task InitializeAsync(IBPlusTreeSerializerInitializeContext context)
{
return Task.CompletedTask;
}

public void Serialize(in BinaryWriter writer, in ListValueContainer<V> values)
{
serializer.Serialize(writer, values._values);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,7 @@ internal interface IStateClient<V, TMetadata>
int BPlusTreePageSizeBytes { get; }

long CacheMisses { get; }

Task InitializeSerializerAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ internal interface IStateSerializer
byte[] Serialize(in ICacheObject value, in StateSerializeOptions stateSerializeOptions);

ICacheObject DeserializeCacheObject(IMemoryOwner<byte> bytes, int length, StateSerializeOptions stateSerializeOptions);

Task CheckpointAsync<TMetadata>(IStateSerializerCheckpointWriter checkpointWriter, StateClientMetadata<TMetadata> metadata)
where TMetadata : IStorageMetadata;

Task InitializeAsync<TMetadata>(IStateSerializerInitializeReader reader, StateClientMetadata<TMetadata> metadata)
where TMetadata : IStorageMetadata;
}
internal interface IStateSerializer<T> : IStateSerializer
where T: ICacheObject
Expand Down
Loading

1 comment on commit 12f5248

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 12f5248 Previous: 7db1083 Ratio
FlowtideDotNet.Benchmarks.Stream.StreamBenchmark.InnerJoin 576679120 ns (± 10424333.064709704) 588638700 ns (± 9231369.002121696) 0.98
FlowtideDotNet.Benchmarks.Stream.StreamBenchmark.LeftJoin 677500750 ns (± 49240664.11090308) 632393477.7777778 ns (± 23156158.76418074) 1.07
FlowtideDotNet.Benchmarks.Stream.StreamBenchmark.ProjectionAndNormalization 212547440 ns (± 11163655.84516709) 171268600 ns (± 6644227.006958748) 1.24
FlowtideDotNet.Benchmarks.Stream.StreamBenchmark.SumAggregation 205907880 ns (± 21534431.800506525) 190822830 ns (± 6415026.852121856) 1.08
FlowtideDotNet.Benchmarks.Stream.StreamBenchmark.ListAggWithMapAggregation 2371215770 ns (± 141437160.23166573) 2656394200 ns (± 136074715.58524996) 0.89

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.