Skip to content

Commit

Permalink
feat(server, openapi)!: event-source support (#147)
Browse files Browse the repository at this point in the history
* serialize/deserialize

* serialize/deserialize

* improve mapEventSourceIterator

* retry / last-event-id

* sync

* validation schema

* openapi specs

* fix

* event-source state

* update server-standard

* improve naming

* docs

* improve options
  • Loading branch information
unnoq authored Feb 16, 2025
1 parent 9125edb commit 3f40711
Show file tree
Hide file tree
Showing 49 changed files with 2,116 additions and 239 deletions.
6 changes: 3 additions & 3 deletions apps/content/content/docs/client/vanilla.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ type ClientContext = { cache?: RequestCache }
const rpcLink = new RPCLink<ClientContext>({
url: 'http://localhost:3000/rpc',
// headers: provide additional headers
fetch: (input, init, context) => globalThis.fetch(input, {
fetch: (url, init, { context }) => globalThis.fetch(url, {
...init,
cache: context?.cache,
}),
method: (path, input, context) => {
method: ({ context }, path, input) => {
// if input contain file, and you return GET, oRPC will change it to POST automatically

if (context?.cache) {
Expand Down Expand Up @@ -98,7 +98,7 @@ const rpcLink2 = new RPCLink({
// headers: provide additional headers
})

const dynamicLink = new DynamicLink((path, input, context) => { // can be async
const dynamicLink = new DynamicLink(({ context }, path, input) => { // can be async
if (path.includes('post')) {
return rpcLink1
}
Expand Down
73 changes: 73 additions & 0 deletions apps/content/content/docs/server/event-iterator.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
---
title: Event Iterator
description: Server-Sent Events (SSE) with out-of-the-box support in oRPC
---

The event iterator in oRPC provides built-in support for streaming responses using Server-Sent Events (SSE) without any extra configuration. This makes it easy to build real-time, event-driven APIs.

### Example Usage

The following example demonstrates how to create a streaming endpoint that continuously sends events to the client:

```ts twoslash
import { createORPCClient } from '@orpc/client'
import { RPCLink } from '@orpc/client/fetch'
import { eventIterator, os, withEventMeta } from '@orpc/server'
import { z } from 'zod'

// Define a streaming endpoint using the event iterator
const streaming = os
// Validate the input with Zod
.input(z.object({ prompt: z.string() }))
// Use eventIterator to specify the streaming output (optional, but recommended)
.output(eventIterator(z.object({ message: z.string() })))
.handler(async function* ({ input, lastEventId }) {
// The lastEventId (if provided) can be used to resume streaming on reconnects
while (true) {
// withEventMeta attaches metadata (e.g., event id, retry interval) to each event
yield withEventMeta(
{ message: 'Hello, world!' },
{ id: 'some-id', retry: 1000 }
)
// Wait for 1 second before sending the next event
await new Promise(resolve => setTimeout(resolve, 1000))
}
})

// Create a router with the streaming endpoint
const router = { streaming }

// Create an ORPC client and configure the SSE behavior
const client = createORPCClient<typeof router>(
new RPCLink({
url: 'http://localhost:3000/rpc',
eventSourceMaxNumberOfRetries: 0, // Set to 0 to disable automatic retries on connection failure,
// Optionally, you can configure:
// - eventSourceRetryDelay: Delay between retry attempts
// - eventSourceRetry: Custom retry behavior
})
)

// Invoke the streaming endpoint with an initial prompt and optional lastEventId
const result = await client.streaming(
{ prompt: 'Hello' },
{ lastEventId: 'you also can pass initial last event id here' }
)

// Process the incoming stream of events using async iteration
for await (const event of result) {
console.log(event)
}

// To close the connection manually, call result.return()
```

### Key Concepts

- **`lastEventId`**:
This parameter represents the ID of the last event received by the client. When reconnecting, the client can send this ID so that the server can resume streaming from where it left off.

- **`withEventMeta`**:
This helper function attaches additional metadata (such as `id` and `retry`) to an event. This metadata is useful for controlling the behavior of SSE, for example:
- **`id`**: A unique identifier for the event. Last event id is used when reconnecting.
- **`retry`**: The suggested reconnection delay (in milliseconds) if the connection is lost.
1 change: 1 addition & 0 deletions apps/content/content/docs/server/meta.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"context",
"contract",
"file-upload",
"event-iterator",
"lazy",
"server-action",
"client",
Expand Down
6 changes: 3 additions & 3 deletions packages/client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@
"dist"
],
"scripts": {
"build": "tsup --clean --sourcemap --entry.index=src/index.ts --entry.fetch=src/adapters/fetch/index.ts --format=esm --onSuccess='tsc -b --noCheck'",
"build": "tsup --onSuccess='tsc -b --noCheck'",
"build:watch": "pnpm run build --watch",
"type:check": "tsc -b"
},
"dependencies": {
"@orpc/contract": "workspace:*",
"@orpc/server": "workspace:*",
"@orpc/server-standard": "^0.0.0",
"@orpc/server-standard-fetch": "^0.0.0",
"@orpc/server-standard": "^0.4.0",
"@orpc/server-standard-fetch": "^0.4.0",
"@orpc/shared": "workspace:*"
},
"devDependencies": {
Expand Down
Loading

0 comments on commit 3f40711

Please sign in to comment.