Skip to content

Commit e27a402

Browse files
authored
Merge pull request #103 from windingtree/feat/level-queue-test
feat: add levelDB + queue test
2 parents 5abd470 + 13e9c42 commit e27a402

File tree

3 files changed

+2653
-7284
lines changed

3 files changed

+2653
-7284
lines changed

packages/storage/package.json

+5
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@
5757
"publishConfig": {
5858
"access": "public"
5959
},
60+
"devDependencies": {
61+
"@types/luxon": "^3.3.5",
62+
"@windingtree/sdk-queue": "workspace:*",
63+
"luxon": "^1.23.0"
64+
},
6065
"dependencies": {
6166
"@windingtree/sdk-logger": "workspace:*",
6267
"buffer": "^6.0.3",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import { describe, expect, it } from 'vitest';
2+
import { GenericStorageOptions } from '../src/index.js';
3+
import { createInitializer } from '../src/level.js';
4+
import { JobHandler, Queue } from '@windingtree/sdk-queue';
5+
import { DateTime } from 'luxon';
6+
7+
describe('Level Queue Test', () => {
8+
const createJobHandler =
9+
<JobData = unknown, HandlerOptions = unknown>(
10+
handler: JobHandler<JobData, HandlerOptions>,
11+
) =>
12+
(options: HandlerOptions = {} as HandlerOptions) =>
13+
(data: JobData) =>
14+
handler(data, options);
15+
16+
it('Initializer queue jobs test', async () => {
17+
const options: GenericStorageOptions = { scope: 'queueTestInitializer' };
18+
const initializer = createInitializer(options);
19+
const initializedStorage = await initializer();
20+
21+
let queue = new Queue({
22+
storage: initializedStorage,
23+
idsKeyName: 'jobsIds',
24+
concurrencyLimit: 3,
25+
});
26+
27+
const idsSet: Set<number> = new Set();
28+
let hasDoubleJobs = false;
29+
const testHandler = createJobHandler<{ id: number }>(async (data) => {
30+
if (data?.id) {
31+
const { id } = data;
32+
33+
if (idsSet.has(id)) {
34+
hasDoubleJobs = true;
35+
}
36+
37+
idsSet.add(data?.id);
38+
}
39+
40+
return Promise.resolve(false);
41+
});
42+
43+
queue.registerHandler('test', testHandler());
44+
45+
const id = Math.floor(Math.random() * 100);
46+
const timeoutSeconds = 3;
47+
48+
DateTime.fromJSDate(new Date()).plus({ minutes: 5 }).toSeconds();
49+
50+
const jobId = queue.add({
51+
handlerName: 'test',
52+
data: { id },
53+
maxRetries: 1,
54+
expire: DateTime.fromJSDate(new Date()).plus({ minutes: 5 }).toSeconds(),
55+
scheduledTime: DateTime.fromJSDate(new Date())
56+
.plus({ seconds: timeoutSeconds })
57+
.toMillis(),
58+
});
59+
60+
await new Promise<void>((resolve) => {
61+
return setTimeout(() => resolve(), 500);
62+
});
63+
64+
await new Promise<void>((resolve) => {
65+
return setTimeout(() => resolve(), (timeoutSeconds + 1) * 1000);
66+
});
67+
68+
await queue.stop();
69+
70+
queue = new Queue({
71+
storage: initializedStorage,
72+
idsKeyName: 'jobsIds',
73+
concurrencyLimit: 3,
74+
});
75+
76+
await new Promise<void>((resolve) => {
77+
return setTimeout(() => resolve(), 1000);
78+
});
79+
80+
const data = (await queue.get(jobId))?.data;
81+
82+
expect((data as { id: number }).id).toBe(id);
83+
expect(hasDoubleJobs).toBe(false);
84+
});
85+
});

0 commit comments

Comments
 (0)