From 02cd9126fe73da2a319637748e1ce3162967d89a Mon Sep 17 00:00:00 2001 From: PJColombo Date: Wed, 20 Dec 2023 05:44:50 +0100 Subject: [PATCH] test: implement general setup and teardown hooks for bullmq queues --- clis/blob-propagation-jobs-cli/package.json | 1 - clis/blob-propagation-jobs-cli/setup.ts | 13 ++++ .../src/QueueManager.ts | 6 +- .../test/commands/remove.test.ts | 15 +--- .../test/commands/retry.test.ts | 4 +- clis/blob-propagation-jobs-cli/tsconfig.json | 2 +- .../vitest.config.ts | 18 +++-- package.json | 3 +- packages/api/test/indexer.test.ts | 10 +-- packages/blob-propagator/setup.ts | 36 ++++++++++ .../blob-propagator/src/BlobPropagator.ts | 71 +++++++++++-------- .../blob-propagator/src/blob-file-manager.ts | 2 +- .../blob-propagator/src/blob-propagator.ts | 6 +- packages/blob-propagator/src/utils.ts | 16 ++--- .../src/worker-processors/gcs.ts | 10 ++- .../src/worker-processors/postgres.ts | 12 +++- .../src/worker-processors/swarm.ts | 10 ++- .../blob-propagator/test/swarm-worker.test.ts | 6 +- packages/blob-propagator/tsconfig.json | 2 +- packages/blob-propagator/vitest.config.ts | 11 ++- vitest.shared.ts | 4 +- 21 files changed, 161 insertions(+), 97 deletions(-) create mode 100644 clis/blob-propagation-jobs-cli/setup.ts create mode 100644 packages/blob-propagator/setup.ts diff --git a/clis/blob-propagation-jobs-cli/package.json b/clis/blob-propagation-jobs-cli/package.json index b0fee152b..8bb176acb 100644 --- a/clis/blob-propagation-jobs-cli/package.json +++ b/clis/blob-propagation-jobs-cli/package.json @@ -6,7 +6,6 @@ "main": "./src/index.ts", "types": "./src/index.ts", "scripts": { - "build": "tsc", "clean": "rm -rf .turbo node_modules", "lint": "eslint .", "lint:fix": "pnpm lint --fix", diff --git a/clis/blob-propagation-jobs-cli/setup.ts b/clis/blob-propagation-jobs-cli/setup.ts new file mode 100644 index 000000000..419c6053f --- /dev/null +++ b/clis/blob-propagation-jobs-cli/setup.ts @@ -0,0 +1,13 @@ +import { afterAll, beforeEach } from "vitest"; + +import { queueManager } from "./src/queue-manager"; + +beforeEach(async () => { + await queueManager.obliterateQueues({ force: true }); +}); +afterAll(async () => { + await queueManager + .obliterateQueues({ force: true }) + // eslint-disable-next-line @typescript-eslint/no-misused-promises + .finally(() => queueManager.close()); +}); diff --git a/clis/blob-propagation-jobs-cli/src/QueueManager.ts b/clis/blob-propagation-jobs-cli/src/QueueManager.ts index 6aa03026c..fe64fb886 100644 --- a/clis/blob-propagation-jobs-cli/src/QueueManager.ts +++ b/clis/blob-propagation-jobs-cli/src/QueueManager.ts @@ -65,9 +65,11 @@ export class QueueManager { ]); } - obliterateQueues() { + obliterateQueues({ force = false } = {}) { return Promise.all([ - ...Object.values(this.#storageQueues).map((queue) => queue.obliterate()), + ...Object.values(this.#storageQueues).map((queue) => + queue.obliterate({ force }) + ), this.#finalizerQueue.obliterate(), ]); } diff --git a/clis/blob-propagation-jobs-cli/test/commands/remove.test.ts b/clis/blob-propagation-jobs-cli/test/commands/remove.test.ts index a3f12d875..bdc648ed4 100644 --- a/clis/blob-propagation-jobs-cli/test/commands/remove.test.ts +++ b/clis/blob-propagation-jobs-cli/test/commands/remove.test.ts @@ -1,14 +1,5 @@ -/* eslint-disable @typescript-eslint/no-misused-promises */ import type { SpyInstance } from "vitest"; -import { - afterAll, - afterEach, - beforeEach, - describe, - expect, - it, - vi, -} from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { remove, removeCommandUsage } from "../../src/commands"; import { queueManager } from "../../src/queue-manager"; @@ -159,8 +150,4 @@ describe("Remove command", () => { '"Invalid queue name: invalid-queue-name"' ); }); - - afterAll(async () => { - await queueManager.obliterateQueues().finally(() => queueManager.close()); - }); }); diff --git a/clis/blob-propagation-jobs-cli/test/commands/retry.test.ts b/clis/blob-propagation-jobs-cli/test/commands/retry.test.ts index 9d469c4a0..2dfc238e3 100644 --- a/clis/blob-propagation-jobs-cli/test/commands/retry.test.ts +++ b/clis/blob-propagation-jobs-cli/test/commands/retry.test.ts @@ -105,9 +105,7 @@ describe("Retry command", () => { }); afterAll(async () => { - let teardownPromise = queueManager - .obliterateQueues() - .finally(() => queueManager.close()); + let teardownPromise = Promise.resolve(); storageWorkers.forEach((worker) => { teardownPromise = teardownPromise.finally(() => worker.close()); diff --git a/clis/blob-propagation-jobs-cli/tsconfig.json b/clis/blob-propagation-jobs-cli/tsconfig.json index 304322ee1..d4f59efdd 100644 --- a/clis/blob-propagation-jobs-cli/tsconfig.json +++ b/clis/blob-propagation-jobs-cli/tsconfig.json @@ -1,4 +1,4 @@ { "extends": "../../tsconfig.json", - "include": ["src/**/*.ts", "test/**/*.ts", "vitest.config.ts"] + "include": ["src/**/*.ts", "test/**/*.ts", "vitest.config.ts", "setup.ts"] } diff --git a/clis/blob-propagation-jobs-cli/vitest.config.ts b/clis/blob-propagation-jobs-cli/vitest.config.ts index a1847fca9..3ad620897 100644 --- a/clis/blob-propagation-jobs-cli/vitest.config.ts +++ b/clis/blob-propagation-jobs-cli/vitest.config.ts @@ -1,8 +1,12 @@ -import { defineProject } from "vitest/config"; +import { defineConfig, mergeConfig } from "vitest/config"; -export default defineProject({ - test: { - include: ["test/**/*.test.ts"], - threads: false, - }, -}); +import { sharedProjectConfig } from "../../vitest.shared"; + +export default mergeConfig( + sharedProjectConfig, + defineConfig({ + test: { + setupFiles: ["./setup.ts"], + }, + }) +); diff --git a/package.json b/package.json index 255f543b8..a4f436e28 100644 --- a/package.json +++ b/package.json @@ -24,7 +24,8 @@ "type-check": "turbo type-check", "stats": "ts-node scripts/stats-aggregator/index.ts", "job:daily": "ts-node scripts/jobs/aggregate-yesterdays-daily-stats.ts", - "job:overall": "ts-node scripts/jobs/aggregate-overall-stats.ts" + "job:overall": "ts-node scripts/jobs/aggregate-overall-stats.ts", + "validate": "turbo lint type-check && manypkg check && pnpm test" }, "dependencies": { "@blobscan/dayjs": "^0.0.1", diff --git a/packages/api/test/indexer.test.ts b/packages/api/test/indexer.test.ts index d1b866c05..07ef59dca 100644 --- a/packages/api/test/indexer.test.ts +++ b/packages/api/test/indexer.test.ts @@ -11,7 +11,6 @@ import { } from "vitest"; import type { Blob as PropagatorBlob } from "@blobscan/blob-propagator"; -import { blobFileManager } from "@blobscan/blob-propagator/src/blob-file-manager"; import type { BlobReference } from "@blobscan/blob-storage-manager"; import { omitDBTimestampFields } from "@blobscan/test"; @@ -420,11 +419,12 @@ describe("Indexer router", async () => { }); afterAll(async () => { - await blobFileManager.removeFolder(); + const blobPropagator = ctxWithBlobPropagator.blobPropagator; - await ctxWithBlobPropagator.blobPropagator?.close({ - emptyJobs: true, - }); + if (blobPropagator) { + await blobPropagator.empty({ force: true }); + await blobPropagator.close(); + } }); it("should call blob propagator", async () => { diff --git a/packages/blob-propagator/setup.ts b/packages/blob-propagator/setup.ts new file mode 100644 index 000000000..d7430ac32 --- /dev/null +++ b/packages/blob-propagator/setup.ts @@ -0,0 +1,36 @@ +import { Queue } from "bullmq"; +import type { RedisOptions } from "ioredis"; +import { afterAll } from "vitest"; + +import { blobFileManager } from "./src/blob-file-manager"; +import { env } from "./src/env"; +import { FINALIZER_WORKER_NAME, STORAGE_WORKER_NAMES } from "./src/utils"; + +const connection: RedisOptions = { + host: env.REDIS_QUEUE_HOST, + port: env.REDIS_QUEUE_PORT, + password: env.REDIS_QUEUE_PASSWORD, + username: env.REDIS_QUEUE_USERNAME, +}; + +afterAll(async () => { + const queues = [ + STORAGE_WORKER_NAMES["GOOGLE"], + STORAGE_WORKER_NAMES["POSTGRES"], + FINALIZER_WORKER_NAME, + ].map((queueName) => new Queue(queueName, { connection })); + + let teardownPromise = Promise.all([ + ...queues.map((q) => q.obliterate({ force: true })), + blobFileManager.removeFolder(), + ]); + + queues.forEach((q) => { + // eslint-disable-next-line @typescript-eslint/no-misused-promises + teardownPromise = teardownPromise.finally(async () => { + await q.close(); + }); + }); + + await teardownPromise; +}); diff --git a/packages/blob-propagator/src/BlobPropagator.ts b/packages/blob-propagator/src/BlobPropagator.ts index af45af965..69be81b7f 100644 --- a/packages/blob-propagator/src/BlobPropagator.ts +++ b/packages/blob-propagator/src/BlobPropagator.ts @@ -1,4 +1,5 @@ -import { FlowProducer, Worker } from "bullmq"; +/* eslint-disable @typescript-eslint/no-misused-promises */ +import { FlowProducer, Queue, Worker } from "bullmq"; import type { ConnectionOptions, FlowChildJob, @@ -19,7 +20,6 @@ import { FINALIZER_WORKER_NAME, STORAGE_WORKER_NAMES, buildJobId, - emptyWorkerJobQueue, } from "./utils"; import { finalizerProcessor, @@ -91,42 +91,49 @@ export class BlobPropagator { ); } - close(opts?: { emptyJobs: boolean }) { + async empty({ force }: { force: boolean } = { force: false }) { + const workers = this.#getWorkers(); + const queues = await Promise.all( + workers.map( + async (w) => new Queue(w.name, { connection: await w.client }) + ) + ); + + let emptyPromise = Promise.all([ + ...queues.map((q) => q.obliterate({ force })), + blobFileManager.removeFolder(), + ]); + + queues.forEach((q) => { + emptyPromise = emptyPromise.finally(async () => { + await q.close(); + }); + }); + + await emptyPromise; + } + + close() { let teardownPromise: Promise = Promise.resolve(); - const emptyJobs = opts?.emptyJobs; Object.values(this.storageWorkers).forEach((w) => { - // eslint-disable-next-line @typescript-eslint/no-misused-promises teardownPromise = teardownPromise.finally(async () => { - if (emptyJobs) { - await emptyWorkerJobQueue(w); - } - await w.close(); }); }); - return ( - teardownPromise - // eslint-disable-next-line @typescript-eslint/no-misused-promises - .finally(async () => { - if (emptyJobs) { - await emptyWorkerJobQueue(this.finalizerWorker); - } - - await this.finalizerWorker.close(); - }) - // eslint-disable-next-line @typescript-eslint/no-misused-promises - .finally(async () => { - const redisClient = await this.blobPropagationFlowProducer.client; - - await redisClient.quit(); - }) - // eslint-disable-next-line @typescript-eslint/no-misused-promises - .finally(async () => { - await this.blobPropagationFlowProducer.close(); - }) - ); + return teardownPromise + .finally(async () => { + await this.finalizerWorker.close(); + }) + .finally(async () => { + const redisClient = await this.blobPropagationFlowProducer.client; + + await redisClient.quit(); + }) + .finally(async () => { + await this.blobPropagationFlowProducer.close(); + }); } async propagateBlob(blob: Blob) { @@ -149,6 +156,10 @@ export class BlobPropagator { await this.blobPropagationFlowProducer.addBulk(blobPropagationFlowJobs); } + #getWorkers() { + return [...Object.values(this.storageWorkers), this.finalizerWorker]; + } + #createBlobPropagationFlowProducer(connection?: ConnectionOptions) { /* * Instantiating a new `FlowProducer` appears to create two separate `RedisConnection` instances. diff --git a/packages/blob-propagator/src/blob-file-manager.ts b/packages/blob-propagator/src/blob-file-manager.ts index 728fe1c94..eff26d5c8 100644 --- a/packages/blob-propagator/src/blob-file-manager.ts +++ b/packages/blob-propagator/src/blob-file-manager.ts @@ -7,5 +7,5 @@ const basePath = !env.TEST ? os.tmpdir() : undefined; export const blobFileManager = new BlobFileManager({ basePath, - folderName: "blob-files-test", + folderName: ".blob-files-test", }); diff --git a/packages/blob-propagator/src/blob-propagator.ts b/packages/blob-propagator/src/blob-propagator.ts index 04d009d01..9f4b8ef41 100644 --- a/packages/blob-propagator/src/blob-propagator.ts +++ b/packages/blob-propagator/src/blob-propagator.ts @@ -33,9 +33,7 @@ function createBlobPropagator() { }, }); } - -const blobPropagator = env.BLOB_PROPAGATOR_ENABLED - ? createBlobPropagator() - : undefined; +const blobPropagator = + env.BLOB_PROPAGATOR_ENABLED === true ? createBlobPropagator() : undefined; export { blobPropagator, createBlobPropagator }; diff --git a/packages/blob-propagator/src/utils.ts b/packages/blob-propagator/src/utils.ts index d9da5e872..1e363a10a 100644 --- a/packages/blob-propagator/src/utils.ts +++ b/packages/blob-propagator/src/utils.ts @@ -1,7 +1,7 @@ import type { Worker } from "bullmq"; -import { Queue } from "bullmq"; +import type { Queue } from "bullmq"; -import { getBlobStorageManager } from "@blobscan/blob-storage-manager"; +import type { BlobStorageManager } from "@blobscan/blob-storage-manager"; import { prisma, $Enums } from "@blobscan/db"; import { blobFileManager } from "./blob-file-manager"; @@ -28,19 +28,11 @@ export function getStorageFromjobId(jobId: string) { )?.[0] as $Enums.BlobStorage; } -export async function emptyWorkerJobQueue(worker: Worker) { - const q = new Queue(worker.name, { - connection: await worker.client, - }); - - return q.drain(); -} - export async function propagateBlob( versionedHash: string, - targetStorage: $Enums.BlobStorage + targetStorage: $Enums.BlobStorage, + { blobStorageManager }: { blobStorageManager: BlobStorageManager } ) { - const blobStorageManager = await getBlobStorageManager(); const blobData = await blobFileManager.readFile(versionedHash); const result = await blobStorageManager.storeBlob( diff --git a/packages/blob-propagator/src/worker-processors/gcs.ts b/packages/blob-propagator/src/worker-processors/gcs.ts index 9a38e7a75..af1022108 100644 --- a/packages/blob-propagator/src/worker-processors/gcs.ts +++ b/packages/blob-propagator/src/worker-processors/gcs.ts @@ -1,6 +1,12 @@ +import { getBlobStorageManager } from "@blobscan/blob-storage-manager"; + import type { BlobPropagationWorkerProcessor } from "../types"; import { propagateBlob } from "../utils"; -export const gcsProcessor: BlobPropagationWorkerProcessor = (job) => { - return propagateBlob(job.data.versionedHash, "GOOGLE"); +export const gcsProcessor: BlobPropagationWorkerProcessor = async (job) => { + const blobStorageManager = await getBlobStorageManager(); + + return propagateBlob(job.data.versionedHash, "GOOGLE", { + blobStorageManager, + }); }; diff --git a/packages/blob-propagator/src/worker-processors/postgres.ts b/packages/blob-propagator/src/worker-processors/postgres.ts index 2bcb4f705..371e6d219 100644 --- a/packages/blob-propagator/src/worker-processors/postgres.ts +++ b/packages/blob-propagator/src/worker-processors/postgres.ts @@ -1,6 +1,14 @@ +import { getBlobStorageManager } from "@blobscan/blob-storage-manager"; + import type { BlobPropagationWorkerProcessor } from "../types"; import { propagateBlob } from "../utils"; -export const postgresProcessor: BlobPropagationWorkerProcessor = (job) => { - return propagateBlob(job.data.versionedHash, "POSTGRES"); +export const postgresProcessor: BlobPropagationWorkerProcessor = async ( + job +) => { + const blobStorageManager = await getBlobStorageManager(); + + return propagateBlob(job.data.versionedHash, "POSTGRES", { + blobStorageManager, + }); }; diff --git a/packages/blob-propagator/src/worker-processors/swarm.ts b/packages/blob-propagator/src/worker-processors/swarm.ts index e5ca26ee7..946257ad7 100644 --- a/packages/blob-propagator/src/worker-processors/swarm.ts +++ b/packages/blob-propagator/src/worker-processors/swarm.ts @@ -1,6 +1,12 @@ +import { getBlobStorageManager } from "@blobscan/blob-storage-manager"; + import type { BlobPropagationWorkerProcessor } from "../types"; import { propagateBlob } from "../utils"; -export const swarmProcessor: BlobPropagationWorkerProcessor = function (job) { - return propagateBlob(job.data.versionedHash, "SWARM"); +export const swarmProcessor: BlobPropagationWorkerProcessor = async function ( + job +) { + const blobStorageManager = await getBlobStorageManager(); + + return propagateBlob(job.data.versionedHash, "SWARM", { blobStorageManager }); }; diff --git a/packages/blob-propagator/test/swarm-worker.test.ts b/packages/blob-propagator/test/swarm-worker.test.ts index 78cd479dd..8563c59f7 100644 --- a/packages/blob-propagator/test/swarm-worker.test.ts +++ b/packages/blob-propagator/test/swarm-worker.test.ts @@ -1,4 +1,4 @@ -import { afterAll, describe, vi } from "vitest"; +import { describe, vi } from "vitest"; import type { BlobStorageManager } from "@blobscan/blob-storage-manager"; @@ -10,10 +10,6 @@ const fixtures = { blobVersionedHash: "swarmWorkerVersionedHash", }; -afterAll(() => { - vi.resetModules(); -}); - describe( "Swarm Worker", runStorageWorkerTestSuite("SWARM", { diff --git a/packages/blob-propagator/tsconfig.json b/packages/blob-propagator/tsconfig.json index 304322ee1..d4f59efdd 100644 --- a/packages/blob-propagator/tsconfig.json +++ b/packages/blob-propagator/tsconfig.json @@ -1,4 +1,4 @@ { "extends": "../../tsconfig.json", - "include": ["src/**/*.ts", "test/**/*.ts", "vitest.config.ts"] + "include": ["src/**/*.ts", "test/**/*.ts", "vitest.config.ts", "setup.ts"] } diff --git a/packages/blob-propagator/vitest.config.ts b/packages/blob-propagator/vitest.config.ts index 8fdeaf1f5..3ad620897 100644 --- a/packages/blob-propagator/vitest.config.ts +++ b/packages/blob-propagator/vitest.config.ts @@ -1,5 +1,12 @@ -import { defineProject } from "vitest/config"; +import { defineConfig, mergeConfig } from "vitest/config"; import { sharedProjectConfig } from "../../vitest.shared"; -export default defineProject(sharedProjectConfig); +export default mergeConfig( + sharedProjectConfig, + defineConfig({ + test: { + setupFiles: ["./setup.ts"], + }, + }) +); diff --git a/vitest.shared.ts b/vitest.shared.ts index 78fffae9a..eb131e14b 100644 --- a/vitest.shared.ts +++ b/vitest.shared.ts @@ -1,6 +1,6 @@ -import { UserProjectConfigExport } from "vitest/config"; +import { UserWorkspaceConfig } from "vitest/config"; -export const sharedProjectConfig: UserProjectConfigExport = { +export const sharedProjectConfig: UserWorkspaceConfig = { test: { include: ["test/**/*.test.ts"], setupFiles: ["@blobscan/test/src/setup.ts"],