From bc4deb1378446affaba9029fd7105c602726e9b5 Mon Sep 17 00:00:00 2001 From: Denny Pradipta Date: Wed, 15 Nov 2023 14:26:31 +0700 Subject: [PATCH 1/4] POC --- package-lock.json | 9 ++++ package.json | 1 + src/looper/index.ts | 19 +++++-- src/workers/probing.ts | 120 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 144 insertions(+), 5 deletions(-) create mode 100644 src/workers/probing.ts diff --git a/package-lock.json b/package-lock.json index 671a0f3a2..90c19cba4 100644 --- a/package-lock.json +++ b/package-lock.json @@ -56,6 +56,7 @@ "pino-pretty": "10.0.1", "piscina": "^4.1.0", "prom-client": "13.1.0", + "queue": "^6.0.2", "redis": "^4.3.0", "smtp-server": "^3.8.0", "sqlite": "^4.0.21", @@ -12766,6 +12767,14 @@ "version": "2.2.0", "license": "MIT" }, + "node_modules/queue": { + "version": "6.0.2", + "resolved": "https://registry.npmjs.org/queue/-/queue-6.0.2.tgz", + "integrity": "sha512-iHZWu+q3IdFZFX36ro/lKBkSvfkztY5Y7HMiPlOUjhupPcG2JMfst2KKEpu5XndviX/3UhFbRngUPNKtgvtZiA==", + "dependencies": { + "inherits": "~2.0.3" + } + }, "node_modules/queue-microtask": { "version": "1.2.3", "funding": [ diff --git a/package.json b/package.json index 194ef221a..28c1a3c79 100644 --- a/package.json +++ b/package.json @@ -101,6 +101,7 @@ "pino-pretty": "10.0.1", "piscina": "^4.1.0", "prom-client": "13.1.0", + "queue": "^6.0.2", "redis": "^4.3.0", "smtp-server": "^3.8.0", "sqlite": "^4.0.21", diff --git a/src/looper/index.ts b/src/looper/index.ts index ad9d88473..b1ece47bd 100644 --- a/src/looper/index.ts +++ b/src/looper/index.ts @@ -29,7 +29,6 @@ import { v4 as uuid } from 'uuid' import type { Probe, ProbeAlert } from '../interfaces/probe' -import { doProbe } from '../components/probe' import { getContext } from '../context' import { log } from '../utils/pino' import { @@ -42,9 +41,17 @@ import { DEFAULT_INCIDENT_THRESHOLD, DEFAULT_RECOVERY_THRESHOLD, } from '../components/config/validation/validator/default-values' +import Queue from 'queue' +import { doProbe } from '../components/probe' let checkSTUNinterval: NodeJS.Timeout +const queue = new Queue({ + concurrency: 1, + autostart: true, + timeout: 10_000, +}) + const DISABLE_STUN = -1 // -1 is disable stun checking export function sanitizeProbe(isSymonMode: boolean, probe: Probe): Probe { @@ -134,10 +141,12 @@ export function startProbing({ } for (const probe of probes) { - doProbe({ - notifications, - probe, - }) + queue.push(() => + doProbe({ + notifications, + probe, + }) + ) } }, 1000) } diff --git a/src/workers/probing.ts b/src/workers/probing.ts new file mode 100644 index 000000000..3ca13188e --- /dev/null +++ b/src/workers/probing.ts @@ -0,0 +1,120 @@ +/********************************************************************************** + * MIT License * + * * + * Copyright (c) 2021 Hyperjump Technology * + * * + * Permission is hereby granted, free of charge, to any person obtaining a copy * + * of this software and associated documentation files (the "Software"), to deal * + * in the Software without restriction, including without limitation the rights * + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * + * copies of the Software, and to permit persons to whom the Software is * + * furnished to do so, subject to the following conditions: * + * * + * The above copyright notice and this permission notice shall be included in all * + * copies or substantial portions of the Software. * + * * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * + * SOFTWARE. * + **********************************************************************************/ + +import { ExponentialBackoff, retry, handleAll } from 'cockatiel' +import { differenceInSeconds } from 'date-fns' +import { getContext } from '../context' +import type { Notification } from '@hyperjumptech/monika-notification' +import type { Probe } from '../interfaces/probe' +import { + getProbeContext, + getProbeState, + setProbeFinish, + setProbeRunning, +} from '../utils/probe-state' +import { createProbers } from '../components/probe/prober/factory' +import { + DEFAULT_INCIDENT_THRESHOLD, + DEFAULT_RECOVERY_THRESHOLD, +} from '../components/config/validation/validator/default-values' + +type doProbeParams = { + probe: Probe // probe contains all the probes + notifications: Notification[] // notifications contains all the notifications +} +/** + * doProbe sends out the http request + * @param {string} stringifiedData stringified parameter + * @returns {Promise} void + */ +export default async (stringifiedData: string) => { + const parsedData = JSON.parse(stringifiedData) + const { probe, notifications }: doProbeParams = parsedData + + if (!isTimeToProbe(probe) || isCycleEnd(probe.id)) { + return + } + + const randomTimeoutMilliseconds = getRandomTimeoutMilliseconds() + setProbeRunning(probe.id) + + setTimeout(async () => { + const probeCtx = getProbeContext(probe.id) + if (!probeCtx) { + return + } + + const probers = createProbers({ + counter: probeCtx.cycle, + notifications, + probeConfig: probe, + }) + + const maxAttempts = Math.max( + // since we will retry for both incident and recovery, let's just get the biggest threshold + probe.incidentThreshold || DEFAULT_INCIDENT_THRESHOLD, + probe.recoveryThreshold || DEFAULT_RECOVERY_THRESHOLD + ) + + await retry(handleAll, { + maxAttempts, + backoff: new ExponentialBackoff({ + initialDelay: getContext().flags.retryInitialDelayMs, + maxDelay: getContext().flags.retryMaxDelayMs, + }), + }).execute(({ attempt }) => + Promise.all(probers.map(async (prober) => prober.probe(attempt))) + ) + + setProbeFinish(probe.id) + }, randomTimeoutMilliseconds) +} + +function isTimeToProbe({ id, interval }: Probe) { + const probeCtx = getProbeContext(id) + if (!probeCtx) { + return false + } + + const isIdle = getProbeState(id) === 'idle' + const isInTime = + differenceInSeconds(new Date(), probeCtx.lastFinish) >= interval + + return isIdle && isInTime +} + +function isCycleEnd(probeID: string) { + const probeCtx = getProbeContext(probeID) + if (!probeCtx) { + return true + } + + return ( + getContext().flags.repeat && getContext().flags.repeat === probeCtx.cycle + ) +} + +function getRandomTimeoutMilliseconds(): number { + return [1000, 2000, 3000].sort(() => Math.random() - 0.5)[0] +} From e89395777ad6b432fb465e4f2b55ef173d2c36dd Mon Sep 17 00:00:00 2001 From: Denny Pradipta Date: Fri, 17 Nov 2023 17:28:05 +0700 Subject: [PATCH 2/4] Init --- src/looper/index.ts | 48 ++++++++++++++++++++++++++++++------------ src/workers/probing.ts | 36 ++++++++++++++++++++++--------- 2 files changed, 60 insertions(+), 24 deletions(-) diff --git a/src/looper/index.ts b/src/looper/index.ts index b1ece47bd..e809dafef 100644 --- a/src/looper/index.ts +++ b/src/looper/index.ts @@ -26,6 +26,9 @@ import type { Notification } from '@hyperjumptech/monika-notification' import { AbortSignal } from 'node-abort-controller' import { v4 as uuid } from 'uuid' +import path from 'path' +// import Queue from 'queue' +import Piscina from 'piscina' import type { Probe, ProbeAlert } from '../interfaces/probe' @@ -41,15 +44,23 @@ import { DEFAULT_INCIDENT_THRESHOLD, DEFAULT_RECOVERY_THRESHOLD, } from '../components/config/validation/validator/default-values' -import Queue from 'queue' -import { doProbe } from '../components/probe' +// import { doProbe } from '../components/probe' let checkSTUNinterval: NodeJS.Timeout -const queue = new Queue({ - concurrency: 1, - autostart: true, - timeout: 10_000, +// const queue = new Queue({ +// concurrency: 1, +// autostart: true, +// timeout: 10_000, +// }) + +const worker = new Piscina.Piscina({ + concurrentTasksPerWorker: 1, + // eslint-disable-next-line unicorn/prefer-module + filename: path.join(__dirname, '../../lib/workers/probing.js'), + idleTimeout: 1000, + maxQueue: 1, + maxThreads: 1, }) const DISABLE_STUN = -1 // -1 is disable stun checking @@ -140,14 +151,23 @@ export function startProbing({ return } - for (const probe of probes) { - queue.push(() => - doProbe({ - notifications, - probe, - }) - ) - } + // This uses node queue + // This is working + // for (const probe of probes) { + // queue.push(() => + // doProbe({ + // notifications, + // probe, + // }) + // ) + // } + + // This uses Piscina + // Does not working yet + worker.run({ + notifications, + probes, + }) }, 1000) } diff --git a/src/workers/probing.ts b/src/workers/probing.ts index 3ca13188e..84784bca1 100644 --- a/src/workers/probing.ts +++ b/src/workers/probing.ts @@ -38,20 +38,25 @@ import { DEFAULT_INCIDENT_THRESHOLD, DEFAULT_RECOVERY_THRESHOLD, } from '../components/config/validation/validator/default-values' +import Queue from 'queue' -type doProbeParams = { - probe: Probe // probe contains all the probes +type WorkerProps = { + probes: Probe[] // probe contains all the probes notifications: Notification[] // notifications contains all the notifications } -/** - * doProbe sends out the http request - * @param {string} stringifiedData stringified parameter - * @returns {Promise} void - */ -export default async (stringifiedData: string) => { - const parsedData = JSON.parse(stringifiedData) - const { probe, notifications }: doProbeParams = parsedData +type DoProbeParams = { + probe: Probe + notifications: Notification[] +} + +const queue = new Queue({ + concurrency: 1, + autostart: true, + timeout: 10_000, +}) + +const doProbe = ({ notifications, probe }: DoProbeParams) => { if (!isTimeToProbe(probe) || isCycleEnd(probe.id)) { return } @@ -118,3 +123,14 @@ function isCycleEnd(probeID: string) { function getRandomTimeoutMilliseconds(): number { return [1000, 2000, 3000].sort(() => Math.random() - 0.5)[0] } + +export default ({ probes, notifications }: WorkerProps) => { + for (const probe of probes) { + queue.push(() => + doProbe({ + notifications, + probe, + }) + ) + } +} From 70934aa21f9f20008b4a8b5eb7d3d2588aa420fe Mon Sep 17 00:00:00 2001 From: Denny Pradipta Date: Mon, 11 Dec 2023 16:27:09 +0700 Subject: [PATCH 3/4] Use queue --- src/components/probe/index.ts | 33 +++++++++++++++++---------- src/looper/index.ts | 42 ++++++----------------------------- 2 files changed, 28 insertions(+), 47 deletions(-) diff --git a/src/components/probe/index.ts b/src/components/probe/index.ts index 8aa222fd1..d7fbf5419 100644 --- a/src/components/probe/index.ts +++ b/src/components/probe/index.ts @@ -27,6 +27,7 @@ import { differenceInSeconds } from 'date-fns' import { getContext } from '../../context' import type { Notification } from '@hyperjumptech/monika-notification' import type { Probe } from '../../interfaces/probe' +import Queue from 'queue' import { getProbeContext, getProbeState, @@ -48,6 +49,13 @@ type doProbeParams = { * @param {object} doProbeParams doProbe parameter * @returns {Promise} void */ + +const queue = new Queue({ + concurrency: 1, + autostart: true, + timeout: 10_000, +}) + export async function doProbe({ probe, notifications, @@ -77,15 +85,18 @@ export async function doProbe({ probe.recoveryThreshold || DEFAULT_RECOVERY_THRESHOLD ) - await retry(handleAll, { - maxAttempts, - backoff: new ExponentialBackoff({ - initialDelay: getContext().flags.retryInitialDelayMs, - maxDelay: getContext().flags.retryMaxDelayMs, - }), - }).execute(({ attempt }) => - Promise.all(probers.map(async (prober) => prober.probe(attempt))) - ) + // Enqueue each prober into the prober queue with retry backoff + queue.push(async () => { + await retry(handleAll, { + maxAttempts, + backoff: new ExponentialBackoff({ + initialDelay: getContext().flags.retryInitialDelayMs, + maxDelay: getContext().flags.retryMaxDelayMs, + }), + }).execute(({ attempt }) => + Promise.all(probers.map(async (prober) => prober.probe(attempt))) + ) + }) setProbeFinish(probe.id) }, randomTimeoutMilliseconds) @@ -116,7 +127,5 @@ function isCycleEnd(probeID: string) { } function getRandomTimeoutMilliseconds(): number { - return [1000, 2000, 3000].sort(() => { - return Math.random() - 0.5 - })[0] + return [1000, 2000, 3000].sort(() => Math.random() - 0.5)[0] } diff --git a/src/looper/index.ts b/src/looper/index.ts index e809dafef..a0ea879cf 100644 --- a/src/looper/index.ts +++ b/src/looper/index.ts @@ -26,10 +26,6 @@ import type { Notification } from '@hyperjumptech/monika-notification' import { AbortSignal } from 'node-abort-controller' import { v4 as uuid } from 'uuid' -import path from 'path' -// import Queue from 'queue' -import Piscina from 'piscina' - import type { Probe, ProbeAlert } from '../interfaces/probe' import { getContext } from '../context' @@ -44,25 +40,10 @@ import { DEFAULT_INCIDENT_THRESHOLD, DEFAULT_RECOVERY_THRESHOLD, } from '../components/config/validation/validator/default-values' -// import { doProbe } from '../components/probe' +import { doProbe } from '../components/probe' let checkSTUNinterval: NodeJS.Timeout -// const queue = new Queue({ -// concurrency: 1, -// autostart: true, -// timeout: 10_000, -// }) - -const worker = new Piscina.Piscina({ - concurrentTasksPerWorker: 1, - // eslint-disable-next-line unicorn/prefer-module - filename: path.join(__dirname, '../../lib/workers/probing.js'), - idleTimeout: 1000, - maxQueue: 1, - maxThreads: 1, -}) - const DISABLE_STUN = -1 // -1 is disable stun checking export function sanitizeProbe(isSymonMode: boolean, probe: Probe): Probe { @@ -153,21 +134,12 @@ export function startProbing({ // This uses node queue // This is working - // for (const probe of probes) { - // queue.push(() => - // doProbe({ - // notifications, - // probe, - // }) - // ) - // } - - // This uses Piscina - // Does not working yet - worker.run({ - notifications, - probes, - }) + for (const probe of probes) { + doProbe({ + notifications, + probe, + }) + } }, 1000) } From 1436487221be36f34552aefe942367d3aa47fb79 Mon Sep 17 00:00:00 2001 From: Denny Pradipta Date: Fri, 15 Dec 2023 15:02:26 +0700 Subject: [PATCH 4/4] Feat: Use Queue to probe --- src/looper/index.ts | 2 - src/workers/probing.ts | 136 ----------------------------------------- 2 files changed, 138 deletions(-) delete mode 100644 src/workers/probing.ts diff --git a/src/looper/index.ts b/src/looper/index.ts index a0ea879cf..b0b674895 100644 --- a/src/looper/index.ts +++ b/src/looper/index.ts @@ -132,8 +132,6 @@ export function startProbing({ return } - // This uses node queue - // This is working for (const probe of probes) { doProbe({ notifications, diff --git a/src/workers/probing.ts b/src/workers/probing.ts deleted file mode 100644 index 84784bca1..000000000 --- a/src/workers/probing.ts +++ /dev/null @@ -1,136 +0,0 @@ -/********************************************************************************** - * MIT License * - * * - * Copyright (c) 2021 Hyperjump Technology * - * * - * Permission is hereby granted, free of charge, to any person obtaining a copy * - * of this software and associated documentation files (the "Software"), to deal * - * in the Software without restriction, including without limitation the rights * - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * - * copies of the Software, and to permit persons to whom the Software is * - * furnished to do so, subject to the following conditions: * - * * - * The above copyright notice and this permission notice shall be included in all * - * copies or substantial portions of the Software. * - * * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * - * SOFTWARE. * - **********************************************************************************/ - -import { ExponentialBackoff, retry, handleAll } from 'cockatiel' -import { differenceInSeconds } from 'date-fns' -import { getContext } from '../context' -import type { Notification } from '@hyperjumptech/monika-notification' -import type { Probe } from '../interfaces/probe' -import { - getProbeContext, - getProbeState, - setProbeFinish, - setProbeRunning, -} from '../utils/probe-state' -import { createProbers } from '../components/probe/prober/factory' -import { - DEFAULT_INCIDENT_THRESHOLD, - DEFAULT_RECOVERY_THRESHOLD, -} from '../components/config/validation/validator/default-values' -import Queue from 'queue' - -type WorkerProps = { - probes: Probe[] // probe contains all the probes - notifications: Notification[] // notifications contains all the notifications -} - -type DoProbeParams = { - probe: Probe - notifications: Notification[] -} - -const queue = new Queue({ - concurrency: 1, - autostart: true, - timeout: 10_000, -}) - -const doProbe = ({ notifications, probe }: DoProbeParams) => { - if (!isTimeToProbe(probe) || isCycleEnd(probe.id)) { - return - } - - const randomTimeoutMilliseconds = getRandomTimeoutMilliseconds() - setProbeRunning(probe.id) - - setTimeout(async () => { - const probeCtx = getProbeContext(probe.id) - if (!probeCtx) { - return - } - - const probers = createProbers({ - counter: probeCtx.cycle, - notifications, - probeConfig: probe, - }) - - const maxAttempts = Math.max( - // since we will retry for both incident and recovery, let's just get the biggest threshold - probe.incidentThreshold || DEFAULT_INCIDENT_THRESHOLD, - probe.recoveryThreshold || DEFAULT_RECOVERY_THRESHOLD - ) - - await retry(handleAll, { - maxAttempts, - backoff: new ExponentialBackoff({ - initialDelay: getContext().flags.retryInitialDelayMs, - maxDelay: getContext().flags.retryMaxDelayMs, - }), - }).execute(({ attempt }) => - Promise.all(probers.map(async (prober) => prober.probe(attempt))) - ) - - setProbeFinish(probe.id) - }, randomTimeoutMilliseconds) -} - -function isTimeToProbe({ id, interval }: Probe) { - const probeCtx = getProbeContext(id) - if (!probeCtx) { - return false - } - - const isIdle = getProbeState(id) === 'idle' - const isInTime = - differenceInSeconds(new Date(), probeCtx.lastFinish) >= interval - - return isIdle && isInTime -} - -function isCycleEnd(probeID: string) { - const probeCtx = getProbeContext(probeID) - if (!probeCtx) { - return true - } - - return ( - getContext().flags.repeat && getContext().flags.repeat === probeCtx.cycle - ) -} - -function getRandomTimeoutMilliseconds(): number { - return [1000, 2000, 3000].sort(() => Math.random() - 0.5)[0] -} - -export default ({ probes, notifications }: WorkerProps) => { - for (const probe of probes) { - queue.push(() => - doProbe({ - notifications, - probe, - }) - ) - } -}