From 4267561c8d883c62ab2bc560f467b27d0e2064d1 Mon Sep 17 00:00:00 2001 From: Emilio Date: Thu, 23 Jan 2025 14:56:28 -0500 Subject: [PATCH] GEOMESA-3438 Fix Kafka layer readiness when initial load is interrupted (#3263) --- .../geomesa/kafka/data/KafkaCacheLoader.scala | 137 +++++++++++------- .../geomesa/kafka/data/KafkaDataStore.scala | 9 +- .../kafka/data/KafkaDataStoreTest.scala | 29 ++++ 3 files changed, 125 insertions(+), 50 deletions(-) diff --git a/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaCacheLoader.scala b/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaCacheLoader.scala index a8f61326062..f5854b16c08 100644 --- a/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaCacheLoader.scala +++ b/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaCacheLoader.scala @@ -23,8 +23,10 @@ import org.locationtech.geomesa.utils.io.CloseWithLogging import java.io.Closeable import java.time.Duration -import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicLong} -import java.util.concurrent.{ConcurrentHashMap, CountDownLatch} +import java.util.Collections +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} +import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, Future} +import scala.util.control.NonFatal /** * Reads from Kafka and populates a `KafkaFeatureCache`. @@ -37,22 +39,28 @@ trait KafkaCacheLoader extends Closeable with LazyLogging { object KafkaCacheLoader extends LazyLogging { object LoaderStatus { - private val count = new AtomicInteger(0) + + private val loading = Collections.newSetFromMap(new ConcurrentHashMap[AnyRef, java.lang.Boolean]()) private val firstLoadStartTime = new AtomicLong(0L) - def startLoad(): Boolean = synchronized { - count.incrementAndGet() + def startLoad(loader: AnyRef): Boolean = synchronized { + if (!loading.add(loader)) { + logger.warn(s"Called startLoad for a loader that was already registered and has not yet completed: $loader") + } firstLoadStartTime.compareAndSet(0L, System.currentTimeMillis()) } - def completedLoad(): Unit = synchronized { - if (count.decrementAndGet() == 0) { + + def completedLoad(loader: AnyRef): Unit = synchronized { + if (!loading.remove(loader)) { + logger.warn(s"Called completedLoad for a loader that was not registered or already deregistered: $loader") + } else if (loading.isEmpty) { logger.info(s"Last active initial load completed. " + - s"Initial loads took ${System.currentTimeMillis()-firstLoadStartTime.get} milliseconds.") + s"Initial loads took ${System.currentTimeMillis() - firstLoadStartTime.get} milliseconds.") firstLoadStartTime.set(0L) } } - def allLoaded(): Boolean = count.get() == 0 + def allLoaded(): Boolean = loading.isEmpty } object NoOpLoader extends KafkaCacheLoader { @@ -70,25 +78,27 @@ object KafkaCacheLoader extends LazyLogging { doInitialLoad: Boolean, initialLoadConfig: ExpiryTimeConfig ) extends ThreadedConsumer(consumers, Duration.ofMillis(frequency)) with KafkaCacheLoader { - try { classOf[ConsumerRecord[Any, Any]].getMethod("timestamp") } catch { case _: NoSuchMethodException => logger.warn("This version of Kafka doesn't support timestamps, using system time") } - private val initialLoader = if (doInitialLoad) { - // for the initial load, don't bother spatially indexing until we have the final state - val loader = new InitialLoader(sft, consumers, topic, frequency, serializer, initialLoadConfig, this) - CachedThreadPool.execute(loader) - Some(loader) - } else { - startConsumers() - None + private val initialLoader = + if (doInitialLoad) { + // for the initial load, don't bother spatially indexing until we have the final state + Some(new InitialLoader(sft, consumers, topic, frequency, serializer, initialLoadConfig, this)) + } else { + None + } + + def start(): Unit = { + initialLoader match { + case None => startConsumers() + case Some(loader) => loader.start() + } } override def close(): Unit = { - try { - super.close() - } finally { + try { super.close() } finally { CloseWithLogging(initialLoader) cache.close() } @@ -128,12 +138,16 @@ object KafkaCacheLoader extends LazyLogging { toLoad: KafkaCacheLoaderImpl ) extends ThreadedConsumer(consumers, Duration.ofMillis(frequency), false) with Runnable { + import scala.collection.JavaConverters._ + private val cache = KafkaFeatureCache.nonIndexing(sft, ordering) // track the offsets that we want to read to private val offsets = new ConcurrentHashMap[Int, Long]() - private var latch: CountDownLatch = _ private val done = new AtomicBoolean(false) + private var latch: CountDownLatch = _ + @volatile + private var submission: Future[_] = _ override protected def consume(record: ConsumerRecord[Array[Byte], Array[Byte]]): Unit = { if (done.get) { toLoad.consume(record) } else { @@ -160,38 +174,51 @@ object KafkaCacheLoader extends LazyLogging { } } - override def run(): Unit = { - LoaderStatus.startLoad() - - import scala.collection.JavaConverters._ - - val partitions = consumers.head.partitionsFor(topic).asScala.map(_.partition) + def start(): Unit = { + LoaderStatus.startLoad(this) try { - // note: these methods are not available in kafka 0.9, which will cause it to fall back to normal loading - val beginningOffsets = KafkaConsumerVersions.beginningOffsets(consumers.head, topic, partitions.toSeq) - val endOffsets = KafkaConsumerVersions.endOffsets(consumers.head, topic, partitions.toSeq) - partitions.foreach { p => - // end offsets are the *next* offset that will be returned, so subtract one to track the last offset - // we will actually consume - val endOffset = endOffsets.getOrElse(p, 0L) - 1L - // note: not sure if start offsets are also off by one, but at the worst we would skip bulk loading - // for the last message per topic - val beginningOffset = beginningOffsets.getOrElse(p, 0L) - if (beginningOffset < endOffset) { - offsets.put(p, endOffset) + val partitions = consumers.head.partitionsFor(topic).asScala.map(_.partition) + try { + // note: these methods are not available in kafka 0.9, which will cause it to fall back to normal loading + val beginningOffsets = KafkaConsumerVersions.beginningOffsets(consumers.head, topic, partitions.toSeq) + val endOffsets = KafkaConsumerVersions.endOffsets(consumers.head, topic, partitions.toSeq) + partitions.foreach { p => + // end offsets are the *next* offset that will be returned, so subtract one to track the last offset + // we will actually consume + val endOffset = endOffsets.getOrElse(p, 0L) - 1L + // note: not sure if start offsets are also off by one, but at the worst we would skip bulk loading + // for the last message per topic + val beginningOffset = beginningOffsets.getOrElse(p, 0L) + if (beginningOffset < endOffset) { + offsets.put(p, endOffset) + } } + } catch { + case e: NoSuchMethodException => logger.warn(s"Can't support initial bulk loading for current Kafka version: $e") + } + if (offsets.isEmpty) { + // don't bother spinning up the consumer threads if we don't need to actually bulk load anything + startNormalLoad() + LoaderStatus.completedLoad(this) + } else { + logger.info(s"Starting initial load for [$topic] with ${offsets.size} partitions") + latch = new CountDownLatch(offsets.size) + startConsumers() // kick off the asynchronous consumer threads + submission = CachedThreadPool.submit(this) } } catch { - case e: NoSuchMethodException => logger.warn(s"Can't support initial bulk loading for current Kafka version: $e") + case NonFatal(e) => + LoaderStatus.completedLoad(this) + throw e } - // don't bother spinning up the consumer threads if we don't need to actually bulk load anything - if (!offsets.isEmpty) { - logger.info(s"Starting initial load for [$topic] with ${offsets.size} partitions") - latch = new CountDownLatch(offsets.size) - startConsumers() // kick off the asynchronous consumer threads + } + + override def run(): Unit = { + try { try { latch.await() } finally { // stop the consumer threads, but won't close the consumers due to `closeConsumers` - close() + // note: don't call this.close() as it would interrupt this thread + super.close() } // set a flag just in case the consumer threads haven't finished spinning down, so that we will // pass any additional messages back to the main loader @@ -199,11 +226,23 @@ object KafkaCacheLoader extends LazyLogging { logger.info(s"Finished initial load, transferring to indexed cache for [$topic]") cache.query(Filter.INCLUDE).foreach(toLoad.cache.put) logger.info(s"Finished transfer for [$topic]") + startNormalLoad() + } finally { + LoaderStatus.completedLoad(this) } + } + + override def close(): Unit = { + try { super.close() } finally { + if (submission != null && !submission.isDone) { + submission.cancel(true) + } + } + } + // start the normal loading + private def startNormalLoad(): Unit = { logger.info(s"Starting normal load for [$topic]") - // start the normal loading toLoad.startConsumers() - LoaderStatus.completedLoad() } } } diff --git a/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaDataStore.scala b/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaDataStore.scala index 20519bee866..2daaf2c35a0 100644 --- a/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaDataStore.scala +++ b/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaDataStore.scala @@ -53,6 +53,7 @@ import java.io.{Closeable, IOException, StringReader} import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService} import java.util.{Collections, Properties, UUID} import scala.concurrent.duration.Duration +import scala.util.control.NonFatal import scala.util.{Failure, Success, Try} class KafkaDataStore( @@ -99,7 +100,13 @@ class KafkaDataStore( val serializer = serialization.apply(sft) val initialLoad = config.consumers.readBack.isDefined val expiry = config.indices.expiry - new KafkaCacheLoaderImpl(sft, cache, consumers, topic, frequency, serializer, initialLoad, expiry) + val loader = new KafkaCacheLoaderImpl(sft, cache, consumers, topic, frequency, serializer, initialLoad, expiry) + try { loader.start() } catch { + case NonFatal(e) => + CloseWithLogging(loader) + throw e + } + loader } } }) diff --git a/geomesa-kafka/geomesa-kafka-datastore/src/test/scala/org/locationtech/geomesa/kafka/data/KafkaDataStoreTest.scala b/geomesa-kafka/geomesa-kafka-datastore/src/test/scala/org/locationtech/geomesa/kafka/data/KafkaDataStoreTest.scala index 7ef2970c6ea..ae930e7fe10 100644 --- a/geomesa-kafka/geomesa-kafka-datastore/src/test/scala/org/locationtech/geomesa/kafka/data/KafkaDataStoreTest.scala +++ b/geomesa-kafka/geomesa-kafka-datastore/src/test/scala/org/locationtech/geomesa/kafka/data/KafkaDataStoreTest.scala @@ -265,6 +265,35 @@ class KafkaDataStoreTest extends KafkaContainerTest with Mockito { } } + "support topic read-back" >> { + foreach(Seq(true, false)) { cqEngine => + val params = if (cqEngine) { + Map("kafka.index.cqengine" -> "geom:default,name:unique", "kafka.zookeepers" -> zookeepers) + } else { + Map.empty[String, String] + } + val (producer, consumer, sft) = createStorePair(params ++ Map("kafka.consumer.read-back" -> "Inf")) + try { + producer.createSchema(sft) + + val f0 = ScalaSimpleFeature.create(sft, "sm", "smith", 30, "2017-01-01T00:00:00.000Z", "POINT (0 0)") + val f1 = ScalaSimpleFeature.create(sft, "jo", "jones", 20, "2017-01-02T00:00:00.000Z", "POINT (-10 -10)") + + // initial write + WithClose(producer.getFeatureWriterAppend(sft.getTypeName, Transaction.AUTO_COMMIT)) { writer => + Seq(f0, f1).foreach(FeatureUtils.write(writer, _, useProvidedFid = true)) + } + + consumer.metadata.resetCache() + val store = consumer.getFeatureSource(sft.getTypeName) // start the consumer polling + eventually(40, 100.millis)(SelfClosingIterator(store.getFeatures.features).toSeq must containTheSameElementsAs(Seq(f0, f1))) + } finally { + consumer.dispose() + producer.dispose() + } + } + } + "write/read with visibilities" >> { import org.locationtech.geomesa.security.AuthProviderParam