Skip to content

Commit

Permalink
Organize Config And Set recommended default values (#517)
Browse files Browse the repository at this point in the history
* Set recommended default values

* Create Config Class for BatchSizeWait

* Create Config Class for BatchSizeWait

* Create Config Class for BatchSizeWait

* Create Config Class for BatchSizeWait
  • Loading branch information
ismailsimsek authored Feb 23, 2025
1 parent 70ef56c commit ae76cfb
Show file tree
Hide file tree
Showing 23 changed files with 349 additions and 207 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package io.debezium.server.iceberg;

import io.debezium.jdbc.TemporalPrecisionMode;
import io.quarkus.runtime.annotations.ConfigRoot;
import io.smallrye.config.ConfigMapping;
import io.smallrye.config.WithDefault;
import io.smallrye.config.WithName;
import io.smallrye.config.WithParentName;

@ConfigRoot
@ConfigMapping
public interface DebeziumConfig {

@WithParentName
IcebergConfig sinkConfig();

@WithName("debezium.source.time.precision.mode")
@WithDefault("isostring")
TemporalPrecisionMode temporalPrecisionMode();

// Event format
@WithName("debezium.format.value")
@WithDefault("json")
String valueFormat();

@WithName("debezium.format.key")
@WithDefault("json")
String keyFormat();

@WithName("debezium.format.value.schemas.enable")
@WithDefault("true")
boolean eventSchemaEnabled();

@WithName("debezium.format.key.schemas.enable")
@WithDefault("true")
boolean eventKeySchemaEnabled();

// SET RECOMMENDED DEFAULT VALUES FOR DEBEZIUM CONFIGS
//# Save debezium offset state to destination, iceberg table
@WithName("debezium.source.offset.storage")
@WithDefault("io.debezium.server.iceberg.offset.IcebergOffsetBackingStore")
String offsetStorage();

@WithName("debezium.source.offset.storage.iceberg.table-name")
@WithDefault("_debezium_offset_storage")
String offsetStorageTable();

// Save schema history to iceberg table
@WithName("debezium.source.schema.history.internal")
@WithDefault("io.debezium.server.iceberg.history.IcebergSchemaHistory")
String schemaHistoryStorage();

@WithName("debezium.source.schema.history.internal.iceberg.table-name")
@WithDefault("_debezium_database_history_storage")
String schemaHistoryStorageTable();

// Event flattening. unwrap message!
@WithName("debezium.transforms")
@WithDefault("unwrap")
String transforms();

@WithName("debezium.transforms.unwrap.type")
@WithDefault("io.debezium.transforms.ExtractNewRecordState")
String unwrapType();

@WithName("debezium.transforms.unwrap.add.fields")
@WithDefault("op,table,source.ts_ms,db,ts_ms")
String unwrapAddFields();

@WithName("debezium.transforms.unwrap.delete.handling.mode")
@WithDefault("rewrite")
String unwrapDeleteHandlingMode();

@WithName("debezium.transforms.unwrap.drop.tombstones")
@WithDefault("true")
String unwrapDeleteTombstoneHandlingMode();

default boolean isIsoStringTemporalMode() {
return temporalPrecisionMode() == TemporalPrecisionMode.ISOSTRING;
}

default boolean isAdaptiveTemporalMode() {
return temporalPrecisionMode() == TemporalPrecisionMode.ADAPTIVE ||
temporalPrecisionMode() == TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.debezium.server.iceberg;

import io.quarkus.runtime.annotations.ConfigRoot;
import io.smallrye.config.ConfigMapping;
import io.smallrye.config.WithParentName;

@ConfigRoot
@ConfigMapping
public interface GlobalConfig {

@WithParentName
IcebergConfig iceberg();

@WithParentName
DebeziumConfig debezium();

}
Original file line number Diff line number Diff line change
Expand Up @@ -74,23 +74,23 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
@Inject
IcebergTableOperator icebergTableOperator;
@Inject
IcebergConsumerConfig config;
GlobalConfig config;


@PostConstruct
void connect() {
if (!config.valueFormat().equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) {
throw new DebeziumException("debezium.format.value={" + config.valueFormat() + "} not supported! Supported (debezium.format.value=*) formats are {json,}!");
if (!config.debezium().valueFormat().equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) {
throw new DebeziumException("debezium.format.value={" + config.debezium().valueFormat() + "} not supported! Supported (debezium.format.value=*) formats are {json,}!");
}
if (!config.keyFormat().equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) {
throw new DebeziumException("debezium.format.key={" + config.valueFormat() + "} not supported! Supported (debezium.format.key=*) formats are {json,}!");
if (!config.debezium().keyFormat().equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) {
throw new DebeziumException("debezium.format.key={" + config.debezium().valueFormat() + "} not supported! Supported (debezium.format.key=*) formats are {json,}!");
}

// pass iceberg properties to iceberg and hadoop
config.icebergConfigs().forEach(this.hadoopConf::set);
config.iceberg().icebergConfigs().forEach(this.hadoopConf::set);

icebergCatalog = CatalogUtil.buildIcebergCatalog(config.catalogName(), config.icebergConfigs(), hadoopConf);
batchSizeWait = IcebergUtil.selectInstance(batchSizeWaitInstances, config.batchSizeWaitName());
icebergCatalog = CatalogUtil.buildIcebergCatalog(config.iceberg().catalogName(), config.iceberg().icebergConfigs(), hadoopConf);
batchSizeWait = IcebergUtil.selectInstance(batchSizeWaitInstances, config.iceberg().batchSizeWaitName());
batchSizeWait.initizalize();

// configure and set
Expand Down Expand Up @@ -142,7 +142,7 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
*/
public Table loadIcebergTable(TableIdentifier tableId, RecordConverter sampleEvent) {
return IcebergUtil.loadIcebergTable(icebergCatalog, tableId).orElseGet(() -> {
if (!config.eventSchemaEnabled()) {
if (!config.debezium().eventSchemaEnabled()) {
throw new RuntimeException("Table '" + tableId + "' not found! " + "Set `debezium.format.value.schemas.enable` to true to create tables automatically!");
}
try {
Expand All @@ -152,10 +152,10 @@ public Table loadIcebergTable(TableIdentifier tableId, RecordConverter sampleEve
// "schema change topic" https://debezium.io/documentation/reference/3.0/connectors/mysql.html#mysql-schema-change-topic
if (sampleEvent.isSchemaChangeEvent()) {
LOGGER.warn("Schema change topic detected. Creating Iceberg schema without identifier fields for append-only mode.");
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, new Schema(schema.columns()), config.writeFormat());
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, new Schema(schema.columns()), config.iceberg().writeFormat());
}

return IcebergUtil.createIcebergTable(icebergCatalog, tableId, schema, config.writeFormat());
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, schema, config.iceberg().writeFormat());
} catch (Exception e) {
throw new DebeziumException("Failed to create table from debezium event schema:" + tableId + " Error:" + e.getMessage(), e);
}
Expand All @@ -179,15 +179,15 @@ protected void logConsumerProgress(long numUploadedEvents) {

public TableIdentifier mapDestination(String destination) {
final String tableName = destination
.replaceAll(config.destinationRegexp().orElse(""), config.destinationRegexpReplace().orElse(""))
.replaceAll(config.iceberg().destinationRegexp().orElse(""), config.iceberg().destinationRegexpReplace().orElse(""))
.replace(".", "_");

if (config.destinationUppercaseTableNames()) {
return TableIdentifier.of(Namespace.of(config.namespace()), (config.tablePrefix().orElse("") + tableName).toUpperCase());
} else if (config.destinationLowercaseTableNames()) {
return TableIdentifier.of(Namespace.of(config.namespace()), (config.tablePrefix().orElse("") + tableName).toLowerCase());
if (config.iceberg().destinationUppercaseTableNames()) {
return TableIdentifier.of(Namespace.of(config.iceberg().namespace()), (config.iceberg().tablePrefix().orElse("") + tableName).toUpperCase());
} else if (config.iceberg().destinationLowercaseTableNames()) {
return TableIdentifier.of(Namespace.of(config.iceberg().namespace()), (config.iceberg().tablePrefix().orElse("") + tableName).toLowerCase());
} else {
return TableIdentifier.of(Namespace.of(config.namespace()), config.tablePrefix().orElse("") + tableName);
return TableIdentifier.of(Namespace.of(config.iceberg().namespace()), config.iceberg().tablePrefix().orElse("") + tableName);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package io.debezium.server.iceberg;

import io.quarkus.runtime.annotations.ConfigRoot;
import io.smallrye.config.ConfigMapping;
import io.smallrye.config.WithDefault;
import io.smallrye.config.WithName;
import org.apache.iceberg.CatalogProperties;

import java.util.Map;
import java.util.Optional;

import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;

@ConfigRoot
@ConfigMapping
public interface IcebergConfig {
String PROP_PREFIX = "debezium.sink.iceberg";

@WithName(PROP_PREFIX)
Map<String, String> icebergConfigs();

@WithName("debezium.sink.iceberg.upsert-op-field")
@WithDefault("__op")
String cdcOpField();

@WithName("debezium.sink.iceberg.upsert-dedup-column")
@WithDefault("__source_ts_ms")
String cdcSourceTsMsField();

//
@WithName("debezium.sink.iceberg.upsert")
@WithDefault("true")
boolean upsert();

@WithName("debezium.sink.iceberg.upsert-keep-deletes")
@WithDefault("true")
boolean keepDeletes();

@WithName("debezium.sink.iceberg." + CatalogProperties.WAREHOUSE_LOCATION)
String warehouseLocation();

@WithName("debezium.sink.iceberg.destination-regexp")
// @WithDefault("")
Optional<String> destinationRegexp();

@WithName("debezium.sink.iceberg.destination-regexp-replace")
// @WithDefault("")
Optional<String> destinationRegexpReplace();

@WithName("debezium.sink.iceberg.destination-uppercase-table-names")
@WithDefault("false")
boolean destinationUppercaseTableNames();

@WithName("debezium.sink.iceberg.destination-lowercase-table-names")
@WithDefault("false")
boolean destinationLowercaseTableNames();

@WithName("debezium.sink.iceberg.table-prefix")
// @WithDefault("")
Optional<String> tablePrefix();

@WithName("debezium.sink.iceberg.table-namespace")
@WithDefault("default")
String namespace();

@WithName("debezium.sink.iceberg.catalog-name")
@WithDefault("default")
String catalogName();

@WithName("debezium.sink.iceberg.create-identifier-fields")
@WithDefault("true")
boolean createIdentifierFields();

@WithName("debezium.sink.batch.batch-size-wait")
@WithDefault("NoBatchSizeWait")
String batchSizeWaitName();

@WithName("debezium.sink.iceberg." + DEFAULT_FILE_FORMAT)
@WithDefault(DEFAULT_FILE_FORMAT_DEFAULT)
String writeFormat();

@WithName("debezium.sink.iceberg.allow-field-addition")
@WithDefault("true")
boolean allowFieldAddition();

}

This file was deleted.

Loading

0 comments on commit ae76cfb

Please sign in to comment.