diff --git a/packages/telemetry/src/flight-recorder.js b/packages/telemetry/src/flight-recorder.js index 1b28f4cc5b74..4267bce5f407 100644 --- a/packages/telemetry/src/flight-recorder.js +++ b/packages/telemetry/src/flight-recorder.js @@ -22,13 +22,13 @@ const { details: X } = assert; export const DEFAULT_CIRCULAR_BUFFER_SIZE = 100 * 1024 * 1024; export const DEFAULT_CIRCULAR_BUFFER_FILE = 'flight-recorder.bin'; -export const SLOG_MAGIC = 0x21474f4c532d4741n; // 'AG-SLOG!' +export const SLOG_MAGIC = 0x41472d534c4f4721n; // 'AG-SLOG!' -const I_MAGIC = 0; -const I_ARENA_SIZE = 1; -const I_CIRC_START = 2; -const I_CIRC_END = 3; -const HEADER_LENGTH = 4; +const I_MAGIC = 0 * BigUint64Array.BYTES_PER_ELEMENT; +const I_ARENA_SIZE = 1 * BigUint64Array.BYTES_PER_ELEMENT; +const I_CIRC_START = 2 * BigUint64Array.BYTES_PER_ELEMENT; +const I_CIRC_END = 3 * BigUint64Array.BYTES_PER_ELEMENT; +const I_ARENA_START = 4 * BigUint64Array.BYTES_PER_ELEMENT; const initializeCircularBuffer = async (bufferFile, circularBufferSize) => { if (!circularBufferSize) { @@ -41,25 +41,24 @@ const initializeCircularBuffer = async (bufferFile, circularBufferSize) => { } throw e; }); - const arenaSize = BigInt( - circularBufferSize - HEADER_LENGTH * BigUint64Array.BYTES_PER_ELEMENT, - ); + const arenaSize = BigInt(circularBufferSize - I_ARENA_START); const writeHeader = async () => { - if ( - stbuf && - stbuf.size >= HEADER_LENGTH * BigUint64Array.BYTES_PER_ELEMENT - ) { + if (stbuf && stbuf.size >= I_ARENA_START) { // Header already exists. return; } // Write the header. - const header = new Array(HEADER_LENGTH).fill(0n); - header[I_MAGIC] = SLOG_MAGIC; - header[I_ARENA_SIZE] = arenaSize; + const headerBuf = new Uint8Array(I_ARENA_START); + const header = new DataView(headerBuf.buffer); + header.setBigUint64(I_MAGIC, SLOG_MAGIC); + header.setBigUint64(I_ARENA_SIZE, arenaSize); + header.setBigUint64(I_CIRC_START, 0n); + header.setBigUint64(I_CIRC_END, 0n); + await fsPromises.mkdir(path.dirname(bufferFile), { recursive: true }); - await fsPromises.writeFile(bufferFile, BigUint64Array.from(header)); + await fsPromises.writeFile(bufferFile, headerBuf); }; await writeHeader(); @@ -92,20 +91,22 @@ export const makeMemoryMappedCircularBuffer = async ({ /** @type {Uint8Array} */ const fileBuf = BufferFromFile(bufferFile).Uint8Array(); - const header = new BigUint64Array(fileBuf.buffer, 0, HEADER_LENGTH); + const header = new DataView(fileBuf.buffer, 0, I_ARENA_START); // Detect the arena size from the header, if not initialized. - const arenaSize = newArenaSize || header[I_ARENA_SIZE]; + const hdrArenaSize = header.getBigUint64(I_ARENA_SIZE); + const arenaSize = newArenaSize || hdrArenaSize; + const hdrMagic = header.getBigUint64(I_MAGIC); assert.equal( SLOG_MAGIC, - header[I_MAGIC], - X`${bufferFile} is not a slog buffer; wanted magic ${SLOG_MAGIC}, got ${header[I_MAGIC]}`, + hdrMagic, + X`${bufferFile} is not a slog buffer; wanted magic ${SLOG_MAGIC}, got ${hdrMagic}`, ); assert.equal( arenaSize, - header[I_ARENA_SIZE], - X`${bufferFile} arena size mismatch; wanted ${arenaSize}, got ${header[I_ARENA_SIZE]}`, + hdrArenaSize, + X`${bufferFile} arena size mismatch; wanted ${arenaSize}, got ${hdrArenaSize}`, ); const arena = new Uint8Array( fileBuf.buffer, @@ -126,21 +127,19 @@ export const makeMemoryMappedCircularBuffer = async ({ // Read the data to the end of the arena. let firstReadLength = data.byteLength; - const circStart = Number(header[I_CIRC_START]); - const readStart = (circStart + offset) % Number(arenaSize); - if (header[I_CIRC_START] > header[I_CIRC_END]) { + const circStart = header.getBigUint64(I_CIRC_START); + const circEnd = header.getBigUint64(I_CIRC_END); + const readStart = (Number(circStart) + offset) % Number(arenaSize); + if (circStart > circEnd) { // The data is wrapped around the end of the arena, like BBB---AAA firstReadLength = Math.min( firstReadLength, Number(arenaSize) - readStart, ); - if (readStart >= header[I_CIRC_END] && readStart < header[I_CIRC_START]) { + if (readStart >= circEnd && readStart < circStart) { return { done: true, value: undefined }; } - } else if ( - readStart < header[I_CIRC_START] || - readStart >= header[I_CIRC_END] - ) { + } else if (readStart < circStart || readStart >= circEnd) { // The data is contiguous, like ---AAABBB--- return { done: true, value: undefined }; } @@ -168,58 +167,64 @@ export const makeMemoryMappedCircularBuffer = async ({ const record = new Uint8Array( BigUint64Array.BYTES_PER_ELEMENT + data.byteLength, ); - const lengthPrefix = new BigUint64Array(record.buffer, 0, 1); - lengthPrefix[0] = BigInt(data.byteLength); record.set(data, BigUint64Array.BYTES_PER_ELEMENT); + const lengthPrefix = new DataView(record.buffer); + lengthPrefix.setBigUint64(0, BigInt(data.byteLength)); + // Check if we need to wrap around. /** @type {bigint} */ let capacity; - if (header[I_CIRC_START] <= header[I_CIRC_END]) { + let circStart = header.getBigUint64(I_CIRC_START); + const circEnd = header.getBigUint64(I_CIRC_END); + if (circStart <= circEnd) { // ---AAAABBBB---- - capacity = - header[I_ARENA_SIZE] - header[I_CIRC_END] + header[I_CIRC_START]; + capacity = arenaSize - circEnd + circStart; } else { // BBB---AAAA - capacity = header[I_CIRC_START] - header[I_CIRC_END]; + capacity = circStart - circEnd; } // Advance the start pointer until we have space to write the record. let overlap = BigInt(record.byteLength) - capacity; while (overlap > 0n) { - const startRecordLength = new BigUint64Array(1); - const { done } = readCircBuf(new Uint8Array(startRecordLength.buffer)); + const startRecordLength = new Uint8Array( + BigUint64Array.BYTES_PER_ELEMENT, + ); + const { done } = readCircBuf(startRecordLength); if (done) { break; } + const dv = new DataView(startRecordLength.buffer); const totalRecordLength = BigInt(startRecordLength.byteLength) + // size of the length field - startRecordLength[0]; // size of the record + dv.getBigUint64(0); // size of the record - header[I_CIRC_START] = - (header[I_CIRC_START] + totalRecordLength) % header[I_ARENA_SIZE]; + circStart = (circStart + totalRecordLength) % arenaSize; + header.setBigUint64(I_CIRC_START, circStart); overlap -= totalRecordLength; } // Append the record. let firstWriteLength = record.byteLength; - if (header[I_CIRC_START] < header[I_CIRC_END]) { + if (circStart < circEnd) { // May need to wrap, it's ---AAAABBBB--- firstWriteLength = Math.min( firstWriteLength, - Number(header[I_ARENA_SIZE] - header[I_CIRC_END]), + Number(arenaSize - circEnd), ); } - const circEnd = Number(header[I_CIRC_END]); - arena.set(record.subarray(0, firstWriteLength), circEnd); + arena.set(record.subarray(0, firstWriteLength), Number(circEnd)); if (firstWriteLength < record.byteLength) { // Write to the beginning of the arena. arena.set(record.subarray(firstWriteLength, record.byteLength), 0); } - header[I_CIRC_END] = - (header[I_CIRC_END] + BigInt(record.byteLength)) % header[I_ARENA_SIZE]; + header.setBigUint64( + I_CIRC_END, + (circEnd + BigInt(record.byteLength)) % arenaSize, + ); }; const writeJSON = (obj, jsonObj) => { @@ -231,7 +236,7 @@ export const makeMemoryMappedCircularBuffer = async ({ } // Prepend a newline so that the file can be more easily manipulated. const data = new TextEncoder().encode(`\n${jsonObj}`); - // console.log('have obj', obj); + // console.log('have obj', obj, data); writeCircBuf(data); }; diff --git a/packages/telemetry/src/frcat-entrypoint.js b/packages/telemetry/src/frcat-entrypoint.js index 25466f464547..e114ce86365a 100755 --- a/packages/telemetry/src/frcat-entrypoint.js +++ b/packages/telemetry/src/frcat-entrypoint.js @@ -13,20 +13,21 @@ const main = async () => { } for await (const file of files) { - const { readCircBuf } = makeMemoryMappedCircularBuffer({ + const { readCircBuf } = await makeMemoryMappedCircularBuffer({ circularBufferFile: file, circularBufferSize: null, }); let offset = 0; for (;;) { - const lenBuf = new BigUint64Array(1); - const { done } = readCircBuf(new Uint8Array(lenBuf.buffer), offset); + const lenBuf = new Uint8Array(BigUint64Array.BYTES_PER_ELEMENT); + const { done } = readCircBuf(lenBuf, offset); if (done) { break; } offset += 8; - const len = Number(lenBuf[0]); + const dv = new DataView(lenBuf.buffer); + const len = Number(dv.getBigUint64(0)); const { done: done2, value: buf } = readCircBuf( new Uint8Array(len), diff --git a/packages/telemetry/test/test-flight-recorder.js b/packages/telemetry/test/test-flight-recorder.js index f8a04526fed9..071dd10863bb 100644 --- a/packages/telemetry/test/test-flight-recorder.js +++ b/packages/telemetry/test/test-flight-recorder.js @@ -14,14 +14,15 @@ test('flight-recorder sanity', async t => { }); slogSender({ type: 'start' }); - const len0 = new BigUint64Array(1); - const { done: done0 } = readCircBuf(new Uint8Array(len0.buffer)); + const len0 = new Uint8Array(BigUint64Array.BYTES_PER_ELEMENT); + const { done: done0 } = readCircBuf(len0); t.false(done0, 'readCircBuf should not be done'); - const buf0 = new Uint8Array(Number(len0[0])); + const dv0 = new DataView(len0.buffer); + const buf0 = new Uint8Array(Number(dv0.getBigUint64(0))); const { done: done0b } = readCircBuf(buf0, len0.byteLength); t.false(done0b, 'readCircBuf should not be done'); const buf0Str = new TextDecoder().decode(buf0); - t.is(buf0Str, `\n{"type":"start"}`); + t.is(buf0Str, `\n{"type":"start"}`, `start compare failed`); const last = 500; for (let i = 0; i < last; i += 1) { @@ -29,20 +30,26 @@ test('flight-recorder sanity', async t => { } let offset = 0; - const len1 = new BigUint64Array(1); + const len1 = new Uint8Array(BigUint64Array.BYTES_PER_ELEMENT); for (let i = 490; i < last; i += 1) { - const { done: done1 } = readCircBuf(new Uint8Array(len1.buffer), offset); + const { done: done1 } = readCircBuf(len1, offset); offset += len1.byteLength; t.false(done1, `readCircBuf ${i} should not be done`); - const buf1 = new Uint8Array(Number(len1[0])); + const dv1 = new DataView(len1.buffer); + const buf1 = new Uint8Array(Number(dv1.getBigUint64(0))); const { done: done1b } = readCircBuf(buf1, offset); offset += buf1.byteLength; t.false(done1b, `readCircBuf ${i} should not be done`); const buf1Str = new TextDecoder().decode(buf1); - t.is(buf1Str, `\n{"type":"iteration","iteration":${i}}`); + t.is( + buf1Str, + `\n{"type":"iteration","iteration":${i}}`, + `iteration ${i} compare failed`, + ); } - const { done: done2 } = readCircBuf(new Uint8Array(len1.buffer), offset); + const { done: done2 } = readCircBuf(len1, offset); t.assert(done2, `readCircBuf ${last} should be done`); - removeCallback(); + console.log({ tmpFile }); + // removeCallback(); });