Skip to content

Commit

Permalink
Exist connector process when encounter exception (#1140)
Browse files Browse the repository at this point in the history
(cherry picked from commit 1a6a0cd)
  • Loading branch information
shibd committed Dec 16, 2024
1 parent f784c83 commit cb7a9b3
Showing 1 changed file with 7 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -285,13 +285,14 @@ private void unsafeFlush() {
schema = getPulsarSchema(firstRecord);
} catch (Exception e) {
log.error("Failed to retrieve message schema", e);
bulkHandleFailedRecords(singleTopicRecordsToInsert);
bulkHandleFailedRecords(e, singleTopicRecordsToInsert);
return;
}

if (!format.doSupportPulsarSchemaType(schema.getSchemaInfo().getType())) {
log.warn("sink does not support schema type {}", schema.getSchemaInfo().getType());
bulkHandleFailedRecords(singleTopicRecordsToInsert);
String errorMsg = "Sink does not support schema type of pulsar: " + schema.getSchemaInfo().getType();
log.error(errorMsg);
bulkHandleFailedRecords(new UnsupportedOperationException(errorMsg), singleTopicRecordsToInsert);
return;
}

Expand Down Expand Up @@ -345,15 +346,16 @@ private void unsafeFlush() {
} else {
log.error("Encountered unknown error writing to blob {}", filepath, e);
}
bulkHandleFailedRecords(singleTopicRecordsToInsert);
bulkHandleFailedRecords(e, singleTopicRecordsToInsert);
}
}
}

private void bulkHandleFailedRecords(List<Record<GenericRecord>> failedRecords) {
private void bulkHandleFailedRecords(Throwable t, List<Record<GenericRecord>> failedRecords) {
if (sinkConfig.isSkipFailedMessages()) {
failedRecords.forEach(Record::ack);
} else {
sinkContext.fatal(t);
failedRecords.forEach(Record::fail);
}
if (sinkContext != null) {
Expand Down

0 comments on commit cb7a9b3

Please sign in to comment.