Skip to content

Commit

Permalink
mcap-record: Fix race condition when adding messages (#598)
Browse files Browse the repository at this point in the history
### Public-Facing Changes

mcap-record: Fix race condition when adding messages

### Description
Does not directly call `writer.addMessage` in the websocket message
handler but instead pushes it to a queue. This makes sure that we do not
call `writer.addMessage` concurrently which could happen due to a
certain `addMessage` taking longer when a chunk is ready to be
compressed & written. This PR also allows to set the message queue size
and to opt out of chunk compression.

Fixes #592
Resolves FG-5775
  • Loading branch information
achim-k authored Nov 28, 2023
1 parent 3379031 commit b4d7dff
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 15 deletions.
2 changes: 2 additions & 0 deletions typescript/ws-protocol-examples/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
"@types/debug": "^4.1.9",
"@types/lodash": "^4.14.200",
"@types/node": "^20.6.2",
"@types/promise-queue": "^2.2.3",
"@types/ws": "^8.5.5",
"@typescript-eslint/eslint-plugin": "6.13.0",
"@typescript-eslint/parser": "6.13.0",
Expand Down Expand Up @@ -73,6 +74,7 @@
"commander": "^11.1.0",
"debug": "^4",
"eventemitter3": "^5.0.1",
"promise-queue": "^2.2.5",
"protobufjs": "^7.2.5",
"pureimage": "^0.4.13",
"tslib": "^2.6.2",
Expand Down
54 changes: 42 additions & 12 deletions typescript/ws-protocol-examples/src/examples/mcap-record.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Command } from "commander";
import Debug from "debug";
import fs, { FileHandle } from "fs/promises";
import path from "path";
import Queue from "promise-queue";
import { WebSocket } from "ws";

const log = Debug("foxglove:mcap-record");
Expand Down Expand Up @@ -74,19 +75,28 @@ async function waitForServer(
return undefined;
}

async function main(address: string, options: { output: string }): Promise<void> {
async function main(
address: string,
options: { output: string; compression: boolean; queueSize: number },
): Promise<void> {
await Zstd.isLoaded;
await fs.mkdir(path.dirname(options.output), { recursive: true });
const fileHandle = await fs.open(options.output, "w");
const fileHandleWritable = new FileHandleWritable(fileHandle);
const textEncoder = new TextEncoder();
const maxPendingPromises = 1;
const maxQueuedPromises = options.queueSize > 0 ? options.queueSize : Infinity;
/** Used to ensure all operations on the McapWriter are sequential */
const writeMsgQueue = new Queue(maxPendingPromises, maxQueuedPromises);

const writer = new McapWriter({
writable: fileHandleWritable,
compressChunk: (data) => ({
compression: "zstd",
compressedData: Zstd.compress(data, 19),
}),
compressChunk: options.compression
? (data) => ({
compression: "zstd",
compressedData: Zstd.compress(data, 19),
})
: undefined,
});

await writer.start({
Expand All @@ -105,6 +115,7 @@ async function main(address: string, options: { output: string }): Promise<void>
if (!client) {
return;
}

await new Promise<void>((resolve) => {
const wsChannelsByMcapChannel = new Map<McapChannelId, WsChannelId>();
const subscriptionsById = new Map<
Expand Down Expand Up @@ -190,13 +201,20 @@ async function main(address: string, options: { output: string }): Promise<void>
log("received message for unknown subscription %s", event.subscriptionId);
return;
}
void writer.addMessage({
channelId: subscription.mcapChannelId,
sequence: subscription.messageCount++,
logTime: BigInt(Date.now()) * 1_000_000n,
publishTime: event.timestamp,
data: new Uint8Array(event.data.buffer, event.data.byteOffset, event.data.byteLength),
});

writeMsgQueue
.add(async () => {
await writer.addMessage({
channelId: subscription.mcapChannelId,
sequence: subscription.messageCount++,
logTime: BigInt(Date.now()) * 1_000_000n,
publishTime: event.timestamp,
data: new Uint8Array(event.data.buffer, event.data.byteOffset, event.data.byteLength),
});
})
.catch((error) => {
log((error as Error).message);
});
});

client.on("close", (event) => {
Expand All @@ -209,6 +227,11 @@ async function main(address: string, options: { output: string }): Promise<void>
resolve();
});
});

// Wait until all queued messages have been written.
while (writeMsgQueue.getPendingLength() + writeMsgQueue.getQueueLength() > 0) {
await new Promise((resolve) => setTimeout(resolve, 100));
}
} finally {
await writer.end();
}
Expand All @@ -218,4 +241,11 @@ export default new Command("mcap-record")
.description("connect to a WebSocket server and record an MCAP file")
.argument("<address>", "WebSocket address, e.g. ws://localhost:8765")
.option("-o, --output <file>", "path to write MCAP file")
.option("-n, --no-compression", "do not compress chunks")
.option(
"-q, --queue-size <value>",
"Size of incoming message queue. Choose 0 for unlimited queue length (default)",
parseInt,
0,
)
.action(main);
16 changes: 13 additions & 3 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1576,6 +1576,11 @@
resolved "https://registry.yarnpkg.com/@types/parse-json/-/parse-json-4.0.1.tgz#27f7559836ad796cea31acb63163b203756a5b4e"
integrity sha512-3YmXzzPAdOTVljVMkTMBdBEvlOLg2cDQaDhnnhT3nT9uDbnJzjWhKlzb+desT12Y7tGqaN6d+AbozcKzyL36Ng==

"@types/promise-queue@^2.2.3":
version "2.2.3"
resolved "https://registry.yarnpkg.com/@types/promise-queue/-/promise-queue-2.2.3.tgz#e58e17eaf855db5f6fedc47ac57f85600bf54230"
integrity sha512-CuEQpGSYKvHr3SQ7C7WkluLg9CFjVORbn8YFRsQ5u6mqGbZVfSOv03ic9t95HtZuMchnlNqnIsQGFOpxqdhjTQ==

"@types/prop-types@*", "@types/prop-types@^15.7.9":
version "15.7.9"
resolved "https://registry.yarnpkg.com/@types/prop-types/-/prop-types-15.7.9.tgz#b6f785caa7ea1fe4414d9df42ee0ab67f23d8a6d"
Expand Down Expand Up @@ -3888,9 +3893,9 @@ he@^1.2.0:
integrity sha512-F/1DnUGPopORZi0ni+CvrCgHQ5FyEAHRLSApuYWMmrbSwoN2Mn/7k+Gl38gJnR7yyDZk6WLXwiGod1JOWNDKGw==

heap-js@^2.2.0:
version "2.2.0"
resolved "https://registry.yarnpkg.com/heap-js/-/heap-js-2.2.0.tgz#f4418874cd2aedc2cf3a7492d579afe23b627c5d"
integrity sha512-G3uM72G9F/zo9Hph/T7m4ZZVlVu5bx2f5CiCS78TBHz2mNIXnB5KRdEEYssXZJ7ldLDqID29bZ1D5ezCKQD2Zw==
version "2.3.0"
resolved "https://registry.yarnpkg.com/heap-js/-/heap-js-2.3.0.tgz#8eed2cede31ec312aa696eef1d4df0565841f183"
integrity sha512-E5303mzwQ+4j/n2J0rDvEPBN7GKjhis10oHiYOgjxsmxYgqG++hz9NyLLOXttzH8as/DyiBHYpUrJTZWYaMo8Q==

hoist-non-react-statics@^3.3.1:
version "3.3.2"
Expand Down Expand Up @@ -5487,6 +5492,11 @@ process-nextick-args@~2.0.0:
resolved "https://registry.yarnpkg.com/process-nextick-args/-/process-nextick-args-2.0.1.tgz#7820d9b16120cc55ca9ae7792680ae7dba6d7fe2"
integrity sha512-3ouUOpQhtgrbOa17J7+uxOTpITYWaGP7/AhoR3+A+/1e9skrzelGi/dXzEYyvbxubEF6Wn2ypscTKiKJFFn1ag==

promise-queue@^2.2.5:
version "2.2.5"
resolved "https://registry.yarnpkg.com/promise-queue/-/promise-queue-2.2.5.tgz#2f6f5f7c0f6d08109e967659c79b88a9ed5e93b4"
integrity sha512-p/iXrPSVfnqPft24ZdNNLECw/UrtLTpT3jpAAMzl/o5/rDsGCPo3/CQS2611flL6LkoEJ3oQZw7C8Q80ZISXRQ==

prompts@^2.0.1:
version "2.4.2"
resolved "https://registry.yarnpkg.com/prompts/-/prompts-2.4.2.tgz#7b57e73b3a48029ad10ebd44f74b01722a4cb069"
Expand Down

0 comments on commit b4d7dff

Please sign in to comment.