Skip to content

Commit

Permalink
Fix an issue where a periodic job will not be re-scheduled (#63)
Browse files Browse the repository at this point in the history
  • Loading branch information
lucas34 authored Jan 22, 2018
1 parent 3a2d507 commit 9748def
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 22 deletions.
4 changes: 2 additions & 2 deletions Sources/SwiftQueue/Job.swift
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public final class JobBuilder {

public func periodic(limit: Limit = .unlimited, interval: TimeInterval = 0) -> Self {
assert(interval >= 0)
info.maxRun = limit.intValue
info.maxRun = limit
info.interval = interval
return self
}
Expand All @@ -82,7 +82,7 @@ public final class JobBuilder {
/// Limit number of retry. Overall for the lifecycle of the SwiftQueueManager.
/// For a periodic job, the retry count will not be reset at each period.
public func retry(limit: Limit) -> Self {
info.retries = limit.intValue
info.retries = limit
return self
}

Expand Down
12 changes: 6 additions & 6 deletions Sources/SwiftQueue/JobInfo.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ struct JobInfo {
var createTime: Date = Date()

var interval: TimeInterval = -1.0
var maxRun: Int = 0
var maxRun: Limit = .limited(0)

var retries: Int = 0
var retries: Limit = .limited(0)

var runCount: Int = 0
var currentRepetition: Int = 0 // Do not serialize
Expand Down Expand Up @@ -65,9 +65,9 @@ struct JobInfo {
dictionary.assign(&self.createTime, key: "createTime", dateFormatter.date)

dictionary.assign(&self.interval, key: "interval")
dictionary.assign(&self.maxRun, key: "maxRun")
dictionary.assign(&self.maxRun, key: "maxRun", Limit.fromIntValue)

dictionary.assign(&self.retries, key: "retries")
dictionary.assign(&self.retries, key: "retries", Limit.fromIntValue)

dictionary.assign(&self.runCount, key: "runCount")
}
Expand All @@ -86,8 +86,8 @@ struct JobInfo {
dict["params"] = self.params
dict["createTime"] = dateFormatter.string(from: self.createTime)
dict["runCount"] = self.runCount
dict["maxRun"] = self.maxRun
dict["retries"] = self.retries
dict["maxRun"] = self.maxRun.intValue
dict["retries"] = self.retries.intValue
dict["interval"] = self.interval
return dict
}
Expand Down
26 changes: 17 additions & 9 deletions Sources/SwiftQueue/SwiftQueueJob.swift
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,16 @@ internal final class SwiftQueueJob: Operation, JobResult {
private func completionFail(error: Swift.Error) {
lastError = error

if info.retries > 0 {
switch info.retries {
case .limited(let value):
if value > 0 {
retryJob(retry: handler.onRetry(error: error))
} else {
onTerminate()
}
break
case .unlimited:
retryJob(retry: handler.onRetry(error: error))
} else {
onTerminate()
}
}

Expand All @@ -135,21 +141,21 @@ internal final class SwiftQueueJob: Operation, JobResult {
case .retry(let after):
guard after > 0 else {
// Retry immediately
info.retries -= 1
info.retries.decreaseValue(by: 1)
self.run()
return
}

// Retry after time in parameter
runInBackgroundAfter(after) { [weak self] in
self?.info.retries -= 1
self?.info.retries.decreaseValue(by: 1)
self?.run()
}
case .exponential(let initial):
info.currentRepetition += 1
let delay = info.currentRepetition == 1 ? initial : initial * pow(2, Double(info.currentRepetition - 1))
runInBackgroundAfter(delay) { [weak self] in
self?.info.retries -= 1
self?.info.retries.decreaseValue(by: 1)
self?.run()
}
}
Expand All @@ -159,10 +165,12 @@ internal final class SwiftQueueJob: Operation, JobResult {
lastError = nil
info.currentRepetition = 0

guard info.runCount + 1 < info.maxRun else {
if case .limited(let limit) = info.maxRun {
// Reached run limit
onTerminate()
return
guard info.runCount + 1 < limit else {
onTerminate()
return
}
}

guard info.interval > 0 else {
Expand Down
33 changes: 32 additions & 1 deletion Sources/SwiftQueue/Utils.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,15 @@ func assertNotEmptyString(_ string: @autoclosure () -> String, file: StaticStrin

internal extension Limit {

internal var intValue: Int {
static func fromIntValue(value: Int) -> Limit {
if value < 0 {
return Limit.unlimited
} else {
return Limit.limited(value)
}
}

var intValue: Int {
switch self {
case .unlimited:
return -1
Expand All @@ -43,5 +51,28 @@ internal extension Limit {
}

}

mutating func decreaseValue(by: Int) {
if case .limited(let limit) = self {
let value = limit - by
assert(value >= 0)
self = Limit.limited(value)
}
}

}


extension Limit: Equatable {

public static func ==(lhs: Limit, rhs: Limit) -> Bool {
switch (lhs, rhs) {
case let (.limited(a), .limited(b)):
return a == b
case (.unlimited, .unlimited):
return true
default:
return false
}
}
}
43 changes: 43 additions & 0 deletions Tests/SwiftQueueTests/ConstraintTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,27 @@ class ConstraintTests: XCTestCase {
XCTAssertEqual(job.onCancelCalled, 0)
}

func testPeriodicJobUnlimited() {
let job = TestJob()
let type = UUID().uuidString

let creator = TestCreator([type: job])

let manager = SwiftQueueManager(creators: [creator])
JobBuilder(type: type)
.periodic(limit: .unlimited)
.schedule(manager: manager)

// Should run at least 100 times
job.awaitRun(value: 1000)

// Semaphore is async so the value is un-predicable
XCTAssertTrue(job.onRunJobCalled > 50)
XCTAssertEqual(job.onCompleteCalled, 0)
XCTAssertEqual(job.onRetryCalled, 0)
XCTAssertEqual(job.onCancelCalled, 0)
}

func testRetryFailJobWithRetryConstraint() {
let job = TestJob()
let type = UUID().uuidString
Expand Down Expand Up @@ -71,6 +92,28 @@ class ConstraintTests: XCTestCase {
XCTAssertEqual(job.onCancelCalled, 1)
}

func testRetryUnlimitedShouldRetryManyTimes() {
let job = TestJob()
let type = UUID().uuidString

let creator = TestCreator([type: job])

job.result = JobError()
job.retryConstraint = .retry(delay: 0)

let manager = SwiftQueueManager(creators: [creator])
JobBuilder(type: type)
.retry(limit: .unlimited)
.schedule(manager: manager)

job.awaitRun(value: 10000)

XCTAssertTrue(job.onRunJobCalled > 50)
XCTAssertEqual(job.onCompleteCalled, 0)
XCTAssertTrue(job.onRetryCalled > 50)
XCTAssertEqual(job.onCancelCalled, 0)
}

func testRetryFailJobWithCancelConstraint() {
let job = TestJob()
let type = UUID().uuidString
Expand Down
8 changes: 4 additions & 4 deletions Tests/SwiftQueueTests/SwiftQueueBuilderTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class SwiftQueueBuilderTests: XCTestCase {
let interval: Double = 12341

let jobInfo = toJobInfo(type: type, JobBuilder(type: type).periodic(limit: .unlimited, interval: interval))
XCTAssertEqual(jobInfo?.maxRun, -1)
XCTAssertEqual(jobInfo?.maxRun, Limit.unlimited)
XCTAssertEqual(jobInfo?.interval, interval)
}

Expand All @@ -74,7 +74,7 @@ class SwiftQueueBuilderTests: XCTestCase {
let interval: Double = 12342

let jobInfo = toJobInfo(type: type, JobBuilder(type: type).periodic(limit: .limited(limited), interval: interval))
XCTAssertEqual(jobInfo?.maxRun, limited)
XCTAssertEqual(jobInfo?.maxRun, Limit.limited(limited))
XCTAssertEqual(jobInfo?.interval, interval)
}

Expand Down Expand Up @@ -106,15 +106,15 @@ class SwiftQueueBuilderTests: XCTestCase {
let type = UUID().uuidString

let jobInfo = toJobInfo(type: type, JobBuilder(type: type).retry(limit: .unlimited))
XCTAssertEqual(jobInfo?.retries, -1)
XCTAssertEqual(jobInfo?.retries, Limit.unlimited)
}

public func testBuilderRetryLimited() {
let type = UUID().uuidString
let limited = 123

let jobInfo = toJobInfo(type: type, JobBuilder(type: type).retry(limit: .limited(limited)))
XCTAssertEqual(jobInfo?.retries, limited)
XCTAssertEqual(jobInfo?.retries, Limit.limited(limited))
}

public func testBuilderAddTag() {
Expand Down
12 changes: 12 additions & 0 deletions Tests/SwiftQueueTests/TestUtils.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,18 @@ class TestJob: Job {

public let completionTimeout: TimeInterval

var runSemaphoreValue = 0
let runSemaphore = DispatchSemaphore(value: 0)

init(_ completionTimeout: TimeInterval = 0) {
self.completionTimeout = completionTimeout
}

func onRun(callback: JobResult) {
onRunJobCalled += 1
if runSemaphoreValue == onRunJobCalled {
runSemaphore.signal()
}
runInBackgroundAfter(completionTimeout) {
if let error = self.result {
callback.done(.fail(error))
Expand Down Expand Up @@ -63,6 +69,12 @@ class TestJob: Job {
let delta = DispatchTime.now() + Double(Int64(seconds) * Int64(NSEC_PER_SEC)) / Double(NSEC_PER_SEC)
_ = semaphore.wait(timeout: delta)
}

func awaitRun(value: Int, _ seconds: TimeInterval = TimeInterval(5)) {
let delta = DispatchTime.now() + Double(Int64(seconds) * Int64(NSEC_PER_SEC)) / Double(NSEC_PER_SEC)
runSemaphoreValue = value
_ = runSemaphore.wait(timeout: delta)
}
}

class TestCreator: JobCreator {
Expand Down

0 comments on commit 9748def

Please sign in to comment.