From cb7a9b3e0d454341c338cd5ff51045bc49ddc70c Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Mon, 16 Dec 2024 15:42:47 +0800 Subject: [PATCH] Exist connector process when encounter exception (#1140) (cherry picked from commit 1a6a0cdcb6ad36776e431c59b1c58935d163c2dc) --- .../pulsar/io/jcloud/sink/BlobStoreAbstractSink.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java b/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java index 663e535b..3223e0bf 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java @@ -285,13 +285,14 @@ private void unsafeFlush() { schema = getPulsarSchema(firstRecord); } catch (Exception e) { log.error("Failed to retrieve message schema", e); - bulkHandleFailedRecords(singleTopicRecordsToInsert); + bulkHandleFailedRecords(e, singleTopicRecordsToInsert); return; } if (!format.doSupportPulsarSchemaType(schema.getSchemaInfo().getType())) { - log.warn("sink does not support schema type {}", schema.getSchemaInfo().getType()); - bulkHandleFailedRecords(singleTopicRecordsToInsert); + String errorMsg = "Sink does not support schema type of pulsar: " + schema.getSchemaInfo().getType(); + log.error(errorMsg); + bulkHandleFailedRecords(new UnsupportedOperationException(errorMsg), singleTopicRecordsToInsert); return; } @@ -345,15 +346,16 @@ private void unsafeFlush() { } else { log.error("Encountered unknown error writing to blob {}", filepath, e); } - bulkHandleFailedRecords(singleTopicRecordsToInsert); + bulkHandleFailedRecords(e, singleTopicRecordsToInsert); } } } - private void bulkHandleFailedRecords(List> failedRecords) { + private void bulkHandleFailedRecords(Throwable t, List> failedRecords) { if (sinkConfig.isSkipFailedMessages()) { failedRecords.forEach(Record::ack); } else { + sinkContext.fatal(t); failedRecords.forEach(Record::fail); } if (sinkContext != null) {