From a66231c8303d8d16ed23d493cf12b9be21835905 Mon Sep 17 00:00:00 2001 From: Ulimo Date: Thu, 14 Dec 2023 15:13:04 +0100 Subject: [PATCH] Add a block to send out more events if the output count is equals or exceed the limit (#194) --- .../Utils/AsyncEnumerableWithWait.cs | 80 +++++++++++++++++++ .../Vertices/Unary/UnaryVertex.cs | 7 +- 2 files changed, 86 insertions(+), 1 deletion(-) create mode 100644 src/FlowtideDotNet.Base/Utils/AsyncEnumerableWithWait.cs diff --git a/src/FlowtideDotNet.Base/Utils/AsyncEnumerableWithWait.cs b/src/FlowtideDotNet.Base/Utils/AsyncEnumerableWithWait.cs new file mode 100644 index 000000000..ecc260cd0 --- /dev/null +++ b/src/FlowtideDotNet.Base/Utils/AsyncEnumerableWithWait.cs @@ -0,0 +1,80 @@ +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace FlowtideDotNet.Base.Utils +{ + internal class AsyncEnumerableWithWait : IAsyncEnumerable + { + private readonly IAsyncEnumerable source; + private readonly Func func; + private readonly Func shouldWaitFunc; + + public AsyncEnumerableWithWait(IAsyncEnumerable source, Func func, Func shouldWaitFunc) + { + this.source = source; + this.func = func; + this.shouldWaitFunc = shouldWaitFunc; + } + + internal class Enumerator : IAsyncEnumerator + { + private readonly IAsyncEnumerator enumerator; + private readonly Func func; + private readonly Func shouldWaitFunc; + + public Enumerator(IAsyncEnumerator enumerator, Func func, Func shouldWaitFunc) + { + this.enumerator = enumerator; + this.func = func; + this.shouldWaitFunc = shouldWaitFunc; + } + + public TDest Current => func(enumerator.Current); + + public ValueTask DisposeAsync() + { + + return enumerator.DisposeAsync(); + } + + public ValueTask MoveNextAsync() + { + if (shouldWaitFunc()) + { + return MoveNextSlow(); + } + + return enumerator.MoveNextAsync(); + } + + private async ValueTask MoveNextSlow() + { + while (shouldWaitFunc()) + { + await Task.Delay(10); + } + return await enumerator.MoveNextAsync(); + } + } + + public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) + { + return new Enumerator(source.GetAsyncEnumerator(cancellationToken), func, shouldWaitFunc); + } + } +} diff --git a/src/FlowtideDotNet.Base/Vertices/Unary/UnaryVertex.cs b/src/FlowtideDotNet.Base/Vertices/Unary/UnaryVertex.cs index 73746f177..f4b77502e 100644 --- a/src/FlowtideDotNet.Base/Vertices/Unary/UnaryVertex.cs +++ b/src/FlowtideDotNet.Base/Vertices/Unary/UnaryVertex.cs @@ -57,6 +57,11 @@ protected UnaryVertex(ExecutionDataflowBlockOptions executionDataflowBlockOption this.executionDataflowBlockOptions = executionDataflowBlockOptions; } + private bool ShouldWait() + { + return _transformBlock?.OutputCount >= executionDataflowBlockOptions.BoundedCapacity; + } + [MemberNotNull(nameof(_transformBlock), nameof(_targetBlock), nameof(_sourceBlock))] private void InitializeBlocks() { @@ -93,7 +98,7 @@ private void InitializeBlocks() } if (streamEvent is Watermark watermark) { - return HandleWatermark(watermark); + return new AsyncEnumerableWithWait(HandleWatermark(watermark), (s) => s, ShouldWait); } throw new NotSupportedException();