diff --git a/src/FlowtideDotNet.Core/Optimizer/EmitPushdown/EmitPushdownVisitor.cs b/src/FlowtideDotNet.Core/Optimizer/EmitPushdown/EmitPushdownVisitor.cs index fe75d9315..2c86ec4d0 100644 --- a/src/FlowtideDotNet.Core/Optimizer/EmitPushdown/EmitPushdownVisitor.cs +++ b/src/FlowtideDotNet.Core/Optimizer/EmitPushdown/EmitPushdownVisitor.cs @@ -12,6 +12,7 @@ using FlowtideDotNet.Substrait.Relations; using System.ComponentModel.DataAnnotations; +using static SqlParser.Ast.DataType; namespace FlowtideDotNet.Core.Optimizer.EmitPushdown { @@ -498,9 +499,41 @@ public override Relation VisitMergeJoinRelation(MergeJoinRelation mergeJoinRelat List leftEmit = new List(); List rightEmit = new List(); + Dictionary leftEmitToInternal = new Dictionary(); + if (mergeJoinRelation.Left.EmitSet) + { + for (int i = 0; i < mergeJoinRelation.Left.Emit.Count; i++) + { + leftEmitToInternal.Add(i, mergeJoinRelation.Left.Emit[i]); + } + } + else + { + for (int i = 0; i < mergeJoinRelation.Left.OutputLength; i++) + { + leftEmitToInternal.Add(i, i); + } + } + + Dictionary rightEmitToInternal = new Dictionary(); + if (mergeJoinRelation.Right.EmitSet) + { + for (int i = 0; i < mergeJoinRelation.Right.Emit.Count; i++) + { + rightEmitToInternal.Add(i, mergeJoinRelation.Right.Emit[i]); + } + } + else + { + for (int i = 0; i < mergeJoinRelation.Right.OutputLength; i++) + { + rightEmitToInternal.Add(i, i); + } + } + foreach (var field in leftUsage) { - leftEmit.Add(field); + leftEmit.Add(leftEmitToInternal[field]); oldToNew.Add(field, replacementCounter); replacementCounter += 1; } @@ -509,7 +542,7 @@ public override Relation VisitMergeJoinRelation(MergeJoinRelation mergeJoinRelat { var rightIndex = field- mergeJoinRelation.Left.OutputLength; - rightEmit.Add(rightIndex); + rightEmit.Add(rightEmitToInternal[rightIndex]); oldToNew.Add(field, replacementCounter); replacementCounter += 1; } diff --git a/tests/FlowtideDotNet.AcceptanceTests/JoinTests.cs b/tests/FlowtideDotNet.AcceptanceTests/JoinTests.cs index cf86f25d6..091cf161d 100644 --- a/tests/FlowtideDotNet.AcceptanceTests/JoinTests.cs +++ b/tests/FlowtideDotNet.AcceptanceTests/JoinTests.cs @@ -207,5 +207,35 @@ from subcompany in gj.DefaultIfEmpty() companyName = subcompany?.Name ?? default(string) }); } + + /// + /// Special case for optimizer + /// + /// + [Fact] + public async Task JoinJoinWithPushdown() + { + GenerateData(100); + await StartStream(@" + INSERT INTO output + SELECT + u.userkey, c.name + FROM users u + LEFT JOIN companies c + ON trim(u.companyid) = trim(c.companyid) + LEFT JOIN companies c2 + ON u.companyid = c2.companyid"); + await WaitForUpdate(); + + AssertCurrentDataEqual( + from user in Users + join company in Companies on user.CompanyId equals company.CompanyId into gj + from subcompany in gj.DefaultIfEmpty() + select new + { + user.UserKey, + companyName = subcompany?.Name ?? default(string) + }); + } } }