Skip to content

Commit

Permalink
Loki: use structured metadata rather than a JSON line
Browse files Browse the repository at this point in the history
Change-type: major
  • Loading branch information
Page- committed Dec 24, 2024
1 parent bea6440 commit a3ee777
Showing 1 changed file with 83 additions and 47 deletions.
130 changes: 83 additions & 47 deletions src/features/device-logs/lib/backends/loki.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,10 @@ import {
incrementPublishCallTotal,
} from './metrics.js';
import { setTimeout } from 'timers/promises';
import { omitNanoTimestamp } from '../config.js';
import { requestAsync } from '../../../../infra/request-promise/index.js';

const { BadRequestError } = errors;

interface LokiDeviceLog extends Omit<InternalDeviceLog, 'nanoTimestamp'> {
version?: number;
createdAt?: number;
}

// invert status object for quick lookup of status identifier using status code
const statusKeys = _.transform(
loki.status,
Expand All @@ -62,7 +56,6 @@ const lokiIngesterAddress = `${LOKI_INGESTER_HOST}:${LOKI_INGESTER_GRPC_PORT}`;

const MIN_BACKOFF = 100;
const MAX_BACKOFF = 10 * 1000;
const VERSION = 2;

function createTimestampFromDate(date = new Date()) {
const timestamp = new loki.Timestamp();
Expand Down Expand Up @@ -215,21 +208,43 @@ export class LokiBackend implements DeviceLogsBackend {

return _(
body.data.result as Array<{
stream: {
application_id: string;
device_id: string;
[name: string]: string;
};
values: Array<[timestamp: string, logLine: string]>;
}>,
)
.flatMap(({ values }) => values)
.map(([timestamp, logLine]): [bigint, OutputDeviceLog] => {
const log: LokiDeviceLog = JSON.parse(logLine);
if (log.version !== VERSION) {
throw new Error(
`Invalid Loki serialization version: ${JSON.stringify(log)}`,
);
.flatMap(({ stream, values }) => {
const baseLog: Partial<OutputDeviceLog> = {};
for (const [key, value] of Object.entries(stream)) {
switch (key) {
case 'timestamp':
baseLog.timestamp = Number(value);
break;
case 'is_system':
baseLog.isSystem = value === 'true';
break;
case 'is_stderr':
baseLog.isStdErr = value === 'true';
break;
case 'service_id':
baseLog.serviceId = Number(value);
break;
}
}
delete log.version;
const nanoTimestamp = BigInt(timestamp);
log.createdAt = Math.floor(Number(nanoTimestamp / 1000000n));
return [nanoTimestamp, log as OutputDeviceLog];
return values.map(([timestamp, message]): [bigint, OutputDeviceLog] => {
const nanoTimestamp = BigInt(timestamp);
return [
nanoTimestamp,
{
...baseLog,
createdAt: Math.floor(Number(nanoTimestamp / 1000000n)),
message,
} as OutputDeviceLog,
];
});
})
.sortBy(([timestamp]) => timestamp)
.map(([, log]) => log)
Expand All @@ -238,7 +253,7 @@ export class LokiBackend implements DeviceLogsBackend {

public async publish(
ctx: LogContext,
logs: Array<InternalDeviceLog & { version?: number }>,
logs: InternalDeviceLog[],
): Promise<any> {
const logEntries = this.fromDeviceLogsToEntries(ctx, logs);

Expand Down Expand Up @@ -349,12 +364,6 @@ export class LokiBackend implements DeviceLogsBackend {
return `o${ctx.orgId}:a${ctx.appId}:d${ctx.id}`;
}

private getStructuredMetadata(ctx: LogContext): loki.LabelPairAdapter[] {
return [
new loki.LabelPairAdapter().setName('device_id').setValue(`${ctx.id}`),
];
}

private getLabels(ctx: LokiLogContext): string {
return `{fleet_id="${ctx.appId}"}`;
}
Expand All @@ -364,18 +373,32 @@ export class LokiBackend implements DeviceLogsBackend {
): OutputDeviceLog[] {
try {
return stream.getEntriesList().map((entry) => {
const log: LokiDeviceLog = JSON.parse(entry.getLine());
if (log.version !== VERSION) {
throw new Error(
`Invalid Loki serialization version: ${JSON.stringify(log)}`,
);
}
delete log.version;
const timestampEntry = entry.getTimestamp()!;
const message = entry.getLine();
const structuredMetadataList = entry.getStructuredmetadataList();
const timestamp = entry.getTimestamp()!;
const nanoTimestamp =
BigInt(timestampEntry.getSeconds()) * 1000000000n +
BigInt(timestampEntry.getNanos());
log.createdAt = Math.floor(Number(nanoTimestamp / 1000000n));
BigInt(timestamp.getSeconds()) * 1000000000n +
BigInt(timestamp.getNanos());
const log: Partial<OutputDeviceLog> = {
createdAt: Math.floor(Number(nanoTimestamp / 1000000n)),
message,
};
for (const structuredMetadata of structuredMetadataList) {
switch (structuredMetadata.getName()) {
case 'timestamp':
log.timestamp = Number(structuredMetadata.getValue());
break;
case 'is_system':
log.isSystem = structuredMetadata.getValue() === 'true';
break;
case 'is_stderr':
log.isStdErr = structuredMetadata.getValue() === 'true';
break;
case 'service_id':
log.serviceId = Number(structuredMetadata.getValue());
break;
}
}
return log as OutputDeviceLog;
});
} catch (err) {
Expand All @@ -384,23 +407,36 @@ export class LokiBackend implements DeviceLogsBackend {
}
}

private fromDeviceLogsToEntries(
ctx: LogContext,
logs: Array<InternalDeviceLog & { version?: number }>,
) {
const structuredMetadata = this.getStructuredMetadata(ctx);
private fromDeviceLogsToEntries(ctx: LogContext, logs: InternalDeviceLog[]) {
const deviceId = new loki.LabelPairAdapter()
.setName('device_id')
.setValue(`${ctx.id}`);
return logs.map((log) => {
const timestamp = new loki.Timestamp();
timestamp.setSeconds(Math.floor(Number(log.nanoTimestamp / 1000000000n)));
timestamp.setNanos(Number(log.nanoTimestamp % 1000000000n));
// store log line as JSON
const logJson = JSON.stringify(
{ ...log, version: VERSION },
omitNanoTimestamp,
);
const structuredMetadata = [
deviceId,
new loki.LabelPairAdapter()
.setName('timestamp')
.setValue(`${log.timestamp}`),
new loki.LabelPairAdapter()
.setName('is_system')
.setValue(`${log.isSystem}`),
new loki.LabelPairAdapter()
.setName('is_stderr')
.setValue(`${log.isStdErr}`),
];
if (log.serviceId) {
structuredMetadata.push(
new loki.LabelPairAdapter()
.setName('service_id')
.setValue(`${log.serviceId}`),
);
}
// create entry with labels, line and timestamp
return new loki.EntryAdapter()
.setLine(logJson)
.setLine(log.message)
.setTimestamp(timestamp)
.setStructuredmetadataList(structuredMetadata);
});
Expand Down

0 comments on commit a3ee777

Please sign in to comment.