Skip to content

Commit

Permalink
chore(NODE-6722): migrate parallel benchmarks (#4404)
Browse files Browse the repository at this point in the history
  • Loading branch information
nbbeeken authored Feb 10, 2025
1 parent a79a13d commit 7d744be
Show file tree
Hide file tree
Showing 12 changed files with 258 additions and 15 deletions.
2 changes: 1 addition & 1 deletion test/benchmarks/driver_bench/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"version": "0.0.0",
"private": true,
"scripts": {
"prestart": "tsc",
"prestart": "rm -rf lib && tsc",
"start": "node lib/main.mjs"
}
}
15 changes: 13 additions & 2 deletions test/benchmarks/driver_bench/src/driver.mts
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,18 @@ export function snakeToCamel(name: string) {
import type mongodb from '../../../../mongodb.js';
export type { mongodb };

const { MongoClient, GridFSBucket } = require(path.join(MONGODB_DRIVER_PATH));
const { MongoClient, GridFSBucket, BSON } = require(path.join(MONGODB_DRIVER_PATH));

export { BSON };
export const EJSON = BSON.EJSON;

const DB_NAME = 'perftest';
const COLLECTION_NAME = 'corpus';

const SPEC_DIRECTORY = path.resolve(__dirname, '..', '..', 'driverBench', 'spec');
// TODO(NODE-6729): move spec into this folder: export const SPEC_DIRECTORY = path.resolve(__dirname, '..', 'spec');
export const SPEC_DIRECTORY = path.resolve(__dirname, '..', '..', 'driverBench', 'spec');
export const PARALLEL_DIRECTORY = path.resolve(SPEC_DIRECTORY, 'parallel');
export const TEMP_DIRECTORY = path.resolve(SPEC_DIRECTORY, 'tmp');

export type Metric = {
name: 'megabytes_per_second';
Expand Down Expand Up @@ -186,6 +192,11 @@ export class DriverTester {
throw new Error('unknown type: ' + type);
}

async resetTmpDir() {
await fs.rm(TEMP_DIRECTORY, { recursive: true, force: true });
await fs.mkdir(TEMP_DIRECTORY);
}

async insertManyOf(document: Record<string, any>, length: number, addId = false) {
const utilClient = new MongoClient(MONGODB_URI, MONGODB_CLIENT_OPTIONS);
const db = utilClient.db(DB_NAME);
Expand Down
24 changes: 12 additions & 12 deletions test/benchmarks/driver_bench/src/main.mts
Original file line number Diff line number Diff line change
Expand Up @@ -120,27 +120,27 @@ function calculateCompositeBenchmarks(results: MetricInfo[]) {
'largeDocBulkInsert',
'smallDocBulkInsert'
],
// parallelBench: [
// 'ldjsonMultiFileUpload',
// 'ldjsonMultiFileExport',
// 'gridfsMultiFileUpload',
// 'gridfsMultiFileDownload'
// ],
parallelBench: [
'ldjsonMultiFileUpload',
'ldjsonMultiFileExport',
'gridfsMultiFileUpload',
'gridfsMultiFileDownload'
],
readBench: [
'findOne',
'findManyAndEmptyCursor',
'gridFsDownload'
// 'gridfsMultiFileDownload',
// 'ldjsonMultiFileExport'
'gridFsDownload',
'gridfsMultiFileDownload',
'ldjsonMultiFileExport'
],
writeBench: [
'smallDocInsertOne',
'largeDocInsertOne',
'smallDocBulkInsert',
'largeDocBulkInsert',
'gridFsUpload'
// 'ldjsonMultiFileUpload',
// 'gridfsMultiFileUpload'
'gridFsUpload',
'ldjsonMultiFileUpload',
'gridfsMultiFileUpload'
]
};

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { createReadStream, createWriteStream, promises as fs } from 'node:fs';
import path from 'node:path';
import { pipeline } from 'node:stream/promises';

import { driver, type mongodb, PARALLEL_DIRECTORY, TEMP_DIRECTORY } from '../../driver.mjs';

export const taskSize = 262.144;

let bucket: mongodb.GridFSBucket;

export async function before() {
await driver.drop();
await driver.create();
await driver.resetTmpDir();

bucket = driver.bucket(driver.client.db(driver.DB_NAME));
await bucket.drop().catch(() => null);

const gridfs_multi = path.resolve(PARALLEL_DIRECTORY, 'gridfs_multi');

const files = (await fs.readdir(gridfs_multi)).map(filename =>
path.resolve(gridfs_multi, filename)
);

const uploadPromises = files.map(async filename => {
const fileStream = createReadStream(filename);
const uploadStream = bucket.openUploadStream(filename);
return await pipeline(fileStream, uploadStream);
});

await Promise.all(uploadPromises);
}

export async function beforeEach() {
await driver.resetTmpDir();
}

export async function run() {
const files = await bucket
.find()
.map(({ _id }) => ({
path: path.resolve(TEMP_DIRECTORY, `${_id}.txt`),
_id
}))
.toArray();

const downloads = files.map(async ({ _id, path }) => {
const fileStream = createWriteStream(path);
const downloadStream = bucket.openDownloadStream(_id);
return await pipeline(downloadStream, fileStream);
});

await Promise.all(downloads);
}

export async function afterEach() {
await driver.resetTmpDir();
}

export async function after() {
await driver.drop();
await driver.close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { createReadStream, promises as fs } from 'node:fs';
import path from 'node:path';
import { Readable } from 'node:stream';
import { pipeline } from 'node:stream/promises';

import { driver, type mongodb, PARALLEL_DIRECTORY } from '../../driver.mjs';

export const taskSize = 262.144;

let bucket: mongodb.GridFSBucket;

const directory = path.resolve(PARALLEL_DIRECTORY, 'gridfs_multi');

export async function before() {
await driver.drop();
await driver.create();

bucket = driver.bucket(driver.client.db(driver.DB_NAME));

await bucket.drop().catch(() => null);
}

export async function beforeEach() {
// Create the bucket.
const stream = bucket.openUploadStream('setup-file.txt');
const oneByteFile = Readable.from('a');
await pipeline(oneByteFile, stream);
}

export async function run() {
const files = await fs.readdir(directory);

const uploadPromises = files.map(async filename => {
const file = path.resolve(directory, filename);
const fileStream = createReadStream(file);
const uploadStream = bucket.openUploadStream(file);
return await pipeline(fileStream, uploadStream);
});

await Promise.all(uploadPromises);
}

export async function after() {
await driver.drop();
await driver.close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { createReadStream, createWriteStream, promises as fs } from 'node:fs';
import path from 'node:path';
import readline from 'node:readline/promises';
import stream from 'node:stream/promises';

import { driver, EJSON, type mongodb, PARALLEL_DIRECTORY, TEMP_DIRECTORY } from '../../driver.mjs';

export const taskSize = 565;

let collection: mongodb.Collection;

export async function before() {
await driver.drop();
await driver.create();
await driver.resetTmpDir();

collection = driver.client.db(driver.DB_NAME).collection(driver.COLLECTION_NAME);

const ldjson_multi = path.resolve(PARALLEL_DIRECTORY, 'ldjson_multi');

const files = (await fs.readdir(ldjson_multi)).map(fileName =>
path.resolve(ldjson_multi, fileName)
);

const uploads = files.map(async fileName => {
const fileStream = createReadStream(fileName);
const lineReader = readline.createInterface({
input: fileStream
});

const operations = [];

for await (const line of lineReader) {
operations.push({
insertOne: {
document: JSON.parse(line)
}
});
}

fileStream.close();
lineReader.close();

return await collection.bulkWrite(operations);
});

await Promise.all(uploads);
}

export async function beforeEach() {
await driver.resetTmpDir();
}

export async function run() {
const skips = Array.from({ length: 100 }, (_, index) => index * 5000);

const promises = skips.map(async skip => {
const documentCursor = collection.find({}, { skip, limit: 5000 });
documentCursor.map(doc => EJSON.stringify(doc));
const outputStream = createWriteStream(path.resolve(TEMP_DIRECTORY, `tmp-${skip}.txt`));
return await stream.pipeline(documentCursor.stream(), outputStream);
});

await Promise.all(promises);
}

export async function afterEach() {
await driver.resetTmpDir();
}

export async function after() {
await driver.drop();
await driver.close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { createReadStream, promises as fs } from 'node:fs';
import path from 'node:path';
import readline from 'node:readline/promises';

import { driver, type mongodb, PARALLEL_DIRECTORY } from '../../driver.mjs';

export const taskSize = 565;

const directory = path.resolve(PARALLEL_DIRECTORY, 'ldjson_multi');
let collection: mongodb.Collection;

export async function beforeEach() {
await driver.drop();
await driver.create();

collection = driver.client.db(driver.DB_NAME).collection(driver.COLLECTION_NAME);
}

export async function run() {
const files = await fs.readdir(directory);
const uploads = files.map(async file => {
const fileStream = createReadStream(path.resolve(directory, file));
const lineReader = readline.createInterface({
input: fileStream
});

const operations = [];

for await (const line of lineReader) {
operations.push({
insertOne: {
document: JSON.parse(line)
}
});
}

fileStream.close();
lineReader.close();

return await collection.bulkWrite(operations);
});

await Promise.all(uploads);
}

export async function after() {
await driver.drop();
await driver.close();
}

0 comments on commit 7d744be

Please sign in to comment.