From 9f7d7f3a3b80b79f776fd29b59ff3fc070ab758c Mon Sep 17 00:00:00 2001 From: Ulimo Date: Sat, 13 Jan 2024 21:38:19 +0100 Subject: [PATCH] Change aggregate operator to use array row data for keys (#303) --- .../Operators/Aggregate/AggregateOperator.cs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/FlowtideDotNet.Core/Operators/Aggregate/AggregateOperator.cs b/src/FlowtideDotNet.Core/Operators/Aggregate/AggregateOperator.cs index 1fd7abbcc..8e6f9eda6 100644 --- a/src/FlowtideDotNet.Core/Operators/Aggregate/AggregateOperator.cs +++ b/src/FlowtideDotNet.Core/Operators/Aggregate/AggregateOperator.cs @@ -274,16 +274,12 @@ public override async IAsyncEnumerable OnRecieve(StreamEventBa RowEvent? key = default; if (groupExpressions != null) { - _flexBufferNewValue.NewObject(); - var vectorStart = _flexBufferNewValue.StartVector(); - foreach (var groupExpr in groupExpressions) + FlxValue[] newVector = new FlxValue[groupExpressions.Count]; + for (int i = 0; i < groupExpressions.Count; i++) { - var result = groupExpr(e); - _flexBufferNewValue.Add(result); + newVector[i] = groupExpressions[i](e); } - _flexBufferNewValue.EndVector(vectorStart, false, false); - var keyBytes = _flexBufferNewValue.Finish(); - key = new RowEvent(e.Weight, 0, new CompactRowData(keyBytes)); + key = new RowEvent(e.Weight, 0, new ArrayRowData(newVector)); // Store the key in the temporary tree await _temporaryTree.RMW(key.Value, default, (_, current, exist) =>