From ea7ca452feca5f4edda1b4e2bb81420c54440c8c Mon Sep 17 00:00:00 2001 From: Hari Nugraha <15191978+haricnugraha@users.noreply.github.com> Date: Tue, 19 Dec 2023 11:55:11 +0700 Subject: [PATCH] Refactor: Update Probe Data Independently (#1203) --- src/components/config/index.ts | 2 +- src/symon/index.test.ts | 162 ++++++++++++-------------- src/symon/index.ts | 203 +++++++++++++++++++++++++-------- 3 files changed, 229 insertions(+), 138 deletions(-) diff --git a/src/components/config/index.ts b/src/components/config/index.ts index 657c7ede0..928eb4e7b 100644 --- a/src/components/config/index.ts +++ b/src/components/config/index.ts @@ -99,7 +99,7 @@ export const updateConfig = async (config: Config): Promise => { setContext({ config: newConfig }) setProbes(newConfig.probes) - emitter.emit(events.config.updated, newConfig) + emitter.emit(events.config.updated) log.info('Config file update detected') } catch (error: unknown) { const message = getErrorMessage(error) diff --git a/src/symon/index.test.ts b/src/symon/index.test.ts index e243d0bb4..ac329904f 100644 --- a/src/symon/index.test.ts +++ b/src/symon/index.test.ts @@ -32,10 +32,9 @@ import type { Probe } from '../interfaces/probe' import SymonClient from '.' import { getContext, resetContext, setContext } from '../context' -import { deleteProbe, getProbes } from '../components/config/probe' +import { deleteProbe, findProbe, getProbes } from '../components/config/probe' import { validateProbes } from '../components/config/validation' import events from '../events' -import { md5Hash } from '../utils/hash' import { getEventEmitter } from '../utils/events' import { getErrorMessage } from '../utils/catch-error-handler' @@ -290,6 +289,25 @@ describe('Symon initiate', () => { it('should add a new probe', async () => { // arrange + const newProbe: Probe = { + id: '3', + name: 'New Probe', + interval: 2, + requests: [{ url: 'https://example.com', body: '', timeout: 2000 }], + alerts: [], + } + server.use( + rest.get( + 'http://localhost:4000/api/v1/monika/1234/probe-changes', + (_, res, ctx) => + res( + ctx.json({ + message: 'Successfully get probe changes', + data: [{ type: 'add', probe: newProbe }], + }) + ) + ) + ) const symonGetProbesIntervalMs = 100 setContext({ flags: { @@ -312,59 +330,40 @@ describe('Symon initiate', () => { // assert // 3. Check the probe data after connected to Symon - expect(getProbes()).deep.eq(await validateProbes(config.probes)) - - // arrange - // 4. Simulate adding a probe - const newProbe: Probe = { - id: '3', - name: 'New Probe', - interval: 2, - requests: [{ url: 'https://example.com', body: '', timeout: 2000 }], - alerts: [], - } - server.use( - rest.get( - 'http://localhost:4000/api/v1/monika/1234/probes', - (_, res, ctx) => { - const newProbes: Probe[] = [...config.probes, newProbe] - - return res( - ctx.set('etag', md5Hash(newProbes)), - ctx.json({ - statusCode: 'ok', - message: 'Successfully get probes configuration', - data: newProbes, - }) - ) - } - ) - ) + expect(getProbes().length).eq(2) // act - // 5. Wait for the probe fetch to run + // 4. Wait for the probe fetch to run await sleep(symonGetProbesIntervalMs) // assert - // 6. Check the updated probe cache - expect(getProbes()).deep.eq( - await validateProbes([...config.probes, newProbe]) - ) + // 5. Check the updated probe cache + expect(getProbes().length).eq(3) - // act - // 7. Wait for probe fetch to run - await sleep(symonGetProbesIntervalMs) - - // assert - // 8. Should not update the probe cache - expect(getProbes()).deep.eq( - await validateProbes([...config.probes, newProbe]) - ) await symon.stop() }).timeout(15_000) it('should update a probe', async () => { // arrange + server.use( + rest.get( + 'http://localhost:4000/api/v1/monika/1234/probe-changes', + (_, res, ctx) => + res( + ctx.json({ + message: 'Successfully get probe changes', + data: [ + { + type: 'update', + // eslint-disable-next-line camelcase + probe_id: '1', + probe: { ...findProbe('1'), interval: 2 }, + }, + ], + }) + ) + ) + ) const symonGetProbesIntervalMs = 100 setContext({ flags: { @@ -387,42 +386,41 @@ describe('Symon initiate', () => { // assert // 3. Check the probe data after connected to Symon - expect(getProbes()).deep.eq(await validateProbes(config.probes)) - - // arrange - // 4. Simulate updating a probe - const updatedProbes: Probe[] = [{ ...config.probes[0], interval: 5 }] - server.use( - rest.get( - 'http://localhost:4000/api/v1/monika/1234/probes', - (_, res, ctx) => { - const newProbes: Probe[] = updatedProbes - - return res( - ctx.set('etag', md5Hash(newProbes)), - ctx.json({ - statusCode: 'ok', - message: 'Successfully get probes configuration', - data: newProbes, - }) - ) - } - ) - ) + expect(getProbes().length).eq(2) // act - // 5. Wait for the probe fetch to run + // 4. Wait for the probe fetch to run await sleep(symonGetProbesIntervalMs) // assert - // 6. Check the updated probe cache - expect(getProbes()).deep.eq(await validateProbes(updatedProbes)) + // 5. Check the updated probe cache + expect(getProbes().length).eq(2) + expect(findProbe('1')?.interval).eq(2) await symon.stop() }).timeout(15_000) it('should delete a probe', async () => { // arrange + server.use( + rest.get( + 'http://localhost:4000/api/v1/monika/1234/probe-changes', + (_, res, ctx) => + res( + ctx.json({ + message: 'Successfully get probe changes', + data: [ + { + type: 'delete', + // eslint-disable-next-line camelcase + probe_id: '1', + probe: {}, + }, + ], + }) + ) + ) + ) const symonGetProbesIntervalMs = 100 setContext({ flags: { @@ -445,33 +443,15 @@ describe('Symon initiate', () => { // assert // 3. Check the probe data after connected to Symon - expect(getProbes()).deep.eq(await validateProbes(config.probes)) - - // arrange - // 4. Simulate deleting a probe - const updatedProbes: Probe[] = config.probes.filter(({ id }) => id === '1') - server.use( - rest.get( - 'http://localhost:4000/api/v1/monika/1234/probes', - (_, res, ctx) => - res( - ctx.set('etag', md5Hash(updatedProbes)), - ctx.json({ - statusCode: 'ok', - message: 'Successfully get probes configuration', - data: updatedProbes, - }) - ) - ) - ) + expect(getProbes().length).eq(2) // act - // 5. Wait for the probe fetch to run + // 4. Wait for the probe fetch to run await sleep(symonGetProbesIntervalMs) // assert - // 6. Check the updated probe cache - expect(getProbes()).deep.eq(await validateProbes(updatedProbes)) + // 5. Check the updated probe cache + expect(getProbes().length).eq(1) await symon.stop() }).timeout(15_000) diff --git a/src/symon/index.ts b/src/symon/index.ts index 5a158592d..db94d558a 100644 --- a/src/symon/index.ts +++ b/src/symon/index.ts @@ -30,15 +30,22 @@ import { hostname } from 'os' import path from 'path' import Piscina from 'piscina' -import { getProbes } from '../components/config/probe' +import type { Config } from '../interfaces/config' +import type { Probe } from '../interfaces/probe' +import type { ValidatedResponse } from '../plugins/validate-response' + +import { + addProbe, + deleteProbe, + getProbes, + updateProbe, +} from '../components/config/probe' import { updateConfig } from '../components/config' +import { validateProbes } from '../components/config/validation/validator/probe' import { getOSName } from '../components/notification/alert-message' import { getContext } from '../context' import events from '../events' import { SYMON_API_VERSION, type MonikaFlags } from '../flag' -import { Config } from '../interfaces/config' -import { Probe } from '../interfaces/probe' -import { ValidatedResponse } from '../plugins/validate-response' import { getEventEmitter } from '../utils/events' import { DEFAULT_TIMEOUT } from '../utils/http' import getIp from '../utils/ip' @@ -84,6 +91,21 @@ type SymonClientParams = Pick< | 'symonUrl' > +type LastEvent = { + id: string + alertId: string + locationId: string + recoveredAt: Date | null +} + +type ProbeChange = { + probe_id: string + type: 'add' | 'delete' | 'update' + created_at: Date + probe: Probe + lastEvent: LastEvent +} + const getHandshakeData = async (): Promise => { await retry(handleAll, { backoff: new ExponentialBackoff(), @@ -129,11 +151,12 @@ export default class SymonClient { private reportProbesInterval: number private worker private apiKey: string - private getProbesInterval: NodeJS.Timeout | undefined + private probeChangesInterval: NodeJS.Timeout | undefined private hasConnectionToSymon: boolean = false private httpClient: AxiosInstance private locationId: string private monikaId: string + private probeChangesCheckedAt: Date | undefined private reportProbesLimit: number private reportTimeout: NodeJS.Timeout | undefined private url: string @@ -170,15 +193,58 @@ export default class SymonClient { } async initiate(): Promise { - log.info('[Symon] Handshake starts') this.monikaId = await this.handshake() - log.info('[Symon] Handshake succeed') + log.info('[Symon] Handshake') + + this.sendStatus({ isOnline: true }) + .then(() => { + log.info('[Symon] Send status succeed') + }) + .catch((error) => { + log.error(`[Symon] Send status failed. ${(error as Error).message}`) + }) + + const probeChangesCheckedAt = new Date() await this.fetchProbesAndUpdateConfig() - this.getProbesInterval = setInterval( - this.fetchProbesAndUpdateConfig.bind(this), - getContext().flags.symonGetProbesIntervalMs - ) + this.setProbeChangesCheckedAt(probeChangesCheckedAt) + + this.probeChangesInterval = setInterval(async () => { + const probeChangesCheckedAt = new Date() + try { + const probeChanges = await this.probeChanges.bind(this)() + this.setProbeChangesCheckedAt(probeChangesCheckedAt) + + const hasProbeChanges = probeChanges.length > 0 + if (!hasProbeChanges) { + log.info( + `[Symon] No probe changes since ${this.probeChangesCheckedAt}` + ) + return + } + + const probeChangesApplyResults = await applyProbeChanges(probeChanges) + for (const result of probeChangesApplyResults) { + if (result.status === 'rejected') { + log.error( + `[Symon] Get probe changes since ${this.probeChangesCheckedAt}. ${result.reason}` + ) + } + } + + this.eventEmitter.emit(events.config.updated) + + log.info( + `[Symon] Get probe changes (${probeChanges.length}) since ${this.probeChangesCheckedAt}` + ) + } catch (error) { + log.error( + `[Symon] Get probe changes since ${ + this.probeChangesCheckedAt + } failed. ${(error as Error).message}` + ) + } + }, getContext().flags.symonGetProbesIntervalMs) this.report().catch((error) => { this.hasConnectionToSymon = false @@ -191,29 +257,41 @@ export default class SymonClient { ) } + async sendStatus({ isOnline }: { isOnline: boolean }): Promise { + const { status } = await this.httpClient({ + data: { + monikaId: this.monikaId, + status: isOnline, + }, + method: 'POST', + url: '/status', + }) + + if (status === 200) { + this.hasConnectionToSymon = true + } + } + async stop(): Promise { if (this.eventEmitter) { this.eventEmitter.removeAllListeners() } - clearInterval(this.getProbesInterval) - + clearInterval(this.probeChangesInterval) clearTimeout(this.reportTimeout) await this.worker.destroy() } + private setProbeChangesCheckedAt(probeChangesCheckedAt: Date) { + this.probeChangesCheckedAt = probeChangesCheckedAt + } + private willSendEventListener({ probeState, validation, alertId, }: NotificationEvent) { - log.info( - `[Symon] Send ${ - probeState === 'DOWN' ? 'incident' : 'recovery' - } event for Alert ID: ${alertId}` - ) - const { data, headers, responseTime, status } = validation.response this.httpClient .post('/events', { @@ -228,6 +306,13 @@ export default class SymonClient { time: responseTime, }, }) + .then(() => { + log.info( + `[Symon] Send ${ + probeState === 'DOWN' ? 'incident' : 'recovery' + } event for Alert ID: ${alertId} succeed` + ) + }) .catch((error) => { log.error( `[Symon] Send ${ @@ -238,7 +323,6 @@ export default class SymonClient { } private async report(): Promise { - log.info('[Symon] Report') // Create a task data object const taskData = { apiKey: this.apiKey, @@ -253,6 +337,7 @@ export default class SymonClient { try { // Submit the task to Piscina await this.worker.run(JSON.stringify(taskData)) + log.info('[Symon] Report succeed') } finally { this.reportTimeout = setTimeout(() => { this.report @@ -265,19 +350,15 @@ export default class SymonClient { } } - async sendStatus({ isOnline }: { isOnline: boolean }): Promise { - const { status } = await this.httpClient({ - data: { - monikaId: this.monikaId, - status: isOnline, - }, - method: 'POST', - url: '/status', - }) + private async probeChanges(): Promise { + const response = await this.httpClient.get<{ data: ProbeChange[] }>( + `/${this.monikaId}/probe-changes`, + { + params: { since: this.probeChangesCheckedAt }, + } + ) - if (status === 200) { - this.hasConnectionToSymon = true - } + return response.data.data } private async fetchProbes() { @@ -297,8 +378,6 @@ export default class SymonClient { }) .then(async (res) => { if (!res.data.data) { - log.info('[Symon] No config changes') - return { probes: getProbes(), hash: res.headers.etag, @@ -323,11 +402,10 @@ export default class SymonClient { } private async fetchProbesAndUpdateConfig() { - log.info('[Symon] Get probes') // Fetch the probes const { hash, probes } = await this.fetchProbes() const newConfig: Config = { probes, version: hash } - await this.setConfig(newConfig) + await setConfig(newConfig) // Set connection to symon as true, because it could fetch the probes this.hasConnectionToSymon = true @@ -359,17 +437,50 @@ export default class SymonClient { .post('/client-handshake', handshakeData) .then((res) => res.data?.data.monikaId) } +} - private async setConfig(newConfig: Config) { - if ( - !newConfig.version || - getContext().config?.version === newConfig.version - ) { - log.info('[Symon] No config change') - return - } +async function applyProbeChanges(probeChanges: ProbeChange[]) { + return Promise.allSettled( + probeChanges.map(async ({ lastEvent, probe, probe_id: probeId, type }) => { + switch (type) { + case 'delete': { + deleteProbe(probeId) + return + } - log.info('[Symon] Config changes. Reloading Monika') - await updateConfig(newConfig) + case 'update': { + const probes = await validateProbes([ + lastEvent ? { ...probe, lastEvent } : probe, + ]) + updateProbe(probeId, probes[0]) + + return + } + + case 'add': { + const probes = await validateProbes([ + lastEvent ? { ...probe, lastEvent } : probe, + ]) + addProbe(probes[0]) + return + } + + default: { + throw new Error(`Unknown probe changes type (${type}).`) + } + } + }) + ) +} + +async function setConfig(newConfig: Config) { + if ( + !newConfig.version || + getContext().config?.version === newConfig.version + ) { + return } + + log.info('[Symon] Config changes. Reloading Monika') + await updateConfig(newConfig) }