Skip to content

Commit

Permalink
push emit list through buffer (#626)
Browse files Browse the repository at this point in the history
* push emit list through buffer

* Fix emit output

* Add test case
  • Loading branch information
Ulimo authored Nov 29, 2024
1 parent 9d18257 commit 89b480f
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -399,9 +399,41 @@ public override Relation VisitJoinRelation(JoinRelation joinRelation, object sta
List<int> leftEmit = new List<int>();
List<int> rightEmit = new List<int>();

Dictionary<int, int> leftEmitToInternal = new Dictionary<int, int>();
if (joinRelation.Left.EmitSet)
{
for (int i = 0; i < joinRelation.Left.Emit.Count; i++)
{
leftEmitToInternal.Add(i, joinRelation.Left.Emit[i]);
}
}
else
{
for (int i = 0; i < joinRelation.Left.OutputLength; i++)
{
leftEmitToInternal.Add(i, i);
}
}

Dictionary<int, int> rightEmitToInternal = new Dictionary<int, int>();
if (joinRelation.Right.EmitSet)
{
for (int i = 0; i < joinRelation.Right.Emit.Count; i++)
{
rightEmitToInternal.Add(i, joinRelation.Right.Emit[i]);
}
}
else
{
for (int i = 0; i < joinRelation.Right.OutputLength; i++)
{
rightEmitToInternal.Add(i, i);
}
}

foreach (var field in leftUsage)
{
leftEmit.Add(field);
leftEmit.Add(leftEmitToInternal[field]);
oldToNew.Add(field, replacementCounter);
replacementCounter += 1;
}
Expand All @@ -410,10 +442,11 @@ public override Relation VisitJoinRelation(JoinRelation joinRelation, object sta
{
var rightIndex = field - joinRelation.Left.OutputLength;

rightEmit.Add(rightIndex);
rightEmit.Add(rightEmitToInternal[rightIndex]);
oldToNew.Add(field, replacementCounter);
replacementCounter += 1;
}

if (leftEmit.Count < joinRelation.Left.OutputLength)
{
joinRelation.Left.Emit = leftEmit;
Expand Down Expand Up @@ -512,6 +545,7 @@ public override Relation VisitMergeJoinRelation(MergeJoinRelation mergeJoinRelat
}
}


leftUsage = leftUsage.Distinct().OrderBy(x => x).ToList();
rightUsage = rightUsage.Distinct().OrderBy(x => x).ToList();

Expand Down Expand Up @@ -696,5 +730,33 @@ public override Relation VisitFilterRelation(FilterRelation filterRelation, obje
}
return base.VisitFilterRelation(filterRelation, state);
}

public override Relation VisitBufferRelation(BufferRelation bufferRelation, object state)
{
if (bufferRelation.Input is ReferenceRelation referenceRelation)
{
return bufferRelation;
}
if (bufferRelation.Input is IterationReferenceReadRelation)
{
return bufferRelation;
}
if (bufferRelation.Input is IterationRelation)
{
return bufferRelation;
}
if (bufferRelation.EmitSet)
{
List<int> emitList = new List<int>();
for (int i = 0; i < bufferRelation.Emit.Count; i++)
{
emitList.Add(i);
}
bufferRelation.Input.Emit = bufferRelation.Emit;
bufferRelation.Emit = emitList;
}

return base.VisitBufferRelation(bufferRelation, state);
}
}
}
3 changes: 2 additions & 1 deletion src/FlowtideDotNet.Core/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@
[assembly: InternalsVisibleTo("SqlSampleWithUI")]
// added only for the purpose of debugging
[assembly: InternalsVisibleTo("FlowtideDotNet.StateDiagnostics")]
[assembly: InternalsVisibleTo("FlowtideDotNet.ComputeTests")]
[assembly: InternalsVisibleTo("FlowtideDotNet.ComputeTests")]
[assembly: InternalsVisibleTo("FlowtideDotNet.AcceptanceTests")]
27 changes: 27 additions & 0 deletions tests/FlowtideDotNet.AcceptanceTests/DatetimeFunctionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,33 @@ FROM Orders
);
}

/// <summary>
/// Special case test when gettimestamp creates buffer operator before the join and the join is followed by a filter
/// </summary>
/// <returns></returns>
[Fact]
public async Task GetTimestampInFilterJoinFilterPushedInfront()
{
GenerateData();
await StartStream(@"
INSERT INTO output
SELECT
orderkey, orderdate < gettimestamp() as active
FROM Orders o
JOIN users u2 ON o.userkey = u2.userkey
JOIN users u ON o.userkey = u.userkey AND o.userkey = u2.userkey
where orderdate < gettimestamp()
");
await WaitForUpdate();

AssertCurrentDataEqual(
Orders.Where(x => x.Orderdate < DateTime.UtcNow).Select(o => new {
o.OrderKey,
Active = o.Orderdate < DateTime.UtcNow
})
);
}

[Fact]
public async Task GetTimestampInBufferedView()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
using FlowtideDotNet.Core.Optimizer;
using FlowtideDotNet.Substrait.Relations;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading.Tasks;

namespace FlowtideDotNet.AcceptanceTests.Internal
{
class EmitLengthValidatorVisitor : OptimizerBaseVisitor
{
public override Relation VisitAggregateRelation(AggregateRelation aggregateRelation, object state)
{
if (aggregateRelation.EmitSet && aggregateRelation.Input.EmitSet &&
aggregateRelation.Emit.Count > (aggregateRelation.Input.Emit.Count + aggregateRelation.Measures?.Count ?? 0))
{
Assert.Fail();
}
return base.VisitAggregateRelation(aggregateRelation, state);
}

public override Relation VisitBufferRelation(BufferRelation bufferRelation, object state)
{
if (bufferRelation.EmitSet && bufferRelation.Input.EmitSet &&
bufferRelation.Emit.Count > bufferRelation.Input.Emit.Count)
{
Assert.Fail();
}
return base.VisitBufferRelation(bufferRelation, state);
}

public override Relation VisitFilterRelation(FilterRelation filterRelation, object state)
{
if (filterRelation.EmitSet && filterRelation.Input.EmitSet &&
filterRelation.Emit.Count > filterRelation.Input.Emit.Count)
{
Assert.Fail();
}
return base.VisitFilterRelation(filterRelation, state);
}

public override Relation VisitJoinRelation(JoinRelation joinRelation, object state)
{
if (joinRelation.OutputLength > joinRelation.Left.OutputLength + joinRelation.Right.OutputLength)
{
Assert.Fail($"Join output length {joinRelation.OutputLength} bigger than input left: {joinRelation.Left.OutputLength} right {joinRelation.Right.OutputLength}");
}
return base.VisitJoinRelation(joinRelation, state);
}

public override Relation VisitMergeJoinRelation(MergeJoinRelation mergeJoinRelation, object state)
{
if (mergeJoinRelation.OutputLength > mergeJoinRelation.Left.OutputLength + mergeJoinRelation.Right.OutputLength)
{
Assert.Fail();
}
return base.VisitMergeJoinRelation(mergeJoinRelation, state);
}

public override Relation VisitNormalizationRelation(NormalizationRelation normalizationRelation, object state)
{
if (normalizationRelation.OutputLength > normalizationRelation.Input.OutputLength)
{
Assert.Fail();
}
return base.VisitNormalizationRelation(normalizationRelation, state);
}

public override Relation VisitProjectRelation(ProjectRelation projectRelation, object state)
{
if (projectRelation.OutputLength > (projectRelation.Input.OutputLength + projectRelation.Expressions.Count))
{
if (projectRelation.EmitSet)
{
if (projectRelation.Emit.Distinct().Count() > (projectRelation.OutputLength + projectRelation.Expressions.Count))
{
Assert.Fail();
}
}
else
{
Assert.Fail();
}
}
return base.VisitProjectRelation(projectRelation, state);
}

public override Relation VisitSetRelation(SetRelation setRelation, object state)
{
if (setRelation.Inputs.Any(x => setRelation.OutputLength > x.OutputLength))
{
Assert.Fail();
}
return base.VisitSetRelation(setRelation, state);
}

public override Relation VisitSortRelation(SortRelation sortRelation, object state)
{
if (sortRelation.OutputLength > sortRelation.Input.OutputLength)
{
Assert.Fail();
}
return base.VisitSortRelation(sortRelation, state);
}

public override Relation VisitTableFunctionRelation(TableFunctionRelation tableFunctionRelation, object state)
{
if (tableFunctionRelation.OutputLength > tableFunctionRelation.Input?.OutputLength)
{
Assert.Fail();
}
return base.VisitTableFunctionRelation(tableFunctionRelation, state);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,16 @@ public async Task StartStream(
_persistentStorage = CreatePersistentStorage(testName, ignoreSameDataCheck);
_notificationReciever = new NotificationReciever(CheckpointComplete);

plan = Core.Optimizer.PlanOptimizer.Optimize(plan);

var emitValidationVisitor = new EmitLengthValidatorVisitor();
foreach (var relation in plan.Relations)
{
emitValidationVisitor.Visit(relation, default!);
}

flowtideBuilder
.AddPlan(plan)
.AddPlan(plan, false)
.SetParallelism(parallelism)
#if DEBUG_WRITE
.WithLoggerFactory(loggerFactory)
Expand Down

1 comment on commit 89b480f

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 89b480f Previous: 7db1083 Ratio
FlowtideDotNet.Benchmarks.Stream.StreamBenchmark.InnerJoin 561913660 ns (± 8885168.59868799) 588638700 ns (± 9231369.002121696) 0.95
FlowtideDotNet.Benchmarks.Stream.StreamBenchmark.LeftJoin 633086910 ns (± 28179291.329566035) 632393477.7777778 ns (± 23156158.76418074) 1.00
FlowtideDotNet.Benchmarks.Stream.StreamBenchmark.ProjectionAndNormalization 206965510 ns (± 11974743.987005875) 171268600 ns (± 6644227.006958748) 1.21
FlowtideDotNet.Benchmarks.Stream.StreamBenchmark.SumAggregation 220732430 ns (± 15248542.067999667) 190822830 ns (± 6415026.852121856) 1.16
FlowtideDotNet.Benchmarks.Stream.StreamBenchmark.ListAggWithMapAggregation 2355444120 ns (± 117635795.91867246) 2656394200 ns (± 136074715.58524996) 0.89

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.