Skip to content

Commit

Permalink
Align job result API with Bootique "CommandOutcome" #124
Browse files Browse the repository at this point in the history
  • Loading branch information
andrus committed May 4, 2024
1 parent e585aac commit e5498f5
Show file tree
Hide file tree
Showing 60 changed files with 451 additions and 457 deletions.
1 change: 1 addition & 0 deletions RELEASE-NOTES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## 3.0-M5

* #123 Jobs as lambdas
* #124 Align job result API with Bootique "CommandOutcome"

## 3.0-M4

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import io.bootique.job.Job;
import io.bootique.job.lock.LockHandler;
import io.bootique.job.JobResult;
import io.bootique.job.JobOutcome;

import java.util.Map;

Expand All @@ -35,7 +35,7 @@ public CompositeConsulLockHandler(LockHandler localLockHandler, LockHandler cons
}

@Override
public JobResult run(Job delegate, Map<String, Object> params) {
public JobOutcome run(Job delegate, Map<String, Object> params) {
return localLockHandler.run(consulLockHandler.decorate(delegate, null, params), params);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import io.bootique.job.Job;
import io.bootique.job.JobMetadata;
import io.bootique.job.lock.LockHandler;
import io.bootique.job.JobResult;
import io.bootique.job.JobOutcome;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,7 +45,7 @@ public ConsulLockHandler(KeyValueClient kvClient, Supplier<String> consulSession
}

@Override
public JobResult run(Job delegate, Map<String, Object> params) {
public JobOutcome run(Job delegate, Map<String, Object> params) {

JobMetadata metadata = delegate.getMetadata();

Expand All @@ -57,7 +57,7 @@ public JobResult run(Job delegate, Map<String, Object> params) {
boolean acquired = kvClient.acquireLock(lockName, sessionId);
if (!acquired) {
LOGGER.info("** Another job instance owns the lock. Skipping execution of '{}'", lockName);
return JobResult.skipped("Another job instance owns the lock. Skipping execution");
return JobOutcome.skipped("Another job instance owns the lock. Skipping execution");
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import io.bootique.job.JobMetadata;
import io.bootique.job.SerialJob;
import io.bootique.job.consul.it.ConsulJobLockIT;
import io.bootique.job.JobResult;
import io.bootique.job.JobOutcome;

import java.util.Map;

Expand All @@ -18,14 +18,14 @@ public LockJob() {
}

@Override
public JobResult run(Map<String, Object> params) {
public JobOutcome run(Map<String, Object> params) {
Integer callsCount = (Integer) params.get(ConsulJobLockIT.CALLS_COUNT);
params.put(ConsulJobLockIT.CALLS_COUNT, callsCount + 1);
try {
Thread.sleep(DELAY);
} catch (InterruptedException e) {
return JobResult.failed();
return JobOutcome.failed();
}
return JobResult.succeeded();
return JobOutcome.succeeded();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package io.bootique.job.instrumented;

import io.bootique.job.Job;
import io.bootique.job.JobResult;
import io.bootique.job.JobOutcome;
import io.bootique.job.runtime.GraphExecutor;

import java.util.Map;
Expand All @@ -36,7 +36,7 @@ public InstrumentedGraphExecutor(ExecutorService pool) {
}

@Override
public Future<JobResult> submit(Job job, Map<String, Object> params) {
public Future<JobOutcome> submit(Job job, Map<String, Object> params) {
Job decorated = decorateWithGroupTxId(job);
return super.submit(decorated, params);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import io.bootique.job.Job;
import io.bootique.job.JobMetadata;
import io.bootique.job.JobResult;
import io.bootique.job.JobOutcome;
import io.bootique.job.runtime.JobLogger;

import java.util.Map;
Expand All @@ -40,15 +40,15 @@ class InstrumentedJobLogger extends JobLogger {
}

@Override
public JobResult run(Job delegate, Map<String, Object> params) {
public JobOutcome run(Job delegate, Map<String, Object> params) {
JobMetadata metadata = delegate.getMetadata();
JobMeter meter = onMeteredJobStarted(metadata, params);

try {
JobResult result = delegate.run(params);
JobOutcome result = delegate.run(params);
return onMeteredJobFinished(metadata, result, meter);
} catch (Throwable th) {
return onMeteredJobFinished(metadata, JobResult.failure(metadata, th), meter);
return onMeteredJobFinished(metadata, JobOutcome.failed(th), meter);
}
}

Expand All @@ -62,38 +62,38 @@ protected JobMeter onMeteredJobStarted(JobMetadata metadata, Map<String, Object>
return meter;
}

private JobResult onMeteredJobFinished(JobMetadata metadata, JobResult result, JobMeter meter) {
private JobOutcome onMeteredJobFinished(JobMetadata metadata, JobOutcome result, JobMeter meter) {
long timeMs = meter.stop(result);
logJobFinished(metadata, result, timeMs);
mdcManager.onJobFinished();
return result;
}

private void logJobFinished(JobMetadata metadata, JobResult result, long timeMs) {
private void logJobFinished(JobMetadata metadata, JobOutcome result, long timeMs) {

String label = metadata.isGroup() ? "group" : "job";
String name = metadata.getName();

switch (result.getOutcome()) {
switch (result.getStatus()) {
case SUCCESS:
LOGGER.info("{} '{}' finished in {} ms", label, name, timeMs);
return;

default:
String message = result.getMessage();
if (message == null && result.getThrowable() != null) {
message = result.getThrowable().getMessage();
if (message == null && result.getException() != null) {
message = result.getException().getMessage();
}

if (message == null) {
message = "";
}

if (result.getThrowable() != null) {
LOGGER.info("job exception", result.getThrowable());
if (result.getException() != null) {
LOGGER.info("job exception", result.getException());
}

LOGGER.warn("{} '{}' finished in {} ms: {} - {} ", label, name, timeMs, result.getOutcome(), message);
LOGGER.warn("{} '{}' finished in {} ms: {} - {} ", label, name, timeMs, result.getStatus(), message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package io.bootique.job.instrumented;

import com.codahale.metrics.Timer;
import io.bootique.job.JobResult;
import io.bootique.job.JobOutcome;

import java.util.Objects;

Expand All @@ -40,14 +40,14 @@ public void start() {
this.runTimer = metrics.getTimer().time();
}

public long stop(JobResult result) {
public long stop(JobOutcome result) {
metrics.getActiveCounter().dec();
metrics.getCompletedCounter().inc();

// Timer.Context#stop also updates aggregate running time of all instances of <jobName>
long timeNanos = runTimer.stop();

switch (result.getOutcome()) {
switch (result.getStatus()) {
case SUCCESS: {
metrics.getSuccessCounter().inc();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import io.bootique.job.Job;
import io.bootique.job.JobMetadata;
import io.bootique.job.JobResult;
import io.bootique.job.JobOutcome;
import io.bootique.metrics.mdc.TransactionIdMDC;

import java.util.Map;
Expand Down Expand Up @@ -48,7 +48,7 @@ public JobMetadata getMetadata() {
}

@Override
public JobResult run(Map<String, Object> parameters) {
public JobOutcome run(Map<String, Object> parameters) {

TransactionIdMDC.setId(groupMDC);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,14 @@ public Job1() {
}

@Override
public JobResult run(Map<String, Object> params) {
public JobOutcome run(Map<String, Object> params) {

LOGGER.info("in job1");

int next = counter.getAndIncrement();
String id = MDC.get(TransactionIdMDC.MDC_KEY);
tx.put(next, id != null ? id : NULL_PLACEHOLDER);
return JobResult.succeeded();
return JobOutcome.succeeded();
}
}

Expand All @@ -147,12 +147,12 @@ public Job2() {
}

@Override
public JobResult run(Map<String, Object> params) {
public JobOutcome run(Map<String, Object> params) {
LOGGER.info("in job2");
int next = counter.getAndIncrement();
String id = MDC.get(TransactionIdMDC.MDC_KEY);
tx.put(next, id != null ? id : NULL_PLACEHOLDER);
return JobResult.succeeded();
return JobOutcome.succeeded();
}
}

Expand All @@ -163,8 +163,8 @@ public Job3() {
}

@Override
public JobResult run(Map<String, Object> params) {
return JobResult.succeeded();
public JobOutcome run(Map<String, Object> params) {
return JobOutcome.succeeded();
}
}

Expand All @@ -175,8 +175,8 @@ public Job4() {
}

@Override
public JobResult run(Map<String, Object> params) {
return JobResult.succeeded();
public JobOutcome run(Map<String, Object> params) {
return JobOutcome.succeeded();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import io.bootique.BQRuntime;
import io.bootique.job.Job;
import io.bootique.job.JobMetadata;
import io.bootique.job.JobResult;
import io.bootique.job.JobOutcome;
import io.bootique.job.JobsModule;
import io.bootique.junit5.BQTest;
import io.bootique.junit5.BQTestFactory;
Expand Down Expand Up @@ -77,8 +77,8 @@ public JobMetadata getMetadata() {
}

@Override
public JobResult run(Map<String, Object> parameters) {
return JobResult.succeeded();
public JobOutcome run(Map<String, Object> parameters) {
return JobOutcome.succeeded();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import io.bootique.job.JobResult;
import io.bootique.job.JobOutcome;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand All @@ -48,10 +48,10 @@ public void jobsInstrumentation_ActiveCount_SuccessAndFailureResults() {
JobMeter m2 = manager.onJobStarted("j1");
assertHasMetrics("j1", metricRegistry, 2, 0, 0, 0);

m1.stop(JobResult.succeeded());
m1.stop(JobOutcome.succeeded());
assertHasMetrics("j1", metricRegistry, 1, 1, 1, 0);

m2.stop(JobResult.failed());
m2.stop(JobOutcome.failed());
assertHasMetrics("j1", metricRegistry, 0, 2, 1, 1);
}

Expand All @@ -62,7 +62,7 @@ public void jobsInstrumentation_UnknownResult() {
JobMeter m1 = manager.onJobStarted("j1");
assertHasMetrics("j1", metricRegistry, 1, 0, 0, 0);

m1.stop(JobResult.unknown());
m1.stop(JobOutcome.unknown());
assertHasMetrics("j1", metricRegistry, 0, 1, 0, 0);
}

Expand All @@ -71,7 +71,7 @@ public void jobsInstrumentation_SuccessResult() {

JobMetricsManager manager = new JobMetricsManager(metricRegistry);
JobMeter m1 = manager.onJobStarted("j1");
m1.stop(JobResult.unknown());
m1.stop(JobOutcome.unknown());

assertHasMetrics("j1", metricRegistry, 0, 1, 0, 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import io.bootique.job.Job;
import io.bootique.job.JobMetadata;
import io.bootique.job.lock.LockHandler;
import io.bootique.job.JobResult;
import io.bootique.job.JobOutcome;
import org.apache.curator.framework.CuratorFramework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -46,7 +46,7 @@ public ZkClusterLockHandler(Provider<CuratorFramework> curator) {
}

@Override
public JobResult run(Job delegate, Map<String, Object> params) {
public JobOutcome run(Job delegate, Map<String, Object> params) {

JobMetadata metadata = delegate.getMetadata();
String lockName = getLockName(metadata);
Expand All @@ -56,7 +56,7 @@ public JobResult run(Job delegate, Map<String, Object> params) {
ZkMutex lock = ZkMutex.acquire(curator.get(), lockName);
if (lock == null) {
LOGGER.info("** Another job instance owns the lock. Skipping execution of '{}'", lockName);
return JobResult.skipped("Another job instance owns the lock. Skipping execution");
return JobOutcome.skipped("Another job instance owns the lock. Skipping execution");
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import io.bootique.job.BaseJob;
import io.bootique.job.JobMetadata;
import io.bootique.job.SerialJob;
import io.bootique.job.JobResult;
import io.bootique.job.JobOutcome;
import io.bootique.job.zookeeper.it.ZkJobLockIT;

import java.util.Map;
Expand All @@ -36,15 +36,15 @@ public LockJob() {
}

@Override
public JobResult run(Map<String, Object> params) {
public JobOutcome run(Map<String, Object> params) {
Integer callsCount = (Integer) params.get(ZkJobLockIT.CALLS_COUNT);
params.put(ZkJobLockIT.CALLS_COUNT, callsCount + 1);
try {
Thread.sleep(DELAY);
} catch (InterruptedException e) {
return JobResult.failed();
return JobOutcome.failed();
}
return JobResult.succeeded();
return JobOutcome.succeeded();
}
}

2 changes: 1 addition & 1 deletion bootique-job/src/main/java/io/bootique/job/BaseJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,5 @@ public JobMetadata getMetadata() {
}

@Override
public abstract JobResult run(Map<String, Object> params);
public abstract JobOutcome run(Map<String, Object> params);
}
Loading

0 comments on commit e5498f5

Please sign in to comment.