Skip to content

Commit

Permalink
Create Config Class for BatchSizeWait (#518)
Browse files Browse the repository at this point in the history
* Create Config Class for BatchSizeWait

* Create Config Class for BatchSizeWait
  • Loading branch information
ismailsimsek authored Feb 23, 2025
1 parent 160ab71 commit 70ef56c
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import io.debezium.engine.format.Json;
import io.debezium.serde.DebeziumSerdes;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.iceberg.batchsizewait.InterfaceBatchSizeWait;
import io.debezium.server.iceberg.batchsizewait.BatchSizeWait;
import io.debezium.server.iceberg.tableoperator.IcebergTableOperator;
import io.debezium.util.Clock;
import io.debezium.util.Strings;
Expand Down Expand Up @@ -68,8 +68,8 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu

@Inject
@Any
Instance<InterfaceBatchSizeWait> batchSizeWaitInstances;
InterfaceBatchSizeWait batchSizeWait;
Instance<BatchSizeWait> batchSizeWaitInstances;
BatchSizeWait batchSizeWait;
Catalog icebergCatalog;
@Inject
IcebergTableOperator icebergTableOperator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import io.debezium.engine.format.Json;
import io.debezium.serde.DebeziumSerdes;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.iceberg.batchsizewait.InterfaceBatchSizeWait;
import io.debezium.server.iceberg.batchsizewait.BatchSizeWait;
import io.debezium.server.iceberg.tableoperator.PartitionedAppendWriter;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.Dependent;
Expand All @@ -25,7 +25,14 @@
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.*;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.NullOrder;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
Expand All @@ -38,8 +45,6 @@
import org.apache.iceberg.types.Types;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -49,7 +54,10 @@
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.iceberg.types.Types.NestedField.optional;
Expand Down Expand Up @@ -97,8 +105,8 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D

@Inject
@Any
Instance<InterfaceBatchSizeWait> batchSizeWaitInstances;
InterfaceBatchSizeWait batchSizeWait;
Instance<BatchSizeWait> batchSizeWaitInstances;
BatchSizeWait batchSizeWait;
Catalog icebergCatalog;
Table eventTable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*
* @author Ismail Simsek
*/
public interface InterfaceBatchSizeWait {
public interface BatchSizeWait {

default void initizalize() {
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.debezium.server.iceberg.batchsizewait;

import io.debezium.config.CommonConnectorConfig;
import io.quarkus.runtime.annotations.ConfigRoot;
import io.smallrye.config.ConfigMapping;
import io.smallrye.config.WithDefault;
import io.smallrye.config.WithName;

@ConfigRoot
@ConfigMapping
public interface BatchSizeWaitConfig {
@WithName("debezium.source.max.queue.size")
@WithDefault(CommonConnectorConfig.DEFAULT_MAX_QUEUE_SIZE + "")
int sourceMaxQueueSize();

@WithName("debezium.source.max.batch.size")
@WithDefault(CommonConnectorConfig.DEFAULT_MAX_BATCH_SIZE + "")
int sourceMaxBatchSize();

@WithName("debezium.sink.batch.batch-size-wait.max-wait-ms")
@WithDefault("300000")
int batchSizeWaitMaxWaitMs();

@WithName("debezium.sink.batch.batch-size-wait.wait-interval-ms")
@WithDefault("10000")
int batchSizeWaitWaitIntervalMs();

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,10 @@
package io.debezium.server.iceberg.batchsizewait;

import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.server.DebeziumMetrics;

import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -26,24 +23,17 @@
*/
@Dependent
@Named("MaxBatchSizeWait")
public class MaxBatchSizeWait implements InterfaceBatchSizeWait {
public class MaxBatchSizeWait implements BatchSizeWait {
protected static final Logger LOGGER = LoggerFactory.getLogger(MaxBatchSizeWait.class);

@ConfigProperty(name = "debezium.source.max.queue.size", defaultValue = CommonConnectorConfig.DEFAULT_MAX_QUEUE_SIZE + "")
int maxQueueSize;
@ConfigProperty(name = "debezium.source.max.batch.size", defaultValue = CommonConnectorConfig.DEFAULT_MAX_BATCH_SIZE + "")
int maxBatchSize;
@ConfigProperty(name = "debezium.sink.batch.batch-size-wait.max-wait-ms", defaultValue = "300000")
int maxWaitMs;
@ConfigProperty(name = "debezium.sink.batch.batch-size-wait.wait-interval-ms", defaultValue = "10000")
int waitIntervalMs;

@Inject
BatchSizeWaitConfig config;
@Inject
DebeziumMetrics dbzMetrics;

@Override
public void initizalize() throws DebeziumException {
assert waitIntervalMs < maxWaitMs : "`wait-interval-ms` cannot be bigger than `max-wait-ms`";
assert config.batchSizeWaitWaitIntervalMs() < config.batchSizeWaitMaxWaitMs() : "`wait-interval-ms` cannot be bigger than `max-wait-ms`";
}

@Override
Expand All @@ -57,22 +47,22 @@ public void waitMs(Integer numRecordsProcessed, Integer processingTimeMs) throws
LOGGER.debug("Processed {}, QueueCurrentSize:{}, QueueTotalCapacity:{}, SecondsBehindSource:{}, SnapshotCompleted:{}",
numRecordsProcessed,
dbzMetrics.streamingQueueCurrentSize(),
maxQueueSize,
config.sourceMaxQueueSize(),
(int) (dbzMetrics.streamingMilliSecondsBehindSource() / 1000),
dbzMetrics.snapshotCompleted()
);

int totalWaitMs = 0;
while (totalWaitMs < maxWaitMs && dbzMetrics.streamingQueueCurrentSize() < maxBatchSize) {
totalWaitMs += waitIntervalMs;
while (totalWaitMs < config.batchSizeWaitMaxWaitMs() && dbzMetrics.streamingQueueCurrentSize() < config.sourceMaxBatchSize()) {
totalWaitMs += config.batchSizeWaitWaitIntervalMs();
LOGGER.debug("Sleeping {} Milliseconds, QueueCurrentSize:{} < maxBatchSize:{}",
waitIntervalMs, dbzMetrics.streamingQueueCurrentSize(), maxBatchSize);
config.batchSizeWaitWaitIntervalMs(), dbzMetrics.streamingQueueCurrentSize(), config.sourceMaxBatchSize());

Thread.sleep(waitIntervalMs);
Thread.sleep(config.batchSizeWaitWaitIntervalMs());
}

LOGGER.debug("Total wait {} Milliseconds, QueueCurrentSize:{} < maxBatchSize:{}",
totalWaitMs, dbzMetrics.streamingQueueCurrentSize(), maxBatchSize);
totalWaitMs, dbzMetrics.streamingQueueCurrentSize(), config.sourceMaxBatchSize());

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@
*/
@Dependent
@Named("NoBatchSizeWait")
public class NoBatchSizeWait implements InterfaceBatchSizeWait {
public class NoBatchSizeWait implements BatchSizeWait {
}

0 comments on commit 70ef56c

Please sign in to comment.