From e5c0bcfb438bb7243df8967417208f9dd58ea67e Mon Sep 17 00:00:00 2001 From: Ulimo Date: Tue, 29 Oct 2024 13:03:05 +0100 Subject: [PATCH] Do not ignore task cancelled exceptions from sharepoint sdk (#555) --- .../Internal/SharepointSource.cs | 28 ++++++++++++++----- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/src/FlowtideDotNet.Connector.Sharepoint/Internal/SharepointSource.cs b/src/FlowtideDotNet.Connector.Sharepoint/Internal/SharepointSource.cs index cc7bef4dc..113523205 100644 --- a/src/FlowtideDotNet.Connector.Sharepoint/Internal/SharepointSource.cs +++ b/src/FlowtideDotNet.Connector.Sharepoint/Internal/SharepointSource.cs @@ -88,15 +88,29 @@ private async Task FetchDelta(IngressOutput output, object? st Logger.BeforeCheckpointInDelta(StreamName, Name); await output.EnterCheckpointLock(); - Logger.FetchingDelta(StreamName, Name); - if (await HandleDataRows(iterator, output)) + try { - _state.WatermarkVersion++; - await output.SendWatermark(new Base.Watermark(readRelation.NamedTable.DotSeperated, _state.WatermarkVersion)); - - ScheduleCheckpoint(TimeSpan.FromMilliseconds(1)); + Logger.FetchingDelta(StreamName, Name); + if (await HandleDataRows(iterator, output)) + { + _state.WatermarkVersion++; + await output.SendWatermark(new Base.Watermark(readRelation.NamedTable.DotSeperated, _state.WatermarkVersion)); + + ScheduleCheckpoint(TimeSpan.FromMilliseconds(1)); + } + } + catch(Exception e) + { + Logger.LogError(e, "Error fetching delta"); + if (e is TaskCanceledException || e is OperationCanceledException) + { + throw new InvalidOperationException("Error fetching delta, task was cancelled.", e); + } + } + finally + { + output.ExitCheckpointLock(); } - output.ExitCheckpointLock(); } protected override Task> GetWatermarkNames()