diff --git a/README.md b/README.md index fe317d4..9d342ec 100644 --- a/README.md +++ b/README.md @@ -66,7 +66,7 @@ The full list of configuration options for `kafka connector for SAP Systems` is * `batch.size` - This setting can be used to specify the number of records that can be pushed into SAP DB table in a single flush. Should be an `Integer`. Default is `3000`. - * `max.retries` - This setting can be used to specify the maximum no. of retries that can be made to re-establish the connection to SAP DB in case the connection is lost. Should be an `Integer`. Default is `10`. + * `max.retries` - (deprecated) This setting can be used to specify the maximum no. of retries that can be made to re-establish the connection to SAP DB in case the connection is lost. Should be an `Integer`. Default is `10`. This property is currently ignored as the task will automatically retry when a connection error results in a RetriableException for both source and sink tasks. * `{topic}.table.name` - This setting allows specifying the SAP DBs table name where the data needs to be written to. Should be a `String`. Must be compatible to SAP DB Table name like `"SCHEMA"."TABLE"`. diff --git a/src/main/scala/com/sap/kafka/client/hana/HANAJdbcClient.scala b/src/main/scala/com/sap/kafka/client/hana/HANAJdbcClient.scala index 8a75db1..577d434 100644 --- a/src/main/scala/com/sap/kafka/client/hana/HANAJdbcClient.scala +++ b/src/main/scala/com/sap/kafka/client/hana/HANAJdbcClient.scala @@ -53,7 +53,7 @@ case class HANAJdbcClient(hanaConfiguration: HANAConfig) { * @return The created JDBC [[Connection]] object */ def getConnection: Connection = { - ExecuteWithExceptions[Connection, HANAConnectorException, HANAJdbcConnectionException] ( + ExecuteWithExceptions[Connection, HANAConnectorRetriableException, HANAJdbcConnectionException] ( new HANAJdbcConnectionException("Cannot acquire a connection")) { () => val connectionUser: String = hanaConfiguration.connectionUser val connectionPassword: String = hanaConfiguration.connectionPassword diff --git a/src/main/scala/com/sap/kafka/client/hana/exceptions.scala b/src/main/scala/com/sap/kafka/client/hana/exceptions.scala index 53cc57f..6ca992b 100644 --- a/src/main/scala/com/sap/kafka/client/hana/exceptions.scala +++ b/src/main/scala/com/sap/kafka/client/hana/exceptions.scala @@ -1,15 +1,17 @@ package com.sap.kafka.client.hana -import com.sap.kafka.utils.ConnectorException +import org.apache.kafka.connect.errors.{ConnectException, RetriableException} -class HANAConnectorException(msg: String) extends ConnectorException(msg) +class HANAConnectorException(msg: String) extends ConnectException(msg) + +class HANAConnectorRetriableException(msg: String) extends RetriableException(msg) class HANAConfigInvalidInputException(msg: String) extends HANAConnectorException(msg) class HANAConfigMissingException(msg: String) extends HANAConnectorException(msg) -class HANAJdbcException(msg: String) extends HANAConnectorException(msg) +class HANAJdbcException(msg: String) extends HANAConnectorRetriableException(msg) -class HANAJdbcConnectionException(msg: String) extends HANAConnectorException(msg) +class HANAJdbcConnectionException(msg: String) extends HANAConnectorRetriableException(msg) -class HANAJdbcBadStateException(msg: String) extends HANAJdbcException(msg) +class HANAJdbcBadStateException(msg: String) extends HANAConnectorRetriableException(msg) diff --git a/src/main/scala/com/sap/kafka/connect/config/BaseConfig.scala b/src/main/scala/com/sap/kafka/connect/config/BaseConfig.scala index 9f6dccc..0ed3e3e 100644 --- a/src/main/scala/com/sap/kafka/connect/config/BaseConfig.scala +++ b/src/main/scala/com/sap/kafka/connect/config/BaseConfig.scala @@ -31,6 +31,7 @@ abstract class BaseConfig(props: Map[String, String]) { * Max retries for sink. Should be an integer. * Default is 10. */ + @deprecated("Retries are handled by the task","20-04-2023") def maxRetries = props.getOrElse[String]("max.retries", "1").toInt /** diff --git a/src/main/scala/com/sap/kafka/connect/sink/GenericSinkTask.scala b/src/main/scala/com/sap/kafka/connect/sink/GenericSinkTask.scala index dce90dd..a5e75b0 100644 --- a/src/main/scala/com/sap/kafka/connect/sink/GenericSinkTask.scala +++ b/src/main/scala/com/sap/kafka/connect/sink/GenericSinkTask.scala @@ -1,11 +1,10 @@ package com.sap.kafka.connect.sink import java.util - import com.sap.kafka.connect.config.BaseConfig -import com.sap.kafka.utils.ConnectorException import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.common.TopicPartition +import org.apache.kafka.connect.errors.ConnectException import org.apache.kafka.connect.sink.{SinkRecord, SinkTask} import org.slf4j.Logger @@ -17,7 +16,6 @@ abstract class GenericSinkTask extends SinkTask with SinkWriter { var log: Logger = _ var config:BaseConfig = null var writer:BaseWriter = null - private var retriesLeft:Int = 0 /** * Pass the SinkRecords to the writer for Writing @@ -26,7 +24,6 @@ abstract class GenericSinkTask extends SinkTask with SinkWriter { log.info(s"PHASE - 1 - get records from kafka, Started for task with assigned " + s"partitions ${this.context.assignment().toString} ") log.info(s"Number of Records read for Sink: ${records.size}") - retriesLeft = config.maxRetries if (records.isEmpty) { return } @@ -35,23 +32,11 @@ abstract class GenericSinkTask extends SinkTask with SinkWriter { try { writer.write(records) } catch { - case exception : ConnectorException => - log.error("Write of {} records failed, remainingRetries={}", records.size(), retriesLeft) - while (retriesLeft > 0) { - try { - retriesLeft = retriesLeft - 1 - writer.close() - writer = initWriter(config) - writer.write(records) - retriesLeft = -1 - } catch { - case exception: ConnectorException => - // ignore - } - } - - if (retriesLeft == 0) - throw exception + case exception : ConnectException => + log.error(s"Write of ${records.size} records failed, may retry later ...") + writer.close() + writer = initWriter(config) + throw exception } finally { log.info(s"PHASE - 1 ended for task, with assigned partitions ${this.context.assignment().toString}") } diff --git a/src/main/scala/com/sap/kafka/connect/sink/hana/HANASinkRecordsCollector.scala b/src/main/scala/com/sap/kafka/connect/sink/hana/HANASinkRecordsCollector.scala index 42ff5d7..51b23d2 100644 --- a/src/main/scala/com/sap/kafka/connect/sink/hana/HANASinkRecordsCollector.scala +++ b/src/main/scala/com/sap/kafka/connect/sink/hana/HANASinkRecordsCollector.scala @@ -18,13 +18,14 @@ import scala.collection.mutable class HANASinkRecordsCollector(var tableName: String, client: HANAJdbcClient, - connection: Connection, config: HANAConfig) { + config: HANAConfig) { private val log: Logger = LoggerFactory.getLogger(classOf[HANASinkTask]) private var records: Seq[SinkRecord] = Seq[SinkRecord]() private var tableMetaData:Seq[metaAttr] = Seq[metaAttr]() private var metaSchema: MetaSchema = null var tableConfigInitialized = false + private def initTableConfig(nameSpace: Option[String], tableName: String, topic: String) : Boolean = { tableConfigInitialized match { @@ -175,7 +176,7 @@ class HANASinkRecordsCollector(var tableName: String, client: HANAJdbcClient, this.records = records } - private[sink] def flush(): Seq[SinkRecord] = { + private[sink] def flush(connection: Connection): Seq[SinkRecord] = { val flushedRecords = records if (!records.isEmpty) { val insertMode = config.topicProperties(records.head.topic())("insert.mode") diff --git a/src/main/scala/com/sap/kafka/connect/sink/hana/HANAWriter.scala b/src/main/scala/com/sap/kafka/connect/sink/hana/HANAWriter.scala index 8b69df0..d64af6f 100644 --- a/src/main/scala/com/sap/kafka/connect/sink/hana/HANAWriter.scala +++ b/src/main/scala/com/sap/kafka/connect/sink/hana/HANAWriter.scala @@ -23,9 +23,11 @@ class HANAWriter(config: HANAConfig, hanaClient: HANAJdbcClient, override def initializeConnection(): Unit = { if (connection != null && !connection.isValid(120)) { + log.debug("initializeConnection the current connection is invalid, closing it") connection.close() } if(connection == null || connection.isClosed ) { + log.debug("initializeConnection creating a connection") connection = hanaClient.getConnection } } @@ -33,8 +35,6 @@ class HANAWriter(config: HANAConfig, hanaClient: HANAJdbcClient, override def write(records: util.Collection[SinkRecord]): Unit = { log.info("write records to HANA") - log.info("initialize connection to HANA") - initializeConnection() val topicMap = Multimaps.index(records, new Function[SinkRecord, String] { @@ -51,7 +51,7 @@ class HANAWriter(config: HANAConfig, hanaClient: HANAJdbcClient, recordsCollector match { case None => - val tableRecordsCollector = new HANASinkRecordsCollector(table, hanaClient, connection, config) + val tableRecordsCollector = new HANASinkRecordsCollector(table, hanaClient, config) tableCache.put(table, tableRecordsCollector) tableRecordsCollector.add(collectionAsScalaIterableConverter(recordsPerTopic).asScala.toSeq) case Some(tableRecordsCollector) => @@ -70,7 +70,7 @@ class HANAWriter(config: HANAConfig, hanaClient: HANAJdbcClient, private def flush(tableCache: Map[String, HANASinkRecordsCollector]): Unit = { log.info("flush records into HANA") for ((table, recordsCollector) <- tableCache) { - recordsCollector.flush() + recordsCollector.flush(connection) } hanaClient.commit(connection) } diff --git a/src/main/scala/com/sap/kafka/utils/ExecuteWithExceptions.scala b/src/main/scala/com/sap/kafka/utils/ExecuteWithExceptions.scala index 3fe3f68..9cba5d2 100644 --- a/src/main/scala/com/sap/kafka/utils/ExecuteWithExceptions.scala +++ b/src/main/scala/com/sap/kafka/utils/ExecuteWithExceptions.scala @@ -1,5 +1,6 @@ package com.sap.kafka.utils +import org.apache.kafka.connect.errors.ConnectException import org.slf4j.LoggerFactory import scala.reflect.ClassTag @@ -19,7 +20,7 @@ object ExecuteWithExceptions { * @tparam TE Technical Exception type * @tparam BE Connector Exception type */ - def defaultThrowException[TE <: Exception, BE <: ConnectorException](exception: TE, + def defaultThrowException[TE <: Exception, BE <: ConnectException](exception: TE, connectorException: BE): BE = { log.error(exception.getMessage) connectorException.initCause(exception) @@ -34,7 +35,7 @@ object ExecuteWithExceptions { * @tparam BE Connector Exception type * @return The block execution result */ - def apply[O, TE <: Exception: ClassTag, BE <: ConnectorException](connectorException: BE)(block: () => O, + def apply[O, TE <: Exception: ClassTag, BE <: ConnectException](connectorException: BE)(block: () => O, doCatch: (TE, BE) => BE = defaultThrowException[TE, BE] _): O = { try { block() diff --git a/src/main/scala/com/sap/kafka/utils/exceptions.scala b/src/main/scala/com/sap/kafka/utils/exceptions.scala index ae02227..0c95724 100644 --- a/src/main/scala/com/sap/kafka/utils/exceptions.scala +++ b/src/main/scala/com/sap/kafka/utils/exceptions.scala @@ -1,5 +1,3 @@ package com.sap.kafka.utils -class ConnectorException(msg: String) extends Exception(msg) - -class SchemaNotMatchedException(msg: String) extends ConnectorException(msg) \ No newline at end of file +class SchemaNotMatchedException(msg: String) extends org.apache.kafka.connect.errors.ConnectException(msg) \ No newline at end of file