Skip to content

Commit

Permalink
Fix performance for merge join during left join (#239)
Browse files Browse the repository at this point in the history
* Fix performance for merge join during left join

The dictionary left join weight caused long lookup times, changed to a list.

* remove debug write
  • Loading branch information
Ulimo authored Dec 27, 2023
1 parent 3e26d96 commit 5156349
Show file tree
Hide file tree
Showing 10 changed files with 122 additions and 24 deletions.
5 changes: 5 additions & 0 deletions src/FlowtideDotNet.Core/CompactRowData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public CompactRowData(Memory<byte> memory)

public int Length => _vector.Length;

internal string ToJson()
{
return _vector.ToJson;
}

public FlxValue GetColumn(int index)
{
return _vector.Get(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ internal class MergeJoinOperatorBase : MultipleInputVertex<StreamEventBatch, Joi
protected readonly MergeJoinRelation mergeJoinRelation;
protected IBPlusTree<JoinStreamEvent, JoinStorageValue>? _leftTree;
protected IBPlusTree<JoinStreamEvent, JoinStorageValue>? _rightTree;
private readonly Dictionary<JoinStreamEvent, int> leftJoinWeight = new Dictionary<JoinStreamEvent, int>();
private readonly List<KeyValuePair<JoinStreamEvent, int>> leftJoinWeight = new List<KeyValuePair<JoinStreamEvent, int>>();
private ICounter<long>? _eventsCounter;

protected readonly Func<JoinStreamEvent, JoinStreamEvent, bool> _keyCondition;
Expand Down Expand Up @@ -126,7 +126,7 @@ protected async IAsyncEnumerable<StreamEventBatch> OnRecieveLeft(StreamEventBatc
foreach (var e in msg.Events)
{
#if DEBUG_WRITE
leftInput.WriteLine($"{e.Weight} {e.Vector.ToJson}");
leftInput.WriteLine($"{e.Weight} {e.ToJson()}");
#endif

var joinEventCheck = new JoinStreamEvent(0, 1, e.RowData);
Expand Down Expand Up @@ -222,7 +222,7 @@ protected async IAsyncEnumerable<StreamEventBatch> OnRecieveLeft(StreamEventBatc
#if DEBUG_WRITE
foreach(var o in output)
{
outputWriter.WriteLine($"{o.Weight} {o.Vector.ToJson}");
outputWriter.WriteLine($"{o.Weight} {o.ToJson()}");
}
#endif
yield return new StreamEventBatch(output);
Expand All @@ -246,7 +246,7 @@ protected async IAsyncEnumerable<StreamEventBatch> OnRecieveRight(StreamEventBat
foreach (var e in msg.Events)
{
#if DEBUG_WRITE
rightInput.WriteLine($"{e.Weight} {e.Vector.ToJson}");
rightInput.WriteLine($"{e.Weight} {e.ToJson()}");
#endif
var joinEventCheck = new JoinStreamEvent(0, 1, e.RowData);

Expand All @@ -266,15 +266,7 @@ protected async IAsyncEnumerable<StreamEventBatch> OnRecieveRight(StreamEventBat

if (mergeJoinRelation.Type == JoinType.Left)
{
// If it is a left join, we need to always check the new weight of a row
if (leftJoinWeight.TryGetValue(kv.Key, out var currentWeight))
{
leftJoinWeight[kv.Key] = currentWeight + outputWeight;
}
else
{
leftJoinWeight.Add(kv.Key, outputWeight);
}
leftJoinWeight.Add(new KeyValuePair<JoinStreamEvent, int>(kv.Key, outputWeight));
}

if (output.Count > 100)
Expand Down Expand Up @@ -360,7 +352,7 @@ protected async IAsyncEnumerable<StreamEventBatch> OnRecieveRight(StreamEventBat
#if DEBUG_WRITE
foreach (var o in output)
{
outputWriter.WriteLine($"{o.Weight} {o.Vector.ToJson}");
outputWriter.WriteLine($"{o.Weight} {o.ToJson()}");
}
#endif
yield return new StreamEventBatch(output);
Expand All @@ -377,7 +369,7 @@ public override IAsyncEnumerable<StreamEventBatch> OnRecieve(int targetId, Strea
allInput.WriteLine("New batch");
foreach (var e in msg.Events)
{
allInput.WriteLine($"{targetId}, {e.Weight} {e.Vector.ToJson}");
allInput.WriteLine($"{targetId}, {e.Weight} {e.ToJson()}");
}
allInput.Flush();
#endif
Expand All @@ -395,17 +387,24 @@ public override IAsyncEnumerable<StreamEventBatch> OnRecieve(int targetId, Strea
protected override async Task InitializeOrRestore(JoinState? state, IStateManagerClient stateManagerClient)
{
#if DEBUG_WRITE
allInput = File.CreateText($"{Name}.all.txt");
leftInput = File.CreateText($"{Name}.left.txt");
rightInput = File.CreateText($"{Name}.right.txt");
outputWriter = File.CreateText($"{Name}.output.txt");
if (allInput != null)
{
allInput.WriteLine("Restart");
}
else
{
allInput = File.CreateText($"{StreamName}-{Name}.all.txt");
leftInput = File.CreateText($"{StreamName}-{Name}.left.txt");
rightInput = File.CreateText($"{StreamName}-{Name}.right.txt");
outputWriter = File.CreateText($"{StreamName}-{Name}.output.txt");
}
#endif
Logger.LogInformation("Initializing merge join operator.");
if(_eventsCounter == null)
{
_eventsCounter = Metrics.CreateCounter<long>("events");
}

leftJoinWeight.Clear();
_flexBuffer.Clear();
if (state == null)
{
Expand Down
11 changes: 11 additions & 0 deletions src/FlowtideDotNet.Core/RowEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,17 @@ public FlxValueRef GetColumnRef(in int index)
return _rowData.GetColumnRef(index);
}

internal string ToJson()
{
var c = Compact(new FlexBuffer(ArrayPool<byte>.Shared));

if (c._rowData is CompactRowData cc)
{
return cc.ToJson();
}
return string.Empty;
}

/// <summary>
/// Returns a compact row event, is useful after many joins to reduce the recursive depth
/// to locate values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ public void AddOrUpdateUser(User user)
flowtideTestStream.AddOrUpdateUser(user);
}

public void DeleteUser(User user)
{
flowtideTestStream.DeleteUser(user);
}

public FlowtideAcceptanceBase(ITestOutputHelper testOutputHelper)
{
var baseType = this.GetType();
Expand Down
10 changes: 10 additions & 0 deletions tests/FlowtideDotNet.AcceptanceTests/Internal/DatasetGenerator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ public void AddOrUpdateUser(User user)
mockTable.AddOrUpdate(new List<User>() { user });
}

public void DeleteUser(User user)
{
var index = Users.FindIndex(x => x.UserKey == user.UserKey);

Users.RemoveAt(index);

var mockTable = mockDatabase.GetOrCreateTable<User>("users");
mockTable.Delete(new List<User>() { user });
}

private void GenerateUsers(int count)
{
string?[] nullableStrings = new string?[] { null, "value" };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ public class FlowtideTestStream : IAsyncDisposable

public FlowtideTestStream(string testName)
{
var streamName = testName.Replace("/", "_");
_db = new Internal.MockDatabase();
generator = new DatasetGenerator(_db);
sqlPlanBuilder = new SqlPlanBuilder();
sqlPlanBuilder.AddTableProvider(new DatasetTableProvider(_db));
flowtideBuilder = new FlowtideBuilder("stream")
flowtideBuilder = new FlowtideBuilder(streamName)
.WithLoggerFactory(new LoggerFactory(new List<ILoggerProvider>() { new DebugLoggerProvider() }));
this.testName = testName;
}
Expand All @@ -74,6 +75,11 @@ public void AddOrUpdateUser(User user)
generator.AddOrUpdateUser(user);
}

public void DeleteUser(User user)
{
generator.DeleteUser(user);
}

public async Task StartStream(string sql, int parallelism = 1, StateSerializeOptions? stateSerializeOptions = default, TimeSpan? timestampInterval = default)
{
if (stateSerializeOptions == null)
Expand Down
16 changes: 14 additions & 2 deletions tests/FlowtideDotNet.AcceptanceTests/Internal/MockDataSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// limitations under the License.

using FlexBuffers;
using FlowtideDotNet.Base;
using FlowtideDotNet.Core;
using FlowtideDotNet.Core.Operators.Set;
using FlowtideDotNet.Core.Operators.Write;
Expand All @@ -37,7 +38,7 @@ internal class MockDataSink : WriteBaseOperator<MockDataSinkState>
private readonly Action<List<byte[]>> onDataChange;
private int crashOnCheckpointCount;
private SortedDictionary<RowEvent, int> currentData;

private bool watermarkRecieved = false;
public MockDataSink(
ExecutionDataflowBlockOptions executionDataflowBlockOptions,
Action<List<byte[]>> onDataChange,
Expand Down Expand Up @@ -66,6 +67,12 @@ protected override Task InitializeOrRestore(long restoreTime, MockDataSinkState?
return Task.CompletedTask;
}

protected override Task OnWatermark(Watermark watermark)
{
watermarkRecieved = true;
return base.OnWatermark(watermark);
}

protected override Task<MockDataSinkState> OnCheckpoint(long checkpointTime)
{
if (crashOnCheckpointCount > 0)
Expand All @@ -90,7 +97,12 @@ protected override Task<MockDataSinkState> OnCheckpoint(long checkpointTime)
}

//var actualData = currentData.Where(x => x.Value > 0).Select(x => x.Key.Memory.ToArray()).ToList();
onDataChange(output);
if (watermarkRecieved)
{
onDataChange(output);
watermarkRecieved = false;
}

return Task.FromResult(new MockDataSinkState());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ protected override async Task SendInitial(IngressOutput<StreamEventBatch> output
await output.SendWatermark(new Base.Watermark(readRelation.NamedTable.DotSeperated, fetchedOffset));
output.ExitCheckpointLock();
await this.RegisterTrigger("changes", TimeSpan.FromMilliseconds(50));
this.ScheduleCheckpoint(TimeSpan.FromMilliseconds(1));
this.ScheduleCheckpoint(TimeSpan.FromMilliseconds(100));
}
}
}
8 changes: 8 additions & 0 deletions tests/FlowtideDotNet.AcceptanceTests/Internal/MockTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ public void AddOrUpdate<T>(IEnumerable<T> rows)
}
}

public void Delete<T>(IEnumerable<T> rows)
{
foreach(var row in rows)
{
_changes.Add(new RowOperation(row, true));
}
}

//public void AddOrUpdate(IEnumerable<byte[]> rows)
//{
// lock (_lock)
Expand Down
42 changes: 42 additions & 0 deletions tests/FlowtideDotNet.AcceptanceTests/JoinTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,48 @@ from subuser in gj.DefaultIfEmpty()
});
}

[Fact]
public async Task LeftJoinMergeJoinWithUpdate()
{
GenerateData(100);
await StartStream(@"
INSERT INTO output
SELECT
o.orderkey, u.firstName, u.LastName
FROM orders o
LEFT JOIN users u
ON o.userkey = u.userkey");
await WaitForUpdate();

AssertCurrentDataEqual(
from order in Orders
join user in Users on order.UserKey equals user.UserKey into gj
from subuser in gj.DefaultIfEmpty()
select new
{
order.OrderKey,
subuser.FirstName,
subuser.LastName
});

var uKey = Orders.First().UserKey;
var firstUser = Users.First(x => x.UserKey == uKey);
DeleteUser(firstUser);

await WaitForUpdate();

AssertCurrentDataEqual(
from order in Orders
join user in Users on order.UserKey equals user.UserKey into gj
from subuser in gj.DefaultIfEmpty()
select new
{
order.OrderKey,
subuser?.FirstName,
subuser?.LastName
});
}

[Fact]
public async Task LeftJoinMergeJoinNullCondition()
{
Expand Down

0 comments on commit 5156349

Please sign in to comment.