Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce parallel rollout processing #2248

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@
import org.eclipse.hawkbit.utils.TenantConfigHelper;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
Expand All @@ -189,6 +190,7 @@
import org.springframework.lang.NonNull;
import org.springframework.retry.annotation.EnableRetry;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
avgustinmm marked this conversation as resolved.
Show resolved Hide resolved
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.validation.beanvalidation.MethodValidationPostProcessor;
Expand Down Expand Up @@ -992,8 +994,8 @@ AutoCleanupScheduler autoCleanupScheduler(final SystemManagement systemManagemen
@Profile("!test")
@ConditionalOnProperty(prefix = "hawkbit.rollout.scheduler", name = "enabled", matchIfMissing = true)
RolloutScheduler rolloutScheduler(final SystemManagement systemManagement,
final RolloutHandler rolloutHandler, final SystemSecurityContext systemSecurityContext) {
return new RolloutScheduler(rolloutHandler, systemManagement, systemSecurityContext);
final RolloutHandler rolloutHandler, final SystemSecurityContext systemSecurityContext, @Value("${hawkbit.rollout.executor.thread-pool.size:1}") int threadPoolSize) {
return new RolloutScheduler(rolloutHandler, systemManagement, systemSecurityContext, threadPoolSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make it final

}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Copyright (c) 2025 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*/

package org.eclipse.hawkbit.repository.jpa.rollout;

import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

public class BlockWhenFullPolicy implements RejectedExecutionHandler {
@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4 spaces indents, please use hawkbit formatter

public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// Because queueCapacity=0 => SynchronousQueue
// This put(...) call blocks if both threads are busy,
// until a thread is free
executor.getQueue().put(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RejectedExecutionException("Interrupted while waiting to queue task", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.eclipse.hawkbit.repository.SystemManagement;
import org.eclipse.hawkbit.security.SystemSecurityContext;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
* Scheduler to schedule the {@link RolloutHandler#handleAll()}. The
Expand All @@ -28,12 +29,16 @@ public class RolloutScheduler {
private final SystemManagement systemManagement;
private final RolloutHandler rolloutHandler;
private final SystemSecurityContext systemSecurityContext;
private final ThreadPoolTaskExecutor rolloutTaskExecutor;

public RolloutScheduler(
final RolloutHandler rolloutHandler, final SystemManagement systemManagement, final SystemSecurityContext systemSecurityContext) {
final RolloutHandler rolloutHandler, final SystemManagement systemManagement, final SystemSecurityContext systemSecurityContext,
final int threadPoolSize) {
this.systemManagement = systemManagement;
this.rolloutHandler = rolloutHandler;
this.systemSecurityContext = systemSecurityContext;
this.rolloutTaskExecutor = threadPoolTaskExecutor(threadPoolSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this. is not needed


}

/**
Expand All @@ -43,14 +48,56 @@ public RolloutScheduler(
@Scheduled(initialDelayString = PROP_SCHEDULER_DELAY_PLACEHOLDER, fixedDelayString = PROP_SCHEDULER_DELAY_PLACEHOLDER)
public void runningRolloutScheduler() {
log.debug("rollout schedule checker has been triggered.");

// run this code in system code privileged to have the necessary permission to query and create entities.
// run this code in system code privileged to have the necessary
// permission to query and create entities.
systemSecurityContext.runAsSystem(() -> {
// workaround eclipselink that is currently not possible to execute a query without multi-tenancy if MultiTenant
// annotation is used. https://bugs.eclipse.org/bugs/show_bug.cgi?id=355458. So, iterate through all tenants and execute the rollout
// check for each tenant separately.
systemManagement.forEachTenant(tenant -> rolloutHandler.handleAll());
// workaround eclipselink that is currently not possible to
// execute a query without multi-tenancy if MultiTenant
// annotation is used.
// https://bugs.eclipse.org/bugs/show_bug.cgi?id=355458. So
// iterate through all tenants and execute the rollout check for
// each tenant seperately.

systemManagement.forEachTenant(this::handle);
return null;
});
}

private void handle(String tenant) {
log.trace("Handling rollout for tenant: {}", tenant);
if (rolloutTaskExecutor == null) {
rolloutHandler.handleAll();
} else {
handleParallel(tenant);
}
}

private void handleParallel(String tenant) {
rolloutTaskExecutor.submit(() -> systemSecurityContext.runAsSystemAsTenant(() -> {
try {
rolloutHandler.handleAll();
} catch (Exception e) {
log.error("Error processing rollout for tenant {}", tenant, e);
avgustinmm marked this conversation as resolved.
Show resolved Hide resolved
}
return null;
}, tenant));

}

private ThreadPoolTaskExecutor threadPoolTaskExecutor (int threadPoolSize) {
if (threadPoolSize <= 1) {
return null;
}

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(threadPoolSize);
executor.setMaxPoolSize(threadPoolSize);
executor.setQueueCapacity(0); // forces a Synchronous Queue
// This policy will block the submitter until a worker thread is free
executor.setRejectedExecutionHandler(new BlockWhenFullPolicy());
executor.setThreadNamePrefix("rollout-exec-");
executor.initialize();
return executor;
}

}