Skip to content

Commit

Permalink
GEOMESA-3438 Fix Kafka layer readiness when initial load is interrupt…
Browse files Browse the repository at this point in the history
…ed (#3263)
  • Loading branch information
elahrvivaz authored Jan 23, 2025
1 parent 9d91d5c commit 4267561
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import org.locationtech.geomesa.utils.io.CloseWithLogging

import java.io.Closeable
import java.time.Duration
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicLong}
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch}
import java.util.Collections
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, Future}
import scala.util.control.NonFatal

/**
* Reads from Kafka and populates a `KafkaFeatureCache`.
Expand All @@ -37,22 +39,28 @@ trait KafkaCacheLoader extends Closeable with LazyLogging {
object KafkaCacheLoader extends LazyLogging {

object LoaderStatus {
private val count = new AtomicInteger(0)

private val loading = Collections.newSetFromMap(new ConcurrentHashMap[AnyRef, java.lang.Boolean]())
private val firstLoadStartTime = new AtomicLong(0L)

def startLoad(): Boolean = synchronized {
count.incrementAndGet()
def startLoad(loader: AnyRef): Boolean = synchronized {
if (!loading.add(loader)) {
logger.warn(s"Called startLoad for a loader that was already registered and has not yet completed: $loader")
}
firstLoadStartTime.compareAndSet(0L, System.currentTimeMillis())
}
def completedLoad(): Unit = synchronized {
if (count.decrementAndGet() == 0) {

def completedLoad(loader: AnyRef): Unit = synchronized {
if (!loading.remove(loader)) {
logger.warn(s"Called completedLoad for a loader that was not registered or already deregistered: $loader")
} else if (loading.isEmpty) {
logger.info(s"Last active initial load completed. " +
s"Initial loads took ${System.currentTimeMillis()-firstLoadStartTime.get} milliseconds.")
s"Initial loads took ${System.currentTimeMillis() - firstLoadStartTime.get} milliseconds.")
firstLoadStartTime.set(0L)
}
}

def allLoaded(): Boolean = count.get() == 0
def allLoaded(): Boolean = loading.isEmpty
}

object NoOpLoader extends KafkaCacheLoader {
Expand All @@ -70,25 +78,27 @@ object KafkaCacheLoader extends LazyLogging {
doInitialLoad: Boolean,
initialLoadConfig: ExpiryTimeConfig
) extends ThreadedConsumer(consumers, Duration.ofMillis(frequency)) with KafkaCacheLoader {

try { classOf[ConsumerRecord[Any, Any]].getMethod("timestamp") } catch {
case _: NoSuchMethodException => logger.warn("This version of Kafka doesn't support timestamps, using system time")
}

private val initialLoader = if (doInitialLoad) {
// for the initial load, don't bother spatially indexing until we have the final state
val loader = new InitialLoader(sft, consumers, topic, frequency, serializer, initialLoadConfig, this)
CachedThreadPool.execute(loader)
Some(loader)
} else {
startConsumers()
None
private val initialLoader =
if (doInitialLoad) {
// for the initial load, don't bother spatially indexing until we have the final state
Some(new InitialLoader(sft, consumers, topic, frequency, serializer, initialLoadConfig, this))
} else {
None
}

def start(): Unit = {
initialLoader match {
case None => startConsumers()
case Some(loader) => loader.start()
}
}

override def close(): Unit = {
try {
super.close()
} finally {
try { super.close() } finally {
CloseWithLogging(initialLoader)
cache.close()
}
Expand Down Expand Up @@ -128,12 +138,16 @@ object KafkaCacheLoader extends LazyLogging {
toLoad: KafkaCacheLoaderImpl
) extends ThreadedConsumer(consumers, Duration.ofMillis(frequency), false) with Runnable {

import scala.collection.JavaConverters._

private val cache = KafkaFeatureCache.nonIndexing(sft, ordering)

// track the offsets that we want to read to
private val offsets = new ConcurrentHashMap[Int, Long]()
private var latch: CountDownLatch = _
private val done = new AtomicBoolean(false)
private var latch: CountDownLatch = _
@volatile
private var submission: Future[_] = _

override protected def consume(record: ConsumerRecord[Array[Byte], Array[Byte]]): Unit = {
if (done.get) { toLoad.consume(record) } else {
Expand All @@ -160,50 +174,75 @@ object KafkaCacheLoader extends LazyLogging {
}
}

override def run(): Unit = {
LoaderStatus.startLoad()

import scala.collection.JavaConverters._

val partitions = consumers.head.partitionsFor(topic).asScala.map(_.partition)
def start(): Unit = {
LoaderStatus.startLoad(this)
try {
// note: these methods are not available in kafka 0.9, which will cause it to fall back to normal loading
val beginningOffsets = KafkaConsumerVersions.beginningOffsets(consumers.head, topic, partitions.toSeq)
val endOffsets = KafkaConsumerVersions.endOffsets(consumers.head, topic, partitions.toSeq)
partitions.foreach { p =>
// end offsets are the *next* offset that will be returned, so subtract one to track the last offset
// we will actually consume
val endOffset = endOffsets.getOrElse(p, 0L) - 1L
// note: not sure if start offsets are also off by one, but at the worst we would skip bulk loading
// for the last message per topic
val beginningOffset = beginningOffsets.getOrElse(p, 0L)
if (beginningOffset < endOffset) {
offsets.put(p, endOffset)
val partitions = consumers.head.partitionsFor(topic).asScala.map(_.partition)
try {
// note: these methods are not available in kafka 0.9, which will cause it to fall back to normal loading
val beginningOffsets = KafkaConsumerVersions.beginningOffsets(consumers.head, topic, partitions.toSeq)
val endOffsets = KafkaConsumerVersions.endOffsets(consumers.head, topic, partitions.toSeq)
partitions.foreach { p =>
// end offsets are the *next* offset that will be returned, so subtract one to track the last offset
// we will actually consume
val endOffset = endOffsets.getOrElse(p, 0L) - 1L
// note: not sure if start offsets are also off by one, but at the worst we would skip bulk loading
// for the last message per topic
val beginningOffset = beginningOffsets.getOrElse(p, 0L)
if (beginningOffset < endOffset) {
offsets.put(p, endOffset)
}
}
} catch {
case e: NoSuchMethodException => logger.warn(s"Can't support initial bulk loading for current Kafka version: $e")
}
if (offsets.isEmpty) {
// don't bother spinning up the consumer threads if we don't need to actually bulk load anything
startNormalLoad()
LoaderStatus.completedLoad(this)
} else {
logger.info(s"Starting initial load for [$topic] with ${offsets.size} partitions")
latch = new CountDownLatch(offsets.size)
startConsumers() // kick off the asynchronous consumer threads
submission = CachedThreadPool.submit(this)
}
} catch {
case e: NoSuchMethodException => logger.warn(s"Can't support initial bulk loading for current Kafka version: $e")
case NonFatal(e) =>
LoaderStatus.completedLoad(this)
throw e
}
// don't bother spinning up the consumer threads if we don't need to actually bulk load anything
if (!offsets.isEmpty) {
logger.info(s"Starting initial load for [$topic] with ${offsets.size} partitions")
latch = new CountDownLatch(offsets.size)
startConsumers() // kick off the asynchronous consumer threads
}

override def run(): Unit = {
try {
try { latch.await() } finally {
// stop the consumer threads, but won't close the consumers due to `closeConsumers`
close()
// note: don't call this.close() as it would interrupt this thread
super.close()
}
// set a flag just in case the consumer threads haven't finished spinning down, so that we will
// pass any additional messages back to the main loader
done.set(true)
logger.info(s"Finished initial load, transferring to indexed cache for [$topic]")
cache.query(Filter.INCLUDE).foreach(toLoad.cache.put)
logger.info(s"Finished transfer for [$topic]")
startNormalLoad()
} finally {
LoaderStatus.completedLoad(this)
}
}

override def close(): Unit = {
try { super.close() } finally {
if (submission != null && !submission.isDone) {
submission.cancel(true)
}
}
}
// start the normal loading
private def startNormalLoad(): Unit = {
logger.info(s"Starting normal load for [$topic]")
// start the normal loading
toLoad.startConsumers()
LoaderStatus.completedLoad()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import java.io.{Closeable, IOException, StringReader}
import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService}
import java.util.{Collections, Properties, UUID}
import scala.concurrent.duration.Duration
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}

class KafkaDataStore(
Expand Down Expand Up @@ -99,7 +100,13 @@ class KafkaDataStore(
val serializer = serialization.apply(sft)
val initialLoad = config.consumers.readBack.isDefined
val expiry = config.indices.expiry
new KafkaCacheLoaderImpl(sft, cache, consumers, topic, frequency, serializer, initialLoad, expiry)
val loader = new KafkaCacheLoaderImpl(sft, cache, consumers, topic, frequency, serializer, initialLoad, expiry)
try { loader.start() } catch {
case NonFatal(e) =>
CloseWithLogging(loader)
throw e
}
loader
}
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,35 @@ class KafkaDataStoreTest extends KafkaContainerTest with Mockito {
}
}

"support topic read-back" >> {
foreach(Seq(true, false)) { cqEngine =>
val params = if (cqEngine) {
Map("kafka.index.cqengine" -> "geom:default,name:unique", "kafka.zookeepers" -> zookeepers)
} else {
Map.empty[String, String]
}
val (producer, consumer, sft) = createStorePair(params ++ Map("kafka.consumer.read-back" -> "Inf"))
try {
producer.createSchema(sft)

val f0 = ScalaSimpleFeature.create(sft, "sm", "smith", 30, "2017-01-01T00:00:00.000Z", "POINT (0 0)")
val f1 = ScalaSimpleFeature.create(sft, "jo", "jones", 20, "2017-01-02T00:00:00.000Z", "POINT (-10 -10)")

// initial write
WithClose(producer.getFeatureWriterAppend(sft.getTypeName, Transaction.AUTO_COMMIT)) { writer =>
Seq(f0, f1).foreach(FeatureUtils.write(writer, _, useProvidedFid = true))
}

consumer.metadata.resetCache()
val store = consumer.getFeatureSource(sft.getTypeName) // start the consumer polling
eventually(40, 100.millis)(SelfClosingIterator(store.getFeatures.features).toSeq must containTheSameElementsAs(Seq(f0, f1)))
} finally {
consumer.dispose()
producer.dispose()
}
}
}

"write/read with visibilities" >> {
import org.locationtech.geomesa.security.AuthProviderParam

Expand Down

0 comments on commit 4267561

Please sign in to comment.