Skip to content

Commit

Permalink
Call writer.abort() to cleanup, when exception happens (#417)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek authored Sep 8, 2024
1 parent 1b22b22 commit d5f5dc2
Showing 1 changed file with 20 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
* Wrapper to perform operations on iceberg tables
*
Expand Down Expand Up @@ -58,19 +59,19 @@ protected List<RecordConverter> deduplicateBatch(List<RecordConverter> events) {
ConcurrentHashMap<JsonNode, RecordConverter> deduplicatedEvents = new ConcurrentHashMap<>();

events.forEach(e -> {
if (e.key() == null || e.key().isNull()) {
throw new DebeziumException("Cannot deduplicate data with null key! destination:'" + e.destination() + "' event: '" + e.value().toString() + "'");
}

// deduplicate using key(PK)
deduplicatedEvents.merge(e.key(), e, (oldValue, newValue) -> {
if (this.compareByTsThenOp(oldValue, newValue) <= 0) {
return newValue;
} else {
return oldValue;
if (e.key() == null || e.key().isNull()) {
throw new DebeziumException("Cannot deduplicate data with null key! destination:'" + e.destination() + "' event: '" + e.value().toString() + "'");
}
});
}

// deduplicate using key(PK)
deduplicatedEvents.merge(e.key(), e, (oldValue, newValue) -> {
if (this.compareByTsThenOp(oldValue, newValue) <= 0) {
return newValue;
} else {
return oldValue;
}
});
}
);

return new ArrayList<>(deduplicatedEvents.values());
Expand Down Expand Up @@ -174,13 +175,13 @@ public void addToTable(Table icebergTable, List<RecordConverter> events) {
private void addToTablePerSchema(Table icebergTable, List<RecordConverter> events) {
// Initialize a task writer to write both INSERT and equality DELETE.
final Schema tableSchema = icebergTable.schema();
try (BaseTaskWriter<Record> writer = writerFactory.create(icebergTable)) {
BaseTaskWriter<Record> writer = writerFactory.create(icebergTable);
try (writer) {
for (RecordConverter e : events) {
final GenericRecord record = e.convert(tableSchema);
writer.write(record);
}

writer.close();
WriteResult files = writer.complete();
if (files.deleteFiles().length > 0) {
RowDelta newRowDelta = icebergTable.newRowDelta();
Expand All @@ -193,6 +194,11 @@ private void addToTablePerSchema(Table icebergTable, List<RecordConverter> event
appendFiles.commit();
}
} catch (IOException ex) {
try {
writer.abort();
} catch (IOException e) {
// pass
}
throw new DebeziumException("Failed to write data to table:`" + icebergTable.name() + "`", ex);
}

Expand Down

0 comments on commit d5f5dc2

Please sign in to comment.