Skip to content

Commit

Permalink
regularly restart subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
shuesken committed Aug 11, 2024
1 parent 360ca39 commit 6bf7d0c
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 18 deletions.
2 changes: 2 additions & 0 deletions common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,5 @@ export const HITCHMAPS_AUTHOR_PUBLIC_KEY =
"53055ee011e96a00a705b38253b9cbc6614ccbd37df4dad42ec69bbe608c4209" as const;

export const DELAY_AFTER_PROCESSING_EVENT_MS = 10;

export const SUBSCRIPTIONS_MAX_AGE_IN_MINUTES = 60;
68 changes: 50 additions & 18 deletions validation/repost.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
DEV_RELAYS,
MAP_NOTE_KIND,
MAP_NOTE_REPOST_KIND,
SUBSCRIPTIONS_MAX_AGE_IN_MINUTES,
} from "../common/constants.ts";
import { DEV_PUBKEY } from "../common/constants.ts";
import { validateEvent } from "./validate.ts";
Expand Down Expand Up @@ -137,25 +138,56 @@ export async function repost(

const relayPool = await getRelayPool(isDev);

const filter = createFilter(isDev, maxAgeMinutes);

const controller = new AbortController();
const signal = controller.signal;
const subscription = relayPool.req(filter, { signal });

const queue = newQueue(3);
const processEventFactory = processEventFactoryFactory(relayPool, privateKey);

for await (const msg of subscription) {
if (msg[0] === "EVENT") {
const event = msg[2];
queue.add(processEventFactory(event));
} else if (msg[0] === "EOSE") {
if (isDev) {
globalThis.setTimeout(() => {
controller.abort();
}, 10e3);
let lastReceivedMessageTimestamp = 0;
let controller: AbortController;
let signal: AbortSignal;

async function _subscribe() {
console.log(
`(Re)starting subscriptions, last message received at ${lastReceivedMessageTimestamp} (${new Date(
lastReceivedMessageTimestamp * 1000
).toLocaleString()})`
);
if (lastReceivedMessageTimestamp)
maxAgeMinutes =
(Math.floor(Date.now() / 1000) - lastReceivedMessageTimestamp) / 60 + 1;

const filter = createFilter(isDev, maxAgeMinutes);

if (controller) controller.abort();
controller = new AbortController();
signal = controller.signal;
const subscription = relayPool.req(filter, { signal });

const queue = newQueue(3);
const processEventFactory = processEventFactoryFactory(
relayPool,
privateKey
);

try {
for await (const msg of subscription) {
console.log("got msg", msg);

if (msg[0] === "EVENT") {
const event = msg[2];
lastReceivedMessageTimestamp = event.created_at;
queue.add(processEventFactory(event));
} else if (msg[0] === "EOSE") {
if (isDev) {
globalThis.setTimeout(() => {
controller.abort();
}, 10e3);
}
}
}
} catch (e) {
console.log("got error");
console.log(e.reason);
console.log(e, typeof e);
}
}

_subscribe();
setInterval(_subscribe, SUBSCRIPTIONS_MAX_AGE_IN_MINUTES * 60 * 1000);
}

0 comments on commit 6bf7d0c

Please sign in to comment.