Skip to content

Commit

Permalink
fix: unblokker konservativt blokkerte tasks jevnlig i stdf. lås (#61)
Browse files Browse the repository at this point in the history
  • Loading branch information
frode-carlsen authored Jun 9, 2020
1 parent 5e87e90 commit 6a078ec
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 36 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
</modules>

<properties>
<revision>2.4.1</revision>
<revision>2.4.2</revision>
<sha1></sha1>
<changelist>-SNAPSHOT</changelist>
<java.version>11</java.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public RunTask(TaskManagerRepositoryImpl taskManagerRepo,
this.eventPubliserer = eventPubliserer;
this.taskManagerRepository = taskManagerRepo;
this.feilhåndteringalgoritmer = feilhåndteringsalgoritmer;
this.vetoHåndterer = new RunTaskVetoHåndterer(eventPubliserer, taskManagerRepo, taskManagerRepo.getEntityManager());
this.vetoHåndterer = new RunTaskVetoHåndterer(eventPubliserer, taskManagerRepo.getEntityManager());
}

public void doRun(RunTaskInfo taskInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,8 @@ public class RunTaskVetoHåndterer {
private ProsessTaskEventPubliserer eventPubliserer;
private EntityManager em;

private TaskManagerRepositoryImpl taskManagerRepo;

public RunTaskVetoHåndterer(ProsessTaskEventPubliserer eventPubliserer, TaskManagerRepositoryImpl taskManagerRepo, EntityManager entityManager) {
public RunTaskVetoHåndterer(ProsessTaskEventPubliserer eventPubliserer, EntityManager entityManager) {
this.eventPubliserer = eventPubliserer;
this.taskManagerRepo = taskManagerRepo;
this.em = entityManager;
}

Expand Down Expand Up @@ -75,8 +72,6 @@ boolean vetoRunTask(ProsessTaskEntitet pte) throws IOException {
vetoed = true;
Long blokkerId = veto.getBlokkertAvProsessTaskId();

sjekkKanSetteBlokker(blokkerId);

Feil feil = TaskManagerFeil.FACTORY.kanIkkeKjøreFikkVeto(pte.getId(), pte.getTaskName(), blokkerId, veto.getBegrunnelse());
ProsessTaskFeil taskFeil = new ProsessTaskFeil(pte.tilProsessTask(), feil);
taskFeil.setBlokkerendeProsessTaskId(blokkerId);
Expand All @@ -93,20 +88,4 @@ boolean vetoRunTask(ProsessTaskEntitet pte) throws IOException {

return vetoed;
}

private void sjekkKanSetteBlokker(Long blokkerId) {
var blokker = taskManagerRepo.finnOgLåsBlokker(blokkerId);
if (blokker.isPresent()) {
// Vi er på riktig vei, kan sette som blokker
return;
} else {
// skal fortsatt finne her (uten lås), (hvis ikke kastes NoResultException, eks hvis noen har slettet)
var blokkerTask = taskManagerRepo.finn(blokkerId);

// fikk ikke tak i lås, eller så er task er ferdigkjørt.
// Kan ikke se forskjell herfra, så kaster en midlertidig exception som gjør at må prøve på nytt senere.
throw TaskManagerFeil.FACTORY
.kunneIkkeProsessereTaskVetoForsøkerIgjen(blokkerId, blokkerTask.getTaskName(), blokkerTask.getStatus(), blokkerTask.getGruppe()).toException();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ synchronized void startPollerThread() {
}
this.pollingServiceScheduledFutures = List.of(
pollingService.scheduleWithFixedDelay(new PollAvailableTasks(), delayBetweenPollingMillis / 2, delayBetweenPollingMillis, TimeUnit.MILLISECONDS),
pollingService.scheduleWithFixedDelay(new MoveToDonePartition(), 30 * 1000L, 5 * 60 * 1000L, TimeUnit.MILLISECONDS));
pollingService.scheduleWithFixedDelay(new MoveToDonePartition(), 30 * 1000L, 5 * 60 * 1000L, TimeUnit.MILLISECONDS),
pollingService.scheduleWithFixedDelay(new FreeBlockedTasks(), 750L, 7500L, TimeUnit.MILLISECONDS));
}

synchronized void startTaskThreads() {
Expand Down Expand Up @@ -461,6 +462,46 @@ public void run() {

}

/** Unblokkerer tasks som har blitt konservativt blokkert (der den som kjører ikke ser nye veto påga Read Committed tx isolation level. */
class FreeBlockedTasks implements Runnable {

/** splittet fra for å kjøre i {@link TransactionHandler}. */
private final class DoInNewTransaction extends TransactionHandler<Integer> {

Integer doWork() throws Exception {
EntityManager entityManager = getTransactionManagerRepository().getEntityManager();
try {
return super.apply(entityManager);
} finally {
CDI.current().destroy(entityManager);
}
}

@Override
protected Integer doWork(EntityManager entityManager) throws Exception {
getTransactionManagerRepository().unblockTasks();
return 0;
}
}

/** splittet fra {@link #run()} for å kjøre med ActivateRequestContext. */
public Integer doWithContext() {
try {
return new DoInNewTransaction().doWork();
} catch (Throwable t) { // NOSONAR
// logg, ikke rethrow feil her da det dreper trådene
log.error("Kunne ikke unblokkerer tasks som kan frigis", t);
}
return 1;
}

@Override
public void run() {
RequestContextHandler.doWithRequestContext(this::doWithContext);
}

}

/** Internal executor that also tracks ids of currently queue or running tasks. */
class IdentExecutorService {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,6 @@ List<ProsessTaskData> tilProsessTask(List<ProsessTaskEntitet> resultList) {
@Transactional
void verifyStartup() {
logDatabaseDetaljer();
unblokkerTasks();
}

private void logDatabaseDetaljer() {
Expand Down Expand Up @@ -366,19 +365,26 @@ private void logDatabaseDetaljer() {
result.dbtz, result.dbtid, userTz, hibernateTz, result.inputtid, result.inputtid2, result.drift);
}

private void unblokkerTasks() {
var kjørtStatusKoder = KJØRT_STATUSER.stream().map(ProsessTaskStatus::getDbKode).collect(Collectors.toList());
var ikkeKjørtStatuser = IKKE_KJØRT_STATUSER.stream().map(ProsessTaskStatus::getDbKode).collect(Collectors.toList());

// midlertidig fiks dersom noe er veto som ikke bør være lenger
String sqlUnveto = "update prosess_task a set blokkert_av=NULL"
+ " WHERE status IN (:statuserIkkeKjoert)"
/**
* Når vi setter veto/blokkert på en task er vi ikke garantert at ikke den som blokkerer ikke er i ferd med å kjøres/snart er ferdig (fordi
* vi opererer med read committed / ikke serialzable rea)
* istdf å ta lås på blokkerende tasks før hvert veto, ettergår vi bare de som har blitt blokkert og opphever veto dersom det ikke er
* nødvendig lenger.
*/
void unblockTasks() {
String sqlUnveto = "update prosess_task a set "
+ " status='KLAR'"
+ ", blokkert_av=NULL"
+ ", feilede_forsoek=0"
+ ", siste_kjoering_feil_kode=NULL"
+ ", siste_kjoering_feil_tekst=NULL"
+ ", neste_kjoering_etter=NULL"
+ ", versjon = versjon +1"
+ " WHERE status = 'VETO'"
+ " AND blokkert_av IS NOT NULL"
+ " AND EXISTS (select 1 from prosess_task b where b.id=a.blokkert_av AND b.status IN (:statuserKjoert))";
+ " AND EXISTS (select 1 from prosess_task b where b.id=a.blokkert_av AND b.status IN ('KJOERT', 'FERDIG'))";

int unvetoed = entityManager.createNativeQuery(sqlUnveto)
.setParameter("statuserKjoert", kjørtStatusKoder)
.setParameter("statuserIkkeKjoert", ikkeKjørtStatuser)
.executeUpdate();
if (unvetoed > 0) {
log.warn("Fjernet veto fra {} tasks som var blokkert av andre tasks som allerede er ferdig");
Expand Down

0 comments on commit 6a078ec

Please sign in to comment.