Skip to content

Commit

Permalink
Revert "Switch to suspending functions all the way to the top (#532)" (
Browse files Browse the repository at this point in the history
…#542)

This reverts commit 4b332b5.
  • Loading branch information
temaEmelyan authored May 14, 2024
1 parent 3f80323 commit da58091
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 83 deletions.
136 changes: 55 additions & 81 deletions lib/src/main/java/graphql/nadel/Nadel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,14 @@ import graphql.schema.idl.TypeDefinitionRegistry
import graphql.schema.idl.WiringFactory
import graphql.validation.ValidationError
import graphql.validation.Validator
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.future.await
import kotlinx.coroutines.future.future
import kotlinx.coroutines.launch
import org.slf4j.Logger
import java.io.Reader
import java.io.StringReader
import java.util.Locale
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionException
import java.util.concurrent.atomic.AtomicReference
import java.util.function.Function

class Nadel private constructor(
private val engine: NextgenEngine,
Expand All @@ -55,24 +48,7 @@ class Nadel private constructor(
private val instrumentation: NadelInstrumentation,
private val preparsedDocumentProvider: PreparsedDocumentProvider,
) {
private val coroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)

@Deprecated(message = "Use executeAsync", replaceWith = ReplaceWith("this.executeAsync(executionInput)"))
@JvmName("execute") // For binary compat
fun executeJava(executionInput: NadelExecutionInput): CompletableFuture<ExecutionResult> {
return coroutineScope.future {
execute(executionInput)
}
}

fun executeAsync(executionInput: NadelExecutionInput): CompletableFuture<ExecutionResult> {
return coroutineScope.future {
execute(executionInput)
}
}

@JvmName("executeSuspending")
suspend fun execute(nadelExecutionInput: NadelExecutionInput): ExecutionResult {
fun execute(nadelExecutionInput: NadelExecutionInput): CompletableFuture<ExecutionResult> {
val executionInput: ExecutionInput = newExecutionInput()
.query(nadelExecutionInput.query)
.operationName(nadelExecutionInput.operationName)
Expand All @@ -88,7 +64,7 @@ class Nadel private constructor(
.executionId(nadelExecutionInput.executionId)
.build()

val hints = nadelExecutionInput.nadelExecutionHints
val nadelExecutionParams = NadelExecutionParams(nadelExecutionInput.nadelExecutionHints)
val instrumentationState = instrumentation.createState(
NadelInstrumentationCreateStateParameters(querySchema, executionInput),
)
Expand All @@ -99,79 +75,75 @@ class Nadel private constructor(
return try {
val executionInstrumentation = instrumentation.beginQueryExecution(instrumentationParameters)

val result = try {
parseValidateAndExecute(executionInput, instrumentationState, hints)
.also {
executionInstrumentation.onCompleted(it, null)
parseValidateAndExecute(executionInput, querySchema, instrumentationState, nadelExecutionParams)
// finish up instrumentation
.whenComplete { result: ExecutionResult?, t: Throwable? ->
executionInstrumentation.onCompleted(result, t)
}
.exceptionally { throwable: Throwable ->
if (throwable is AbortExecutionException) {
throwable.toExecutionResult()
} else if (throwable is CompletionException && throwable.cause is AbortExecutionException) {
val abortException = throwable.cause as AbortExecutionException
abortException.toExecutionResult()
} else if (throwable is RuntimeException) {
throw throwable
} else {
throw RuntimeException(throwable)
}
} catch (e: Exception) {
executionInstrumentation.onCompleted(null, e)

val cause = e.cause
if (e is AbortExecutionException) {
e.toExecutionResult()
} else if (e is CompletionException && cause is AbortExecutionException) {
cause.toExecutionResult()
} else {
throw e
} //
// allow instrumentation to tweak the result
.thenCompose { result: ExecutionResult ->
instrumentation.instrumentExecutionResult(result, instrumentationParameters)
}
}

instrumentation
.instrumentExecutionResult(result, instrumentationParameters)
.await()
} catch (e: AbortExecutionException) {
instrumentation
.instrumentExecutionResult(e.toExecutionResult(), instrumentationParameters)
.await()
} catch (abortException: AbortExecutionException) {
instrumentation.instrumentExecutionResult(abortException.toExecutionResult(), instrumentationParameters)
}
}

fun close() {
// Closes the scope after letting in flight requests go through
coroutineScope.launch {
delay(60_000) // Wait a minute
coroutineScope.cancel()
}
engine.close()
}

private suspend fun parseValidateAndExecute(
private fun parseValidateAndExecute(
executionInput: ExecutionInput,
graphQLSchema: GraphQLSchema,
instrumentationState: InstrumentationState?,
hints: NadelExecutionHints,
): ExecutionResult {
// todo: I'm pretty sure this is never changed, but let's circle back to that in another PR to reduce changelog here
nadelExecutionParams: NadelExecutionParams,
): CompletableFuture<ExecutionResult> {
val executionInputRef = AtomicReference(executionInput)

val result = preparsedDocumentProvider
.getDocumentAsync(executionInput) { transformedInput ->
// If they change the original query in the pre-parser, then we want to see it downstream from then on
executionInputRef.set(transformedInput)
parseAndValidate(executionInputRef, instrumentationState)
val computeFunction = Function { transformedInput: ExecutionInput ->
// if they change the original query in the pre-parser, then we want to see it downstream from then on
executionInputRef.set(transformedInput)
parseAndValidate(executionInputRef, graphQLSchema, instrumentationState)
}

return preparsedDocumentProvider.getDocumentAsync(executionInput, computeFunction)
.thenCompose { result ->
if (result.hasErrors()) {
CompletableFuture.completedFuture(
ExecutionResultImpl(result.errors),
)
} else engine.execute(
executionInputRef.get()!!,
result.document,
instrumentationState,
nadelExecutionParams,
)
}
.await()

return if (result.hasErrors()) {
ExecutionResultImpl(result.errors)
} else {
engine.execute(
executionInput = executionInputRef.get()!!,
queryDocument = result.document,
instrumentationState = instrumentationState,
executionHints = hints,
)
}
}

private fun parseAndValidate(
executionInputRef: AtomicReference<ExecutionInput>,
graphQLSchema: GraphQLSchema,
instrumentationState: InstrumentationState?,
): PreparsedDocumentEntry {
var executionInput = executionInputRef.get()!!

val query = executionInput.query
logNotSafe.debug("Parsing query: '{}'...", query)
val parseResult = parse(executionInput, instrumentationState)
val parseResult = parse(executionInput, graphQLSchema, instrumentationState)

return if (parseResult.isFailure) {
logNotSafe.warn("Query failed to parse : '{}'", executionInput.query)
Expand All @@ -186,7 +158,7 @@ class Nadel private constructor(
executionInputRef.set(executionInput)

logNotSafe.debug("Validating query: '{}'", query)
val errors = validate(executionInput, document, instrumentationState)
val errors = validate(executionInput, document, graphQLSchema, instrumentationState)

if (errors.isNotEmpty()) {
logNotSafe.warn("Query failed to validate : '{}' because of {} ", query, errors)
Expand All @@ -199,11 +171,12 @@ class Nadel private constructor(

private fun parse(
executionInput: ExecutionInput,
graphQLSchema: GraphQLSchema,
instrumentationState: InstrumentationState?,
): ParseAndValidateResult {
val parameters = NadelInstrumentationQueryExecutionParameters(
executionInput,
querySchema,
graphQLSchema,
instrumentationState
)

Expand Down Expand Up @@ -234,19 +207,20 @@ class Nadel private constructor(
private fun validate(
executionInput: ExecutionInput,
document: Document,
graphQLSchema: GraphQLSchema,
instrumentationState: InstrumentationState?,
): MutableList<ValidationError> {
val validationCtx = instrumentation.beginValidation(
NadelInstrumentationQueryValidationParameters(
executionInput = executionInput,
document = document,
schema = querySchema,
schema = graphQLSchema,
instrumentationState = instrumentationState,
context = executionInput.context,
),
)
val validator = Validator()
val validationErrors = validator.validateDocument(querySchema, document, Locale.getDefault())
val validationErrors = validator.validateDocument(graphQLSchema, document, Locale.getDefault())
validationCtx.onCompleted(validationErrors, null)
return validationErrors
}
Expand Down
3 changes: 3 additions & 0 deletions lib/src/main/java/graphql/nadel/NadelExecutionParams.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package graphql.nadel

class NadelExecutionParams internal constructor(val nadelExecutionHints: NadelExecutionHints)
35 changes: 34 additions & 1 deletion lib/src/main/java/graphql/nadel/NextgenEngine.kt
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,19 @@ import graphql.normalized.ExecutableNormalizedField
import graphql.normalized.ExecutableNormalizedOperationFactory.createExecutableNormalizedOperationWithRawVariables
import graphql.normalized.VariablePredicate
import graphql.schema.GraphQLSchema
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.cancel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.future.asCompletableFuture
import kotlinx.coroutines.future.asDeferred
import kotlinx.coroutines.future.await
import kotlinx.coroutines.launch
import java.util.concurrent.CompletableFuture
import graphql.normalized.ExecutableNormalizedOperationFactory.Options.defaultOptions as executableNormalizedOperationFactoryOptions

internal class NextgenEngine(
Expand All @@ -63,6 +71,7 @@ internal class NextgenEngine(
transforms: List<NadelTransform<out Any>> = emptyList(),
introspectionRunnerFactory: NadelIntrospectionRunnerFactory = NadelIntrospectionRunnerFactory(::NadelDefaultIntrospectionRunner),
) {
private val coroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
private val services: Map<String, Service> = services.strictAssociateBy { it.name }
private val overallExecutionBlueprint = NadelExecutionBlueprintFactory.create(
engineSchema = engineSchema,
Expand Down Expand Up @@ -90,7 +99,31 @@ internal class NextgenEngine(
.maxChildrenDepth(maxQueryDepth)
.maxFieldsCount(maxFieldCount)

suspend fun execute(
fun execute(
executionInput: ExecutionInput,
queryDocument: Document,
instrumentationState: InstrumentationState?,
nadelExecutionParams: NadelExecutionParams,
): CompletableFuture<ExecutionResult> {
return coroutineScope.async {
executeCoroutine(
executionInput,
queryDocument,
instrumentationState,
nadelExecutionParams.nadelExecutionHints,
)
}.asCompletableFuture()
}

fun close() {
// Closes the scope after letting in flight requests go through
coroutineScope.launch {
delay(60_000) // Wait a minute
coroutineScope.cancel()
}
}

private suspend fun executeCoroutine(
executionInput: ExecutionInput,
queryDocument: Document,
instrumentationState: InstrumentationState?,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ suspend fun main() {
.query(query)
.build(),
)
.asDeferred()
.await()
.also {
println(it)
}
Expand Down
2 changes: 1 addition & 1 deletion test/src/test/kotlin/graphql/nadel/tests/EngineTests.kt
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ private suspend fun execute(
)
}
.build(),
)
).await()

if (fixture.exception != null) {
fail("Expected exception did not occur")
Expand Down

0 comments on commit da58091

Please sign in to comment.