Skip to content

Commit

Permalink
Change aggregate operator to use array row data for keys (#303)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ulimo authored Jan 13, 2024
1 parent 0c02e8f commit 9f7d7f3
Showing 1 changed file with 4 additions and 8 deletions.
12 changes: 4 additions & 8 deletions src/FlowtideDotNet.Core/Operators/Aggregate/AggregateOperator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -274,16 +274,12 @@ public override async IAsyncEnumerable<StreamEventBatch> OnRecieve(StreamEventBa
RowEvent? key = default;
if (groupExpressions != null)
{
_flexBufferNewValue.NewObject();
var vectorStart = _flexBufferNewValue.StartVector();
foreach (var groupExpr in groupExpressions)
FlxValue[] newVector = new FlxValue[groupExpressions.Count];
for (int i = 0; i < groupExpressions.Count; i++)
{
var result = groupExpr(e);
_flexBufferNewValue.Add(result);
newVector[i] = groupExpressions[i](e);
}
_flexBufferNewValue.EndVector(vectorStart, false, false);
var keyBytes = _flexBufferNewValue.Finish();
key = new RowEvent(e.Weight, 0, new CompactRowData(keyBytes));
key = new RowEvent(e.Weight, 0, new ArrayRowData(newVector));

// Store the key in the temporary tree
await _temporaryTree.RMW(key.Value, default, (_, current, exist) =>
Expand Down

0 comments on commit 9f7d7f3

Please sign in to comment.