Skip to content

Commit

Permalink
Add a block to send out more events if the output count is equals or …
Browse files Browse the repository at this point in the history
…exceed the limit (#194)
  • Loading branch information
Ulimo authored Dec 14, 2023
1 parent cb0b5a6 commit a66231c
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 1 deletion.
80 changes: 80 additions & 0 deletions src/FlowtideDotNet.Base/Utils/AsyncEnumerableWithWait.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Licensed under the Apache License, Version 2.0 (the "License")
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace FlowtideDotNet.Base.Utils
{
internal class AsyncEnumerableWithWait<TSource, TDest> : IAsyncEnumerable<TDest>
{
private readonly IAsyncEnumerable<TSource> source;
private readonly Func<TSource, TDest> func;
private readonly Func<bool> shouldWaitFunc;

public AsyncEnumerableWithWait(IAsyncEnumerable<TSource> source, Func<TSource, TDest> func, Func<bool> shouldWaitFunc)
{
this.source = source;
this.func = func;
this.shouldWaitFunc = shouldWaitFunc;
}

internal class Enumerator : IAsyncEnumerator<TDest>
{
private readonly IAsyncEnumerator<TSource> enumerator;
private readonly Func<TSource, TDest> func;
private readonly Func<bool> shouldWaitFunc;

public Enumerator(IAsyncEnumerator<TSource> enumerator, Func<TSource, TDest> func, Func<bool> shouldWaitFunc)
{
this.enumerator = enumerator;
this.func = func;
this.shouldWaitFunc = shouldWaitFunc;
}

public TDest Current => func(enumerator.Current);

public ValueTask DisposeAsync()
{

return enumerator.DisposeAsync();
}

public ValueTask<bool> MoveNextAsync()
{
if (shouldWaitFunc())
{
return MoveNextSlow();
}

return enumerator.MoveNextAsync();
}

private async ValueTask<bool> MoveNextSlow()
{
while (shouldWaitFunc())
{
await Task.Delay(10);
}
return await enumerator.MoveNextAsync();
}
}

public IAsyncEnumerator<TDest> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
return new Enumerator(source.GetAsyncEnumerator(cancellationToken), func, shouldWaitFunc);
}
}
}
7 changes: 6 additions & 1 deletion src/FlowtideDotNet.Base/Vertices/Unary/UnaryVertex.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ protected UnaryVertex(ExecutionDataflowBlockOptions executionDataflowBlockOption
this.executionDataflowBlockOptions = executionDataflowBlockOptions;
}

private bool ShouldWait()
{
return _transformBlock?.OutputCount >= executionDataflowBlockOptions.BoundedCapacity;
}

[MemberNotNull(nameof(_transformBlock), nameof(_targetBlock), nameof(_sourceBlock))]
private void InitializeBlocks()
{
Expand Down Expand Up @@ -93,7 +98,7 @@ private void InitializeBlocks()
}
if (streamEvent is Watermark watermark)
{
return HandleWatermark(watermark);
return new AsyncEnumerableWithWait<IStreamEvent, IStreamEvent>(HandleWatermark(watermark), (s) => s, ShouldWait);
}

throw new NotSupportedException();
Expand Down

0 comments on commit a66231c

Please sign in to comment.