Skip to content

Commit 40f0c64

Browse files
feat(s3stream): change append callback thread number (AutoMQ#1950)
1 parent 352bf01 commit 40f0c64

File tree

1 file changed

+3
-3
lines changed

1 file changed

+3
-3
lines changed

core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
package kafka.log.streamaspect
1313

1414
import com.automq.stream.api.{Client, CreateStreamOptions, KeyValue, OpenStreamOptions}
15-
import com.automq.stream.utils.FutureUtil
15+
import com.automq.stream.utils.{FutureUtil, Systems}
1616
import io.netty.buffer.Unpooled
1717
import kafka.log.LocalLog.CleanedFileSuffix
1818
import kafka.log._
@@ -588,8 +588,8 @@ object ElasticLog extends Logging {
588588
private val APPEND_TIME_HIST = KafkaMetricsGroup.newHistogram("AppendTimeNanos")
589589
private val APPEND_CALLBACK_TIME_HIST = KafkaMetricsGroup.newHistogram("AppendCallbackTimeNanos")
590590
private val APPEND_ACK_TIME_HIST = KafkaMetricsGroup.newHistogram("AppendAckTimeNanos")
591-
private val APPEND_CALLBACK_EXECUTOR: Array[ExecutorService] = new Array[ExecutorService](8)
592-
private val READ_ASYNC_EXECUTOR: Array[ExecutorService] = new Array[ExecutorService](8)
591+
private val APPEND_CALLBACK_EXECUTOR: Array[ExecutorService] = new Array[ExecutorService](Systems.CPU_CORES * 2)
592+
private val READ_ASYNC_EXECUTOR: Array[ExecutorService] = new Array[ExecutorService](Systems.CPU_CORES * 4)
593593

594594
for (i <- APPEND_CALLBACK_EXECUTOR.indices) {
595595
APPEND_CALLBACK_EXECUTOR(i) = Executors.newSingleThreadExecutor(ThreadUtils.createThreadFactory("log-append-callback-executor-" + i, true))

0 commit comments

Comments
 (0)