Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better outstanding external calls system #655

Merged
merged 1 commit into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package graphql.nadel.time

import java.io.Closeable
import java.time.Duration
import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import java.util.function.Supplier

Expand All @@ -17,53 +19,67 @@ open class NadelInternalLatencyTrackerImpl(
return internalLatency.elapsed()
}

fun onExternalRun(code: Runnable) {
onExternalCallStart()
fun newExternalCall(): Closeable {
return ExternalCall()
}

try {
fun onExternalRun(code: Runnable) {
newExternalCall().use {
code.run()
} finally {
onExternalCallEnd()
}
}

fun <T : Any> onExternalGet(code: Supplier<T>): T {
onExternalCallStart()

try {
return code.get()
} finally {
onExternalCallEnd()
return newExternalCall().use {
code.get()
}
}

fun <T : Any> onExternalFuture(future: CompletableFuture<T>): CompletableFuture<T> {
onExternalCallStart()
val call = newExternalCall()

return future
.whenComplete { _, _ ->
onExternalCallEnd()
call.close()
}
}

fun <T : Any> onExternalFuture(future: Supplier<CompletableFuture<T>>): CompletableFuture<T> {
onExternalCallStart()
val call = newExternalCall()

return future.get()
.whenComplete { _, _ ->
onExternalCallEnd()
call.close()
}
}

protected fun onExternalCallStart() {
if (outstandingExternalLatencyCount.getAndIncrement() == 0) {
internalLatency.stop()
}
/**
* Used to ensure that at the end of a request, there are no outstanding external calls.
*
* @return true if all external calls were closed
*/
fun noOutstandingCalls(): Boolean {
return outstandingExternalLatencyCount.get() == 0
}

protected fun onExternalCallEnd() {
if (outstandingExternalLatencyCount.decrementAndGet() == 0) {
internalLatency.start()
private inner class ExternalCall : Closeable {
/**
* Used to ensure the call does not decrement the counter more than once.
*/
private val closed = AtomicBoolean(false)

init {
if (outstandingExternalLatencyCount.getAndIncrement() == 0) {
internalLatency.stop()
}
}

override fun close() {
if (!closed.getAndSet(true)) {
if (outstandingExternalLatencyCount.decrementAndGet() == 0) {
internalLatency.start()
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package graphql.nadel.time

import graphql.Assert.assertFalse
import graphql.nadel.test.mock
import io.mockk.confirmVerified
import io.mockk.every
Expand Down Expand Up @@ -51,6 +52,8 @@ class NadelInternalLatencyTrackerImplTest {
internalLatency.start()
}
confirmVerified(internalLatency)

assertTrue(tracker.noOutstandingCalls())
}

@Test
Expand Down Expand Up @@ -94,6 +97,8 @@ class NadelInternalLatencyTrackerImplTest {
internalLatency.start()
}
confirmVerified(internalLatency)

assertTrue(tracker.noOutstandingCalls())
}

@Test
Expand Down Expand Up @@ -124,6 +129,8 @@ class NadelInternalLatencyTrackerImplTest {
internalLatency.start()
}
confirmVerified(internalLatency)

assertTrue(tracker.noOutstandingCalls())
}

@Test
Expand Down Expand Up @@ -158,6 +165,8 @@ class NadelInternalLatencyTrackerImplTest {
internalLatency.start()
}
confirmVerified(internalLatency)

assertTrue(tracker.noOutstandingCalls())
}

@Test
Expand Down Expand Up @@ -208,6 +217,8 @@ class NadelInternalLatencyTrackerImplTest {
internalLatency.start()
}
confirmVerified(internalLatency)

assertTrue(tracker.noOutstandingCalls())
}

@Test
Expand Down Expand Up @@ -265,9 +276,63 @@ class NadelInternalLatencyTrackerImplTest {
thread2.join()

// Then: internal latency is started again as all external work completes
verify(exactly=1){
verify(exactly = 1) {
internalLatency.start()
}
confirmVerified(internalLatency)

assertTrue(tracker.noOutstandingCalls())
}

@Test
fun `cannot close external call more than once`() {
val tracker = NadelInternalLatencyTrackerImpl(internalLatency)

every { internalLatency.start() } returns Unit
every { internalLatency.stop() } returns Unit

val call1 = tracker.newExternalCall()
val call2 = tracker.newExternalCall()

// When: close call 1 multiple times
call1.close()
call1.close()
call1.close()

// Then: does not resume internal latency
assertFalse(tracker.noOutstandingCalls())
verify(exactly = 1) {
internalLatency.stop()
}
confirmVerified(internalLatency)

// When: close call 2
call2.close()

// Then: it resumes internal latency as there are more no outstanding calls
assertTrue(tracker.noOutstandingCalls())
verify(exactly = 1) {
internalLatency.start()
}
confirmVerified(internalLatency)

// When: open another external call
val call3 = tracker.newExternalCall()

// Then: has outstanding call
assertFalse(tracker.noOutstandingCalls())

// When: closing already closed calls
call1.close()
call2.close()

// Then: still has outstanding call 3
assertFalse(tracker.noOutstandingCalls())

// When: close call 3
call3.close()

// Then: all calls are closed
assertTrue(tracker.noOutstandingCalls())
}
}
Loading