Skip to content

Commit

Permalink
Add source operator id to watermark to know which source sent it. (#304)
Browse files Browse the repository at this point in the history
* Add source operator id to watermark to know which source sent it.

* Fix code smell
  • Loading branch information
Ulimo authored Jan 14, 2024
1 parent 9f7d7f3 commit 9e82acf
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 5 deletions.
2 changes: 2 additions & 0 deletions src/FlowtideDotNet.Base/Engine/Internal/VertexHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public VertexHandler(

public ILoggerFactory LoggerFactory { get; }

public string OperatorId => operatorName;

public Task RegisterTrigger(string name, TimeSpan? scheduledInterval = null)
{
return registerTrigger(operatorName, name, scheduledInterval);
Expand Down
2 changes: 2 additions & 0 deletions src/FlowtideDotNet.Base/IVertexHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ namespace FlowtideDotNet.Base
{
public interface IVertexHandler
{
string OperatorId { get; }

string StreamName { get; }

IMeter Metrics { get; }
Expand Down
24 changes: 21 additions & 3 deletions src/FlowtideDotNet.Base/Vertices/Egress/EgressVertex.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using FlowtideDotNet.Base.Vertices.Egress.Internal;
using FlowtideDotNet.Storage.StateManager;
using Microsoft.Extensions.Logging;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Diagnostics.Metrics;
Expand All @@ -30,7 +31,7 @@ public abstract class EgressVertex<T, TState> : ITargetBlock<IStreamEvent>, IStr
private IEgressImplementation? _targetBlock;
private bool _isHealthy = true;
private CancellationTokenSource? _cancellationTokenSource;
private float _lastLatency = 0;
private readonly ConcurrentDictionary<string, float> _lastLatency = new ConcurrentDictionary<string, float>();

public string Name { get; private set; }

Expand Down Expand Up @@ -65,7 +66,16 @@ private void InitializeBlocks()
private Task HandleWatermark(Watermark watermark)
{
var span = DateTimeOffset.UtcNow.Subtract(watermark.StartTime);
_lastLatency = (float)span.TotalMilliseconds;
var latency = (float)span.TotalMilliseconds;
if (watermark.SourceOperatorId != null)
{
_lastLatency[watermark.SourceOperatorId] = latency;
}
else
{
Logger.LogWarning("Recieved watermark without source operator id");
}

return OnWatermark(watermark);
}

Expand Down Expand Up @@ -166,7 +176,15 @@ public Task Initialize(string name, long restoreTime, long newTime, JsonElement?
});
Metrics.CreateObservableGauge("latency", () =>
{
return _lastLatency;
List<Measurement<float>> output = new List<Measurement<float>>();
foreach(var kv in _lastLatency)
{
output.Add(new Measurement<float>(kv.Value, new TagList
{
{ "source", kv.Key }
}));
}
return output;
});

return InitializeOrRestore(restoreTime, dState, vertexHandler.StateClient);
Expand Down
2 changes: 2 additions & 0 deletions src/FlowtideDotNet.Base/Vertices/Ingress/IngressOutput.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ internal Task SendLockingEvent(ILockingEvent lockingEvent)

public Task SendWatermark(Watermark watermark)
{
Debug.Assert(_ingressState._vertexHandler != null, nameof(_ingressState._vertexHandler));
watermark.SourceOperatorId = _ingressState._vertexHandler.OperatorId;
if (_inLock)
{
return _targetBlock.SendAsync(watermark, CancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,10 @@ private async IAsyncEnumerable<IStreamEvent> HandleWatermark(int targetId, Water

if (_currentWatermark == null)
{
_currentWatermark = new Watermark(ImmutableDictionary<string, long>.Empty, watermark.StartTime);
_currentWatermark = new Watermark(ImmutableDictionary<string, long>.Empty, watermark.StartTime)
{
SourceOperatorId = watermark.SourceOperatorId
};
}
_targetWatermarks[targetId] = watermark;
var currentDict = _currentWatermark.Watermarks;
Expand Down Expand Up @@ -240,7 +243,10 @@ private async IAsyncEnumerable<IStreamEvent> HandleWatermark(int targetId, Water
currentDict = currentDict.SetItem(kv.Key, watermarkValue);
}
}
var newWatermark = new Watermark(currentDict, watermark.StartTime);
var newWatermark = new Watermark(currentDict, watermark.StartTime)
{
SourceOperatorId = watermark.SourceOperatorId
};

// only output watermark if there is a difference in the numbers
if (!newWatermark.Equals(_currentWatermark))
Expand Down
2 changes: 2 additions & 0 deletions src/FlowtideDotNet.Base/Watermark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public Watermark(IImmutableDictionary<string, long> watermarks, DateTimeOffset s

public DateTimeOffset StartTime { get; }

public string? SourceOperatorId { get; internal set; }

public override bool Equals(object? obj)
{
return obj is Watermark watermark &&
Expand Down

0 comments on commit 9e82acf

Please sign in to comment.