Skip to content

Commit

Permalink
Return from the poll after waking up to allow the task status to be c…
Browse files Browse the repository at this point in the history
…hecked (#93)
  • Loading branch information
elakito authored Oct 6, 2021
1 parent 010acdb commit f3d2805
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 30 deletions.
52 changes: 23 additions & 29 deletions src/main/scala/com/sap/kafka/connect/source/GenericSourceTask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package com.sap.kafka.connect.source

import java.util
import java.util.concurrent.atomic.AtomicBoolean

import com.sap.kafka.client.hana.{HANAConfigMissingException, HANAJdbcClient}
import com.sap.kafka.connect.config.hana.HANAParameters
import com.sap.kafka.connect.config.{BaseConfig, BaseConfigConstants}
Expand Down Expand Up @@ -135,12 +134,11 @@ abstract class GenericSourceTask extends SourceTask {
log.info("Start polling records from HANA")
val topic = config.topics.head

var now = time.milliseconds()

while (!stopFlag.get()) {
var now = time.milliseconds()
val querier = tableQueue.head

var waitFlag = false
if (!querier.querying()) {
if (querier.getMaxRowsOffset() == 0) {
val nextUpdate = querier.getLastUpdate() +
Expand All @@ -149,41 +147,37 @@ abstract class GenericSourceTask extends SourceTask {

if (untilNext > 0) {
log.info(s"Waiting $untilNext ms to poll from ${querier.toString}")
waitFlag = true
time.sleep(untilNext)
now = time.milliseconds()
// return from the poll to check the task status
return null
}
}
}
var results = List[SourceRecord]()

if (!waitFlag) {
var results = List[SourceRecord]()

log.info(s"Checking for the next block of results from ${querier.toString}")
querier.maybeStartQuery()
log.info(s"Checking for the next block of results from ${querier.toString}")
querier.maybeStartQuery()

results ++= querier.extractRecords()
results ++= querier.extractRecords()

// dequeue the queierer only if the records are fully polled
if (querier.getMaxRowsOffset() == 0) {
val removedQuerier = tableQueue.dequeue()
assert(removedQuerier == querier)
}
log.info(s"Closing this query for ${querier.toString}")
now = time.milliseconds()
querier.close(now)
if (querier.getMaxRowsOffset() == 0) {
tableQueue.enqueue(querier)
}

if (results.isEmpty) {
log.info(s"No updates for ${querier.toString}")
return null
}
// dequeue the queierer only if the records are fully polled
if (querier.getMaxRowsOffset() == 0) {
val removedQuerier = tableQueue.dequeue()
assert(removedQuerier == querier)
}
log.info(s"Closing this query for ${querier.toString}")
querier.close(time.milliseconds())
if (querier.getMaxRowsOffset() == 0) {
tableQueue.enqueue(querier)
}

log.info(s"Returning ${results.size} records for ${querier.toString}")
return results.asJava
if (results.isEmpty) {
log.info(s"No updates for ${querier.toString}")
return null
}

log.info(s"Returning ${results.size} records for ${querier.toString}")
return results.asJava
}
null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ class HANASourceTaskConversionTest extends HANASourceTaskTestBase {
connection.setAutoCommit(true)
val stmt = connection.createStatement()
stmt.execute("insert into \"TEST\".\"EMPLOYEES_SOURCE\" values(" + sqlValue.toString + ")")
val records = task.poll()
var records = task.poll()
if (records == null) {
records = task.poll()
}
validateRecords(records.asScala.toList, convertedSchema, convertedValue)
stmt.execute("drop table \"TEST\".\"EMPLOYEES_SOURCE\"")
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ class HANASourceTaskUpdateTest extends HANASourceTaskTestBase

stmt.execute("insert into " + SINGLE_TABLE_NAME_FOR_BULK_LOAD + "values(2)")
records = task.poll()
//return after waking up to allow the status to be checked before actuall polling
assert(records == null)
records = task.poll()
//because this reads everything
assert(records.size() === 2)

Expand Down Expand Up @@ -214,6 +217,8 @@ class HANASourceTaskUpdateTest extends HANASourceTaskTestBase

stmt.execute("insert into " + SINGLE_TABLE_NAME_FOR_BULK_QUERY_LOAD + "values(2)")
records = queryLoadTask.poll()
assert(records == null)
records = queryLoadTask.poll()
//because this reads everything
assert(records.size() === 2)

Expand Down Expand Up @@ -260,6 +265,8 @@ class HANASourceTaskUpdateTest extends HANASourceTaskTestBase

stmt.execute("insert into " + SINGLE_TABLE_NAME_FOR_INCR_LOAD + "values(2, 'Lukas')")
records = incrLoadTask.poll()
assert(records == null)
records = incrLoadTask.poll()
// because this only takes the delta
assert(records.size() === 1)

Expand Down Expand Up @@ -306,6 +313,8 @@ class HANASourceTaskUpdateTest extends HANASourceTaskTestBase

stmt.execute("insert into " + SINGLE_TABLE_NAME_FOR_INCR2_LOAD + "values('2', 'Lukas')")
records = incr2LoadTask.poll()
assert(records == null)
records = incr2LoadTask.poll()
// because this only takes the delta
assert(records.size() === 1)

Expand Down Expand Up @@ -352,6 +361,8 @@ class HANASourceTaskUpdateTest extends HANASourceTaskTestBase

stmt.execute("insert into " + SINGLE_TABLE_NAME_FOR_INCR_QUERY_LOAD + "values(2, 'Lukas')")
records = incrQueryLoadTask.poll()
assert(records == null)
records = incrQueryLoadTask.poll()
// because this only takes the delta
assert(records.size() === 1)

Expand Down Expand Up @@ -399,6 +410,11 @@ class HANASourceTaskUpdateTest extends HANASourceTaskTestBase
stmt.execute(sqlstr.format(SINGLE_TABLE_NAME_FOR_BULK_MAXROWS_LOAD, 6))
for(i <- 1 to 3) {
var records = maxrowsLoadTask.poll()
if (i == 1) {
//needs to reset the time for the initial poll
assert(records == null)
records = maxrowsLoadTask.poll()
}
assert(records.size() === 2)
verifyRecords(i-1, 2, records, expectedSchema)
}
Expand Down Expand Up @@ -436,6 +452,8 @@ class HANASourceTaskUpdateTest extends HANASourceTaskTestBase

stmt.execute(sqlstr.format(SINGLE_TABLE_NAME_FOR_INCR_MAXROWS_LOAD, 6))
var records = maxrowsIncrLoadTask.poll()
assert(records == null)
records = maxrowsIncrLoadTask.poll()
assert(records.size() === 1)
assert(maxrowsIncrLoadTask.poll() === null)

Expand Down

0 comments on commit f3d2805

Please sign in to comment.