Skip to content

Commit

Permalink
Sliding timeout and fix jobs not enqueued when requeued while process…
Browse files Browse the repository at this point in the history
…ing (#380) (#381)

* Fix job not enqueued when requeued while processing #380

- Change strategy of how to determine whether a background job that a worker dequeued is still alive and being processed
- Using same strategy as Hangfire.SqlServer using SlidingInvisibilityTimeout
- Update to Hangfire v1.8.9

* user server time for distributedlock heartbeats

add unit tests

* update version and changelog

* minor visual update

* update comment

* update comment yet again
  • Loading branch information
gottscj authored Feb 7, 2024
1 parent 3a5c2b9 commit 5698137
Show file tree
Hide file tree
Showing 19 changed files with 250 additions and 77 deletions.
7 changes: 7 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@

## Change log

### v1.10.0
- Update to Hangfire 1.8.9
- Use SlidingInvisibilityTimeout to determine whether a background job is still alive
- BREAKING: Remove InvisibilityTimeout
- Use server time for distributed lock instead of Datetime.UtcNow
- Fixed Job not requeued when requeued while in processing state (#380)

### v1.9.16
- Update to Hangfire 1.8.7
- Update to MongoDB 2.23.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public ActionResult Delayed(int id)
public ActionResult Recurring()
{
RecurringJob.AddOrUpdate("recurring-job",
() => PrintToDebug($@"Hangfire recurring task started - {Guid.NewGuid()}"), Cron.Minutely);
() => Recurring($@"Hangfire recurring task started - {Guid.NewGuid()}"), Cron.Minutely);

return RedirectToAction("Index");
}
Expand All @@ -99,5 +99,11 @@ public static void PrintToDebug(string message)
{
Debug.WriteLine(message);
}

public static void Recurring(string message)
{
Thread.Sleep(15000);
Debug.WriteLine(message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<Description />
<Version>1.9.16</Version>
<Version>1.10.0</Version>
<Description>MongoDB storage implementation for Hangfire (background job system for ASP.NET applications).</Description>
<Copyright>Copyright © 2014-2019 Sergey Zwezdin, Martin Lobger, Jonas Gottschau</Copyright>
<Authors>Sergey Zwezdin, Martin Lobger, Jonas Gottschau</Authors>
Expand All @@ -21,8 +21,8 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="bootstrap" Version="5.3.2" />
<PackageReference Include="Hangfire.AspNetCore" Version="1.8.7" />
<PackageReference Include="Hangfire.Core" Version="1.8.7" />
<PackageReference Include="Hangfire.AspNetCore" Version="1.8.9" />
<PackageReference Include="Hangfire.Core" Version="1.8.9" />
<PackageReference Include="jquery" Version="3.7.1" />
<PackageReference Include="Mongo2Go" Version="3.1.3" />
<PackageReference Include="MongoDB.Driver" Version="2.23.1" />
Expand Down
2 changes: 0 additions & 2 deletions src/Hangfire.Mongo.Sample.ASPNetCore/Startup.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using System;
using Hangfire.Mongo.Migration.Strategies;
using Hangfire.Mongo.Migration.Strategies.Backup;
using Microsoft.AspNetCore.Builder;
Expand Down Expand Up @@ -51,7 +50,6 @@ public void ConfigureServices(IServiceCollection services)
BackupStrategy = new CollectionMongoBackupStrategy()
},
CheckQueuedJobsStrategy = CheckQueuedJobsStrategy.Watch,
InvisibilityTimeout = TimeSpan.FromMinutes(5)
};

//config.UseLogProvider(new FileLogProvider());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<Description />
<Version>1.9.16</Version>
<Version>1.10.0</Version>
<Description>MongoDB storage implementation for Hangfire (background job system for ASP.NET applications).</Description>
<Copyright>Copyright © 2014-2019 Sergey Zwezdin, Martin Lobger, Jonas Gottschau</Copyright>
<Authors>Sergey Zwezdin, Martin Lobger, Jonas Gottschau</Authors>
Expand All @@ -21,8 +21,8 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="bootstrap" Version="5.3.2" />
<PackageReference Include="Hangfire.AspNetCore" Version="1.8.7" />
<PackageReference Include="Hangfire.Core" Version="1.8.7" />
<PackageReference Include="Hangfire.AspNetCore" Version="1.8.9" />
<PackageReference Include="Hangfire.Core" Version="1.8.9" />
<PackageReference Include="jquery" Version="3.7.1" />
<PackageReference Include="MongoDB.Driver" Version="2.23.1" />
<PackageReference Include="Microsoft.VisualStudio.Web.BrowserLink" Version="2.2.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<AssemblyName>Hangfire.Mongo.Sample.NETCore</AssemblyName>
<OutputType>Exe</OutputType>
<PackageId>Hangfire.Mongo.Sample.NETCore</PackageId>
<Version>1.9.16</Version>
<Version>1.10.0</Version>
<Description>MongoDB storage implementation for Hangfire (background job system for ASP.NET applications).</Description>
<Copyright>Copyright © 2014-2019 Sergey Zwezdin, Martin Lobger, Jonas Gottschau</Copyright>
<Authors>Sergey Zwezdin, Martin Lobger, Jonas Gottschau</Authors>
Expand All @@ -19,7 +19,7 @@
<ProjectReference Include="..\Hangfire.Mongo\Hangfire.Mongo.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Hangfire.Core" Version="1.8.7" />
<PackageReference Include="Hangfire.Core" Version="1.8.9" />
<PackageReference Include="Mongo2Go" Version="3.1.3" />
<PackageReference Include="MongoDB.Driver" Version="2.23.1" />
</ItemGroup>
Expand Down
5 changes: 1 addition & 4 deletions src/Hangfire.Mongo.Tests/Hangfire.Mongo.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="xunit" Version="2.6.4" />
</ItemGroup>
<ItemGroup>
<Reference Include="System.IO.Compression" />
<PackageReference Include="xunit" Version="2.6.6" />
</ItemGroup>
<ItemGroup>
<Service Include="{82a7f48d-3b50-4b1e-b82e-3ada8210c358}" />
Expand Down
6 changes: 3 additions & 3 deletions src/Hangfire.Mongo.Tests/MongoDistributedLockFacts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,10 @@ public void Ctor_WaitForLock_SignaledAtLockRelease()
});
t.Start();

// Wait just a bit to make sure the above lock is acuired
// Wait just a bit to make sure the above lock is acquired
Thread.Sleep(TimeSpan.FromSeconds(1));

// Record when we try to aquire the lock
// Record when we try to acquire the lock
var startTime = DateTime.UtcNow;
var lock2 = new MongoDistributedLock("resource1", TimeSpan.FromSeconds(10), _database,
new MongoStorageOptions());
Expand Down Expand Up @@ -165,7 +165,7 @@ public void Ctor_SetLockExpireAtWorks_WhenResourceIsNotLocked()
};
using (lock1.AcquireLock())
{
DateTime initialExpireAt = DateTime.UtcNow;
var initialExpireAt = DateTime.UtcNow;
Thread.Sleep(TimeSpan.FromSeconds(5));

var lockEntry = _database.DistributedLock.Find(filter).FirstOrDefault();
Expand Down
22 changes: 22 additions & 0 deletions src/Hangfire.Mongo.Tests/MongoFetchedJobFacts.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Linq;
using System.Threading;
using Hangfire.Mongo.Database;
using Hangfire.Mongo.Dto;
using Hangfire.Mongo.Tests.Utils;
Expand Down Expand Up @@ -132,6 +133,27 @@ public void Dispose_SetsFetchedAtValueToNull_IfThereWereNoCallsToComplete()
_dbContext.JobGraph.Find(new BsonDocument("_t", nameof(JobDto))).ToList().Single());
Assert.Null(record.FetchedAt);
}

[Fact]
public void Heartbeat_LonRunningJob_UpdatesFetchedAt()
{
// Arrange
// time out job after 1s
var options = new MongoStorageOptions() {SlidingInvisibilityTimeout = TimeSpan.FromSeconds(1)};
var queue = "default";
var jobId = ObjectId.GenerateNewId();
var id = CreateJobQueueRecord(_dbContext, jobId, queue, _fetchedAt);
var initialFetchedAt = DateTime.UtcNow;

// Act
var job = new MongoFetchedJob(_dbContext, options, initialFetchedAt, id, jobId, queue);
// job runs for 2s, Heartbeat updates job
Thread.Sleep(TimeSpan.FromSeconds(2));
job.Dispose();

// Assert
Assert.True(job.FetchedAt > initialFetchedAt, "Expected job FetchedAt field to be updated");
}

private ObjectId CreateJobQueueRecord(HangfireDbContext connection, ObjectId jobId, string queue, DateTime? fetchedAt)
{
Expand Down
7 changes: 5 additions & 2 deletions src/Hangfire.Mongo.Tests/MongoJobQueueFacts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public void Dequeue_ShouldFetchATimedOutJobs_FromTheSpecifiedQueue()

var options = new MongoStorageOptions
{
InvisibilityTimeout = TimeSpan.FromMinutes(30)
SlidingInvisibilityTimeout = TimeSpan.FromMinutes(30)
};

_jobQueueSemaphoreMock.WaitNonBlock("default").Returns(true);
Expand Down Expand Up @@ -191,7 +191,10 @@ public void Dequeue_NoInvisibilityTimeout_WaitsForever()
};
_hangfireDbContext.JobGraph.InsertOne(job.Serialize());

var options = new MongoStorageOptions();
var options = new MongoStorageOptions
{
SlidingInvisibilityTimeout = null
};

_jobQueueSemaphoreMock.WaitNonBlock("default").Returns(true);
var queue =new MongoJobFetcher(_hangfireDbContext, options, _jobQueueSemaphoreMock);
Expand Down
2 changes: 1 addition & 1 deletion src/Hangfire.Mongo.Tests/MongoStorageOptionsFacts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public void Ctor_SetsTheDefaultOptions()
MongoStorageOptions storageOptions = new MongoStorageOptions();

Assert.Equal("hangfire", storageOptions.Prefix);
Assert.Null(storageOptions.InvisibilityTimeout);
Assert.NotNull(storageOptions.SlidingInvisibilityTimeout);
}

[Fact]
Expand Down
3 changes: 1 addition & 2 deletions src/Hangfire.Mongo/Database/HangfireDbContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public sealed class HangfireDbContext
/// Database instance used for this db context instance
/// </summary>
public IMongoDatabase Database { get; }

internal HangfireDbContext(string connectionString, string databaseName, string prefix = "hangfire")
:this(new MongoClient(connectionString), databaseName, prefix)
{
Expand All @@ -41,7 +41,6 @@ public HangfireDbContext(IMongoClient mongoClient, string databaseName, string p
ConnectionId = Guid.NewGuid().ToString();
}


/// <summary>
/// Mongo database connection identifier
/// </summary>
Expand Down
52 changes: 42 additions & 10 deletions src/Hangfire.Mongo/DistributedLock/MongoDistributedLock.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading;
using Hangfire.Logging;
using Hangfire.Mongo.Database;
Expand Down Expand Up @@ -112,11 +114,8 @@ public virtual void Dispose()
{
AcquiredLocks.Value.Remove(_resource);

if (_heartbeatTimer != null)
{
_heartbeatTimer.Dispose();
_heartbeatTimer = null;
}
_heartbeatTimer?.Dispose();
_heartbeatTimer = null;

Release();

Expand Down Expand Up @@ -263,13 +262,15 @@ protected virtual void Cleanup()
/// </summary>
protected virtual void StartHeartBeat()
{
TimeSpan timerInterval = TimeSpan.FromMilliseconds(_storageOptions.DistributedLockLifetime.TotalMilliseconds / 5);
_heartbeatTimer = new Timer(state =>
var timerInterval = TimeSpan.FromMilliseconds(_storageOptions.DistributedLockLifetime.TotalMilliseconds / 5);
_heartbeatTimer = new Timer(_ =>
{
// Timer callback may be invoked after the Dispose method call,
// so we are using lock to avoid un synchronized calls.
lock (_lockObject)
{
if (_completed) return;

try
{
var filter = new BsonDocument
Expand All @@ -278,12 +279,43 @@ protected virtual void StartHeartBeat()
};
var update = new BsonDocument
{
["$set"] = new BsonDocument
[nameof(DistributedLockDto.ExpireAt)] = new BsonDocument
{
[nameof(DistributedLockDto.ExpireAt)] = DateTime.UtcNow.Add(_storageOptions.DistributedLockLifetime)
["$add"] = new BsonArray
{
"$$NOW",
(int) _storageOptions.DistributedLockLifetime.TotalMilliseconds,
}
}
};
_dbContext.DistributedLock.FindOneAndUpdate(filter, update);

var pipeline = new BsonDocument[]
{
new BsonDocument("$match", filter),
new BsonDocument("$set", update)
};
Stopwatch sw = null;
if (Logger.IsTraceEnabled())
{
sw = Stopwatch.StartNew();
}

_dbContext.DistributedLock.Aggregate<BsonDocument>(pipeline).FirstOrDefault();

if (Logger.IsTraceEnabled() && sw != null)
{
var serializedModel = new Dictionary<string, BsonDocument>
{
["Filter"] = filter,
["Update"] = update
};
sw.Stop();
var builder = new StringBuilder();
builder.AppendLine($"Lock heartbeat");
builder.AppendLine($"{serializedModel.ToJson()}");
builder.AppendLine($"Executed in {sw.ElapsedMilliseconds} ms");
Logger.Trace($"{builder}");
}
}
catch (Exception ex)
{
Expand Down
13 changes: 8 additions & 5 deletions src/Hangfire.Mongo/Hangfire.Mongo.csproj
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<VersionPrefix>1.9.16</VersionPrefix>
<VersionPrefix>1.10.0</VersionPrefix>
<TargetFramework>netstandard2.0</TargetFramework>
<NoWarn>$(NoWarn);CS0618</NoWarn>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
Expand All @@ -21,9 +21,12 @@
<owners>Sergey Zwezdin, Jonas Gottschau</owners>
<Description>MongoDB storage implementation for Hangfire (background job system for ASP.NET applications).</Description>
<PackageTags>Hangfire AspNet OWIN MongoDB CosmosDB Long-Running Background Fire-And-Forget Delayed Recurring Tasks Jobs Scheduler Threading Queues</PackageTags>
<PackageReleaseNotes>1.9.16
- Update to Hangfire 1.8.7
- Update to MongoDB 2.23.1
<PackageReleaseNotes>1.10.0
- Update to Hangfire 1.8.9
- Use SlidingInvisibilityTimeout to determine whether a background job is still alive
- BREAKING: Remove InvisibilityTimeout
- Use server time for distributed lock instead of Datetime.UtcNow
- Fixed Job not requeued when requeued while in processing state (#380)
</PackageReleaseNotes>
<PackageReadmeFile>README.md</PackageReadmeFile>
<!--<PackageLicenseUrl>https://raw.githubusercontent.com/sergun/Hangfire.Mongo/master/LICENSE</PackageLicenseUrl>-->
Expand All @@ -35,7 +38,7 @@
<None Include="../../README.md" pack="true" PackagePath="." />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Hangfire.Core" Version="1.8.7" />
<PackageReference Include="Hangfire.Core" Version="1.8.9" />
<PackageReference Include="MongoDB.Driver" Version="2.23.1" />
</ItemGroup>
</Project>
2 changes: 1 addition & 1 deletion src/Hangfire.Mongo/MongoConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ public override DateTime GetUtcDateTime()

if (Logger.IsTraceEnabled())
{
Logger.Trace($"GetUtcDateTime() => {now}");
Logger.Trace($"GetUtcDateTime() => {now:O}");
}

return now;
Expand Down
Loading

0 comments on commit 5698137

Please sign in to comment.