From 5156349f2346d5f03aebf515d88a580d0b30de27 Mon Sep 17 00:00:00 2001 From: Ulimo Date: Wed, 27 Dec 2023 12:06:42 +0100 Subject: [PATCH] Fix performance for merge join during left join (#239) * Fix performance for merge join during left join The dictionary left join weight caused long lookup times, changed to a list. * remove debug write --- src/FlowtideDotNet.Core/CompactRowData.cs | 5 +++ .../Join/MergeJoin/MergeJoinOperatorBase.cs | 39 +++++++++-------- src/FlowtideDotNet.Core/RowEvent.cs | 11 +++++ .../FlowtideAcceptanceBase.cs | 5 +++ .../Internal/DatasetGenerator.cs | 10 +++++ .../Internal/FlowtideTestStream.cs | 8 +++- .../Internal/MockDataSink.cs | 16 ++++++- .../Internal/MockDataSourceOperator.cs | 2 +- .../Internal/MockTable.cs | 8 ++++ .../JoinTests.cs | 42 +++++++++++++++++++ 10 files changed, 122 insertions(+), 24 deletions(-) diff --git a/src/FlowtideDotNet.Core/CompactRowData.cs b/src/FlowtideDotNet.Core/CompactRowData.cs index 822285efd..6ba78546c 100644 --- a/src/FlowtideDotNet.Core/CompactRowData.cs +++ b/src/FlowtideDotNet.Core/CompactRowData.cs @@ -42,6 +42,11 @@ public CompactRowData(Memory memory) public int Length => _vector.Length; + internal string ToJson() + { + return _vector.ToJson; + } + public FlxValue GetColumn(int index) { return _vector.Get(index); diff --git a/src/FlowtideDotNet.Core/Operators/Join/MergeJoin/MergeJoinOperatorBase.cs b/src/FlowtideDotNet.Core/Operators/Join/MergeJoin/MergeJoinOperatorBase.cs index 87a9d76ec..450f8250d 100644 --- a/src/FlowtideDotNet.Core/Operators/Join/MergeJoin/MergeJoinOperatorBase.cs +++ b/src/FlowtideDotNet.Core/Operators/Join/MergeJoin/MergeJoinOperatorBase.cs @@ -33,7 +33,7 @@ internal class MergeJoinOperatorBase : MultipleInputVertex? _leftTree; protected IBPlusTree? _rightTree; - private readonly Dictionary leftJoinWeight = new Dictionary(); + private readonly List> leftJoinWeight = new List>(); private ICounter? _eventsCounter; protected readonly Func _keyCondition; @@ -126,7 +126,7 @@ protected async IAsyncEnumerable OnRecieveLeft(StreamEventBatc foreach (var e in msg.Events) { #if DEBUG_WRITE - leftInput.WriteLine($"{e.Weight} {e.Vector.ToJson}"); + leftInput.WriteLine($"{e.Weight} {e.ToJson()}"); #endif var joinEventCheck = new JoinStreamEvent(0, 1, e.RowData); @@ -222,7 +222,7 @@ protected async IAsyncEnumerable OnRecieveLeft(StreamEventBatc #if DEBUG_WRITE foreach(var o in output) { - outputWriter.WriteLine($"{o.Weight} {o.Vector.ToJson}"); + outputWriter.WriteLine($"{o.Weight} {o.ToJson()}"); } #endif yield return new StreamEventBatch(output); @@ -246,7 +246,7 @@ protected async IAsyncEnumerable OnRecieveRight(StreamEventBat foreach (var e in msg.Events) { #if DEBUG_WRITE - rightInput.WriteLine($"{e.Weight} {e.Vector.ToJson}"); + rightInput.WriteLine($"{e.Weight} {e.ToJson()}"); #endif var joinEventCheck = new JoinStreamEvent(0, 1, e.RowData); @@ -266,15 +266,7 @@ protected async IAsyncEnumerable OnRecieveRight(StreamEventBat if (mergeJoinRelation.Type == JoinType.Left) { - // If it is a left join, we need to always check the new weight of a row - if (leftJoinWeight.TryGetValue(kv.Key, out var currentWeight)) - { - leftJoinWeight[kv.Key] = currentWeight + outputWeight; - } - else - { - leftJoinWeight.Add(kv.Key, outputWeight); - } + leftJoinWeight.Add(new KeyValuePair(kv.Key, outputWeight)); } if (output.Count > 100) @@ -360,7 +352,7 @@ protected async IAsyncEnumerable OnRecieveRight(StreamEventBat #if DEBUG_WRITE foreach (var o in output) { - outputWriter.WriteLine($"{o.Weight} {o.Vector.ToJson}"); + outputWriter.WriteLine($"{o.Weight} {o.ToJson()}"); } #endif yield return new StreamEventBatch(output); @@ -377,7 +369,7 @@ public override IAsyncEnumerable OnRecieve(int targetId, Strea allInput.WriteLine("New batch"); foreach (var e in msg.Events) { - allInput.WriteLine($"{targetId}, {e.Weight} {e.Vector.ToJson}"); + allInput.WriteLine($"{targetId}, {e.Weight} {e.ToJson()}"); } allInput.Flush(); #endif @@ -395,17 +387,24 @@ public override IAsyncEnumerable OnRecieve(int targetId, Strea protected override async Task InitializeOrRestore(JoinState? state, IStateManagerClient stateManagerClient) { #if DEBUG_WRITE - allInput = File.CreateText($"{Name}.all.txt"); - leftInput = File.CreateText($"{Name}.left.txt"); - rightInput = File.CreateText($"{Name}.right.txt"); - outputWriter = File.CreateText($"{Name}.output.txt"); + if (allInput != null) + { + allInput.WriteLine("Restart"); + } + else + { + allInput = File.CreateText($"{StreamName}-{Name}.all.txt"); + leftInput = File.CreateText($"{StreamName}-{Name}.left.txt"); + rightInput = File.CreateText($"{StreamName}-{Name}.right.txt"); + outputWriter = File.CreateText($"{StreamName}-{Name}.output.txt"); + } #endif Logger.LogInformation("Initializing merge join operator."); if(_eventsCounter == null) { _eventsCounter = Metrics.CreateCounter("events"); } - + leftJoinWeight.Clear(); _flexBuffer.Clear(); if (state == null) { diff --git a/src/FlowtideDotNet.Core/RowEvent.cs b/src/FlowtideDotNet.Core/RowEvent.cs index 7df3f7420..ff0764307 100644 --- a/src/FlowtideDotNet.Core/RowEvent.cs +++ b/src/FlowtideDotNet.Core/RowEvent.cs @@ -72,6 +72,17 @@ public FlxValueRef GetColumnRef(in int index) return _rowData.GetColumnRef(index); } + internal string ToJson() + { + var c = Compact(new FlexBuffer(ArrayPool.Shared)); + + if (c._rowData is CompactRowData cc) + { + return cc.ToJson(); + } + return string.Empty; + } + /// /// Returns a compact row event, is useful after many joins to reduce the recursive depth /// to locate values diff --git a/tests/FlowtideDotNet.AcceptanceTests/FlowtideAcceptanceBase.cs b/tests/FlowtideDotNet.AcceptanceTests/FlowtideAcceptanceBase.cs index ef5c891d1..3ec1e259a 100644 --- a/tests/FlowtideDotNet.AcceptanceTests/FlowtideAcceptanceBase.cs +++ b/tests/FlowtideDotNet.AcceptanceTests/FlowtideAcceptanceBase.cs @@ -71,6 +71,11 @@ public void AddOrUpdateUser(User user) flowtideTestStream.AddOrUpdateUser(user); } + public void DeleteUser(User user) + { + flowtideTestStream.DeleteUser(user); + } + public FlowtideAcceptanceBase(ITestOutputHelper testOutputHelper) { var baseType = this.GetType(); diff --git a/tests/FlowtideDotNet.AcceptanceTests/Internal/DatasetGenerator.cs b/tests/FlowtideDotNet.AcceptanceTests/Internal/DatasetGenerator.cs index fc052d4cc..4fc554187 100644 --- a/tests/FlowtideDotNet.AcceptanceTests/Internal/DatasetGenerator.cs +++ b/tests/FlowtideDotNet.AcceptanceTests/Internal/DatasetGenerator.cs @@ -60,6 +60,16 @@ public void AddOrUpdateUser(User user) mockTable.AddOrUpdate(new List() { user }); } + public void DeleteUser(User user) + { + var index = Users.FindIndex(x => x.UserKey == user.UserKey); + + Users.RemoveAt(index); + + var mockTable = mockDatabase.GetOrCreateTable("users"); + mockTable.Delete(new List() { user }); + } + private void GenerateUsers(int count) { string?[] nullableStrings = new string?[] { null, "value" }; diff --git a/tests/FlowtideDotNet.AcceptanceTests/Internal/FlowtideTestStream.cs b/tests/FlowtideDotNet.AcceptanceTests/Internal/FlowtideTestStream.cs index 6f49c0c17..38ff8e641 100644 --- a/tests/FlowtideDotNet.AcceptanceTests/Internal/FlowtideTestStream.cs +++ b/tests/FlowtideDotNet.AcceptanceTests/Internal/FlowtideTestStream.cs @@ -55,11 +55,12 @@ public class FlowtideTestStream : IAsyncDisposable public FlowtideTestStream(string testName) { + var streamName = testName.Replace("/", "_"); _db = new Internal.MockDatabase(); generator = new DatasetGenerator(_db); sqlPlanBuilder = new SqlPlanBuilder(); sqlPlanBuilder.AddTableProvider(new DatasetTableProvider(_db)); - flowtideBuilder = new FlowtideBuilder("stream") + flowtideBuilder = new FlowtideBuilder(streamName) .WithLoggerFactory(new LoggerFactory(new List() { new DebugLoggerProvider() })); this.testName = testName; } @@ -74,6 +75,11 @@ public void AddOrUpdateUser(User user) generator.AddOrUpdateUser(user); } + public void DeleteUser(User user) + { + generator.DeleteUser(user); + } + public async Task StartStream(string sql, int parallelism = 1, StateSerializeOptions? stateSerializeOptions = default, TimeSpan? timestampInterval = default) { if (stateSerializeOptions == null) diff --git a/tests/FlowtideDotNet.AcceptanceTests/Internal/MockDataSink.cs b/tests/FlowtideDotNet.AcceptanceTests/Internal/MockDataSink.cs index 2feffde58..40292ffbf 100644 --- a/tests/FlowtideDotNet.AcceptanceTests/Internal/MockDataSink.cs +++ b/tests/FlowtideDotNet.AcceptanceTests/Internal/MockDataSink.cs @@ -11,6 +11,7 @@ // limitations under the License. using FlexBuffers; +using FlowtideDotNet.Base; using FlowtideDotNet.Core; using FlowtideDotNet.Core.Operators.Set; using FlowtideDotNet.Core.Operators.Write; @@ -37,7 +38,7 @@ internal class MockDataSink : WriteBaseOperator private readonly Action> onDataChange; private int crashOnCheckpointCount; private SortedDictionary currentData; - + private bool watermarkRecieved = false; public MockDataSink( ExecutionDataflowBlockOptions executionDataflowBlockOptions, Action> onDataChange, @@ -66,6 +67,12 @@ protected override Task InitializeOrRestore(long restoreTime, MockDataSinkState? return Task.CompletedTask; } + protected override Task OnWatermark(Watermark watermark) + { + watermarkRecieved = true; + return base.OnWatermark(watermark); + } + protected override Task OnCheckpoint(long checkpointTime) { if (crashOnCheckpointCount > 0) @@ -90,7 +97,12 @@ protected override Task OnCheckpoint(long checkpointTime) } //var actualData = currentData.Where(x => x.Value > 0).Select(x => x.Key.Memory.ToArray()).ToList(); - onDataChange(output); + if (watermarkRecieved) + { + onDataChange(output); + watermarkRecieved = false; + } + return Task.FromResult(new MockDataSinkState()); } diff --git a/tests/FlowtideDotNet.AcceptanceTests/Internal/MockDataSourceOperator.cs b/tests/FlowtideDotNet.AcceptanceTests/Internal/MockDataSourceOperator.cs index d40a1132f..c736dbbce 100644 --- a/tests/FlowtideDotNet.AcceptanceTests/Internal/MockDataSourceOperator.cs +++ b/tests/FlowtideDotNet.AcceptanceTests/Internal/MockDataSourceOperator.cs @@ -148,7 +148,7 @@ protected override async Task SendInitial(IngressOutput output await output.SendWatermark(new Base.Watermark(readRelation.NamedTable.DotSeperated, fetchedOffset)); output.ExitCheckpointLock(); await this.RegisterTrigger("changes", TimeSpan.FromMilliseconds(50)); - this.ScheduleCheckpoint(TimeSpan.FromMilliseconds(1)); + this.ScheduleCheckpoint(TimeSpan.FromMilliseconds(100)); } } } diff --git a/tests/FlowtideDotNet.AcceptanceTests/Internal/MockTable.cs b/tests/FlowtideDotNet.AcceptanceTests/Internal/MockTable.cs index e9a823f96..5378c0477 100644 --- a/tests/FlowtideDotNet.AcceptanceTests/Internal/MockTable.cs +++ b/tests/FlowtideDotNet.AcceptanceTests/Internal/MockTable.cs @@ -53,6 +53,14 @@ public void AddOrUpdate(IEnumerable rows) } } + public void Delete(IEnumerable rows) + { + foreach(var row in rows) + { + _changes.Add(new RowOperation(row, true)); + } + } + //public void AddOrUpdate(IEnumerable rows) //{ // lock (_lock) diff --git a/tests/FlowtideDotNet.AcceptanceTests/JoinTests.cs b/tests/FlowtideDotNet.AcceptanceTests/JoinTests.cs index db1016b24..7cc4a2d74 100644 --- a/tests/FlowtideDotNet.AcceptanceTests/JoinTests.cs +++ b/tests/FlowtideDotNet.AcceptanceTests/JoinTests.cs @@ -82,6 +82,48 @@ from subuser in gj.DefaultIfEmpty() }); } + [Fact] + public async Task LeftJoinMergeJoinWithUpdate() + { + GenerateData(100); + await StartStream(@" + INSERT INTO output + SELECT + o.orderkey, u.firstName, u.LastName + FROM orders o + LEFT JOIN users u + ON o.userkey = u.userkey"); + await WaitForUpdate(); + + AssertCurrentDataEqual( + from order in Orders + join user in Users on order.UserKey equals user.UserKey into gj + from subuser in gj.DefaultIfEmpty() + select new + { + order.OrderKey, + subuser.FirstName, + subuser.LastName + }); + + var uKey = Orders.First().UserKey; + var firstUser = Users.First(x => x.UserKey == uKey); + DeleteUser(firstUser); + + await WaitForUpdate(); + + AssertCurrentDataEqual( + from order in Orders + join user in Users on order.UserKey equals user.UserKey into gj + from subuser in gj.DefaultIfEmpty() + select new + { + order.OrderKey, + subuser?.FirstName, + subuser?.LastName + }); + } + [Fact] public async Task LeftJoinMergeJoinNullCondition() {