Skip to content

Commit

Permalink
Elastic schedulers (#349)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillPamPam authored Nov 24, 2023
1 parent 4fd262f commit d5b4833
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 13 deletions.
3 changes: 2 additions & 1 deletion src/main/kotlin/io/emeraldpay/dshackle/Starter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ fun main(args: Array<String>) {

HeapDumpCreator.init()

val cores = Runtime.getRuntime().availableProcessors()
val maxMemory: Long = Runtime.getRuntime().maxMemory() / (1024 * 1024).toLong()
log.info("Max heap size: {} MB", maxMemory)
log.info("Max heap size: {} MB, number of cores: {}", maxMemory, cores)

val app = SpringApplication(Starter::class.java)
app.setDefaultProperties(ResourcePropertySource("version.properties").source)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,33 @@ import io.emeraldpay.dshackle.config.MonitoringConfig
import io.micrometer.core.instrument.Metrics
import io.micrometer.core.instrument.Tag
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics
import org.slf4j.LoggerFactory
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.scheduling.concurrent.CustomizableThreadFactory
import reactor.core.scheduler.Scheduler
import reactor.core.scheduler.Schedulers
import java.util.concurrent.Executor
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors

@Configuration
open class SchedulersConfig {
@Bean
open fun rpcScheduler(monitoringConfig: MonitoringConfig): Scheduler {
return makeScheduler("blockchain-rpc-scheduler", 30, monitoringConfig)
private val log = LoggerFactory.getLogger(SchedulersConfig::class.java)
private val threadsMultiplier: Int

init {
val cores = Runtime.getRuntime().availableProcessors()
threadsMultiplier = if (cores < 3) {
1
} else {
cores / 2
}
log.info("Creating schedulers with multiplier: {}...", threadsMultiplier)
}

@Bean
open fun trackTxScheduler(monitoringConfig: MonitoringConfig): Scheduler {
return makeScheduler("tracktx-scheduler", 5, monitoringConfig)
open fun rpcScheduler(monitoringConfig: MonitoringConfig): Scheduler {
return makeScheduler("blockchain-rpc-scheduler", 20, monitoringConfig)
}

@Bean
Expand Down Expand Up @@ -55,18 +63,13 @@ open class SchedulersConfig {
return makeScheduler("head-liveness-scheduler", 4, monitoringConfig)
}

@Bean
open fun grpcChannelExecutor(monitoringConfig: MonitoringConfig): Executor {
return makePool("grpc-client-channel", 10, monitoringConfig)
}

@Bean
open fun authScheduler(monitoringConfig: MonitoringConfig): Scheduler {
return makeScheduler("auth-scheduler", 4, monitoringConfig)
}

private fun makeScheduler(name: String, size: Int, monitoringConfig: MonitoringConfig): Scheduler {
return Schedulers.fromExecutorService(makePool(name, size, monitoringConfig))
return Schedulers.fromExecutorService(makePool(name, size * threadsMultiplier, monitoringConfig))
}

private fun makePool(name: String, size: Int, monitoringConfig: MonitoringConfig): ExecutorService {
Expand Down

0 comments on commit d5b4833

Please sign in to comment.