diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java index e45f95e5..90e3833c 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; + /** * Wrapper to perform operations on iceberg tables * @@ -58,19 +59,19 @@ protected List deduplicateBatch(List events) { ConcurrentHashMap deduplicatedEvents = new ConcurrentHashMap<>(); events.forEach(e -> { - if (e.key() == null || e.key().isNull()) { - throw new DebeziumException("Cannot deduplicate data with null key! destination:'" + e.destination() + "' event: '" + e.value().toString() + "'"); - } - - // deduplicate using key(PK) - deduplicatedEvents.merge(e.key(), e, (oldValue, newValue) -> { - if (this.compareByTsThenOp(oldValue, newValue) <= 0) { - return newValue; - } else { - return oldValue; + if (e.key() == null || e.key().isNull()) { + throw new DebeziumException("Cannot deduplicate data with null key! destination:'" + e.destination() + "' event: '" + e.value().toString() + "'"); } - }); - } + + // deduplicate using key(PK) + deduplicatedEvents.merge(e.key(), e, (oldValue, newValue) -> { + if (this.compareByTsThenOp(oldValue, newValue) <= 0) { + return newValue; + } else { + return oldValue; + } + }); + } ); return new ArrayList<>(deduplicatedEvents.values()); @@ -174,13 +175,13 @@ public void addToTable(Table icebergTable, List events) { private void addToTablePerSchema(Table icebergTable, List events) { // Initialize a task writer to write both INSERT and equality DELETE. final Schema tableSchema = icebergTable.schema(); - try (BaseTaskWriter writer = writerFactory.create(icebergTable)) { + BaseTaskWriter writer = writerFactory.create(icebergTable); + try (writer) { for (RecordConverter e : events) { final GenericRecord record = e.convert(tableSchema); writer.write(record); } - writer.close(); WriteResult files = writer.complete(); if (files.deleteFiles().length > 0) { RowDelta newRowDelta = icebergTable.newRowDelta(); @@ -193,6 +194,11 @@ private void addToTablePerSchema(Table icebergTable, List event appendFiles.commit(); } } catch (IOException ex) { + try { + writer.abort(); + } catch (IOException e) { + // pass + } throw new DebeziumException("Failed to write data to table:`" + icebergTable.name() + "`", ex); }