From 398884c76d21f314ead9e6f6e805e2924f7fd078 Mon Sep 17 00:00:00 2001 From: Franklin Wang Date: Thu, 12 Dec 2024 10:57:52 +1100 Subject: [PATCH] Better outstanding external calls system Ensures calls are always opened/closed Ensures calls cannot be closed more than once --- .../time/NadelInternalLatencyTrackerImpl.kt | 60 +++++++++++------ .../NadelInternalLatencyTrackerImplTest.kt | 67 ++++++++++++++++++- 2 files changed, 104 insertions(+), 23 deletions(-) diff --git a/lib/src/main/java/graphql/nadel/time/NadelInternalLatencyTrackerImpl.kt b/lib/src/main/java/graphql/nadel/time/NadelInternalLatencyTrackerImpl.kt index 23e4463ee..182c080ed 100644 --- a/lib/src/main/java/graphql/nadel/time/NadelInternalLatencyTrackerImpl.kt +++ b/lib/src/main/java/graphql/nadel/time/NadelInternalLatencyTrackerImpl.kt @@ -1,7 +1,9 @@ package graphql.nadel.time +import java.io.Closeable import java.time.Duration import java.util.concurrent.CompletableFuture +import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicInteger import java.util.function.Supplier @@ -17,53 +19,67 @@ open class NadelInternalLatencyTrackerImpl( return internalLatency.elapsed() } - fun onExternalRun(code: Runnable) { - onExternalCallStart() + fun newExternalCall(): Closeable { + return ExternalCall() + } - try { + fun onExternalRun(code: Runnable) { + newExternalCall().use { code.run() - } finally { - onExternalCallEnd() } } fun onExternalGet(code: Supplier): T { - onExternalCallStart() - - try { - return code.get() - } finally { - onExternalCallEnd() + return newExternalCall().use { + code.get() } } fun onExternalFuture(future: CompletableFuture): CompletableFuture { - onExternalCallStart() + val call = newExternalCall() return future .whenComplete { _, _ -> - onExternalCallEnd() + call.close() } } fun onExternalFuture(future: Supplier>): CompletableFuture { - onExternalCallStart() + val call = newExternalCall() return future.get() .whenComplete { _, _ -> - onExternalCallEnd() + call.close() } } - protected fun onExternalCallStart() { - if (outstandingExternalLatencyCount.getAndIncrement() == 0) { - internalLatency.stop() - } + /** + * Used to ensure that at the end of a request, there are no outstanding external calls. + * + * @return true if all external calls were closed + */ + fun noOutstandingCalls(): Boolean { + return outstandingExternalLatencyCount.get() == 0 } - protected fun onExternalCallEnd() { - if (outstandingExternalLatencyCount.decrementAndGet() == 0) { - internalLatency.start() + private inner class ExternalCall : Closeable { + /** + * Used to ensure the call does not decrement the counter more than once. + */ + private val closed = AtomicBoolean(false) + + init { + if (outstandingExternalLatencyCount.getAndIncrement() == 0) { + internalLatency.stop() + } + } + + override fun close() { + if (!closed.getAndSet(true)) { + if (outstandingExternalLatencyCount.decrementAndGet() == 0) { + internalLatency.start() + } + } } } } diff --git a/lib/src/test/kotlin/graphql/nadel/time/NadelInternalLatencyTrackerImplTest.kt b/lib/src/test/kotlin/graphql/nadel/time/NadelInternalLatencyTrackerImplTest.kt index e591396c2..0b43c41b4 100644 --- a/lib/src/test/kotlin/graphql/nadel/time/NadelInternalLatencyTrackerImplTest.kt +++ b/lib/src/test/kotlin/graphql/nadel/time/NadelInternalLatencyTrackerImplTest.kt @@ -1,5 +1,6 @@ package graphql.nadel.time +import graphql.Assert.assertFalse import graphql.nadel.test.mock import io.mockk.confirmVerified import io.mockk.every @@ -51,6 +52,8 @@ class NadelInternalLatencyTrackerImplTest { internalLatency.start() } confirmVerified(internalLatency) + + assertTrue(tracker.noOutstandingCalls()) } @Test @@ -94,6 +97,8 @@ class NadelInternalLatencyTrackerImplTest { internalLatency.start() } confirmVerified(internalLatency) + + assertTrue(tracker.noOutstandingCalls()) } @Test @@ -124,6 +129,8 @@ class NadelInternalLatencyTrackerImplTest { internalLatency.start() } confirmVerified(internalLatency) + + assertTrue(tracker.noOutstandingCalls()) } @Test @@ -158,6 +165,8 @@ class NadelInternalLatencyTrackerImplTest { internalLatency.start() } confirmVerified(internalLatency) + + assertTrue(tracker.noOutstandingCalls()) } @Test @@ -208,6 +217,8 @@ class NadelInternalLatencyTrackerImplTest { internalLatency.start() } confirmVerified(internalLatency) + + assertTrue(tracker.noOutstandingCalls()) } @Test @@ -265,9 +276,63 @@ class NadelInternalLatencyTrackerImplTest { thread2.join() // Then: internal latency is started again as all external work completes - verify(exactly=1){ + verify(exactly = 1) { internalLatency.start() } confirmVerified(internalLatency) + + assertTrue(tracker.noOutstandingCalls()) + } + + @Test + fun `cannot close external call more than once`() { + val tracker = NadelInternalLatencyTrackerImpl(internalLatency) + + every { internalLatency.start() } returns Unit + every { internalLatency.stop() } returns Unit + + val call1 = tracker.newExternalCall() + val call2 = tracker.newExternalCall() + + // When: close call 1 multiple times + call1.close() + call1.close() + call1.close() + + // Then: does not resume internal latency + assertFalse(tracker.noOutstandingCalls()) + verify(exactly = 1) { + internalLatency.stop() + } + confirmVerified(internalLatency) + + // When: close call 2 + call2.close() + + // Then: it resumes internal latency as there are more no outstanding calls + assertTrue(tracker.noOutstandingCalls()) + verify(exactly = 1) { + internalLatency.start() + } + confirmVerified(internalLatency) + + // When: open another external call + val call3 = tracker.newExternalCall() + + // Then: has outstanding call + assertFalse(tracker.noOutstandingCalls()) + + // When: closing already closed calls + call1.close() + call2.close() + + // Then: still has outstanding call 3 + assertFalse(tracker.noOutstandingCalls()) + + // When: close call 3 + call3.close() + + // Then: all calls are closed + assertTrue(tracker.noOutstandingCalls()) } }