diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/RepositoryApplicationConfiguration.java b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/RepositoryApplicationConfiguration.java index 51ea38ad30..0f753f20a0 100644 --- a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/RepositoryApplicationConfiguration.java +++ b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/RepositoryApplicationConfiguration.java @@ -167,6 +167,7 @@ import org.eclipse.hawkbit.utils.TenantConfigHelper; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; @@ -992,8 +993,8 @@ AutoCleanupScheduler autoCleanupScheduler(final SystemManagement systemManagemen @Profile("!test") @ConditionalOnProperty(prefix = "hawkbit.rollout.scheduler", name = "enabled", matchIfMissing = true) RolloutScheduler rolloutScheduler(final SystemManagement systemManagement, - final RolloutHandler rolloutHandler, final SystemSecurityContext systemSecurityContext) { - return new RolloutScheduler(rolloutHandler, systemManagement, systemSecurityContext); + final RolloutHandler rolloutHandler, final SystemSecurityContext systemSecurityContext, @Value("${hawkbit.rollout.executor.thread-pool.size:1}") int threadPoolSize) { + return new RolloutScheduler(rolloutHandler, systemManagement, systemSecurityContext, threadPoolSize); } /** diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/rollout/BlockWhenFullPolicy.java b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/rollout/BlockWhenFullPolicy.java new file mode 100644 index 0000000000..348df11a64 --- /dev/null +++ b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/rollout/BlockWhenFullPolicy.java @@ -0,0 +1,30 @@ +/** + * Copyright (c) 2025 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + */ + +package org.eclipse.hawkbit.repository.jpa.rollout; + +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; + +public class BlockWhenFullPolicy implements RejectedExecutionHandler { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + try { + // Because queueCapacity=0 => SynchronousQueue + // This put(...) call blocks if both threads are busy, + // until a thread is free + executor.getQueue().put(r); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RejectedExecutionException("Interrupted while waiting to queue task", e); + } + } +} \ No newline at end of file diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/rollout/RolloutScheduler.java b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/rollout/RolloutScheduler.java index 7b69dcf549..57c9bd0208 100644 --- a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/rollout/RolloutScheduler.java +++ b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/rollout/RolloutScheduler.java @@ -14,6 +14,7 @@ import org.eclipse.hawkbit.repository.SystemManagement; import org.eclipse.hawkbit.security.SystemSecurityContext; import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; /** * Scheduler to schedule the {@link RolloutHandler#handleAll()}. The @@ -28,12 +29,16 @@ public class RolloutScheduler { private final SystemManagement systemManagement; private final RolloutHandler rolloutHandler; private final SystemSecurityContext systemSecurityContext; + private final ThreadPoolTaskExecutor rolloutTaskExecutor; public RolloutScheduler( - final RolloutHandler rolloutHandler, final SystemManagement systemManagement, final SystemSecurityContext systemSecurityContext) { + final RolloutHandler rolloutHandler, final SystemManagement systemManagement, final SystemSecurityContext systemSecurityContext, + final int threadPoolSize) { this.systemManagement = systemManagement; this.rolloutHandler = rolloutHandler; this.systemSecurityContext = systemSecurityContext; + this.rolloutTaskExecutor = threadPoolTaskExecutor(threadPoolSize); + } /** @@ -43,14 +48,58 @@ public RolloutScheduler( @Scheduled(initialDelayString = PROP_SCHEDULER_DELAY_PLACEHOLDER, fixedDelayString = PROP_SCHEDULER_DELAY_PLACEHOLDER) public void runningRolloutScheduler() { log.debug("rollout schedule checker has been triggered."); - - // run this code in system code privileged to have the necessary permission to query and create entities. + // run this code in system code privileged to have the necessary + // permission to query and create entities. systemSecurityContext.runAsSystem(() -> { - // workaround eclipselink that is currently not possible to execute a query without multi-tenancy if MultiTenant - // annotation is used. https://bugs.eclipse.org/bugs/show_bug.cgi?id=355458. So, iterate through all tenants and execute the rollout - // check for each tenant separately. - systemManagement.forEachTenant(tenant -> rolloutHandler.handleAll()); + // workaround eclipselink that is currently not possible to + // execute a query without multi-tenancy if MultiTenant + // annotation is used. + // https://bugs.eclipse.org/bugs/show_bug.cgi?id=355458. So + // iterate through all tenants and execute the rollout check for + // each tenant seperately. + + systemManagement.forEachTenant(tenant -> { + if (rolloutTaskExecutor == null) { + handleAll(tenant); + } else { + handleAllAsync(tenant); + } + }); return null; }); } + + private void handleAll(String tenant) { + log.trace("Handling rollout for tenant: {}", tenant); + try { + rolloutHandler.handleAll(); + } catch (Exception e) { + log.error("Error processing rollout for tenant {}", tenant, e); + } + } + + private void handleAllAsync(String tenant) { + rolloutTaskExecutor.submit(() -> systemSecurityContext.runAsSystemAsTenant(() -> { + handleAll(tenant); + return null; + }, tenant)); + + } + + private ThreadPoolTaskExecutor threadPoolTaskExecutor (int threadPoolSize) { + if (threadPoolSize <= 1) { + return null; + } + + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(threadPoolSize); + executor.setMaxPoolSize(threadPoolSize); + executor.setQueueCapacity(0); // forces a Synchronous Queue + // This policy will block the submitter until a worker thread is free + executor.setRejectedExecutionHandler(new BlockWhenFullPolicy()); + executor.setThreadNamePrefix("rollout-exec-"); + executor.initialize(); + return executor; + } + } \ No newline at end of file