From ceddafa06348cd74010471a755b03f96f2e7a9a3 Mon Sep 17 00:00:00 2001 From: Chris Gillum Date: Thu, 2 Nov 2023 18:54:40 -0700 Subject: [PATCH] Implement support for suspend/resume (#195) Also: Fix minor issue with DateTimeKind --- CHANGELOG.md | 11 ++++ src/DurableTask.SqlServer/Scripts/logic.sql | 1 - .../SqlOrchestrationService.cs | 15 +---- src/DurableTask.SqlServer/SqlUtils.cs | 29 +++++++--- src/common.props | 2 +- .../Integration/DatabaseManagement.cs | 2 +- .../Integration/Orchestrations.cs | 57 +++++++++++++++++++ .../Utils/TestInstance.cs | 19 ++++++- 8 files changed, 109 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 013c6f6..c752556 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,17 @@ (Add new notes here) +## v1.2.1 + +### New + +* Support suspend/resume of orchestrations + +### Updates + +* SqlOrchestrationService.WaitForInstanceAsync no longer throws `TimeoutException` - only `OperationCanceledException` (previously could be either, depending on timing) +* Fix default DateTime values to have DateTimeKind of UTC (instead of Unspecified) + ## v1.2.0 ### New diff --git a/src/DurableTask.SqlServer/Scripts/logic.sql b/src/DurableTask.SqlServer/Scripts/logic.sql index 2c4eb8b..9e6fef2 100644 --- a/src/DurableTask.SqlServer/Scripts/logic.sql +++ b/src/DurableTask.SqlServer/Scripts/logic.sql @@ -630,7 +630,6 @@ BEGIN E.[InstanceID] = I.[InstanceID] WHERE I.TaskHub = @TaskHub AND - I.[RuntimeStatus] NOT IN ('Suspended') AND (I.[LockExpiration] IS NULL OR I.[LockExpiration] < @now) AND (E.[VisibleTime] IS NULL OR E.[VisibleTime] < @now) diff --git a/src/DurableTask.SqlServer/SqlOrchestrationService.cs b/src/DurableTask.SqlServer/SqlOrchestrationService.cs index debcef4..47f56a2 100644 --- a/src/DurableTask.SqlServer/SqlOrchestrationService.cs +++ b/src/DurableTask.SqlServer/SqlOrchestrationService.cs @@ -205,6 +205,7 @@ public override Task DeleteAsync(bool deleteInstanceStore) currentStatus = SqlUtils.GetRuntimeStatus(reader); isRunning = currentStatus == OrchestrationStatus.Running || + currentStatus == OrchestrationStatus.Suspended || currentStatus == OrchestrationStatus.Pending; } else @@ -570,19 +571,7 @@ public override async Task WaitForOrchestrationAsync( return state; } - try - { - await Task.Delay(TimeSpan.FromSeconds(1), combinedCts.Token); - } - catch (TaskCanceledException) - { - if (timeoutCts.Token.IsCancellationRequested) - { - throw new TimeoutException($"A caller-specified timeout of {timeout} has expired, but instance '{instanceId}' is still in an {state?.OrchestrationStatus.ToString() ?? "unknown"} state."); - } - - throw; - } + await Task.Delay(TimeSpan.FromSeconds(1), combinedCts.Token); } } diff --git a/src/DurableTask.SqlServer/SqlUtils.cs b/src/DurableTask.SqlServer/SqlUtils.cs index 1647488..591047e 100644 --- a/src/DurableTask.SqlServer/SqlUtils.cs +++ b/src/DurableTask.SqlServer/SqlUtils.cs @@ -207,6 +207,12 @@ public static HistoryEvent GetHistoryEvent(this DbDataReader reader, bool isOrch TimerId = GetTaskId(reader), }; break; + case EventType.ExecutionSuspended: + historyEvent = new ExecutionSuspendedEvent(eventId, GetPayloadText(reader)); + break; + case EventType.ExecutionResumed: + historyEvent = new ExecutionResumedEvent(eventId, GetPayloadText(reader)); + break; default: throw new InvalidOperationException($"Don't know how to interpret '{eventType}'."); } @@ -247,10 +253,10 @@ public static OrchestrationState GetOrchestrationState(this DbDataReader reader) var state = new OrchestrationState { - CompletedTime = reader.GetUtcDateTimeOrNull(reader.GetOrdinal("CompletedTime")) ?? default, - CreatedTime = reader.GetUtcDateTimeOrNull(reader.GetOrdinal("CreatedTime")) ?? default, + CompletedTime = GetUtcDateTime(reader, "CompletedTime"), + CreatedTime = GetUtcDateTime(reader, "CreatedTime"), Input = reader.GetStringOrNull(reader.GetOrdinal("InputText")), - LastUpdatedTime = reader.GetUtcDateTimeOrNull(reader.GetOrdinal("LastUpdatedTime")) ?? default, + LastUpdatedTime = GetUtcDateTime(reader, "LastUpdatedTime"), Name = GetName(reader), Version = GetVersion(reader), OrchestrationInstance = new OrchestrationInstance @@ -411,23 +417,28 @@ internal static string GetInstanceId(DbDataReader reader) static DateTime GetVisibleTime(DbDataReader reader) { - int ordinal = reader.GetOrdinal("VisibleTime"); - return GetUtcDateTime(reader, ordinal); + return GetUtcDateTime(reader, "VisibleTime"); } static DateTime GetTimestamp(DbDataReader reader) { - int ordinal = reader.GetOrdinal("Timestamp"); - return GetUtcDateTime(reader, ordinal); + return GetUtcDateTime(reader, "Timestamp"); } - static DateTime? GetUtcDateTimeOrNull(this DbDataReader reader, int columnIndex) + static DateTime GetUtcDateTime(DbDataReader reader, string columnName) { - return reader.IsDBNull(columnIndex) ? (DateTime?)null : GetUtcDateTime(reader, columnIndex); + int ordinal = reader.GetOrdinal(columnName); + return GetUtcDateTime(reader, ordinal); } static DateTime GetUtcDateTime(DbDataReader reader, int ordinal) { + if (reader.IsDBNull(ordinal)) + { + // Note that some serializers (like protobuf) won't accept non-UTC DateTime objects. + return DateTime.SpecifyKind(default, DateTimeKind.Utc); + } + // The SQL client always assumes DateTimeKind.Unspecified. We need to modify the result so that it knows it is UTC. return DateTime.SpecifyKind(reader.GetDateTime(ordinal), DateTimeKind.Utc); } diff --git a/src/common.props b/src/common.props index cdf1b87..1a3f425 100644 --- a/src/common.props +++ b/src/common.props @@ -17,7 +17,7 @@ 1 2 - 0 + 1 $(MajorVersion).$(MinorVersion).$(PatchVersion) $(MajorVersion).$(MinorVersion).0.0 diff --git a/test/DurableTask.SqlServer.Tests/Integration/DatabaseManagement.cs b/test/DurableTask.SqlServer.Tests/Integration/DatabaseManagement.cs index ab5e1e4..110230e 100644 --- a/test/DurableTask.SqlServer.Tests/Integration/DatabaseManagement.cs +++ b/test/DurableTask.SqlServer.Tests/Integration/DatabaseManagement.cs @@ -504,7 +504,7 @@ async Task ValidateDatabaseSchemaAsync(TestDatabase database, string schemaName schemaName); Assert.Equal(1, currentSchemaVersion.Major); Assert.Equal(2, currentSchemaVersion.Minor); - Assert.Equal(0, currentSchemaVersion.Patch); + Assert.Equal(1, currentSchemaVersion.Patch); } sealed class TestDatabase : IDisposable diff --git a/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs b/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs index 41340f7..bc208bd 100644 --- a/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs +++ b/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs @@ -768,5 +768,62 @@ public async Task TraceContextFlowCorrectly() Assert.True(activitySpan.Duration > delay); Assert.True(activitySpan.Duration < delay * 2); } + + [Fact] + public async Task SuspendAndResumeInstance() + { + TaskCompletionSource tcs = null; + + const int EventCount = 5; + string orchestrationName = "SuspendResumeOrchestration"; + + TestInstance instance = await this.testService.RunOrchestration( + null, + orchestrationName, + implementation: async (ctx, _) => + { + tcs = new TaskCompletionSource(); + + int i; + for (i = 0; i < EventCount; i++) + { + await tcs.Task; + tcs = new TaskCompletionSource(); + } + + return i; + }, + onEvent: (ctx, name, value) => + { + Assert.Equal("Event" + value, name); + tcs.TrySetResult(int.Parse(value)); + }); + + // Wait for the orchestration to finish starting + await instance.WaitForStart(); + + // Suspend the orchestration so that it won't process any new events + await instance.SuspendAsync(); + + // Raise the events, which should get buffered but not consumed + for (int i = 0; i < EventCount; i++) + { + await instance.RaiseEventAsync($"Event{i}", i); + } + + // Make sure that the orchestration *doesn't* complete + await Assert.ThrowsAnyAsync( + () => instance.WaitForCompletion(TimeSpan.FromSeconds(3), doNotAdjustTimeout: true)); + + // Confirm that the orchestration is in a suspended state + OrchestrationState state = await instance.GetStateAsync(); + Assert.Equal(OrchestrationStatus.Suspended, state.OrchestrationStatus); + + // Resume the orchestration + await instance.ResumeAsync(); + + // Now the orchestration should complete immediately + await instance.WaitForCompletion(timeout: TimeSpan.FromSeconds(3), expectedOutput: EventCount); + } } } diff --git a/test/DurableTask.SqlServer.Tests/Utils/TestInstance.cs b/test/DurableTask.SqlServer.Tests/Utils/TestInstance.cs index 0483ca5..637b1aa 100644 --- a/test/DurableTask.SqlServer.Tests/Utils/TestInstance.cs +++ b/test/DurableTask.SqlServer.Tests/Utils/TestInstance.cs @@ -71,9 +71,13 @@ public async Task WaitForCompletion( OrchestrationStatus expectedStatus = OrchestrationStatus.Completed, object expectedOutput = null, string expectedOutputRegex = null, - bool continuedAsNew = false) + bool continuedAsNew = false, + bool doNotAdjustTimeout = false) { - AdjustTimeout(ref timeout); + if (!doNotAdjustTimeout) + { + AdjustTimeout(ref timeout); + } OrchestrationState state = await this.client.WaitForOrchestrationAsync(this.GetInstanceForAnyExecution(), timeout); Assert.NotNull(state); @@ -158,6 +162,17 @@ internal async Task RestartAsync(TInput newInput, OrchestrationStatus[] dedupeSt this.instance.ExecutionId = newInstance.ExecutionId; } + + internal Task SuspendAsync(string reason = null) + { + return this.client.SuspendInstanceAsync(this.instance, reason); + } + + internal Task ResumeAsync(string reason = null) + { + return this.client.ResumeInstanceAsync(this.instance, reason); + } + static void AdjustTimeout(ref TimeSpan timeout) { timeout = timeout.AdjustForDebugging();