Skip to content

Commit

Permalink
fix(telemetry): make flight recorder big-endian on all platforms
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelfig committed Feb 7, 2022
1 parent b453697 commit d3fb18a
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 63 deletions.
103 changes: 54 additions & 49 deletions packages/telemetry/src/flight-recorder.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ const { details: X } = assert;

export const DEFAULT_CIRCULAR_BUFFER_SIZE = 100 * 1024 * 1024;
export const DEFAULT_CIRCULAR_BUFFER_FILE = 'flight-recorder.bin';
export const SLOG_MAGIC = 0x21474f4c532d4741n; // 'AG-SLOG!'
export const SLOG_MAGIC = 0x41472d534c4f4721n; // 'AG-SLOG!'

const I_MAGIC = 0;
const I_ARENA_SIZE = 1;
const I_CIRC_START = 2;
const I_CIRC_END = 3;
const HEADER_LENGTH = 4;
const I_MAGIC = 0 * BigUint64Array.BYTES_PER_ELEMENT;
const I_ARENA_SIZE = 1 * BigUint64Array.BYTES_PER_ELEMENT;
const I_CIRC_START = 2 * BigUint64Array.BYTES_PER_ELEMENT;
const I_CIRC_END = 3 * BigUint64Array.BYTES_PER_ELEMENT;
const I_ARENA_START = 4 * BigUint64Array.BYTES_PER_ELEMENT;

const initializeCircularBuffer = async (bufferFile, circularBufferSize) => {
if (!circularBufferSize) {
Expand All @@ -41,25 +41,24 @@ const initializeCircularBuffer = async (bufferFile, circularBufferSize) => {
}
throw e;
});
const arenaSize = BigInt(
circularBufferSize - HEADER_LENGTH * BigUint64Array.BYTES_PER_ELEMENT,
);
const arenaSize = BigInt(circularBufferSize - I_ARENA_START);

const writeHeader = async () => {
if (
stbuf &&
stbuf.size >= HEADER_LENGTH * BigUint64Array.BYTES_PER_ELEMENT
) {
if (stbuf && stbuf.size >= I_ARENA_START) {
// Header already exists.
return;
}

// Write the header.
const header = new Array(HEADER_LENGTH).fill(0n);
header[I_MAGIC] = SLOG_MAGIC;
header[I_ARENA_SIZE] = arenaSize;
const headerBuf = new Uint8Array(I_ARENA_START);
const header = new DataView(headerBuf.buffer);
header.setBigUint64(I_MAGIC, SLOG_MAGIC);
header.setBigUint64(I_ARENA_SIZE, arenaSize);
header.setBigUint64(I_CIRC_START, 0n);
header.setBigUint64(I_CIRC_END, 0n);

await fsPromises.mkdir(path.dirname(bufferFile), { recursive: true });
await fsPromises.writeFile(bufferFile, BigUint64Array.from(header));
await fsPromises.writeFile(bufferFile, headerBuf);
};
await writeHeader();

Expand Down Expand Up @@ -92,20 +91,22 @@ export const makeMemoryMappedCircularBuffer = async ({

/** @type {Uint8Array} */
const fileBuf = BufferFromFile(bufferFile).Uint8Array();
const header = new BigUint64Array(fileBuf.buffer, 0, HEADER_LENGTH);
const header = new DataView(fileBuf.buffer, 0, I_ARENA_START);

// Detect the arena size from the header, if not initialized.
const arenaSize = newArenaSize || header[I_ARENA_SIZE];
const hdrArenaSize = header.getBigUint64(I_ARENA_SIZE);
const arenaSize = newArenaSize || hdrArenaSize;

const hdrMagic = header.getBigUint64(I_MAGIC);
assert.equal(
SLOG_MAGIC,
header[I_MAGIC],
X`${bufferFile} is not a slog buffer; wanted magic ${SLOG_MAGIC}, got ${header[I_MAGIC]}`,
hdrMagic,
X`${bufferFile} is not a slog buffer; wanted magic ${SLOG_MAGIC}, got ${hdrMagic}`,
);
assert.equal(
arenaSize,
header[I_ARENA_SIZE],
X`${bufferFile} arena size mismatch; wanted ${arenaSize}, got ${header[I_ARENA_SIZE]}`,
hdrArenaSize,
X`${bufferFile} arena size mismatch; wanted ${arenaSize}, got ${hdrArenaSize}`,
);
const arena = new Uint8Array(
fileBuf.buffer,
Expand All @@ -126,21 +127,19 @@ export const makeMemoryMappedCircularBuffer = async ({

// Read the data to the end of the arena.
let firstReadLength = data.byteLength;
const circStart = Number(header[I_CIRC_START]);
const readStart = (circStart + offset) % Number(arenaSize);
if (header[I_CIRC_START] > header[I_CIRC_END]) {
const circStart = header.getBigUint64(I_CIRC_START);
const circEnd = header.getBigUint64(I_CIRC_END);
const readStart = (Number(circStart) + offset) % Number(arenaSize);
if (circStart > circEnd) {
// The data is wrapped around the end of the arena, like BBB---AAA
firstReadLength = Math.min(
firstReadLength,
Number(arenaSize) - readStart,
);
if (readStart >= header[I_CIRC_END] && readStart < header[I_CIRC_START]) {
if (readStart >= circEnd && readStart < circStart) {
return { done: true, value: undefined };
}
} else if (
readStart < header[I_CIRC_START] ||
readStart >= header[I_CIRC_END]
) {
} else if (readStart < circStart || readStart >= circEnd) {
// The data is contiguous, like ---AAABBB---
return { done: true, value: undefined };
}
Expand Down Expand Up @@ -168,58 +167,64 @@ export const makeMemoryMappedCircularBuffer = async ({
const record = new Uint8Array(
BigUint64Array.BYTES_PER_ELEMENT + data.byteLength,
);
const lengthPrefix = new BigUint64Array(record.buffer, 0, 1);
lengthPrefix[0] = BigInt(data.byteLength);
record.set(data, BigUint64Array.BYTES_PER_ELEMENT);

const lengthPrefix = new DataView(record.buffer);
lengthPrefix.setBigUint64(0, BigInt(data.byteLength));

// Check if we need to wrap around.
/** @type {bigint} */
let capacity;
if (header[I_CIRC_START] <= header[I_CIRC_END]) {
let circStart = header.getBigUint64(I_CIRC_START);
const circEnd = header.getBigUint64(I_CIRC_END);
if (circStart <= circEnd) {
// ---AAAABBBB----
capacity =
header[I_ARENA_SIZE] - header[I_CIRC_END] + header[I_CIRC_START];
capacity = arenaSize - circEnd + circStart;
} else {
// BBB---AAAA
capacity = header[I_CIRC_START] - header[I_CIRC_END];
capacity = circStart - circEnd;
}

// Advance the start pointer until we have space to write the record.
let overlap = BigInt(record.byteLength) - capacity;
while (overlap > 0n) {
const startRecordLength = new BigUint64Array(1);
const { done } = readCircBuf(new Uint8Array(startRecordLength.buffer));
const startRecordLength = new Uint8Array(
BigUint64Array.BYTES_PER_ELEMENT,
);
const { done } = readCircBuf(startRecordLength);
if (done) {
break;
}

const dv = new DataView(startRecordLength.buffer);
const totalRecordLength =
BigInt(startRecordLength.byteLength) + // size of the length field
startRecordLength[0]; // size of the record
dv.getBigUint64(0); // size of the record

header[I_CIRC_START] =
(header[I_CIRC_START] + totalRecordLength) % header[I_ARENA_SIZE];
circStart = (circStart + totalRecordLength) % arenaSize;
header.setBigUint64(I_CIRC_START, circStart);
overlap -= totalRecordLength;
}

// Append the record.
let firstWriteLength = record.byteLength;
if (header[I_CIRC_START] < header[I_CIRC_END]) {
if (circStart < circEnd) {
// May need to wrap, it's ---AAAABBBB---
firstWriteLength = Math.min(
firstWriteLength,
Number(header[I_ARENA_SIZE] - header[I_CIRC_END]),
Number(arenaSize - circEnd),
);
}

const circEnd = Number(header[I_CIRC_END]);
arena.set(record.subarray(0, firstWriteLength), circEnd);
arena.set(record.subarray(0, firstWriteLength), Number(circEnd));
if (firstWriteLength < record.byteLength) {
// Write to the beginning of the arena.
arena.set(record.subarray(firstWriteLength, record.byteLength), 0);
}
header[I_CIRC_END] =
(header[I_CIRC_END] + BigInt(record.byteLength)) % header[I_ARENA_SIZE];
header.setBigUint64(
I_CIRC_END,
(circEnd + BigInt(record.byteLength)) % arenaSize,
);
};

const writeJSON = (obj, jsonObj) => {
Expand All @@ -231,7 +236,7 @@ export const makeMemoryMappedCircularBuffer = async ({
}
// Prepend a newline so that the file can be more easily manipulated.
const data = new TextEncoder().encode(`\n${jsonObj}`);
// console.log('have obj', obj);
// console.log('have obj', obj, data);
writeCircBuf(data);
};

Expand Down
9 changes: 5 additions & 4 deletions packages/telemetry/src/frcat-entrypoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,21 @@ const main = async () => {
}

for await (const file of files) {
const { readCircBuf } = makeMemoryMappedCircularBuffer({
const { readCircBuf } = await makeMemoryMappedCircularBuffer({
circularBufferFile: file,
circularBufferSize: null,
});

let offset = 0;
for (;;) {
const lenBuf = new BigUint64Array(1);
const { done } = readCircBuf(new Uint8Array(lenBuf.buffer), offset);
const lenBuf = new Uint8Array(BigUint64Array.BYTES_PER_ELEMENT);
const { done } = readCircBuf(lenBuf, offset);
if (done) {
break;
}
offset += 8;
const len = Number(lenBuf[0]);
const dv = new DataView(lenBuf.buffer);
const len = Number(dv.getBigUint64(0));

const { done: done2, value: buf } = readCircBuf(
new Uint8Array(len),
Expand Down
27 changes: 17 additions & 10 deletions packages/telemetry/test/test-flight-recorder.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,42 @@ test('flight-recorder sanity', async t => {
});
slogSender({ type: 'start' });

const len0 = new BigUint64Array(1);
const { done: done0 } = readCircBuf(new Uint8Array(len0.buffer));
const len0 = new Uint8Array(BigUint64Array.BYTES_PER_ELEMENT);
const { done: done0 } = readCircBuf(len0);
t.false(done0, 'readCircBuf should not be done');
const buf0 = new Uint8Array(Number(len0[0]));
const dv0 = new DataView(len0.buffer);
const buf0 = new Uint8Array(Number(dv0.getBigUint64(0)));
const { done: done0b } = readCircBuf(buf0, len0.byteLength);
t.false(done0b, 'readCircBuf should not be done');
const buf0Str = new TextDecoder().decode(buf0);
t.is(buf0Str, `\n{"type":"start"}`);
t.is(buf0Str, `\n{"type":"start"}`, `start compare failed`);

const last = 500;
for (let i = 0; i < last; i += 1) {
slogSender({ type: 'iteration', iteration: i });
}

let offset = 0;
const len1 = new BigUint64Array(1);
const len1 = new Uint8Array(BigUint64Array.BYTES_PER_ELEMENT);
for (let i = 490; i < last; i += 1) {
const { done: done1 } = readCircBuf(new Uint8Array(len1.buffer), offset);
const { done: done1 } = readCircBuf(len1, offset);
offset += len1.byteLength;
t.false(done1, `readCircBuf ${i} should not be done`);
const buf1 = new Uint8Array(Number(len1[0]));
const dv1 = new DataView(len1.buffer);
const buf1 = new Uint8Array(Number(dv1.getBigUint64(0)));
const { done: done1b } = readCircBuf(buf1, offset);
offset += buf1.byteLength;
t.false(done1b, `readCircBuf ${i} should not be done`);
const buf1Str = new TextDecoder().decode(buf1);
t.is(buf1Str, `\n{"type":"iteration","iteration":${i}}`);
t.is(
buf1Str,
`\n{"type":"iteration","iteration":${i}}`,
`iteration ${i} compare failed`,
);
}

const { done: done2 } = readCircBuf(new Uint8Array(len1.buffer), offset);
const { done: done2 } = readCircBuf(len1, offset);
t.assert(done2, `readCircBuf ${last} should be done`);
removeCallback();
console.log({ tmpFile });
// removeCallback();
});

0 comments on commit d3fb18a

Please sign in to comment.