Skip to content

Commit

Permalink
feat: reuse wss
Browse files Browse the repository at this point in the history
  • Loading branch information
Tanimodori committed Oct 28, 2022
1 parent 99cee7f commit 442019f
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 12 deletions.
16 changes: 16 additions & 0 deletions src/ws/allocator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { WebSocketServer } from 'ws';

let wss: WebSocketServer | null = null;
let wssPort = -1;

export function getWss(port: number) {
if (wss === null || wssPort !== port) {
if (wss) {
wss.clients.forEach((ws) => ws.close());
wss.close();
}
wssPort = port;
wss = new WebSocketServer({ port });
}
return wss;
}
2 changes: 2 additions & 0 deletions src/ws/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
export * from './adapter';
export * from './allocator';
export * from './import';
export * from './manager';
export * from './messages';
42 changes: 30 additions & 12 deletions src/ws/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
} from './messages';
import { z } from 'zod';
import { logger } from '..';
import { getWss } from './allocator';

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export interface PromiseHolder<T = any> {
Expand All @@ -36,12 +37,16 @@ export interface WsManagerOptions {
timeout?: number;
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
type Fn = (...args: any[]) => any;

export class WsManager {
options: Required<WsManagerOptions>;
ws: WebSocket | undefined;
wss: WebSocketServer;
trackers: PromiseHolder[];
nextId: number;
handlers: [string, Fn][];
constructor(options: WsManagerOptions) {
this.options = {
timeout: 10000,
Expand All @@ -50,33 +55,44 @@ export class WsManager {
this.trackers = [];
this.nextId = 0;
this.ws = undefined;
this.wss = new WebSocketServer({ port: this.options.port });
this.wss.on('connection', (ws) => {
this.wss = getWss(this.options.port);
this.handlers = [];
this._registerHandler();
}
get connected() {
return this.ws?.readyState === WebSocket.OPEN;
}
_registerHandler() {
const _onConnected = (ws: WebSocket) => {
this.ws = ws;
ws.on('message', (response) => this.handleMessage(response));
});
this.wss.on('error', (e) => {
};
const _onError = (e: unknown) => {
const err = String(e);
if (err.indexOf('EADDRINUSE') !== -1) {
logger.error('ws', `fatal: port ${this.options.port} is already in use`);
process.exit(1);
} else {
logger.error('ws', `${err}`);
}
});
}
get connected() {
return this.ws?.readyState === WebSocket.OPEN;
};
this.wss.on('connection', _onConnected);
this.wss.on('error', _onError);
this.handlers.push(['connection', _onConnected]);
this.handlers.push(['error', _onError]);
}
onConnected(cb: (ws: WebSocket) => void) {
this.wss.on('connection', (ws) => {
const handler = (ws: WebSocket) => {
// ensure ws is saved before any sendMessage calls
this.ws = ws;
cb(ws);
});
};
this.wss.on('connection', handler);
this.handlers.push(['connection', handler]);
}
onDisconneted(cb: () => void) {
this.wss.on('close', cb);
this.handlers.push(['close', cb]);
}
handleMessage(response: RawData) {
const parsed = wsResponseSchema.parse(JSON.parse(response.toString()));
Expand Down Expand Up @@ -192,7 +208,9 @@ export class WsManager {
});
}
close() {
this.ws?.close();
this.wss.close();
// remove all handlers
this.handlers.forEach(([event, handler]) => {
this.wss.off(event, handler);
});
}
}

0 comments on commit 442019f

Please sign in to comment.