diff --git a/src/fast-tracker.ts b/src/fast-tracker.ts index c374421..83a17f8 100644 --- a/src/fast-tracker.ts +++ b/src/fast-tracker.ts @@ -15,33 +15,36 @@ */ import Debug from "debug"; -import { - Tracker, - SocketContext, - PeerContext, - TrackerError, - Swarm, -} from "./tracker.js"; +import { Tracker, PeerContext, TrackerError, Swarm } from "./tracker.js"; const debug = Debug("wt-tracker:fast-tracker"); const debugEnabled = debug.enabled; type UnknownObject = Record; -interface Settings { +type Settings = { maxOffers: number; announceInterval: number; -} +}; -export class FastTracker implements Tracker { +export class FastTracker< + ConnectionContext extends Record>, +> implements Tracker +{ public readonly settings: Settings; - readonly #swarms = new Map(); - readonly #peers = new Map(); + readonly #swarms = new Map>(); + readonly #peers = new Map>(); #clearPeersInterval?: NodeJS.Timeout; - public constructor(settings?: Partial) { + public constructor( + settings: Partial | undefined, + private sendMessage: ( + json: UnknownObject, + connection: ConnectionContext, + ) => void, + ) { this.settings = { maxOffers: 20, announceInterval: 20, @@ -77,8 +80,8 @@ export class FastTracker implements Tracker { } private addPeerToSwarm( - swarm: Swarm, - peer: PeerContext, + swarm: Swarm, + peer: PeerContext, isPeerCompleted: boolean, ) { swarm.peers.push(peer); @@ -90,7 +93,10 @@ export class FastTracker implements Tracker { } } - private removePeerFromSwarm(swarm: Swarm, peer: PeerContext) { + private removePeerFromSwarm( + swarm: Swarm, + peer: PeerContext, + ) { const peerIndex = swarm.peers.indexOf(peer); swarm.completedPeers?.delete(peer.peerId); @@ -112,7 +118,10 @@ export class FastTracker implements Tracker { } } - private setPeerCompletedInSwarm(swarm: Swarm, peer: PeerContext) { + private setPeerCompletedInSwarm( + swarm: Swarm, + peer: PeerContext, + ) { if (swarm.completedPeers === undefined) { swarm.completedPeers = new Set(); } @@ -146,53 +155,61 @@ export class FastTracker implements Tracker { }, this.settings.announceInterval * 1000); } - private removePeer(peer: PeerContext) { + private removePeer(peer: PeerContext) { const { swarm } = peer; this.removePeerFromSwarm(swarm, peer); this.#peers.delete(peer.peerId); // eslint-disable-next-line @typescript-eslint/no-dynamic-delete - delete (peer.socket as unknown as UnknownObject)[peer.peerId]; + delete peer.connection[peer.peerId]; } - public get swarms(): ReadonlyMap { + public get swarms(): ReadonlyMap< + string, + { peers: readonly PeerContext[] } + > { return this.#swarms; } - public processMessage(jsonObject: object, socket: SocketContext): void { - const json = jsonObject as UnknownObject; + public processMessage( + json: UnknownObject, + connection: ConnectionContext, + ): void { const { action } = json; if (action === "announce") { const { event } = json; if (event === undefined) { if (json.answer === undefined) { - this.processAnnounce(json, socket); + this.processAnnounce(json, connection); } else { this.processAnswer(json); } } else if (event === "started") { - this.processAnnounce(json, socket); + this.processAnnounce(json, connection); } else if (event === "stopped") { this.processStop(json); } else if (event === "completed") { - this.processAnnounce(json, socket, true); + this.processAnnounce(json, connection, true); } else { throw new TrackerError("unknown announce event"); } } else if (action === "scrape") { - this.processScrape(json, socket); + this.processScrape(json, connection); } else { throw new TrackerError("unknown action"); } } - public disconnectPeersFromSocket(socket: SocketContext): void { - for (const peerId in socket) { - const peer = (socket as unknown as UnknownObject)[peerId] as PeerContext; + public disconnectPeers(connection: ConnectionContext): void { + for (const peerId in connection) { + const peer = connection[peerId] as + | PeerContext + | undefined; + + if (!peer?.peerId) continue; // Not a peer property - if (peer.peerId !== peerId) continue; // Not a peer property if (debugEnabled) { debug( "disconnect peer:", @@ -207,17 +224,15 @@ export class FastTracker implements Tracker { private processAnnounce( json: UnknownObject, - socket: SocketContext, + connection: ConnectionContext, completed = false, ): void { const infoHash = json.info_hash as string; const peerId = json.peer_id as string; - let swarm: Swarm | undefined; + let swarm: Swarm | undefined; const isPeerCompleted = completed || json.left === 0; - let peer = (socket as unknown as UnknownObject)[peerId] as - | PeerContext - | undefined; + let peer = connection[peerId] as PeerContext | undefined; if (peer === undefined) { const existingPeer = this.#peers.get(peerId); @@ -240,15 +255,14 @@ export class FastTracker implements Tracker { peer = { peerId, - sendMessage: socket.sendMessage, - socket, + connection, lastAccessed: performance.now(), swarm, }; this.addPeerToSwarm(swarm, peer, isPeerCompleted); - (socket as unknown as UnknownObject)[peerId] = peer; + (connection as unknown as UnknownObject)[peerId] = peer; this.#peers.set(peerId, peer); } else if (peer.peerId === peerId) { peer.lastAccessed = performance.now(); @@ -273,7 +287,7 @@ export class FastTracker implements Tracker { const complete = swarm.completedPeers?.size ?? 0; - socket.sendMessage( + this.sendMessage( { action: "announce", interval: this.settings.announceInterval, @@ -281,7 +295,7 @@ export class FastTracker implements Tracker { complete, incomplete: swarm.peers.length - complete, }, - socket, + connection, ); this.sendOffersToPeers(json, swarm.peers, peer, infoHash); @@ -289,8 +303,8 @@ export class FastTracker implements Tracker { private sendOffersToPeers( json: UnknownObject, - peers: readonly PeerContext[], - peer: PeerContext, + peers: readonly PeerContext[], + peer: PeerContext, infoHash: string, ): void { if (peers.length <= 1) { @@ -319,14 +333,16 @@ export class FastTracker implements Tracker { if (countOffersToSend === countPeersToSend) { // we have offers for all the peers from the swarm - send offers to all - const offersIterator = (offers as unknown[]).values(); + const offersIterator = offers.values(); for (const toPeer of peers) { if (toPeer !== peer) { - sendOffer( - offersIterator.next().value, - peer.peerId, - toPeer.socket, - infoHash, + this.sendMessage( + getSendOfferJson( + offersIterator.next().value, + peer.peerId, + infoHash, + ), + toPeer.connection, ); } } @@ -340,7 +356,10 @@ export class FastTracker implements Tracker { if (toPeer === peer) { i--; // do one more iteration } else { - sendOffer(offers[i], peer.peerId, toPeer.socket, infoHash); + this.sendMessage( + getSendOfferJson(offers[i], peer.peerId, infoHash), + toPeer.connection, + ); } peerIndex++; @@ -364,7 +383,7 @@ export class FastTracker implements Tracker { } delete json.to_peer_id; - toPeer.sendMessage(json, toPeer.socket); + this.sendMessage(json, toPeer.connection); if (debugEnabled) { debug( @@ -393,13 +412,19 @@ export class FastTracker implements Tracker { } } - private processScrape(json: UnknownObject, socket: SocketContext): void { + private processScrape( + json: UnknownObject, + connection: ConnectionContext, + ): void { const infoHash = json.info_hash; - const files: Record = {}; + } + > = {}; if (infoHash === undefined) { for (const swarm of this.#swarms.values()) { @@ -446,16 +471,15 @@ export class FastTracker implements Tracker { } } - socket.sendMessage({ action: "scrape", files }, socket); + this.sendMessage({ action: "scrape", files }, connection); } } -function sendOffer( +function getSendOfferJson( offerItem: unknown, fromPeerId: string, - socket: SocketContext, infoHash: string, -): void { +) { if (!(offerItem instanceof Object)) { throw new TrackerError("announce: wrong offer item format"); } @@ -467,17 +491,14 @@ function sendOffer( throw new TrackerError("announce: wrong offer item field format"); } - socket.sendMessage( - { - action: "announce", - info_hash: infoHash, - offer_id: offerId, // offerId is not validated to be a string - peer_id: fromPeerId, - offer: { - type: "offer", - sdp: (offer as UnknownObject).sdp, // offer.sdp is not validated to be a string - }, + return { + action: "announce", + info_hash: infoHash, + offer_id: offerId, // offerId is not validated to be a string + peer_id: fromPeerId, + offer: { + type: "offer", + sdp: (offer as UnknownObject).sdp, // offer.sdp is not validated to be a string }, - socket, - ); + }; } diff --git a/src/index.ts b/src/index.ts index b6911ca..f048aef 100644 --- a/src/index.ts +++ b/src/index.ts @@ -17,8 +17,4 @@ export { UWebSocketsTracker } from "./uws-tracker.js"; export { FastTracker } from "./fast-tracker.js"; -export { - Tracker, - SocketContext as PeerContext, - TrackerError, -} from "./tracker.js"; +export { Tracker, TrackerError } from "./tracker.js"; diff --git a/src/run-uws-tracker.ts b/src/run-uws-tracker.ts index ec9368d..5334dc5 100644 --- a/src/run-uws-tracker.ts +++ b/src/run-uws-tracker.ts @@ -19,7 +19,11 @@ import { readFileSync } from "fs"; import { HttpResponse, HttpRequest } from "uWebSockets.js"; import Debug from "debug"; -import { UWebSocketsTracker } from "./uws-tracker.js"; +import { + sendMessage, + UWebSocketsTracker, + UwsConnectionContext, +} from "./uws-tracker.js"; import { FastTracker } from "./fast-tracker.js"; import { Tracker } from "./tracker.js"; @@ -27,7 +31,7 @@ const debugRequests = Debug("wt-tracker:uws-tracker-requests"); const debugRequestsEnabled = debugRequests.enabled; interface BuildServerParams { - tracker: Tracker; + tracker: Tracker; serverSettings: ServerItemSettings; websocketsAccess: Partial | undefined; indexHtml: Buffer | undefined; @@ -109,7 +113,7 @@ async function main(): Promise { return; } - const tracker = new FastTracker(settings.tracker); + const tracker = new FastTracker(settings.tracker, sendMessage); try { await runServers(tracker, settings); @@ -173,7 +177,10 @@ function validateSettings(jsonSettings: UnknownObject): Settings | undefined { }; } -async function runServers(tracker: Tracker, settings: Settings): Promise { +async function runServers( + tracker: Tracker, + settings: Settings, +): Promise { let indexHtml: Buffer | undefined = undefined; try { diff --git a/src/tracker.ts b/src/tracker.ts index 11bf0b3..334436e 100644 --- a/src/tracker.ts +++ b/src/tracker.ts @@ -14,29 +14,37 @@ * limitations under the License. */ -export interface SocketContext { - sendMessage: (json: object, peer: SocketContext) => void; -} - -export interface Swarm { +export interface Swarm> { infoHash: string; completedPeers?: Set; - peers: PeerContext[]; + peers: PeerContext[]; } -export interface PeerContext { +export interface PeerContext< + ConnectionContext extends Record, +> { peerId: string; - sendMessage: (json: object, peer: SocketContext) => void; - socket: SocketContext; + connection: ConnectionContext; lastAccessed: number; - swarm: Swarm; + swarm: Swarm; } -export interface Tracker { - readonly swarms: ReadonlyMap; - readonly settings: object; - processMessage: (json: object, socket: SocketContext) => void; - disconnectPeersFromSocket: (socket: SocketContext) => void; +export interface Tracker< + ConnectionContext extends Record>, +> { + readonly swarms: ReadonlyMap< + string, + { peers: readonly PeerContext[] } + >; + + readonly settings: Record; + + processMessage: ( + json: Record, + connection: ConnectionContext, + ) => void; + + disconnectPeers: (connection: ConnectionContext) => void; } export class TrackerError extends Error {} diff --git a/src/uws-tracker.ts b/src/uws-tracker.ts index 0615698..aac04c7 100644 --- a/src/uws-tracker.ts +++ b/src/uws-tracker.ts @@ -25,19 +25,13 @@ import { HttpResponse, } from "uWebSockets.js"; import Debug from "debug"; -import { Tracker, TrackerError, SocketContext } from "./tracker.js"; +import { PeerContext, Tracker, TrackerError } from "./tracker.js"; import { ServerSettings, WebSocketsSettings, WebSocketsAccessSettings, } from "./run-uws-tracker.js"; -declare module "./tracker.js" { - interface SocketContext { - ws: WebSocket; - } -} - const debugWebSockets = Debug("wt-tracker:uws-tracker"); const debugWebSocketsEnabled = debugWebSockets.enabled; @@ -49,6 +43,10 @@ const debugRequestsEnabled = debugRequests.enabled; const decoder = new StringDecoder(); +export type UwsConnectionContext = { + ws?: WebSocket; +} & Record>; + export interface UwsTrackerSettings { server: ServerSettings; websockets: WebSocketsSettings; @@ -63,7 +61,7 @@ export interface PartialUwsTrackerSettings { export class UWebSocketsTracker { public readonly settings: UwsTrackerSettings; - public readonly tracker: Readonly; + public readonly tracker: Readonly>; private webSocketsCount = 0; private validateOrigin = false; @@ -72,7 +70,7 @@ export class UWebSocketsTracker { readonly #app: TemplatedApp; public constructor( - tracker: Readonly, + tracker: Readonly>, settings: PartialUwsTrackerSettings, ) { this.tracker = tracker; @@ -186,7 +184,7 @@ export class UWebSocketsTracker { idleTimeout: this.settings.websockets.idleTimeout, open: this.onOpen, upgrade: this.onUpgrade, - drain: (ws: WebSocket) => { + drain: (ws: WebSocket) => { if (debugWebSocketsEnabled) { debugWebSockets("drain", ws.getBufferedAmount()); } @@ -276,10 +274,8 @@ export class UWebSocketsTracker { ); } - response.upgrade>( - { - sendMessage, - }, + response.upgrade( + {}, request.getHeader("sec-websocket-key"), request.getHeader("sec-websocket-protocol"), request.getHeader("sec-websocket-extensions"), @@ -288,7 +284,7 @@ export class UWebSocketsTracker { }; private readonly onMessage = ( - ws: WebSocket, + ws: WebSocket, message: ArrayBuffer, ): void => { debugWebSockets("message of size", message.byteLength); @@ -296,11 +292,11 @@ export class UWebSocketsTracker { const userData = ws.getUserData(); userData.ws = ws; - let json: object | undefined = undefined; + let json; try { json = JSON.parse( decoder.end(new Uint8Array(message) as Buffer), - ) as object; + ) as Record; } catch (e) { debugWebSockets("failed to parse JSON message", e); ws.close(); @@ -324,22 +320,26 @@ export class UWebSocketsTracker { }; private readonly onClose = ( - ws: WebSocket, + ws: WebSocket, code: number, ): void => { this.webSocketsCount--; - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition - if (ws.getUserData().sendMessage !== undefined) { - this.tracker.disconnectPeersFromSocket(ws as unknown as SocketContext); + const userData = ws.getUserData() as UwsConnectionContext | undefined; + + // Test that user data is really a connection context + if (userData?.ws) { + this.tracker.disconnectPeers(userData); } debugWebSockets("closed with code", code); }; } -function sendMessage(json: object, peerContext: SocketContext): void { - peerContext.ws.send(JSON.stringify(json), false, false); +export function sendMessage(json: object, connection: UwsConnectionContext) { + // Connection without WebSocket is not possible here + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + connection.ws!.send(JSON.stringify(json), false, false); if (debugMessagesEnabled) { debugMessages("out", json); } diff --git a/test/announce.test.ts b/test/announce.test.ts index 8ce7dfe..9a38f53 100644 --- a/test/announce.test.ts +++ b/test/announce.test.ts @@ -19,14 +19,14 @@ import { describe, it, expect } from "vitest"; describe("announce", () => { it("should add peers to swarms on announce", () => { - const tracker = new FastTracker(); + const tracker = new FastTracker<{}>(undefined, () => undefined); const peer0 = { sendMessage: () => {}, }; let announceMessage = { action: "announce", - event: "started", + event: "started" as string | undefined, info_hash: "swarm1", peer_id: "0", offers: new Array(), @@ -49,6 +49,7 @@ describe("announce", () => { }; announceMessage = { action: "announce", + event: undefined, info_hash: "swarm1", peer_id: "1", offers: new Array(), diff --git a/test/memory/heap-usage.ts b/test/memory/heap-usage.ts index d8fea45..919eeaf 100644 --- a/test/memory/heap-usage.ts +++ b/test/memory/heap-usage.ts @@ -15,7 +15,6 @@ */ import { FastTracker } from "../../src/fast-tracker.js"; -import { SocketContext } from "../../src/tracker.js"; const peersCount = 100000; const swarmsCount = 1000000000; @@ -39,7 +38,7 @@ for (let o = 0; o < message.numwant; o++) { }); } -const tracker = new FastTracker(); +const tracker = new FastTracker<{}>(undefined, () => undefined); console.log("heap", process.memoryUsage()); console.log( @@ -47,15 +46,13 @@ console.log( ); console.log("\nadding peers to swarms"); -const sockets: SocketContext[] = []; +const sockets: {}[] = []; for (let p = 0; p < peersCount; p++) { message.peer_id = p.toPrecision(19).toString(); message.info_hash = Math.floor(swarmsCount * Math.random()) .toPrecision(19) .toString(); - const peer = { - sendMessage: () => p, - }; + const peer = {}; tracker.processMessage(message, peer); sockets.push(peer); } @@ -74,7 +71,7 @@ console.log( console.log("\nremoving peers"); for (const peer of sockets) { - tracker.disconnectPeersFromSocket(peer); + tracker.disconnectPeers(peer); } sockets.length = 0; diff --git a/test/simulation.test.ts b/test/simulation.test.ts index f3b2ed6..9f01228 100644 --- a/test/simulation.test.ts +++ b/test/simulation.test.ts @@ -15,7 +15,6 @@ */ import { FastTracker } from "../src/fast-tracker.js"; -import { SocketContext } from "../src/tracker.js"; import { describe, it, expect } from "vitest"; describe("simulation", () => { @@ -26,16 +25,13 @@ describe("simulation", () => { const offersCount = 10; const sameIdPeersRatio = 0.1; - const tracker = new FastTracker(); + const tracker = new FastTracker<{}>(undefined, () => undefined); - const sockets: SocketContext[] = []; + const sockets: {}[] = []; const peersData: Array<{ infoHash?: string; peerId: string }> = []; for (let i = 0; i < peersCount; i++) { - sockets.push({ - // eslint-disable-next-line @typescript-eslint/no-unused-vars - sendMessage: (_json: object, _socket: SocketContext) => {}, - }); + sockets.push({}); peersData.push({ peerId: (i % Math.floor(peersCount * sameIdPeersRatio)).toString(), }); @@ -80,9 +76,9 @@ describe("simulation", () => { return; } else if (random < 0.06) { // disconnect - tracker.disconnectPeersFromSocket(peer); + tracker.disconnectPeers(peer); peerData.infoHash = undefined; - sockets[peerIndex] = { sendMessage: peer.sendMessage }; + sockets[peerIndex] = {}; return; } else { // announce on the same torrent