Skip to content

Commit

Permalink
use RetriableException to handle retriable connection errors (#148)
Browse files Browse the repository at this point in the history
* use RetriableException for retriable connection errors
* align the sink's retry with the source for now
  • Loading branch information
elakito authored Apr 21, 2023
1 parent 6eaab1f commit 178c28b
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 39 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ The full list of configuration options for `kafka connector for SAP Systems` is

* `batch.size` - This setting can be used to specify the number of records that can be pushed into SAP DB table in a single flush. Should be an `Integer`. Default is `3000`.

* `max.retries` - This setting can be used to specify the maximum no. of retries that can be made to re-establish the connection to SAP DB in case the connection is lost. Should be an `Integer`. Default is `10`.
* `max.retries` - (deprecated) This setting can be used to specify the maximum no. of retries that can be made to re-establish the connection to SAP DB in case the connection is lost. Should be an `Integer`. Default is `10`. This property is currently ignored as the task will automatically retry when a connection error results in a RetriableException for both source and sink tasks.

* `{topic}.table.name` - This setting allows specifying the SAP DBs table name where the data needs to be written to. Should be a `String`. Must be compatible to SAP DB Table name like `"SCHEMA"."TABLE"`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ case class HANAJdbcClient(hanaConfiguration: HANAConfig) {
* @return The created JDBC [[Connection]] object
*/
def getConnection: Connection = {
ExecuteWithExceptions[Connection, HANAConnectorException, HANAJdbcConnectionException] (
ExecuteWithExceptions[Connection, HANAConnectorRetriableException, HANAJdbcConnectionException] (
new HANAJdbcConnectionException("Cannot acquire a connection")) { () =>
val connectionUser: String = hanaConfiguration.connectionUser
val connectionPassword: String = hanaConfiguration.connectionPassword
Expand Down
12 changes: 7 additions & 5 deletions src/main/scala/com/sap/kafka/client/hana/exceptions.scala
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package com.sap.kafka.client.hana

import com.sap.kafka.utils.ConnectorException
import org.apache.kafka.connect.errors.{ConnectException, RetriableException}

class HANAConnectorException(msg: String) extends ConnectorException(msg)
class HANAConnectorException(msg: String) extends ConnectException(msg)

class HANAConnectorRetriableException(msg: String) extends RetriableException(msg)

class HANAConfigInvalidInputException(msg: String) extends HANAConnectorException(msg)

class HANAConfigMissingException(msg: String) extends HANAConnectorException(msg)

class HANAJdbcException(msg: String) extends HANAConnectorException(msg)
class HANAJdbcException(msg: String) extends HANAConnectorRetriableException(msg)

class HANAJdbcConnectionException(msg: String) extends HANAConnectorException(msg)
class HANAJdbcConnectionException(msg: String) extends HANAConnectorRetriableException(msg)

class HANAJdbcBadStateException(msg: String) extends HANAJdbcException(msg)
class HANAJdbcBadStateException(msg: String) extends HANAConnectorRetriableException(msg)
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ abstract class BaseConfig(props: Map[String, String]) {
* Max retries for sink. Should be an integer.
* Default is 10.
*/
@deprecated("Retries are handled by the task","20-04-2023")
def maxRetries = props.getOrElse[String]("max.retries", "1").toInt

/**
Expand Down
27 changes: 6 additions & 21 deletions src/main/scala/com/sap/kafka/connect/sink/GenericSinkTask.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package com.sap.kafka.connect.sink

import java.util

import com.sap.kafka.connect.config.BaseConfig
import com.sap.kafka.utils.ConnectorException
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.connect.errors.ConnectException
import org.apache.kafka.connect.sink.{SinkRecord, SinkTask}
import org.slf4j.Logger

Expand All @@ -17,7 +16,6 @@ abstract class GenericSinkTask extends SinkTask with SinkWriter {
var log: Logger = _
var config:BaseConfig = null
var writer:BaseWriter = null
private var retriesLeft:Int = 0

/**
* Pass the SinkRecords to the writer for Writing
Expand All @@ -26,7 +24,6 @@ abstract class GenericSinkTask extends SinkTask with SinkWriter {
log.info(s"PHASE - 1 - get records from kafka, Started for task with assigned " +
s"partitions ${this.context.assignment().toString} ")
log.info(s"Number of Records read for Sink: ${records.size}")
retriesLeft = config.maxRetries
if (records.isEmpty) {
return
}
Expand All @@ -35,23 +32,11 @@ abstract class GenericSinkTask extends SinkTask with SinkWriter {
try {
writer.write(records)
} catch {
case exception : ConnectorException =>
log.error("Write of {} records failed, remainingRetries={}", records.size(), retriesLeft)
while (retriesLeft > 0) {
try {
retriesLeft = retriesLeft - 1
writer.close()
writer = initWriter(config)
writer.write(records)
retriesLeft = -1
} catch {
case exception: ConnectorException =>
// ignore
}
}

if (retriesLeft == 0)
throw exception
case exception : ConnectException =>
log.error(s"Write of ${records.size} records failed, may retry later ...")
writer.close()
writer = initWriter(config)
throw exception
} finally {
log.info(s"PHASE - 1 ended for task, with assigned partitions ${this.context.assignment().toString}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ import scala.collection.mutable


class HANASinkRecordsCollector(var tableName: String, client: HANAJdbcClient,
connection: Connection, config: HANAConfig) {
config: HANAConfig) {
private val log: Logger = LoggerFactory.getLogger(classOf[HANASinkTask])
private var records: Seq[SinkRecord] = Seq[SinkRecord]()
private var tableMetaData:Seq[metaAttr] = Seq[metaAttr]()
private var metaSchema: MetaSchema = null
var tableConfigInitialized = false


private def initTableConfig(nameSpace: Option[String], tableName: String, topic: String) : Boolean = {

tableConfigInitialized match {
Expand Down Expand Up @@ -175,7 +176,7 @@ class HANASinkRecordsCollector(var tableName: String, client: HANAJdbcClient,
this.records = records
}

private[sink] def flush(): Seq[SinkRecord] = {
private[sink] def flush(connection: Connection): Seq[SinkRecord] = {
val flushedRecords = records
if (!records.isEmpty) {
val insertMode = config.topicProperties(records.head.topic())("insert.mode")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,18 @@ class HANAWriter(config: HANAConfig, hanaClient: HANAJdbcClient,

override def initializeConnection(): Unit = {
if (connection != null && !connection.isValid(120)) {
log.debug("initializeConnection the current connection is invalid, closing it")
connection.close()
}
if(connection == null || connection.isClosed ) {
log.debug("initializeConnection creating a connection")
connection = hanaClient.getConnection
}
}


override def write(records: util.Collection[SinkRecord]): Unit = {
log.info("write records to HANA")
log.info("initialize connection to HANA")

initializeConnection()

val topicMap = Multimaps.index(records, new Function[SinkRecord, String] {
Expand All @@ -51,7 +51,7 @@ class HANAWriter(config: HANAConfig, hanaClient: HANAJdbcClient,

recordsCollector match {
case None =>
val tableRecordsCollector = new HANASinkRecordsCollector(table, hanaClient, connection, config)
val tableRecordsCollector = new HANASinkRecordsCollector(table, hanaClient, config)
tableCache.put(table, tableRecordsCollector)
tableRecordsCollector.add(collectionAsScalaIterableConverter(recordsPerTopic).asScala.toSeq)
case Some(tableRecordsCollector) =>
Expand All @@ -70,7 +70,7 @@ class HANAWriter(config: HANAConfig, hanaClient: HANAJdbcClient,
private def flush(tableCache: Map[String, HANASinkRecordsCollector]): Unit = {
log.info("flush records into HANA")
for ((table, recordsCollector) <- tableCache) {
recordsCollector.flush()
recordsCollector.flush(connection)
}
hanaClient.commit(connection)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.sap.kafka.utils

import org.apache.kafka.connect.errors.ConnectException
import org.slf4j.LoggerFactory

import scala.reflect.ClassTag
Expand All @@ -19,7 +20,7 @@ object ExecuteWithExceptions {
* @tparam TE Technical Exception type
* @tparam BE Connector Exception type
*/
def defaultThrowException[TE <: Exception, BE <: ConnectorException](exception: TE,
def defaultThrowException[TE <: Exception, BE <: ConnectException](exception: TE,
connectorException: BE): BE = {
log.error(exception.getMessage)
connectorException.initCause(exception)
Expand All @@ -34,7 +35,7 @@ object ExecuteWithExceptions {
* @tparam BE Connector Exception type
* @return The block execution result
*/
def apply[O, TE <: Exception: ClassTag, BE <: ConnectorException](connectorException: BE)(block: () => O,
def apply[O, TE <: Exception: ClassTag, BE <: ConnectException](connectorException: BE)(block: () => O,
doCatch: (TE, BE) => BE = defaultThrowException[TE, BE] _): O = {
try {
block()
Expand Down
4 changes: 1 addition & 3 deletions src/main/scala/com/sap/kafka/utils/exceptions.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
package com.sap.kafka.utils

class ConnectorException(msg: String) extends Exception(msg)

class SchemaNotMatchedException(msg: String) extends ConnectorException(msg)
class SchemaNotMatchedException(msg: String) extends org.apache.kafka.connect.errors.ConnectException(msg)

0 comments on commit 178c28b

Please sign in to comment.