Skip to content

Commit 5f335a5

Browse files
authored
Handle failed subscription payloads (#7866)
Currently, the error handling for subscription events is not well-defined in the GraphQL spec, but that doesn't mean we shouldn't handle them! The existing behavior is that an error thrown from a subscription's generator will go uncaught and crash the whole server. For a transient failure, it may be preferable for consumers that we simply return an error response and continue waiting for more data from the iterator, in case the producer recovers and resumes producing valid data. However, Node's AsyncGenerator terminates once an error is thrown, even if you manually loop calling `iterator.next()`. This change wraps the iterator consumption in a `try/catch` and closes the subscription when an error is encountered. Propagating the error up to the subscriber will allow them to decide if they need to resubscribe or not, in the case of a transient error.
1 parent 5cf2928 commit 5f335a5

File tree

3 files changed

+114
-30
lines changed

3 files changed

+114
-30
lines changed

.changeset/large-ladybugs-breathe.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@apollo/server": patch
3+
---
4+
5+
Catch errors thrown by subscription generators, and gracefully clean up the subscription instead of crashing.

packages/server/src/__tests__/plugin/subscriptionCallback/index.test.ts

+70
Original file line numberDiff line numberDiff line change
@@ -1102,6 +1102,62 @@ describe('SubscriptionCallbackPlugin', () => {
11021102
`);
11031103
});
11041104

1105+
it('sends a `complete` with errors when a subscription throws an error', async () => {
1106+
const server = await startSubscriptionServer({ logger });
1107+
1108+
mockRouterCheckResponse();
1109+
mockRouterCheckResponse();
1110+
mockRouterCompleteResponse({
1111+
errors: [{ message: "The subscription generator didn't catch this!" }],
1112+
});
1113+
1114+
const result = await server.executeHTTPGraphQLRequest(
1115+
buildHTTPGraphQLRequest({
1116+
body: {
1117+
query: `#graphql
1118+
subscription {
1119+
throwsError
1120+
}
1121+
`,
1122+
extensions: {
1123+
subscription: {
1124+
callbackUrl: 'http://mock-router-url.com',
1125+
subscriptionId: '1234-cats',
1126+
verifier: 'my-verifier-token',
1127+
},
1128+
},
1129+
},
1130+
}),
1131+
);
1132+
expect(result.status).toEqual(200);
1133+
1134+
jest.advanceTimersByTime(5000);
1135+
await server.stop();
1136+
1137+
expect(logger.orderOfOperations).toMatchInlineSnapshot(`
1138+
[
1139+
"SubscriptionCallback[1234-cats]: Received new subscription request",
1140+
"SubscriptionManager[1234-cats]: Sending \`check\` request to router",
1141+
"SubscriptionManager[1234-cats]: \`check\` request successful",
1142+
"SubscriptionCallback[1234-cats]: Starting graphql-js subscription",
1143+
"SubscriptionCallback[1234-cats]: graphql-js subscription successful",
1144+
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
1145+
"SubscriptionManager[1234-cats]: Listening to graphql-js subscription",
1146+
"SubscriptionCallback[1234-cats]: Responding to original subscription request",
1147+
"ERROR: SubscriptionManager[1234-cats]: Generator threw an error, terminating subscription: The subscription generator didn't catch this!",
1148+
"SubscriptionManager[1234-cats]: Sending \`complete\` request to router with errors",
1149+
"SubscriptionManager: Sending \`check\` request to http://mock-router-url.com for ID: 1234-cats",
1150+
"SubscriptionCallback: Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals",
1151+
"SubscriptionManager[1234-cats]: \`complete\` request successful",
1152+
"SubscriptionManager: Terminating subscriptions for ID: 1234-cats",
1153+
"SubscriptionManager: Terminating heartbeat interval for http://mock-router-url.com",
1154+
"SubscriptionManager: Heartbeat received response for ID: 1234-cats",
1155+
"SubscriptionManager: Heartbeat request successful, ID: 1234-cats",
1156+
"SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.",
1157+
]
1158+
`);
1159+
});
1160+
11051161
(process.env.INCREMENTAL_DELIVERY_TESTS_ENABLED ? describe.skip : describe)(
11061162
'error handling',
11071163
() => {
@@ -1979,6 +2035,7 @@ async function startSubscriptionServer(
19792035
type Subscription {
19802036
count: Int
19812037
terminatesSuccessfully: Boolean
2038+
throwsError: Int
19822039
}
19832040
`,
19842041
resolvers: {
@@ -2011,6 +2068,19 @@ async function startSubscriptionServer(
20112068
},
20122069
}),
20132070
},
2071+
throwsError: {
2072+
subscribe: () => ({
2073+
[Symbol.asyncIterator]() {
2074+
return {
2075+
next: () => {
2076+
throw new Error(
2077+
"The subscription generator didn't catch this!",
2078+
);
2079+
},
2080+
};
2081+
},
2082+
}),
2083+
},
20142084
},
20152085
},
20162086
...opts,

packages/server/src/plugin/subscriptionCallback/index.ts

+39-30
Original file line numberDiff line numberDiff line change
@@ -557,39 +557,48 @@ class SubscriptionManager {
557557
cancelled: false,
558558
async startConsumingSubscription() {
559559
self.logger?.debug(`Listening to graphql-js subscription`, id);
560-
for await (const payload of subscription) {
561-
if (this.cancelled) {
562-
self.logger?.debug(
563-
`Subscription already cancelled, ignoring current and future payloads`,
564-
id,
565-
);
566-
// It's already been cancelled - something else has already handled
567-
// sending the `complete` request so we don't want to `break` here
568-
// and send it again after the loop.
569-
return;
570-
}
560+
try {
561+
for await (const payload of subscription) {
562+
if (this.cancelled) {
563+
self.logger?.debug(
564+
`Subscription already cancelled, ignoring current and future payloads`,
565+
id,
566+
);
567+
// It's already been cancelled - something else has already handled
568+
// sending the `complete` request so we don't want to `break` here
569+
// and send it again after the loop.
570+
return;
571+
}
571572

572-
try {
573-
await self.retryFetch({
574-
url: callbackUrl,
575-
action: 'next',
576-
id,
577-
verifier,
578-
payload,
579-
});
580-
} catch (e) {
581-
const originalError = ensureError(e);
582-
self.logger?.error(
583-
`\`next\` request failed, terminating subscription: ${originalError.message}`,
584-
id,
585-
);
586-
self.terminateSubscription(id, callbackUrl);
573+
try {
574+
await self.retryFetch({
575+
url: callbackUrl,
576+
action: 'next',
577+
id,
578+
verifier,
579+
payload,
580+
});
581+
} catch (e) {
582+
const originalError = ensureError(e);
583+
self.logger?.error(
584+
`\`next\` request failed, terminating subscription: ${originalError.message}`,
585+
id,
586+
);
587+
self.terminateSubscription(id, callbackUrl);
588+
}
587589
}
590+
// The subscription ended without errors, send the `complete` request to
591+
// the router
592+
self.logger?.debug(`Subscription completed without errors`, id);
593+
await this.completeSubscription();
594+
} catch (e) {
595+
const error = ensureGraphQLError(e);
596+
self.logger?.error(
597+
`Generator threw an error, terminating subscription: ${error.message}`,
598+
id,
599+
);
600+
this.completeSubscription([error]);
588601
}
589-
// The subscription ended without errors, send the `complete` request to
590-
// the router
591-
self.logger?.debug(`Subscription completed without errors`, id);
592-
await this.completeSubscription();
593602
},
594603
async completeSubscription(errors?: readonly GraphQLError[]) {
595604
if (this.cancelled) return;

0 commit comments

Comments
 (0)