Skip to content

Commit

Permalink
Use config class, to centralize config access and management (#510)
Browse files Browse the repository at this point in the history
* Add root config class, to centralize config access
  • Loading branch information
ismailsimsek authored Feb 22, 2025
1 parent 99baab5 commit d10e616
Show file tree
Hide file tree
Showing 12 changed files with 221 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,13 @@
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,13 +42,9 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;

/**
* Implementation of the consumer that delivers the messages to iceberg tables.
*
Expand All @@ -65,70 +58,39 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
protected static final Serde<JsonNode> valSerde = DebeziumSerdes.payloadJson(JsonNode.class);
protected static final Serde<JsonNode> keySerde = DebeziumSerdes.payloadJson(JsonNode.class);
private static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeConsumer.class);
public static final String PROP_PREFIX = "debezium.sink.iceberg.";
static Deserializer<JsonNode> valDeserializer;
static Deserializer<JsonNode> keyDeserializer;
protected final Clock clock = Clock.system();
final Configuration hadoopConf = new Configuration();
final Map<String, String> icebergProperties = new ConcurrentHashMap<>();
protected long consumerStart = clock.currentTimeInMillis();
protected long numConsumedEvents = 0;
protected Threads.Timer logTimer = Threads.timer(clock, LOG_INTERVAL);
@ConfigProperty(name = "debezium.format.value", defaultValue = "json")
String valueFormat;
@ConfigProperty(name = "debezium.format.key", defaultValue = "json")
String keyFormat;
@ConfigProperty(name = PROP_PREFIX + CatalogProperties.WAREHOUSE_LOCATION)
String warehouseLocation;
@ConfigProperty(name = "debezium.sink.iceberg.destination-regexp", defaultValue = "")
protected Optional<String> destinationRegexp;
@ConfigProperty(name = "debezium.sink.iceberg.destination-regexp-replace", defaultValue = "")
protected Optional<String> destinationRegexpReplace;
@ConfigProperty(name = "debezium.sink.iceberg.destination-uppercase-table-names", defaultValue = "false")
protected boolean destinationUppercaseTableNames;
@ConfigProperty(name = "debezium.sink.iceberg.destination-lowercase-table-names", defaultValue = "false")
protected boolean destinationLowercaseTableNames;
@ConfigProperty(name = "debezium.sink.iceberg.table-prefix", defaultValue = "")
Optional<String> tablePrefix;
@ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default")
String namespace;
@ConfigProperty(name = "debezium.sink.iceberg.catalog-name", defaultValue = "default")
String catalogName;
@ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true")
boolean upsert;
@ConfigProperty(name = "debezium.sink.iceberg.create-identifier-fields", defaultValue = "true")
boolean createIdentifierFields;
@ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait")
String batchSizeWaitName;
@ConfigProperty(name = "debezium.format.value.schemas.enable", defaultValue = "false")
boolean eventSchemaEnabled;
@ConfigProperty(name = "debezium.sink.iceberg." + DEFAULT_FILE_FORMAT, defaultValue = DEFAULT_FILE_FORMAT_DEFAULT)
String writeFormat;

@Inject
@Any
Instance<InterfaceBatchSizeWait> batchSizeWaitInstances;
InterfaceBatchSizeWait batchSizeWait;
Catalog icebergCatalog;
@Inject
IcebergTableOperator icebergTableOperator;
@Inject
IcebergConsumerConfig config;


@PostConstruct
void connect() {
if (!valueFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) {
throw new DebeziumException("debezium.format.value={" + valueFormat + "} not supported! Supported (debezium.format.value=*) formats are {json,}!");
if (!config.valueFormat().equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) {
throw new DebeziumException("debezium.format.value={" + config.valueFormat() + "} not supported! Supported (debezium.format.value=*) formats are {json,}!");
}
if (!keyFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) {
throw new DebeziumException("debezium.format.key={" + valueFormat + "} not supported! Supported (debezium.format.key=*) formats are {json,}!");
if (!config.keyFormat().equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) {
throw new DebeziumException("debezium.format.key={" + config.valueFormat() + "} not supported! Supported (debezium.format.key=*) formats are {json,}!");
}

// pass iceberg properties to iceberg and hadoop
Map<String, String> conf = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_PREFIX);
conf.forEach(this.hadoopConf::set);
this.icebergProperties.putAll(conf);

icebergCatalog = CatalogUtil.buildIcebergCatalog(catalogName, icebergProperties, hadoopConf);
config.icebergConfigs().forEach(this.hadoopConf::set);

batchSizeWait = IcebergUtil.selectInstance(batchSizeWaitInstances, batchSizeWaitName);
icebergCatalog = CatalogUtil.buildIcebergCatalog(config.catalogName(), config.icebergConfigs(), hadoopConf);
batchSizeWait = IcebergUtil.selectInstance(batchSizeWaitInstances, config.batchSizeWaitName());
batchSizeWait.initizalize();

// configure and set
Expand Down Expand Up @@ -177,11 +139,11 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
*/
public Table loadIcebergTable(TableIdentifier tableId, RecordConverter sampleEvent) {
return IcebergUtil.loadIcebergTable(icebergCatalog, tableId).orElseGet(() -> {
if (!eventSchemaEnabled) {
if (!config.eventSchemaEnabled()) {
throw new RuntimeException("Table '" + tableId + "' not found! " + "Set `debezium.format.value.schemas.enable` to true to create tables automatically!");
}
try {
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(createIdentifierFields), writeFormat);
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(config.createIdentifierFields()), config.writeFormat());
} catch (Exception e) {
throw new DebeziumException("Failed to create table from debezium event schema:" + tableId + " Error:" + e.getMessage(), e);
}
Expand All @@ -205,15 +167,15 @@ protected void logConsumerProgress(long numUploadedEvents) {

public TableIdentifier mapDestination(String destination) {
final String tableName = destination
.replaceAll(destinationRegexp.orElse(""), destinationRegexpReplace.orElse(""))
.replaceAll(config.destinationRegexp().orElse(""), config.destinationRegexpReplace().orElse(""))
.replace(".", "_");

if (destinationUppercaseTableNames) {
return TableIdentifier.of(Namespace.of(namespace), (tablePrefix.orElse("") + tableName).toUpperCase());
} else if (destinationLowercaseTableNames) {
return TableIdentifier.of(Namespace.of(namespace), (tablePrefix.orElse("") + tableName).toLowerCase());
if (config.destinationUppercaseTableNames()) {
return TableIdentifier.of(Namespace.of(config.namespace()), (config.tablePrefix().orElse("") + tableName).toUpperCase());
} else if (config.destinationLowercaseTableNames()) {
return TableIdentifier.of(Namespace.of(config.namespace()), (config.tablePrefix().orElse("") + tableName).toLowerCase());
} else {
return TableIdentifier.of(Namespace.of(namespace), tablePrefix.orElse("") + tableName);
return TableIdentifier.of(Namespace.of(config.namespace()), config.tablePrefix().orElse("") + tableName);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package io.debezium.server.iceberg;

import io.debezium.jdbc.TemporalPrecisionMode;
import io.quarkus.runtime.annotations.ConfigRoot;
import io.smallrye.config.ConfigMapping;
import io.smallrye.config.WithDefault;
import io.smallrye.config.WithName;
import org.apache.iceberg.CatalogProperties;

import java.util.Map;
import java.util.Optional;

import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;

@ConfigRoot
@ConfigMapping
public interface IcebergConsumerConfig {
public static final String PROP_PREFIX = "debezium.sink.iceberg";
@WithName(value = PROP_PREFIX)
public Map<String, String> icebergConfigs();

@WithName(value = "debezium.sink.iceberg.upsert-op-field")
@WithDefault(value = "__op")
public String cdcOpField();
@WithName(value = "debezium.sink.iceberg.upsert-dedup-column")
@WithDefault(value = "__source_ts_ms")
public String cdcSourceTsMsField();

@WithName(value = "debezium.source.time.precision.mode")
public TemporalPrecisionMode temporalPrecisionMode();

@WithName(value = "debezium.format.value")
@WithDefault(value = "json")
String valueFormat();
@WithName(value = "debezium.format.key")
@WithDefault(value = "json")
String keyFormat();
@WithName(value = "debezium.sink.iceberg.upsert")
@WithDefault(value = "true")
public boolean upsert();
@WithName(value = "debezium.sink.iceberg." + CatalogProperties.WAREHOUSE_LOCATION)
String warehouseLocation();
@WithName(value = "debezium.sink.iceberg.destination-regexp")
// @WithDefault(value = "")
public Optional<String> destinationRegexp();
@WithName(value = "debezium.sink.iceberg.destination-regexp-replace")
// @WithDefault(value = "")
public Optional<String> destinationRegexpReplace();
@WithName(value = "debezium.sink.iceberg.destination-uppercase-table-names")
@WithDefault(value = "false")
public boolean destinationUppercaseTableNames();
@WithName(value = "debezium.sink.iceberg.destination-lowercase-table-names")
@WithDefault(value = "false")
public boolean destinationLowercaseTableNames();
@WithName(value = "debezium.sink.iceberg.table-prefix")
// @WithDefault(value = "")
Optional<String> tablePrefix();
@WithName(value = "debezium.sink.iceberg.table-namespace")
@WithDefault(value = "default")
public String namespace();
@WithName(value = "debezium.sink.iceberg.catalog-name")
@WithDefault(value = "default")
String catalogName();
@WithName(value = "debezium.sink.iceberg.create-identifier-fields")
@WithDefault(value = "true")
public boolean createIdentifierFields();
@WithName(value = "debezium.sink.batch.batch-size-wait")
@WithDefault(value = "NoBatchSizeWait")
String batchSizeWaitName();
@WithName(value = "debezium.format.value.schemas.enable")
@WithDefault(value = "false")
boolean eventSchemaEnabled();
@WithName(value = "debezium.sink.iceberg." + DEFAULT_FILE_FORMAT)
@WithDefault(value = DEFAULT_FILE_FORMAT_DEFAULT)
String writeFormat();
@WithName(value = "debezium.sink.iceberg.allow-field-addition")
@WithDefault(value = "true")
public boolean allowFieldAddition();
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,22 +88,13 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D
.asc("event_sink_timestamptz", NullOrder.NULLS_LAST)
.build();
private static final Logger LOGGER = LoggerFactory.getLogger(IcebergEventsChangeConsumer.class);
private static final String PROP_PREFIX = "debezium.sink.iceberg.";
static Deserializer<JsonNode> valDeserializer;
static Deserializer<JsonNode> keyDeserializer;
final Configuration hadoopConf = new Configuration();
@ConfigProperty(name = "debezium.format.value", defaultValue = "json")
String valueFormat;
@ConfigProperty(name = "debezium.format.key", defaultValue = "json")
String keyFormat;
@ConfigProperty(name = PROP_PREFIX + CatalogProperties.WAREHOUSE_LOCATION)
String warehouseLocation;
@ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default")
String namespace;
@ConfigProperty(name = "debezium.sink.iceberg.catalog-name", defaultValue = "default")
String catalogName;
@ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait")
String batchSizeWaitName;

@Inject
IcebergConsumerConfig config;

@Inject
@Any
Instance<InterfaceBatchSizeWait> batchSizeWaitInstances;
Expand All @@ -113,20 +104,18 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D

@PostConstruct
void connect() {
if (!valueFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) {
throw new DebeziumException("debezium.format.value={" + valueFormat + "} not supported, " +
"Supported (debezium.format.value=*) formats are {json,}!");
if (!config.valueFormat().equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) {
throw new DebeziumException("debezium.format.value={" + config.valueFormat() + "} not supported! Supported (debezium.format.value=*) formats are {json,}!");
}
if (!keyFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) {
throw new DebeziumException("debezium.format.key={" + valueFormat + "} not supported, " +
"Supported (debezium.format.key=*) formats are {json,}!");
if (!config.keyFormat().equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) {
throw new DebeziumException("debezium.format.key={" + config.valueFormat() + "} not supported! Supported (debezium.format.key=*) formats are {json,}!");
}

Map<String, String> icebergProperties = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_PREFIX);
icebergProperties.forEach(this.hadoopConf::set);
// pass iceberg properties to iceberg and hadoop
config.icebergConfigs().forEach(this.hadoopConf::set);

icebergCatalog = CatalogUtil.buildIcebergCatalog(catalogName, icebergProperties, hadoopConf);
TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), TABLE_NAME);
icebergCatalog = CatalogUtil.buildIcebergCatalog(config.catalogName(), config.icebergConfigs(), hadoopConf);
TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(config.namespace()), TABLE_NAME);

// create table if not exists
if (!icebergCatalog.tableExists(tableIdentifier)) {
Expand All @@ -139,7 +128,7 @@ void connect() {
// load table
eventTable = icebergCatalog.loadTable(tableIdentifier);

batchSizeWait = IcebergUtil.selectInstance(batchSizeWaitInstances, batchSizeWaitName);
batchSizeWait = IcebergUtil.selectInstance(batchSizeWaitInstances, config.batchSizeWaitName());
batchSizeWait.initizalize();

// configure and set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.debezium.DebeziumException;
import io.debezium.server.iceberg.IcebergConsumerConfig;
import io.debezium.server.iceberg.RecordConverter;
import io.debezium.server.iceberg.SchemaConverter;
import jakarta.enterprise.context.Dependent;
Expand Down Expand Up @@ -45,19 +46,10 @@ public class IcebergTableOperator {

static final ImmutableMap<Operation, Integer> CDC_OPERATION_PRIORITY = ImmutableMap.of(Operation.INSERT, 1, Operation.READ, 2, Operation.UPDATE, 3, Operation.DELETE, 4);
private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperator.class);
@ConfigProperty(name = "debezium.sink.iceberg.upsert-dedup-column", defaultValue = "__source_ts_ms")
String cdcSourceTsMsField;
@ConfigProperty(name = "debezium.sink.iceberg.upsert-op-field", defaultValue = "__op")
String cdcOpField;
@ConfigProperty(name = "debezium.sink.iceberg.allow-field-addition", defaultValue = "true")
boolean allowFieldAddition;
@ConfigProperty(name = "debezium.sink.iceberg.create-identifier-fields", defaultValue = "true")
boolean createIdentifierFields;
@Inject
IcebergTableWriterFactory writerFactory;

@ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true")
boolean upsert;
@Inject
IcebergConsumerConfig config;

protected List<RecordConverter> deduplicateBatch(List<RecordConverter> events) {

Expand Down Expand Up @@ -101,13 +93,13 @@ protected List<RecordConverter> deduplicateBatch(List<RecordConverter> events) {
*/
private int compareByTsThenOp(RecordConverter lhs, RecordConverter rhs) {

int result = Long.compare(lhs.cdcSourceTsMsValue(cdcSourceTsMsField), rhs.cdcSourceTsMsValue(cdcSourceTsMsField));
int result = Long.compare(lhs.cdcSourceTsMsValue(config.cdcSourceTsMsField()), rhs.cdcSourceTsMsValue(config.cdcSourceTsMsField()));

if (result == 0) {
// return (x < y) ? -1 : ((x == y) ? 0 : 1);
result = CDC_OPERATION_PRIORITY.getOrDefault(lhs.cdcOpValue(cdcOpField), -1)
result = CDC_OPERATION_PRIORITY.getOrDefault(lhs.cdcOpValue(config.cdcOpField()), -1)
.compareTo(
CDC_OPERATION_PRIORITY.getOrDefault(rhs.cdcOpValue(cdcOpField), -1)
CDC_OPERATION_PRIORITY.getOrDefault(rhs.cdcOpValue(config.cdcOpField()), -1)
);
}

Expand Down Expand Up @@ -152,11 +144,11 @@ private void applyFieldAddition(Table icebergTable, Schema newSchema) {
public void addToTable(Table icebergTable, List<RecordConverter> events) {

// when operation mode is not upsert deduplicate the events to avoid inserting duplicate row
if (upsert && !icebergTable.schema().identifierFieldIds().isEmpty()) {
if (config.upsert() && !icebergTable.schema().identifierFieldIds().isEmpty()) {
events = deduplicateBatch(events);
}

if (!allowFieldAddition) {
if (!config.allowFieldAddition()) {
// if field additions not enabled add set of events to table
addToTablePerSchema(icebergTable, events);
} else {
Expand All @@ -167,7 +159,7 @@ public void addToTable(Table icebergTable, List<RecordConverter> events) {

for (Map.Entry<SchemaConverter, List<RecordConverter>> schemaEvents : eventsGroupedBySchema.entrySet()) {
// extend table schema if new fields found
applyFieldAddition(icebergTable, schemaEvents.getValue().get(0).icebergSchema(createIdentifierFields));
applyFieldAddition(icebergTable, schemaEvents.getValue().get(0).icebergSchema(config.createIdentifierFields()));
// add set of events to table
addToTablePerSchema(icebergTable, schemaEvents.getValue());
}
Expand All @@ -187,7 +179,7 @@ private void addToTablePerSchema(Table icebergTable, List<RecordConverter> event
BaseTaskWriter<Record> writer = writerFactory.create(icebergTable);
try (writer) {
for (RecordConverter e : events) {
final RecordWrapper record = (upsert && !tableSchema.identifierFieldIds().isEmpty()) ? e.convert(tableSchema, cdcOpField) : e.convertAsAppend(tableSchema);
final RecordWrapper record = (config.upsert() && !tableSchema.identifierFieldIds().isEmpty()) ? e.convert(tableSchema, config.cdcOpField()) : e.convertAsAppend(tableSchema);
writer.write(record);
}

Expand Down
Loading

0 comments on commit d10e616

Please sign in to comment.