Skip to content

Commit

Permalink
Fix code smells (#238)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ulimo authored Dec 27, 2023
1 parent b5f1e31 commit 3e26d96
Show file tree
Hide file tree
Showing 25 changed files with 58 additions and 132 deletions.
2 changes: 1 addition & 1 deletion samples/MonitoringAzureMonitor/DummyReadOperator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ protected override async Task SendInitial(IngressOutput<StreamEventBatch> output
}
}));
}
await output.SendAsync(new StreamEventBatch(null, o));
await output.SendAsync(new StreamEventBatch(o));
output.ExitCheckpointLock();
ScheduleCheckpoint(TimeSpan.FromSeconds(1));
}
Expand Down
2 changes: 1 addition & 1 deletion samples/MonitoringPrometheus/DummyReadOperator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ protected override async Task SendInitial(IngressOutput<StreamEventBatch> output
}
}));
}
await output.SendAsync(new StreamEventBatch(null, o));
await output.SendAsync(new StreamEventBatch(o));
output.ExitCheckpointLock();
ScheduleCheckpoint(TimeSpan.FromSeconds(1));
}
Expand Down
2 changes: 1 addition & 1 deletion samples/SqlSampleWithUI/DummyReadOperator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ protected override async Task SendInitial(IngressOutput<StreamEventBatch> output
}
}));
}
await output.SendAsync(new StreamEventBatch(null, o));
await output.SendAsync(new StreamEventBatch(o));
output.ExitCheckpointLock();
ScheduleCheckpoint(TimeSpan.FromSeconds(1));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ private async Task LoadChangesTask(IngressOutput<StreamEventBatch> output, objec
waitTimeMs = 1;
if (rows.Count > 100)
{
await output.SendAsync(new StreamEventBatch(null, rows));
await output.SendAsync(new StreamEventBatch(rows));
rows = new List<RowEvent>();
await SendWatermark(output);
}
Expand All @@ -166,7 +166,7 @@ private async Task LoadChangesTask(IngressOutput<StreamEventBatch> output, objec
{
if (rows.Count > 0)
{
await output.SendAsync(new StreamEventBatch(null, rows));
await output.SendAsync(new StreamEventBatch(rows));
rows = new List<RowEvent>();
await SendWatermark(output);
}
Expand Down Expand Up @@ -240,7 +240,7 @@ protected override async Task SendInitial(IngressOutput<StreamEventBatch> output

if (result == null || rows.Count >= 100)
{
await output.SendAsync(new StreamEventBatch(null, rows));
await output.SendAsync(new StreamEventBatch(rows));
rows = new List<RowEvent>();
// Check offsets
bool offsetsReached = true;
Expand Down Expand Up @@ -268,7 +268,7 @@ protected override async Task SendInitial(IngressOutput<StreamEventBatch> output

if (rows.Count > 0)
{
await output.SendAsync(new StreamEventBatch(null, rows));
await output.SendAsync(new StreamEventBatch(rows));
}

// Send watermark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private async Task FetchChanges(IngressOutput<StreamEventBatch> output, object?
{
_eventsCounter.Add(result.Count);
Logger.LogInformation("{changeCount} Changes found from table, {tableName}", result.Count, readRelation.NamedTable.DotSeperated);
await output.SendAsync(new StreamEventBatch(null, result));
await output.SendAsync(new StreamEventBatch(result));
await output.SendWatermark(new FlowtideDotNet.Base.Watermark(readRelation.NamedTable.DotSeperated, _state.ChangeTrackingVersion));
this.ScheduleCheckpoint(TimeSpan.FromSeconds(1));
}
Expand Down Expand Up @@ -272,14 +272,14 @@ protected override async Task SendInitial(IngressOutput<StreamEventBatch> output
if (outdata.Count >= 100)
{
_eventsCounter.Add(outdata.Count);
await output.SendAsync(new StreamEventBatch(null, outdata));
await output.SendAsync(new StreamEventBatch(outdata));
outdata = new List<RowEvent>();
}
}
if (outdata.Count > 0)
{
_eventsCounter.Add(outdata.Count);
await output.SendAsync(new StreamEventBatch(null, outdata));
await output.SendAsync(new StreamEventBatch(outdata));
}
retryCount = 0;
SetHealth(true);
Expand Down
10 changes: 5 additions & 5 deletions src/FlowtideDotNet.Core/Operators/Aggregate/AggregateOperator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ protected override async IAsyncEnumerable<StreamEventBatch> OnWatermark(Watermar

if (outputs.Count > 100)
{
yield return new StreamEventBatch(null, outputs);
yield return new StreamEventBatch(outputs);
outputs = new List<RowEvent>();
}

Expand All @@ -160,7 +160,7 @@ protected override async IAsyncEnumerable<StreamEventBatch> OnWatermark(Watermar

if (outputs.Count > 100)
{
yield return new StreamEventBatch(null, outputs);
yield return new StreamEventBatch(outputs);
outputs = new List<RowEvent>();
}

Expand Down Expand Up @@ -228,7 +228,7 @@ protected override async IAsyncEnumerable<StreamEventBatch> OnWatermark(Watermar

if (outputs.Count > 100)
{
yield return new StreamEventBatch(null, outputs);
yield return new StreamEventBatch(outputs);
outputs = new List<RowEvent>();
}

Expand All @@ -241,7 +241,7 @@ protected override async IAsyncEnumerable<StreamEventBatch> OnWatermark(Watermar
// Output only 100 rows per batch to reduce memory consumption
if (outputs.Count > 100)
{
yield return new StreamEventBatch(null, outputs);
yield return new StreamEventBatch(outputs);
outputs = new List<RowEvent>();
}

Expand All @@ -250,7 +250,7 @@ protected override async IAsyncEnumerable<StreamEventBatch> OnWatermark(Watermar

if (outputs.Count > 0)
{
yield return new StreamEventBatch(null, outputs);
yield return new StreamEventBatch(outputs);
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/FlowtideDotNet.Core/Operators/Buffer/BufferOperator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,14 @@ protected override async IAsyncEnumerable<StreamEventBatch> OnWatermark(Watermar
if (output.Count > 100)
{
_eventsCounter.Add(output.Count);
yield return new StreamEventBatch(null, output);
yield return new StreamEventBatch(output);
output = new List<RowEvent>();
}
}
if (output.Count > 0)
{
_eventsCounter.Add(output.Count);
yield return new StreamEventBatch(null, output);
yield return new StreamEventBatch(output);
}
await _tree.Clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public async IAsyncEnumerable<StreamEventBatch> OnRecieve(StreamEventBatch msg,

if (output.Count > 0)
{
yield return new StreamEventBatch(null, output);
yield return new StreamEventBatch(output);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ protected override async IAsyncEnumerable<KeyValuePair<int, StreamMessage<Stream
egressOutput.Add(new RowEvent(streamEvent.Weight, 0, streamEvent.RowData));
}

yield return new KeyValuePair<int, StreamMessage<StreamEventBatch>>(0, new StreamMessage<StreamEventBatch>(new StreamEventBatch(null, egressOutput), time));
yield return new KeyValuePair<int, StreamMessage<StreamEventBatch>>(1, new StreamMessage<StreamEventBatch>(new StreamEventBatch(null, loopOutput), time));
yield return new KeyValuePair<int, StreamMessage<StreamEventBatch>>(0, new StreamMessage<StreamEventBatch>(new StreamEventBatch(egressOutput), time));
yield return new KeyValuePair<int, StreamMessage<StreamEventBatch>>(1, new StreamMessage<StreamEventBatch>(new StreamEventBatch(loopOutput), time));
}

protected override async IAsyncEnumerable<KeyValuePair<int, StreamMessage<StreamEventBatch>>> OnIngressRecieve(StreamEventBatch data, long time)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using FlowtideDotNet.Substrait.Expressions;
using FlowtideDotNet.Substrait.Relations;
using Google.Protobuf.WellKnownTypes;
using System.Diagnostics;
using System.Linq.Expressions;
using System.Reflection;

Expand All @@ -25,7 +26,8 @@ internal static class MergeJoinExpressionCompiler
{
internal static System.Linq.Expressions.MethodCallExpression CompareRef(System.Linq.Expressions.Expression a, System.Linq.Expressions.Expression b)
{
MethodInfo compareMethod = typeof(FlxValueRefComparer).GetMethod("CompareTo", BindingFlags.NonPublic | BindingFlags.Public | BindingFlags.Static);
MethodInfo? compareMethod = typeof(FlxValueRefComparer).GetMethod("CompareTo", BindingFlags.NonPublic | BindingFlags.Public | BindingFlags.Static);
Debug.Assert(compareMethod != null);
return System.Linq.Expressions.Expression.Call(compareMethod, a, b);
}

Expand All @@ -46,7 +48,10 @@ private static System.Linq.Expressions.Expression GetAccessFieldExpression(Syste
throw new NotSupportedException("Only direct field references are supported in merge join keys");
}

// Used in reflection
#pragma warning disable IDE0051 // Remove unused private members
private static bool EqualImplementation(in FlxValueRef x, in FlxValueRef y)
#pragma warning restore IDE0051 // Remove unused private members
{
// If either is null, return null
if (x.IsNull || y.IsNull)
Expand All @@ -65,7 +70,8 @@ private static bool EqualImplementation(in FlxValueRef x, in FlxValueRef y)

internal static System.Linq.Expressions.MethodCallExpression EqualRef(System.Linq.Expressions.Expression a, System.Linq.Expressions.Expression b)
{
MethodInfo compareMethod = typeof(MergeJoinExpressionCompiler).GetMethod("EqualImplementation", BindingFlags.NonPublic | BindingFlags.Public | BindingFlags.Static);
MethodInfo? compareMethod = typeof(MergeJoinExpressionCompiler).GetMethod("EqualImplementation", BindingFlags.NonPublic | BindingFlags.Public | BindingFlags.Static);
Debug.Assert(compareMethod != null);
return System.Linq.Expressions.Expression.Call(compareMethod, a, b);
}

Expand Down Expand Up @@ -127,7 +133,7 @@ public static MergeCompileResult Compile(MergeJoinRelation mergeJoinRelation)
// Create each index compare function that returns if a value is lesser or greater than the other value
// These functions are used during insertion
var tmpVar = System.Linq.Expressions.Expression.Variable(typeof(int));
var leftCompare = leftIndexExpressions.Last();
var leftCompare = leftIndexExpressions[leftIndexExpressions.Count - 1];
for (int i = leftIndexExpressions.Count - 2; i >= 0; i--)
{
var res = leftIndexExpressions[i];
Expand Down Expand Up @@ -174,7 +180,7 @@ public static MergeCompileResult Compile(MergeJoinRelation mergeJoinRelation)

System.Linq.Expressions.Expression? keyEqualsExpression;

var firstEqual = fieldEqualExpressions.First();
var firstEqual = fieldEqualExpressions[0];
keyEqualsExpression = firstEqual;
for (int i = 1; i < fieldEqualExpressions.Count; i++)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,14 @@ internal class MergeJoinOperatorBase : MultipleInputVertex<StreamEventBatch, Joi
protected readonly MergeJoinRelation mergeJoinRelation;
protected IBPlusTree<JoinStreamEvent, JoinStorageValue>? _leftTree;
protected IBPlusTree<JoinStreamEvent, JoinStorageValue>? _rightTree;
private readonly int _leftSize;
private readonly Dictionary<JoinStreamEvent, int> leftJoinWeight = new Dictionary<JoinStreamEvent, int>();
private ICounter<long>? _eventsCounter;

protected readonly Func<JoinStreamEvent, JoinStreamEvent, bool> _keyCondition;
protected readonly Func<JoinStreamEvent, JoinStreamEvent, bool> _postCondition;

private FlexBuffers.FlexBuffer _flexBuffer;
private List<int> mappedEmit;
private IRowData _rightNullData;
private readonly FlexBuffers.FlexBuffer _flexBuffer;
private readonly IRowData _rightNullData;

#if DEBUG_WRITE
// TODO: Tmp remove
Expand All @@ -69,7 +67,6 @@ public MergeJoinOperatorBase(MergeJoinRelation mergeJoinRelation, FunctionsRegis
{
_postCondition = (left, right) => true;
}
_leftSize = mergeJoinRelation.Left.OutputLength;
_flexBuffer = new FlexBuffers.FlexBuffer(ArrayPool<byte>.Shared);

_rightNullData = RowEvent.Create(0, 0, v =>
Expand All @@ -83,18 +80,14 @@ public MergeJoinOperatorBase(MergeJoinRelation mergeJoinRelation, FunctionsRegis

public override string DisplayName => "Merge Join";

public override ValueTask DisposeAsync()
{
return base.DisposeAsync();
}

public override Task Compact()
{
return Task.CompletedTask;
}

public override async Task DeleteAsync()
public override Task DeleteAsync()
{
return Task.CompletedTask;
}

public override async Task<JoinState?> OnCheckpoint()
Expand Down Expand Up @@ -129,7 +122,6 @@ protected async IAsyncEnumerable<StreamEventBatch> OnRecieveLeft(StreamEventBatc

List<RowEvent> output = new List<RowEvent>();
var it = _rightTree.CreateIterator();
//using var it = _rightTree.CreateIterator();

foreach (var e in msg.Events)
{
Expand Down Expand Up @@ -160,7 +152,7 @@ protected async IAsyncEnumerable<StreamEventBatch> OnRecieveLeft(StreamEventBatc
if (output.Count > 100)
{
_eventsCounter.Add(output.Count);
yield return new StreamEventBatch(null, output);
yield return new StreamEventBatch(output);
output = new List<RowEvent>();
}

Expand All @@ -186,7 +178,7 @@ protected async IAsyncEnumerable<StreamEventBatch> OnRecieveLeft(StreamEventBatc
if (output.Count > 100)
{
_eventsCounter.Add(output.Count);
yield return new StreamEventBatch(null, output);
yield return new StreamEventBatch(output);
output = new List<RowEvent>();
}

Expand Down Expand Up @@ -233,7 +225,7 @@ protected async IAsyncEnumerable<StreamEventBatch> OnRecieveLeft(StreamEventBatc
outputWriter.WriteLine($"{o.Weight} {o.Vector.ToJson}");
}
#endif
yield return new StreamEventBatch(null, output);
yield return new StreamEventBatch(output);
}
#if DEBUG_WRITE
await leftInput.FlushAsync();
Expand Down Expand Up @@ -288,7 +280,7 @@ protected async IAsyncEnumerable<StreamEventBatch> OnRecieveRight(StreamEventBat
if (output.Count > 100)
{
_eventsCounter.Add(output.Count);
yield return new StreamEventBatch(null, output);
yield return new StreamEventBatch(output);
output = new List<RowEvent>();
}

Expand Down Expand Up @@ -354,7 +346,7 @@ protected async IAsyncEnumerable<StreamEventBatch> OnRecieveRight(StreamEventBat
if (output.Count > 100)
{
_eventsCounter.Add(output.Count);
yield return new StreamEventBatch(null, output);
yield return new StreamEventBatch(output);
output = new List<RowEvent>();
}

Expand All @@ -371,7 +363,7 @@ protected async IAsyncEnumerable<StreamEventBatch> OnRecieveRight(StreamEventBat
outputWriter.WriteLine($"{o.Weight} {o.Vector.ToJson}");
}
#endif
yield return new StreamEventBatch(null, output);
yield return new StreamEventBatch(output);
}
#if DEBUG_WRITE
await rightInput.FlushAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ await _rightTree.RMW(kv.Key, kv.Value, (input, current, exist) =>

await _leftTemporary.Clear();

yield return new StreamEventBatch(null, output);
yield return new StreamEventBatch(output);
}

public override async IAsyncEnumerable<StreamEventBatch> OnRecieve(int targetId, StreamEventBatch msg, long time)
Expand Down
Loading

0 comments on commit 3e26d96

Please sign in to comment.