Skip to content

Commit

Permalink
Refactor tracker types
Browse files Browse the repository at this point in the history
  • Loading branch information
mrlika committed Feb 20, 2025
1 parent 2168852 commit 014cc50
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 134 deletions.
159 changes: 90 additions & 69 deletions src/fast-tracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,36 @@
*/

import Debug from "debug";
import {
Tracker,
SocketContext,
PeerContext,
TrackerError,
Swarm,
} from "./tracker.js";
import { Tracker, PeerContext, TrackerError, Swarm } from "./tracker.js";

const debug = Debug("wt-tracker:fast-tracker");
const debugEnabled = debug.enabled;

type UnknownObject = Record<string, unknown>;

interface Settings {
type Settings = {
maxOffers: number;
announceInterval: number;
}
};

export class FastTracker implements Tracker {
export class FastTracker<
ConnectionContext extends Record<string, PeerContext<ConnectionContext>>,
> implements Tracker<ConnectionContext>
{
public readonly settings: Settings;

readonly #swarms = new Map<string, Swarm>();
readonly #peers = new Map<string, PeerContext>();
readonly #swarms = new Map<string, Swarm<ConnectionContext>>();
readonly #peers = new Map<string, PeerContext<ConnectionContext>>();

#clearPeersInterval?: NodeJS.Timeout;

public constructor(settings?: Partial<Settings>) {
public constructor(
settings: Partial<Settings> | undefined,
private sendMessage: (
json: UnknownObject,
connection: ConnectionContext,
) => void,
) {
this.settings = {
maxOffers: 20,
announceInterval: 20,
Expand Down Expand Up @@ -77,8 +80,8 @@ export class FastTracker implements Tracker {
}

private addPeerToSwarm(
swarm: Swarm,
peer: PeerContext,
swarm: Swarm<ConnectionContext>,
peer: PeerContext<ConnectionContext>,
isPeerCompleted: boolean,
) {
swarm.peers.push(peer);
Expand All @@ -90,7 +93,10 @@ export class FastTracker implements Tracker {
}
}

private removePeerFromSwarm(swarm: Swarm, peer: PeerContext) {
private removePeerFromSwarm(
swarm: Swarm<ConnectionContext>,
peer: PeerContext<ConnectionContext>,
) {
const peerIndex = swarm.peers.indexOf(peer);

swarm.completedPeers?.delete(peer.peerId);
Expand All @@ -112,7 +118,10 @@ export class FastTracker implements Tracker {
}
}

private setPeerCompletedInSwarm(swarm: Swarm, peer: PeerContext) {
private setPeerCompletedInSwarm(
swarm: Swarm<ConnectionContext>,
peer: PeerContext<ConnectionContext>,
) {
if (swarm.completedPeers === undefined) {
swarm.completedPeers = new Set();
}
Expand Down Expand Up @@ -146,53 +155,61 @@ export class FastTracker implements Tracker {
}, this.settings.announceInterval * 1000);
}

private removePeer(peer: PeerContext) {
private removePeer(peer: PeerContext<ConnectionContext>) {
const { swarm } = peer;

this.removePeerFromSwarm(swarm, peer);
this.#peers.delete(peer.peerId);

// eslint-disable-next-line @typescript-eslint/no-dynamic-delete
delete (peer.socket as unknown as UnknownObject)[peer.peerId];
delete peer.connection[peer.peerId];
}

public get swarms(): ReadonlyMap<string, { peers: readonly PeerContext[] }> {
public get swarms(): ReadonlyMap<
string,
{ peers: readonly PeerContext<ConnectionContext>[] }
> {
return this.#swarms;
}

public processMessage(jsonObject: object, socket: SocketContext): void {
const json = jsonObject as UnknownObject;
public processMessage(
json: UnknownObject,
connection: ConnectionContext,
): void {
const { action } = json;

if (action === "announce") {
const { event } = json;
if (event === undefined) {
if (json.answer === undefined) {
this.processAnnounce(json, socket);
this.processAnnounce(json, connection);
} else {
this.processAnswer(json);
}
} else if (event === "started") {
this.processAnnounce(json, socket);
this.processAnnounce(json, connection);
} else if (event === "stopped") {
this.processStop(json);
} else if (event === "completed") {
this.processAnnounce(json, socket, true);
this.processAnnounce(json, connection, true);
} else {
throw new TrackerError("unknown announce event");
}
} else if (action === "scrape") {
this.processScrape(json, socket);
this.processScrape(json, connection);
} else {
throw new TrackerError("unknown action");
}
}

public disconnectPeersFromSocket(socket: SocketContext): void {
for (const peerId in socket) {
const peer = (socket as unknown as UnknownObject)[peerId] as PeerContext;
public disconnectPeers(connection: ConnectionContext): void {
for (const peerId in connection) {
const peer = connection[peerId] as
| PeerContext<ConnectionContext>
| undefined;

if (!peer?.peerId) continue; // Not a peer property

if (peer.peerId !== peerId) continue; // Not a peer property
if (debugEnabled) {
debug(
"disconnect peer:",
Expand All @@ -207,17 +224,15 @@ export class FastTracker implements Tracker {

private processAnnounce(
json: UnknownObject,
socket: SocketContext,
connection: ConnectionContext,
completed = false,
): void {
const infoHash = json.info_hash as string;
const peerId = json.peer_id as string;
let swarm: Swarm | undefined;
let swarm: Swarm<ConnectionContext> | undefined;
const isPeerCompleted = completed || json.left === 0;

let peer = (socket as unknown as UnknownObject)[peerId] as
| PeerContext
| undefined;
let peer = connection[peerId] as PeerContext<ConnectionContext> | undefined;

if (peer === undefined) {
const existingPeer = this.#peers.get(peerId);
Expand All @@ -240,15 +255,14 @@ export class FastTracker implements Tracker {

peer = {
peerId,
sendMessage: socket.sendMessage,
socket,
connection,
lastAccessed: performance.now(),
swarm,
};

this.addPeerToSwarm(swarm, peer, isPeerCompleted);

(socket as unknown as UnknownObject)[peerId] = peer;
(connection as unknown as UnknownObject)[peerId] = peer;
this.#peers.set(peerId, peer);
} else if (peer.peerId === peerId) {
peer.lastAccessed = performance.now();
Expand All @@ -273,24 +287,24 @@ export class FastTracker implements Tracker {

const complete = swarm.completedPeers?.size ?? 0;

socket.sendMessage(
this.sendMessage(
{
action: "announce",
interval: this.settings.announceInterval,
info_hash: infoHash,
complete,
incomplete: swarm.peers.length - complete,
},
socket,
connection,
);

this.sendOffersToPeers(json, swarm.peers, peer, infoHash);
}

private sendOffersToPeers(
json: UnknownObject,
peers: readonly PeerContext[],
peer: PeerContext,
peers: readonly PeerContext<ConnectionContext>[],
peer: PeerContext<ConnectionContext>,
infoHash: string,
): void {
if (peers.length <= 1) {
Expand Down Expand Up @@ -319,14 +333,16 @@ export class FastTracker implements Tracker {

if (countOffersToSend === countPeersToSend) {
// we have offers for all the peers from the swarm - send offers to all
const offersIterator = (offers as unknown[]).values();
const offersIterator = offers.values();
for (const toPeer of peers) {
if (toPeer !== peer) {
sendOffer(
offersIterator.next().value,
peer.peerId,
toPeer.socket,
infoHash,
this.sendMessage(
getSendOfferJson(
offersIterator.next().value,
peer.peerId,
infoHash,
),
toPeer.connection,
);
}
}
Expand All @@ -340,7 +356,10 @@ export class FastTracker implements Tracker {
if (toPeer === peer) {
i--; // do one more iteration
} else {
sendOffer(offers[i], peer.peerId, toPeer.socket, infoHash);
this.sendMessage(
getSendOfferJson(offers[i], peer.peerId, infoHash),
toPeer.connection,
);
}

peerIndex++;
Expand All @@ -364,7 +383,7 @@ export class FastTracker implements Tracker {
}

delete json.to_peer_id;
toPeer.sendMessage(json, toPeer.socket);
this.sendMessage(json, toPeer.connection);

if (debugEnabled) {
debug(
Expand Down Expand Up @@ -393,13 +412,19 @@ export class FastTracker implements Tracker {
}
}

private processScrape(json: UnknownObject, socket: SocketContext): void {
private processScrape(
json: UnknownObject,
connection: ConnectionContext,
): void {
const infoHash = json.info_hash;
const files: Record<string, {
const files: Record<
string,
{
complete: number;
incomplete: number;
downloaded: number;
}> = {};
}
> = {};

if (infoHash === undefined) {
for (const swarm of this.#swarms.values()) {
Expand Down Expand Up @@ -446,16 +471,15 @@ export class FastTracker implements Tracker {
}
}

socket.sendMessage({ action: "scrape", files }, socket);
this.sendMessage({ action: "scrape", files }, connection);
}
}

function sendOffer(
function getSendOfferJson(
offerItem: unknown,
fromPeerId: string,
socket: SocketContext,
infoHash: string,
): void {
) {
if (!(offerItem instanceof Object)) {
throw new TrackerError("announce: wrong offer item format");
}
Expand All @@ -467,17 +491,14 @@ function sendOffer(
throw new TrackerError("announce: wrong offer item field format");
}

socket.sendMessage(
{
action: "announce",
info_hash: infoHash,
offer_id: offerId, // offerId is not validated to be a string
peer_id: fromPeerId,
offer: {
type: "offer",
sdp: (offer as UnknownObject).sdp, // offer.sdp is not validated to be a string
},
return {
action: "announce",
info_hash: infoHash,
offer_id: offerId, // offerId is not validated to be a string
peer_id: fromPeerId,
offer: {
type: "offer",
sdp: (offer as UnknownObject).sdp, // offer.sdp is not validated to be a string
},
socket,
);
};
}
6 changes: 1 addition & 5 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,4 @@

export { UWebSocketsTracker } from "./uws-tracker.js";
export { FastTracker } from "./fast-tracker.js";
export {
Tracker,
SocketContext as PeerContext,
TrackerError,
} from "./tracker.js";
export { Tracker, TrackerError } from "./tracker.js";
15 changes: 11 additions & 4 deletions src/run-uws-tracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@
import { readFileSync } from "fs";
import { HttpResponse, HttpRequest } from "uWebSockets.js";
import Debug from "debug";
import { UWebSocketsTracker } from "./uws-tracker.js";
import {
sendMessage,
UWebSocketsTracker,
UwsConnectionContext,
} from "./uws-tracker.js";
import { FastTracker } from "./fast-tracker.js";
import { Tracker } from "./tracker.js";

const debugRequests = Debug("wt-tracker:uws-tracker-requests");
const debugRequestsEnabled = debugRequests.enabled;

interface BuildServerParams {
tracker: Tracker;
tracker: Tracker<UwsConnectionContext>;
serverSettings: ServerItemSettings;
websocketsAccess: Partial<WebSocketsAccessSettings> | undefined;
indexHtml: Buffer | undefined;
Expand Down Expand Up @@ -109,7 +113,7 @@ async function main(): Promise<void> {
return;
}

const tracker = new FastTracker(settings.tracker);
const tracker = new FastTracker(settings.tracker, sendMessage);

try {
await runServers(tracker, settings);
Expand Down Expand Up @@ -173,7 +177,10 @@ function validateSettings(jsonSettings: UnknownObject): Settings | undefined {
};
}

async function runServers(tracker: Tracker, settings: Settings): Promise<void> {
async function runServers(
tracker: Tracker<UwsConnectionContext>,
settings: Settings,
): Promise<void> {
let indexHtml: Buffer | undefined = undefined;

try {
Expand Down
Loading

0 comments on commit 014cc50

Please sign in to comment.