From 867ff426ba46a2f1e07a8673febd8fae4213fd27 Mon Sep 17 00:00:00 2001 From: saltyaom Date: Fri, 8 Nov 2024 21:01:55 +0700 Subject: [PATCH] :tada: feat: update stuff --- example/a.ts | 32 +- package.json | 2 +- src/adapter/bun/index.ts | 290 +++++++++++++++- src/adapter/types.ts | 6 + src/adapter/web-standard/handler.ts | 29 +- src/cookies.ts | 30 +- src/index.ts | 178 +++------- src/types.ts | 12 +- src/ws/bun.ts | 495 ++++++++++++++++++++++++++++ src/ws/index.ts | 258 ++++++++------- src/ws/types.ts | 344 +++++++++++++------ 11 files changed, 1275 insertions(+), 401 deletions(-) create mode 100644 src/ws/bun.ts diff --git a/example/a.ts b/example/a.ts index c360ccc0..0df1781a 100644 --- a/example/a.ts +++ b/example/a.ts @@ -1,15 +1,25 @@ -import { Elysia, file } from '../src' +import { Elysia, t } from '../src' -const app = new Elysia() +new Elysia() + .decorate('a', 'a') + .state('b', 'b') .ws('/', { - message(ws) { - ws.send('hello') - } - }) - .post('/json', ({ body }) => body) - .get('/', () => file('./test/kyuukurarin.mp4')) - .get('/teapot', ({ set }) => { - set.status = 418 - return file('./example/teapot.webp') + parse({ body }) { + if (typeof body === 'number') return { id: body } + }, + resolve: () => ({ + requestId: ~~(Math.random() * 1000000) + }), + message: function* ({ body: { id }, data: { requestId }, send }) { + yield { id } + + send({ id: requestId }, true) + }, + body: t.Object({ + id: t.Number() + }), + response: t.Object({ + id: t.Number() + }) }) .listen(3000) diff --git a/package.json b/package.json index 322d45b5..b603a6b0 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "elysia", "description": "Ergonomic Framework for Human", - "version": "1.2.0-exp.25", + "version": "1.2.0-exp.27", "author": { "name": "saltyAom", "url": "https://github.com/SaltyAom", diff --git a/src/adapter/bun/index.ts b/src/adapter/bun/index.ts index 161abf93..596edd02 100644 --- a/src/adapter/bun/index.ts +++ b/src/adapter/bun/index.ts @@ -1,14 +1,23 @@ +/* eslint-disable sonarjs/no-duplicate-string */ import type { Serve } from 'bun' +import type { TSchema } from '@sinclair/typebox' -import { createNativeStaticHandler } from './handler' - -import { isProduction } from '../../error' -import { hasHeaderShorthand, isNumericString } from '../../utils' -import { websocket } from '../../ws/index' - +import { WebStandardAdapter } from '../web-standard/index' +import { parseSetCookies } from '../web-standard/handler' import type { ElysiaAdapter } from '../types' -import { WebStandardAdapter } from '../web-standard/index' +import { createNativeStaticHandler } from './handler' +import { serializeCookie } from '../../cookies' +import { isProduction, ValidationError } from '../../error' +import { + getSchemaValidator, + hasHeaderShorthand, + isNotEmpty, + isNumericString +} from '../../utils' + +import { ElysiaWebSocket, websocket } from '../../ws/index' +import type { ServerWebSocket } from '../../ws/bun' export const BunAdapter: ElysiaAdapter = { ...WebStandardAdapter, @@ -98,5 +107,272 @@ export const BunAdapter: ElysiaAdapter = { Bun?.gc(false) }) } + }, + ws(app, path, options) { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const { parse, body, response, ...rest } = options + + const parsers = typeof parse === 'function' ? [parse] : parse + + const validateMessage = getSchemaValidator(body, { + // @ts-expect-error private property + models: app.definitions.type as Record, + normalize: app.config.normalize + }) + + const validateResponse = getSchemaValidator(response as any, { + // @ts-expect-error private property + models: app.definitions.type as Record, + normalize: app.config.normalize + }) + + const parseMessage = async (ws: ServerWebSocket, message: any) => { + if (typeof message === 'string') { + const start = message?.charCodeAt(0) + + if (start === 47 || start === 123) + try { + message = JSON.parse(message) + } catch { + // Not empty + } + else if (isNumericString(message)) message = +message + } + + if (parsers) + for (let i = 0; i < parsers.length; i++) { + let temp = parsers[i](ws, message) + if (temp instanceof Promise) temp = await temp + + if (temp !== undefined) return temp + } + + return message + } + + app.route( + '$INTERNALWS', + path as any, + async (context) => { + // ! Enable static code analysis just in case resolveUnknownFunction doesn't work, do not remove + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const { set, path, qi, headers, query, params } = context + + // @ts-expect-error private property + const server = app.getServer() + + // @ts-ignore + context.validator = validateResponse + + if (options.upgrade) { + if (typeof options.upgrade === 'function') { + const temp = options.upgrade(context as any) + if (temp instanceof Promise) await temp + } else if (options.upgrade) + Object.assign( + set.headers, + options.upgrade as Record + ) + } + + if (set.cookie && isNotEmpty(set.cookie)) { + const cookie = serializeCookie(set.cookie) + + if (cookie) set.headers['set-cookie'] = cookie + } + + if ( + set.headers['set-cookie'] && + Array.isArray(set.headers['set-cookie']) + ) + set.headers = parseSetCookies( + new Headers(set.headers as any) as Headers, + set.headers['set-cookie'] + ) as any + + const handleResponse = ( + ws: ServerWebSocket, + data: unknown + ): unknown => { + if (data instanceof Promise) + return data.then((data) => handleResponse(ws, data)) + + if (Buffer.isBuffer(data)) return ws.send(data) + + if (data === undefined) return + + // @ts-ignore Generator function + if (typeof data?.next === 'function') { + // @ts-ignore Generator function + const init = data.next() + if (init instanceof Promise) + return (async () => { + const data = await init + + if (validateResponse?.Check(data) === false) + return ws.send( + new ValidationError( + 'message', + validateResponse, + data + ).message + ) + + // @ts-ignore Generator function + if (typeof data.data === 'object') + // @ts-ignore Generator function + ws.send(JSON.stringify(data.data)) as any + // @ts-ignore Generator function + else ws.send(data.data as any) + + // @ts-ignore Generator function + if (!data.done) + // @ts-ignore + for await (const datum of data) { + if ( + validateResponse?.Check(datum) === + false + ) + return ws.send( + new ValidationError( + 'message', + validateResponse, + datum + ).message + ) + + if (typeof datum === 'object') + return ws.send( + JSON.stringify(datum) + ) as any + + ws.send(datum as any) + } + })() + + if (typeof init.value === 'object') + ws.send(JSON.stringify(init.value)) as any + else ws.send(init.value) + + if (!init.done) + // @ts-ignore + for (const datum of data) { + if (validateResponse?.Check(datum) === false) + return ws.send( + new ValidationError( + 'message', + validateResponse, + datum + ).message + ) + + if (typeof datum === 'object') + return ws.send(JSON.stringify(datum)) as any + + ws.send(datum as any) + } + + return + } + + if (validateResponse?.Check(data) === false) + return ws.send( + new ValidationError( + 'message', + validateResponse, + data + ).message + ) + + if (typeof data === 'object') + return ws.send(JSON.stringify(data)) as any + + if (response !== undefined) return ws.send(response) + } + + if ( + server?.upgrade(context.request, { + headers: isNotEmpty(set.headers) + ? (set.headers as Record) + : undefined, + data: { + ...context, + validator: validateResponse, + ping(data?: unknown) { + options.ping?.(data) + }, + pong(data?: unknown) { + options.pong?.(data) + }, + open(ws: ServerWebSocket) { + handleResponse( + ws, + options.open?.( + new ElysiaWebSocket(ws, context as any) + ) + ) + }, + message: async ( + ws: ServerWebSocket, + _message: any + ) => { + const message = await parseMessage(ws, _message) + + if (validateMessage?.Check(message) === false) + return void ws.send( + new ValidationError( + 'message', + validateMessage, + message + ).message as string + ) + + handleResponse( + ws, + options.message?.( + new ElysiaWebSocket( + ws, + context as any, + message + ), + message as any + ) + ) + }, + drain(ws: ServerWebSocket) { + handleResponse( + ws, + options.drain?.( + new ElysiaWebSocket(ws, context as any) + ) + ) + }, + close( + ws: ServerWebSocket, + code: number, + reason: string + ) { + handleResponse( + ws, + options.close?.( + new ElysiaWebSocket(ws, context as any), + code, + reason + ) + ) + } + } + }) + ) + return + + set.status = 400 + return 'Expected a websocket connection' + }, + { + ...rest, + websocket: options + } as any + ) } } diff --git a/src/adapter/types.ts b/src/adapter/types.ts index 0a3d4481..3d9d658c 100644 --- a/src/adapter/types.ts +++ b/src/adapter/types.ts @@ -3,6 +3,7 @@ import type { AnyElysia, ListenCallback } from '..' import type { Context } from '../context' import type { Prettify, LocalHook } from '../types' import type { Sucrose } from '../sucrose' +import { WSLocalHook } from '../ws/types' export interface ElysiaAdapter { name: string @@ -128,4 +129,9 @@ export interface ElysiaAdapter { validationError: string unknownError: string } + ws?( + app: AnyElysia, + path: string, + handler: WSLocalHook + ): unknown } diff --git a/src/adapter/web-standard/handler.ts b/src/adapter/web-standard/handler.ts index 12918c6c..23f7968b 100644 --- a/src/adapter/web-standard/handler.ts +++ b/src/adapter/web-standard/handler.ts @@ -1,9 +1,8 @@ /* eslint-disable sonarjs/no-nested-switch */ /* eslint-disable sonarjs/no-duplicate-string */ -import { serialize } from 'cookie' import { isNotEmpty, hasHeaderShorthand, StatusMap } from '../../utils' -import { Cookie } from '../../cookies' +import { Cookie, serializeCookie } from '../../cookies' import type { Context } from '../../context' import type { LocalHook } from '../../types' @@ -93,32 +92,6 @@ export const parseSetCookies = (headers: Headers, setCookie: string[]) => { return headers } -export const serializeCookie = (cookies: Context['set']['cookie']) => { - if (!cookies || !isNotEmpty(cookies)) return undefined - - const set: string[] = [] - - for (const [key, property] of Object.entries(cookies)) { - if (!key || !property) continue - - const value = property.value - if (value === undefined || value === null) continue - - set.push( - serialize( - key, - typeof value === 'object' ? JSON.stringify(value) : value + '', - property - ) - ) - } - - if (set.length === 0) return undefined - if (set.length === 1) return set[0] - - return set -} - const handleStream = async ( generator: Generator | AsyncGenerator, set?: Context['set'], diff --git a/src/cookies.ts b/src/cookies.ts index 18275cfd..befb08a7 100644 --- a/src/cookies.ts +++ b/src/cookies.ts @@ -1,9 +1,9 @@ -import { parse } from 'cookie' +import { parse, serialize } from 'cookie' // @ts-ignore import decodeURIComponent from 'fast-decode-uri-component' -import { unsignCookie } from './utils' +import { isNotEmpty, unsignCookie } from './utils' import { InvalidCookieSignature } from './error' import type { Context } from './context' @@ -359,3 +359,29 @@ export const parseCookie = async ( return createCookieJar(set, jar, initial) } + +export const serializeCookie = (cookies: Context['set']['cookie']) => { + if (!cookies || !isNotEmpty(cookies)) return undefined + + const set: string[] = [] + + for (const [key, property] of Object.entries(cookies)) { + if (!key || !property) continue + + const value = property.value + if (value === undefined || value === null) continue + + set.push( + serialize( + key, + typeof value === 'object' ? JSON.stringify(value) : value + '', + property + ) + ) + } + + if (set.length === 0) return undefined + if (set.length === 1) return set[0] + + return set +} diff --git a/src/index.ts b/src/index.ts index d9882360..e7d77067 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,4 +1,4 @@ -import type { Serve, Server, ServerWebSocket } from 'bun' +import type { Serve, Server } from 'bun' import { Memoirist } from 'memoirist' import { type TObject, type Static, type TSchema } from '@sinclair/typebox' @@ -8,8 +8,7 @@ import type { Context } from './context' import { t, TypeCheck } from './type-system' import { sucrose, type Sucrose } from './sucrose' -import { ElysiaWS } from './ws/index' -import type { WS } from './ws/types' +import type { WSLocalHook } from './ws/types' import { BunAdapter } from './adapter/bun/index' import { WebStandardAdapter } from './adapter/web-standard/index' @@ -78,6 +77,7 @@ import type { ComposedHandler, InputSchema, LocalHook, + AnyLocalHook, MergeSchema, RouteSchema, UnwrapRoute, @@ -373,9 +373,7 @@ export default class Elysia< return this } - private applyMacro( - localHook: LocalHook - ) { + private applyMacro(localHook: AnyLocalHook) { if (this.extender.macros.length) { const manage = createMacroManager({ globalHook: this.event, @@ -452,7 +450,7 @@ export default class Elysia< method: HTTPMethod, path: string, handle: Handler | any, - localHook?: LocalHook, + localHook?: AnyLocalHook, { allowMeta = false, skipPrefix = false } = { allowMeta: false as boolean | undefined, skipPrefix: false as boolean | undefined @@ -751,13 +749,16 @@ export default class Elysia< ctx ) + const isWebSocket = method === '$INTERNALWS' + this.router.history.push({ method, path, composed: mainHandler, handler: handle, hooks: hooks as any, - compile: () => compile() + compile: () => compile(), + websocket: localHook.websocket as any }) const staticRouter = this.router.static.http @@ -767,7 +768,7 @@ export default class Elysia< compile } - if (method === '$INTERNALWS') { + if (isWebSocket) { const loose = getLoosePath(path) if (path.indexOf(':') === -1 && path.indexOf('*') === -1) { @@ -2635,9 +2636,7 @@ export default class Elysia< */ group( prefix: string, - schemaOrRun: - | LocalHook - | ((group: AnyElysia) => AnyElysia), + schemaOrRun: AnyLocalHook | ((group: AnyElysia) => AnyElysia), run?: (group: AnyElysia) => AnyElysia ): AnyElysia { const instance = new Elysia({ @@ -3031,7 +3030,7 @@ export default class Elysia< */ guard( hook: - | (LocalHook & { + | (AnyLocalHook & { as: LifeCycleType }) | ((group: AnyElysia) => AnyElysia), @@ -3494,12 +3493,9 @@ export default class Elysia< method, path, handler, - mergeHook( - hooks as LocalHook, - { - error: plugin.event.error - } - ) + mergeHook(hooks as AnyLocalHook, { + error: plugin.event.error + }) ) } @@ -4753,19 +4749,31 @@ export default class Elysia< >, const Schema extends MergeSchema< UnwrapRoute, - Metadata['schema'] + MergeSchema< + Volatile['schema'], + MergeSchema + > + >, + const Macro extends Metadata['macro'], + const Resolutions extends MaybeArray< + ResolveHandler< + Schema, + { + decorator: Singleton['decorator'] + store: Singleton['store'] + derive: Ephemeral['derive'] & Volatile['derive'] + resolve: Ephemeral['resolve'] & Volatile['resolve'] + } + > > >( path: Path, - options: WS.LocalHook< + options: WSLocalHook< LocalSchema, Schema, - Singleton & { - derive: Ephemeral['derive'] & Volatile['derive'] - resolve: Ephemeral['resolve'] & Volatile['resolve'] - }, - Definitions['error'], - Metadata['macro'], + Singleton, + Macro, + Resolutions, JoinPath > ): Elysia< @@ -4795,118 +4803,8 @@ export default class Elysia< Ephemeral, Volatile > { - const transform = options.transformMessage - ? Array.isArray(options.transformMessage) - ? options.transformMessage - : [options.transformMessage] - : undefined - - let server: Server | null = null - - const validateMessage = getSchemaValidator(options?.body, { - models: this.definitions.type as Record, - normalize: this.config.normalize - }) - - const validateResponse = getSchemaValidator(options?.response as any, { - models: this.definitions.type as Record, - normalize: this.config.normalize - }) - - const parseMessage = (message: any) => { - if (typeof message === 'string') { - const start = message?.charCodeAt(0) - - if (start === 47 || start === 123) - try { - message = JSON.parse(message) - } catch { - // Not empty - } - else if (isNumericString(message)) message = +message - } - - if (transform?.length) - for (let i = 0; i < transform.length; i++) { - const temp = transform[i](message) - - if (temp !== undefined) message = temp - } - - return message - } - - this.route( - '$INTERNALWS', - path as any, - // @ts-expect-error - (context) => { - // ! Enable static code analysis just in case resolveUnknownFunction doesn't work, do not remove - // eslint-disable-next-line @typescript-eslint/no-unused-vars - const { set, path, qi, headers, query, params } = context - - if (server === null) server = this.getServer() - - if ( - server?.upgrade(context.request, { - headers: (typeof options.upgrade === 'function' - ? options.upgrade(context as any as Context) - : options.upgrade) as Bun.HeadersInit, - data: { - validator: validateResponse, - open(ws: ServerWebSocket) { - options.open?.(new ElysiaWS(ws, context as any)) - }, - message: (ws: ServerWebSocket, msg: any) => { - const message = parseMessage(msg) - - if (validateMessage?.Check(message) === false) - return void ws.send( - new ValidationError( - 'message', - validateMessage, - message - ).message as string - ) - - options.message?.( - new ElysiaWS(ws, context as any), - message as any - ) - }, - drain(ws: ServerWebSocket) { - options.drain?.( - new ElysiaWS(ws, context as any) - ) - }, - close( - ws: ServerWebSocket, - code: number, - reason: string - ) { - options.close?.( - new ElysiaWS(ws, context as any), - code, - reason - ) - } - } - }) - ) - return - - set.status = 400 - - return 'Expected a websocket connection' - }, - { - beforeHandle: options.beforeHandle, - transform: options.transform, - headers: options.headers, - params: options.params, - query: options.query - } as any - ) + if (this['~adapter'].ws) this['~adapter'].ws(this, path, options as any) + else console.warn(`Current adapter doesn't support WebSocket`) return this as any } @@ -6093,7 +5991,7 @@ export default class Elysia< export { Elysia } export { t } from './type-system' -export { Cookie, type CookieOptions } from './cookies' +export { serializeCookie, Cookie, type CookieOptions } from './cookies' export type { Context, PreContext, ErrorContext } from './context' export { ELYSIA_TRACE, diff --git a/src/types.ts b/src/types.ts index d3dba004..fd2c8093 100644 --- a/src/types.ts +++ b/src/types.ts @@ -30,6 +30,7 @@ import type { import type { ComposerGeneralHandlerOptions } from './compose' import type { ElysiaAdapter } from './adapter' +import type { WSLocalHook } from './ws/types' type PartialServe = Partial @@ -38,7 +39,7 @@ export type IsNever = [T] extends [never] ? true : false export type ElysiaConfig = { /** * @default Bun Adapter - */ + */ adapter?: ElysiaAdapter /** * Path prefix of the instance @@ -1029,6 +1030,8 @@ export type ResolveResolutionsArray< : ResolveResolutionsArray : Prettify +export type AnyLocalHook = LocalHook + export type LocalHook< LocalSchema extends InputSchema, Schema extends RouteSchema, @@ -1145,6 +1148,7 @@ export type LocalHook< */ error?: MaybeArray> tags?: DocumentDecoration['tags'] + websocket?: WSLocalHook } export type ComposedHandler = (context: Context) => MaybePromise @@ -1156,6 +1160,7 @@ export interface InternalRoute { handler: Handler compile(): Function hooks: LocalHook + websocket?: WSLocalHook } export type SchemaValidator = { @@ -1429,10 +1434,7 @@ export type ComposeElysiaResponse = Handle extends ( type _ComposeElysiaResponse = Prettify< Prettify< { - 200: Exclude< - Handle, - ElysiaCustomStatusResponse - > + 200: Exclude> } & ExtractErrorFromHandle & ({} extends Response ? {} : Omit) > diff --git a/src/ws/bun.ts b/src/ws/bun.ts new file mode 100644 index 00000000..2885c03a --- /dev/null +++ b/src/ws/bun.ts @@ -0,0 +1,495 @@ +// ? Copy from bun.d.ts for universal runtime support + +type TypedArray = + | Uint8Array + | Uint8ClampedArray + | Uint16Array + | Uint32Array + | Int8Array + | Int16Array + | Int32Array + | BigUint64Array + | BigInt64Array + | Float32Array + | Float64Array + +export type BufferSource = TypedArray | DataView | ArrayBufferLike + +/** + * A status that represents the outcome of a sent message. + * + * - if **0**, the message was **dropped**. + * - if **-1**, there is **backpressure** of messages. + * - if **>0**, it represents the **number of bytes sent**. + * + * @example + * ```js + * const status = ws.send("Hello!"); + * if (status === 0) { + * console.log("Message was dropped"); + * } else if (status === -1) { + * console.log("Backpressure was applied"); + * } else { + * console.log(`Success! Sent ${status} bytes`); + * } + * ``` + */ +export type ServerWebSocketSendStatus = number + +/** + * A state that represents if a WebSocket is connected. + * + * - `WebSocket.CONNECTING` is `0`, the connection is pending. + * - `WebSocket.OPEN` is `1`, the connection is established and `send()` is possible. + * - `WebSocket.CLOSING` is `2`, the connection is closing. + * - `WebSocket.CLOSED` is `3`, the connection is closed or couldn't be opened. + * + * @link https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/readyState + */ +export type WebSocketReadyState = 0 | 1 | 2 | 3 + +/** + * A fast WebSocket designed for servers. + * + * Features: + * - **Message compression** - Messages can be compressed + * - **Backpressure** - If the client is not ready to receive data, the server will tell you. + * - **Dropped messages** - If the client cannot receive data, the server will tell you. + * - **Topics** - Messages can be {@link ServerWebSocket.publish}ed to a specific topic and the client can {@link ServerWebSocket.subscribe} to topics + * + * This is slightly different than the browser {@link WebSocket} which Bun supports for clients. + * + * Powered by [uWebSockets](https://github.com/uNetworking/uWebSockets). + * + * @example + * import { serve } from "bun"; + * + * serve({ + * websocket: { + * open(ws) { + * console.log("Connected", ws.remoteAddress); + * }, + * message(ws, data) { + * console.log("Received", data); + * ws.send(data); + * }, + * close(ws, code, reason) { + * console.log("Disconnected", code, reason); + * }, + * } + * }); + */ +export interface ServerWebSocket { + /** + * Sends a message to the client. + * + * @param data The data to send. + * @param compress Should the data be compressed? If the client does not support compression, this is ignored. + * @example + * ws.send("Hello!"); + * ws.send("Compress this.", true); + * ws.send(new Uint8Array([1, 2, 3, 4])); + */ + send( + data: string | BufferSource, + compress?: boolean + ): ServerWebSocketSendStatus + + /** + * Sends a text message to the client. + * + * @param data The data to send. + * @param compress Should the data be compressed? If the client does not support compression, this is ignored. + * @example + * ws.send("Hello!"); + * ws.send("Compress this.", true); + */ + sendText(data: string, compress?: boolean): ServerWebSocketSendStatus + + /** + * Sends a binary message to the client. + * + * @param data The data to send. + * @param compress Should the data be compressed? If the client does not support compression, this is ignored. + * @example + * ws.send(new TextEncoder().encode("Hello!")); + * ws.send(new Uint8Array([1, 2, 3, 4]), true); + */ + sendBinary( + data: BufferSource, + compress?: boolean + ): ServerWebSocketSendStatus + + /** + * Closes the connection. + * + * Here is a list of close codes: + * - `1000` means "normal closure" **(default)** + * - `1009` means a message was too big and was rejected + * - `1011` means the server encountered an error + * - `1012` means the server is restarting + * - `1013` means the server is too busy or the client is rate-limited + * - `4000` through `4999` are reserved for applications (you can use it!) + * + * To close the connection abruptly, use `terminate()`. + * + * @param code The close code to send + * @param reason The close reason to send + */ + close(code?: number, reason?: string): void + + /** + * Abruptly close the connection. + * + * To gracefully close the connection, use `close()`. + */ + terminate(): void + + /** + * Sends a ping. + * + * @param data The data to send + */ + ping(data?: string | BufferSource): ServerWebSocketSendStatus + + /** + * Sends a pong. + * + * @param data The data to send + */ + pong(data?: string | BufferSource): ServerWebSocketSendStatus + + /** + * Sends a message to subscribers of the topic. + * + * @param topic The topic name. + * @param data The data to send. + * @param compress Should the data be compressed? If the client does not support compression, this is ignored. + * @example + * ws.publish("chat", "Hello!"); + * ws.publish("chat", "Compress this.", true); + * ws.publish("chat", new Uint8Array([1, 2, 3, 4])); + */ + publish( + topic: string, + data: string | BufferSource, + compress?: boolean + ): ServerWebSocketSendStatus + + /** + * Sends a text message to subscribers of the topic. + * + * @param topic The topic name. + * @param data The data to send. + * @param compress Should the data be compressed? If the client does not support compression, this is ignored. + * @example + * ws.publish("chat", "Hello!"); + * ws.publish("chat", "Compress this.", true); + */ + publishText( + topic: string, + data: string, + compress?: boolean + ): ServerWebSocketSendStatus + + /** + * Sends a binary message to subscribers of the topic. + * + * @param topic The topic name. + * @param data The data to send. + * @param compress Should the data be compressed? If the client does not support compression, this is ignored. + * @example + * ws.publish("chat", new TextEncoder().encode("Hello!")); + * ws.publish("chat", new Uint8Array([1, 2, 3, 4]), true); + */ + publishBinary( + topic: string, + data: BufferSource, + compress?: boolean + ): ServerWebSocketSendStatus + + /** + * Subscribes a client to the topic. + * + * @param topic The topic name. + * @example + * ws.subscribe("chat"); + */ + subscribe(topic: string): void + + /** + * Unsubscribes a client to the topic. + * + * @param topic The topic name. + * @example + * ws.unsubscribe("chat"); + */ + unsubscribe(topic: string): void + + /** + * Is the client subscribed to a topic? + * + * @param topic The topic name. + * @example + * ws.subscribe("chat"); + * console.log(ws.isSubscribed("chat")); // true + */ + isSubscribed(topic: string): boolean + + /** + * Batches `send()` and `publish()` operations, which makes it faster to send data. + * + * The `message`, `open`, and `drain` callbacks are automatically corked, so + * you only need to call this if you are sending messages outside of those + * callbacks or in async functions. + * + * @param callback The callback to run. + * @example + * ws.cork((ctx) => { + * ctx.send("These messages"); + * ctx.sendText("are sent"); + * ctx.sendBinary(new TextEncoder().encode("together!")); + * }); + */ + cork(callback: (ws: ServerWebSocket) => T): T + + /** + * The IP address of the client. + * + * @example + * console.log(socket.remoteAddress); // "127.0.0.1" + */ + readonly remoteAddress: string + + /** + * The ready state of the client. + * + * - if `0`, the client is connecting. + * - if `1`, the client is connected. + * - if `2`, the client is closing. + * - if `3`, the client is closed. + * + * @example + * console.log(socket.readyState); // 1 + */ + readonly readyState: WebSocketReadyState + + /** + * Sets how binary data is returned in events. + * + * - if `nodebuffer`, binary data is returned as `Buffer` objects. **(default)** + * - if `arraybuffer`, binary data is returned as `ArrayBuffer` objects. + * - if `uint8array`, binary data is returned as `Uint8Array` objects. + * + * @example + * let ws: WebSocket; + * ws.binaryType = "uint8array"; + * ws.addEventListener("message", ({ data }) => { + * console.log(data instanceof Uint8Array); // true + * }); + */ + binaryType?: 'nodebuffer' | 'arraybuffer' | 'uint8array' + + /** + * Custom data that you can assign to a client, can be read and written at any time. + * + * @example + * import { serve } from "bun"; + * + * serve({ + * fetch(request, server) { + * const data = { + * accessToken: request.headers.get("Authorization"), + * }; + * if (server.upgrade(request, { data })) { + * return; + * } + * return new Response(); + * }, + * websocket: { + * open(ws) { + * console.log(ws.data.accessToken); + * } + * } + * }); + */ + data: T +} + +/** + * Compression options for WebSocket messages. + */ +export type WebSocketCompressor = + | 'disable' + | 'shared' + | 'dedicated' + | '3KB' + | '4KB' + | '8KB' + | '16KB' + | '32KB' + | '64KB' + | '128KB' + | '256KB' + +/** + * Create a server-side {@link ServerWebSocket} handler for use with {@link Bun.serve} + * + * @example + * ```ts + * import { websocket, serve } from "bun"; + * + * serve<{name: string}>({ + * port: 3000, + * websocket: { + * open: (ws) => { + * console.log("Client connected"); + * }, + * message: (ws, message) => { + * console.log(`${ws.data.name}: ${message}`); + * }, + * close: (ws) => { + * console.log("Client disconnected"); + * }, + * }, + * + * fetch(req, server) { + * const url = new URL(req.url); + * if (url.pathname === "/chat") { + * const upgraded = server.upgrade(req, { + * data: { + * name: new URL(req.url).searchParams.get("name"), + * }, + * }); + * if (!upgraded) { + * return new Response("Upgrade failed", { status: 400 }); + * } + * return; + * } + * return new Response("Hello World"); + * }, + * }); + * ``` + */ +export interface WebSocketHandler { + /** + * Called when the server receives an incoming message. + * + * If the message is not a `string`, its type is based on the value of `binaryType`. + * - if `nodebuffer`, then the message is a `Buffer`. + * - if `arraybuffer`, then the message is an `ArrayBuffer`. + * - if `uint8array`, then the message is a `Uint8Array`. + * + * @param ws The websocket that sent the message + * @param message The message received + */ + message( + ws: ServerWebSocket, + message: string | Buffer + ): void | Promise + + /** + * Called when a connection is opened. + * + * @param ws The websocket that was opened + */ + open?(ws: ServerWebSocket): void | Promise + + /** + * Called when a connection was previously under backpressure, + * meaning it had too many queued messages, but is now ready to receive more data. + * + * @param ws The websocket that is ready for more data + */ + drain?(ws: ServerWebSocket): void | Promise + + /** + * Called when a connection is closed. + * + * @param ws The websocket that was closed + * @param code The close code + * @param message The close message + */ + close?( + ws: ServerWebSocket, + code: number, + reason: string + ): void | Promise + + /** + * Called when a ping is sent. + * + * @param ws The websocket that received the ping + * @param data The data sent with the ping + */ + ping?(ws: ServerWebSocket, data: Buffer): void | Promise + + /** + * Called when a pong is received. + * + * @param ws The websocket that received the ping + * @param data The data sent with the ping + */ + pong?(ws: ServerWebSocket, data: Buffer): void | Promise + + /** + * Sets the maximum size of messages in bytes. + * + * Default is 16 MB, or `1024 * 1024 * 16` in bytes. + */ + maxPayloadLength?: number + + /** + * Sets the maximum number of bytes that can be buffered on a single connection. + * + * Default is 16 MB, or `1024 * 1024 * 16` in bytes. + */ + backpressureLimit?: number + + /** + * Sets if the connection should be closed if `backpressureLimit` is reached. + * + * Default is `false`. + */ + closeOnBackpressureLimit?: boolean + + /** + * Sets the the number of seconds to wait before timing out a connection + * due to no messages or pings. + * + * Default is 2 minutes, or `120` in seconds. + */ + idleTimeout?: number + + /** + * Should `ws.publish()` also send a message to `ws` (itself), if it is subscribed? + * + * Default is `false`. + */ + publishToSelf?: boolean + + /** + * Should the server automatically send and respond to pings to clients? + * + * Default is `true`. + */ + sendPings?: boolean + + /** + * Sets the compression level for messages, for clients that supports it. By default, compression is disabled. + * + * Default is `false`. + */ + perMessageDeflate?: + | boolean + | { + /** + * Sets the compression level. + */ + compress?: WebSocketCompressor | boolean + /** + * Sets the decompression level. + */ + decompress?: WebSocketCompressor | boolean + } +} diff --git a/src/ws/index.ts b/src/ws/index.ts index 0e29b788..57f24261 100644 --- a/src/ws/index.ts +++ b/src/ws/index.ts @@ -1,13 +1,18 @@ -import type { ServerWebSocket, WebSocketHandler } from 'bun' +import { randomId } from '../utils' + +import type { + ServerWebSocket, + ServerWebSocketSendStatus, + BufferSource, + WebSocketHandler +} from './bun' import type { TSchema } from '@sinclair/typebox' -import type { TypeCheck } from '@sinclair/typebox/compiler' +import type { TypeCheck } from '../type-system' +import type { Prettify, RouteSchema } from '../types' import { ValidationError } from '../error' -import type { Context } from '../context' - -import type { SingletonBase, RouteSchema, Prettify } from '../types' -import { randomId } from '../utils' +import { FlattenResponse } from './types' export const websocket: WebSocketHandler = { open(ws) { @@ -24,129 +29,162 @@ export const websocket: WebSocketHandler = { } } -export class ElysiaWS< - WS extends ServerWebSocket<{ - id?: string - validator?: TypeCheck - }>, - Route extends RouteSchema = RouteSchema, - Singleton extends SingletonBase = { - decorator: {} - store: {} - derive: {} - resolve: {} +type ElysiaServerWebSocket = Omit< + ServerWebSocket, + 'send' | 'ping' | 'pong' | 'publish' +> + +export class ElysiaWebSocket + implements ElysiaServerWebSocket +{ + validator?: TypeCheck; + ['~types']?: { + validator: Prettify } -> { - validator?: TypeCheck - _validator?: Prettify + + id: string constructor( - public raw: WS, - public data: Context + public raw: ServerWebSocket<{ + id?: string + validator?: TypeCheck + }>, + public data: Context, + public body: Route['body'] = undefined ) { - this.validator = raw.data.validator - if (raw.data.id) { - this.id = raw.data.id - } else { - this.id = randomId().toString() - } - } - - get id() { - return this.raw.data.id! - } - - set id(newID: string) { - this.raw.data.id = newID - } - - get publish() { - return ( - topic: string, - data: {} extends Route['response'] - ? unknown - : Route['response'] extends Record<200, unknown> - ? Route['response'][200] - : unknown = undefined, - compress?: boolean - ) => { - if (this.validator?.Check(data) === false) - throw new ValidationError('message', this.validator, data) - - if (typeof data === 'object') data = JSON.stringify(data) - - this.raw.publish(topic, data as unknown as string, compress) - - return this - } + this.validator = raw.data?.validator + + if (raw.data.id) this.id = raw.data.id + else this.id = randomId().toString() + + this.sendText = raw.sendText.bind(this) + this.sendBinary = raw.sendBinary.bind(this) + this.close = raw.close.bind(this) + this.terminate = raw.terminate.bind(this) + this.publishText = raw.publishText.bind(this) + this.publishBinary = raw.publishBinary.bind(this) + this.subscribe = raw.subscribe.bind(this) + this.unsubscribe = raw.unsubscribe.bind(this) + this.isSubscribed = raw.isSubscribed.bind(this) + this.cork = raw.cork.bind(this) + this.remoteAddress = raw.remoteAddress + this.binaryType = raw.binaryType + this.data = raw.data as any + + this.send = this.send.bind(this) + this.ping = this.ping.bind(this) + this.pong = this.pong.bind(this) + this.publish = this.publish.bind(this) } - get send() { - return ( - data: {} extends Route['response'] - ? unknown - : Route['response'] extends Record<200, unknown> - ? Route['response'][200] - : unknown - ) => { - if (this.validator?.Check(data) === false) - throw new ValidationError('message', this.validator, data) - - if (Buffer.isBuffer(data)) { - this.raw.send(data as unknown as Buffer) - - return this - } - - if (typeof data === 'object') data = JSON.stringify(data) - - this.raw.send(data as unknown as string) - - return this - } + /** + * Sends a message to the client. + * + * @param data The data to send. + * @param compress Should the data be compressed? If the client does not support compression, this is ignored. + * @example + * ws.send("Hello!"); + * ws.send("Compress this.", true); + * ws.send(new Uint8Array([1, 2, 3, 4])); + */ + send( + data: FlattenResponse | BufferSource, + compress?: boolean + ): ServerWebSocketSendStatus { + if (Buffer.isBuffer(data)) + return this.raw.send(data as unknown as BufferSource, compress) + + if (this.validator?.Check(data) === false) + throw new ValidationError('message', this.validator, data) + + if (typeof data === 'object') data = JSON.stringify(data) as any + + return this.raw.send(data as unknown as string, compress) } - get subscribe() { - return (room: string) => { - this.raw.subscribe(room) + /** + * Sends a ping. + * + * @param data The data to send + */ + ping( + data?: FlattenResponse | BufferSource + ): ServerWebSocketSendStatus { + if (Buffer.isBuffer(data)) + return this.raw.ping(data as unknown as BufferSource) - return this - } - } + if (this.validator?.Check(data) === false) + throw new ValidationError('message', this.validator, data) - get unsubscribe() { - return (room: string) => { - this.raw.unsubscribe(room) + if (typeof data === 'object') data = JSON.stringify(data) as any - return this - } + return this.raw.ping(data as string) } - get cork() { - return (callback: () => this) => { - this.raw.cork(callback as any) + /** + * Sends a pong. + * + * @param data The data to send + */ + pong( + data?: FlattenResponse | BufferSource + ): ServerWebSocketSendStatus { + if (Buffer.isBuffer(data)) + return this.raw.pong(data as unknown as BufferSource) - return this - } - } + if (this.validator?.Check(data) === false) + throw new ValidationError('message', this.validator, data) - get close() { - return () => { - this.raw.close() - - return this - } - } + if (typeof data === 'object') data = JSON.stringify(data) as any - get terminate() { - return this.raw.terminate.bind(this.raw) + return this.raw.pong(data as string) } - get isSubscribed() { - return this.raw.isSubscribed.bind(this.raw) + /** + * Sends a message to subscribers of the topic. + * + * @param topic The topic name. + * @param data The data to send. + * @param compress Should the data be compressed? If the client does not support compression, this is ignored. + * @example + * ws.publish("chat", "Hello!"); + * ws.publish("chat", "Compress this.", true); + * ws.publish("chat", new Uint8Array([1, 2, 3, 4])); + */ + publish( + topic: string, + data: FlattenResponse | BufferSource, + compress?: boolean + ): ServerWebSocketSendStatus { + if (Buffer.isBuffer(data)) + return this.raw.publish( + topic, + data as unknown as BufferSource, + compress + ) + + if (this.validator?.Check(data) === false) + throw new ValidationError('message', this.validator, data) + + if (typeof data === 'object') data = JSON.stringify(data) as any + + return this.raw.publish(topic, data as unknown as string, compress) } - get remoteAddress() { - return this.raw.remoteAddress + sendText: ServerWebSocket['sendText'] + sendBinary: ServerWebSocket['sendBinary'] + close: ServerWebSocket['close'] + terminate: ServerWebSocket['terminate'] + publishText: ServerWebSocket['publishText'] + publishBinary: ServerWebSocket['publishBinary'] + subscribe: ServerWebSocket['subscribe'] + unsubscribe: ServerWebSocket['unsubscribe'] + isSubscribed: ServerWebSocket['isSubscribed'] + cork: ServerWebSocket['cork'] + remoteAddress: ServerWebSocket['remoteAddress'] + binaryType: ServerWebSocket['binaryType'] + + get readyState() { + return this.raw.readyState } } diff --git a/src/ws/types.ts b/src/ws/types.ts index 3a2c7da5..0131ed7b 100644 --- a/src/ws/types.ts +++ b/src/ws/types.ts @@ -1,110 +1,260 @@ -import type { ServerWebSocket, WebSocketHandler } from 'bun' +import type { ElysiaWebSocket } from './index' +import { WebSocketHandler } from './bun' -import type { TSchema } from '@sinclair/typebox' -import type { TypeCheck } from '@sinclair/typebox/compiler' - -import type { ElysiaWS } from '.' import type { Context } from '../context' - -import type { - SingletonBase, - Handler, +import { + AfterHandler, + AfterResponseHandler, + BaseMacro, + ContentType, + DocumentDecoration, ErrorHandler, InputSchema, - RouteSchema, Isolate, - GetPathParameter, + LocalHook, + MapResponse, MaybeArray, - BaseMacro, + MaybePromise, + OptionalHandler, + Prettify, + ResolveHandler, + ResolvePath, + ResolveResolutions, + RouteSchema, + SingletonBase, TransformHandler } from '../types' -export namespace WS { - export type Config = Omit< - WebSocketHandler, - 'open' | 'message' | 'close' | 'drain' - > - - export type LocalHook< - LocalSchema extends InputSchema, - Route extends RouteSchema, - Singleton extends SingletonBase, - Errors extends Record, - Extension extends BaseMacro, - Path extends string = '', - TypedRoute extends RouteSchema = keyof Route['params'] extends never - ? Route & { - params: Record, string> - } - : Route - > = (LocalSchema extends {} ? LocalSchema : Isolate) & - Extension & - Omit< - Partial>, - 'open' | 'message' | 'close' | 'drain' | 'publish' | 'publishToSelf' - > & - (ElysiaWS< - ServerWebSocket<{ - validator?: TypeCheck - }>, - TypedRoute, - Singleton - > extends infer WS - ? { - transform?: MaybeArray< - TransformHandler - > - transformMessage?: MaybeArray< - TransformHandler - > - beforeHandle?: MaybeArray> - /** - * Catch error - */ - error?: MaybeArray> - - /** - * Headers to register to websocket before `upgrade` - */ - upgrade?: - | Bun.HeadersInit - | ((context: Context) => Bun.HeadersInit) +type TypedWebSocketMethod = + | 'open' + | 'message' + | 'drain' + | 'close' + | 'ping' + | 'pong' - /** - * The {@link ServerWebSocket} has been opened - * - * @param ws The {@link ServerWebSocket} that was opened - */ - open?: (ws: WS) => void | Promise +export type FlattenResponse = + {} extends Response ? unknown : Response[keyof Response] - /** - * Handle an incoming message to a {@link ServerWebSocket} - * - * @param ws The {@link ServerWebSocket} that received the message - * @param message The message received - * - * To change `message` to be an `ArrayBuffer` instead of a `Uint8Array`, set `ws.binaryType = "arraybuffer"` - */ - message?: (ws: WS, message: Route['body']) => any +type TypedWebSocketHandler = Prettify< + Omit, TypedWebSocketMethod> & { + open?( + ws: ElysiaWebSocket & { body: never }> + ): MaybePromise | void> + message?( + ws: ElysiaWebSocket, + message: Route['body'] + ): MaybePromise< + | FlattenResponse + | void + | Generator< + FlattenResponse, + void | FlattenResponse + > + | AsyncGenerator< + FlattenResponse, + void | FlattenResponse + > + > + drain?( + ws: ElysiaWebSocket & { body: never }> + ): MaybePromise< + | FlattenResponse + | void + | Generator< + FlattenResponse, + void | FlattenResponse + > + | AsyncGenerator< + FlattenResponse, + void | FlattenResponse + > + > + close?( + ws: ElysiaWebSocket & { body: never }>, + code: number, + reason: string + ): MaybePromise< + | FlattenResponse + | void + | Generator< + FlattenResponse, + void | FlattenResponse + > + | AsyncGenerator< + FlattenResponse, + void | FlattenResponse + > + > + ping?( + message: Route['body'] + ): MaybePromise< + | FlattenResponse + | void + | Generator< + FlattenResponse, + void | FlattenResponse + > + | AsyncGenerator< + FlattenResponse, + void | FlattenResponse + > + > + pong?( + message: Route['body'] + ): MaybePromise< + | FlattenResponse + | void + | Generator< + FlattenResponse, + void | FlattenResponse + > + | AsyncGenerator< + FlattenResponse, + void | FlattenResponse + > + > + } +> - /** - * The {@link ServerWebSocket} is being closed - * @param ws The {@link ServerWebSocket} that was closed - * @param code The close code - * @param message The close message - */ - close?: ( - ws: WS, - code: number, - message: string - ) => void | Promise +export type WSLocalHook< + LocalSchema extends InputSchema, + Schema extends RouteSchema, + Singleton extends SingletonBase, + Extension extends BaseMacro, + Resolutions extends MaybeArray>, + Path extends string = '', + TypedRoute extends RouteSchema = Prettify< + Schema extends { + params: Record + } + ? Schema + : Schema & { + params: undefined extends Schema['params'] + ? ResolvePath + : Schema['params'] + } + > +> = (LocalSchema extends {} ? LocalSchema : Isolate) & + Extension & { + /** + * Short for 'Content-Type' + * + * Available: + * - 'none': do not parse body + * - 'text' / 'text/plain': parse body as string + * - 'json' / 'application/json': parse body as json + * - 'formdata' / 'multipart/form-data': parse body as form-data + * - 'urlencoded' / 'application/x-www-form-urlencoded: parse body as urlencoded + * - 'arraybuffer': parse body as readable stream + */ + type?: ContentType + detail?: DocumentDecoration + /** + * Headers to register to websocket before `upgrade` + */ + upgrade?: Record | ((context: Context) => unknown) + parse?: MaybeArray< + ( + ws: ElysiaWebSocket< + Context, + Omit & { body: unknown } + >, + message: unknown + ) => MaybePromise + > - /** - * The {@link ServerWebSocket} is ready for more data - * - * @param ws The {@link ServerWebSocket} that is ready - */ - drain?: (ws: WS) => void | Promise - } - : {}) -} + /** + * Transform context's value + */ + transform?: MaybeArray< + TransformHandler< + TypedRoute, + { + decorator: Singleton['decorator'] + store: Singleton['decorator'] + derive: Singleton['derive'] + resolve: {} + } + > + > + resolve?: Resolutions + /** + * Execute before main handler + */ + beforeHandle?: MaybeArray< + OptionalHandler< + TypedRoute, + { + decorator: Singleton['decorator'] + store: Singleton['decorator'] + derive: Singleton['derive'] + resolve: Singleton['resolve'] & + ResolveResolutions + } + > + > + /** + * Execute after main handler + */ + afterHandle?: MaybeArray< + AfterHandler< + TypedRoute, + { + decorator: Singleton['decorator'] + store: Singleton['decorator'] + derive: Singleton['derive'] + resolve: Singleton['resolve'] & + ResolveResolutions + } + > + > + /** + * Execute after main handler + */ + mapResponse?: MaybeArray< + MapResponse< + TypedRoute, + { + decorator: Singleton['decorator'] + store: Singleton['decorator'] + derive: Singleton['derive'] + resolve: Singleton['resolve'] & + ResolveResolutions + } + > + > + /** + * Execute after response is sent + */ + afterResponse?: MaybeArray< + AfterResponseHandler< + TypedRoute, + { + decorator: Singleton['decorator'] + store: Singleton['decorator'] + derive: Singleton['derive'] + resolve: Singleton['resolve'] & + ResolveResolutions + } + > + > + /** + * Catch error + */ + error?: MaybeArray> + tags?: DocumentDecoration['tags'] + } & TypedWebSocketHandler< + Omit< + Context< + TypedRoute, + Singleton & { + resolve: ResolveResolutions + } + >, + 'body' + > & { + body: never + }, + TypedRoute + >