Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SSEEdgeTransport #178

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
- [Running Your Server](#running-your-server)
- [stdio](#stdio)
- [HTTP with SSE](#http-with-sse)
- [HTTP on Edge](#http-on-edge)
- [Cloudflare Worker](#cloudflare-worker-with-durable-object)
- [Testing and Debugging](#testing-and-debugging)
- [Examples](#examples)
- [Echo Server](#echo-server)
Expand Down Expand Up @@ -239,6 +241,97 @@ app.post("/messages", async (req, res) => {
app.listen(3001);
```

### HTTP on Edge

For Edge based environments (Vercel, Cloudflare Workers) you can use the SSEEdgeTransport
which returns a SSE response and accepts normal HTTP requests.

For example using Hono inside of a Cloudflare Worker:

```typescript
import { Hono } from 'hono';
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { SSEEdgeTransport } from "@modelcontextprotocol/sdk/server/sseEdge.js";

const server = new McpServer({
name: "example-server",
version: "1.0.0"
});

// ... set up server resources, tools, and prompts ...

const app = new Hono();
app.get('/sse', async (c) => {
const uuid = crypto.randomUUID();
const transport = new SSEEdgeTransport('/messages', uuid);
return transport.sseResponse
});

app.post('/messages', async (c) => {
// Note: to support multiple simultaneous connections, these messages will
// need to be routed to a specific matching transport. (This logic isn't
// implemented here, for simplicity.)
await transport.handlePostMessage(req, res);
})

export default {
fetch: app.fetch,
} satisfies ExportedHandler;
```

### Cloudflare Worker with Durable Object

This makes it possible to keep track of MCP Servers with Durable Objects relatively easily.

```ts
import { McpServer, ResourceTemplate } from '@modelcontextprotocol/sdk/server/mcp.js';
import { DurableObject } from 'cloudflare:workers';
import { SSEEdgeTransport } from "@modelcontextprotocol/sdk/server/sseEdge.js";
import { Hono } from 'hono';

export class McpObject extends DurableObject {
private transport?: SSEEdgeTransport;
private server: McpServer;

constructor(ctx: DurableObjectState, env: any) {
super(ctx, env);
this.server = createMcpServer();
}

override async fetch(request: Request) {
const url = new URL(request.url);
// Create the transport if it doesn't exist
if (!this.transport) {
this.transport = new SSEEdgeTransport('/message', this.ctx.id.toString());
}

if (request.method === 'GET' && url.pathname.endsWith('/sse')) {
await this.server.connect(this.transport);
return this.transport.sseResponse;
}

if (request.method === 'POST' && url.pathname.endsWith('/message')) {
return this.transport.handlePostMessage(request);
}

return new Response('Not found', { status: 404 });
}
}

//---------- Define worker
const app = new Hono();
app.all('*', async (c) => {
const sessionId = c.req.query('sessionId');
const object = c.env.MCP_OBJECT.get(
sessionId ? c.env.MCP_OBJECT.idFromString(sessionId) : c.env.MCP_OBJECT.newUniqueId(),
);
return object.fetch(c.req.raw);
});

export default {
fetch: app.fetch,
} satisfies ExportedHandler;

### Testing and Debugging

To test your server, you can use the [MCP Inspector](https://github.com/modelcontextprotocol/inspector). See its README for more information.
Expand Down
133 changes: 133 additions & 0 deletions src/server/sseEdge.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import { Transport } from '../shared/transport.js';
import { JSONRPCMessage, JSONRPCMessageSchema } from '../types.js';

const MAXIMUM_MESSAGE_SIZE = 4 * 1024 * 1024; // 4MB

/**
* This transport is compatible with Cloudflare Workers and other edge environments
*/
export class SSEEdgeTransport implements Transport {
private controller: ReadableStreamDefaultController<Uint8Array> | null = null;
readonly stream: ReadableStream<Uint8Array>;
private closed = false;

onclose?: () => void;
onerror?: (error: Error) => void;
onmessage?: (message: JSONRPCMessage) => void;

/**
* Creates a new EdgeSSETransport, which will direct the MPC client to POST messages to messageUrl
*/
constructor(
private messageUrl: string,
readonly sessionId: string,
) {
// Create a readable stream for SSE
this.stream = new ReadableStream({
start: (controller) => {
this.controller = controller;
},
cancel: () => {
this.closed = true;
this.onclose?.();
},
});
}

async start(): Promise<void> {
if (this.closed) {
throw new Error(
'SSE transport already closed! If using Server class, note that connect() calls start() automatically.',
);
}

// Make sure the controller exists
if (!this.controller) {
throw new Error('Stream controller not initialized');
}

// Send the endpoint event
const endpointMessage = `event: endpoint\ndata: ${encodeURI(this.messageUrl)}?sessionId=${this.sessionId}\n\n`;
this.controller.enqueue(new TextEncoder().encode(endpointMessage));
}

get sseResponse(): Response {
// Ensure the stream is properly initialized
if (!this.stream) {
throw new Error('Stream not initialized');
}

// Return a response with the SSE stream
return new Response(this.stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
},
});
}

/**
* Handles incoming Requests
*/
async handlePostMessage(req: Request): Promise<Response> {
if (this.closed || !this.controller) {
const message = 'SSE connection not established';
return new Response(message, { status: 500 });
}

try {
const contentType = req.headers.get('content-type') || '';
if (!contentType.includes('application/json')) {
throw new Error(`Unsupported content-type: ${contentType}`);
}

// Check if the request body is too large
const contentLength = parseInt(req.headers.get('content-length') || '0', 10);
if (contentLength > MAXIMUM_MESSAGE_SIZE) {
throw new Error(`Request body too large: ${contentLength} bytes`);
}

// Clone the request before reading the body to avoid stream issues
const body = await req.json();
await this.handleMessage(body);
return new Response('Accepted', { status: 202 });
} catch (error) {
this.onerror?.(error as Error);
return new Response(String(error), { status: 400 });
}
}

/**
* Handle a client message, regardless of how it arrived. This can be used to inform the server of messages that arrive via a means different than HTTP POST.
*/
async handleMessage(message: unknown): Promise<void> {
let parsedMessage: JSONRPCMessage;
try {
parsedMessage = JSONRPCMessageSchema.parse(message);
} catch (error) {
this.onerror?.(error as Error);
throw error;
}

this.onmessage?.(parsedMessage);
}

async close(): Promise<void> {
if (!this.closed && this.controller) {
this.controller.close();
this.stream.cancel();
this.closed = true;
this.onclose?.();
}
}

async send(message: JSONRPCMessage): Promise<void> {
if (this.closed || !this.controller) {
throw new Error('Not connected');
}

const messageText = `event: message\ndata: ${JSON.stringify(message)}\n\n`;
this.controller.enqueue(new TextEncoder().encode(messageText));
}
}