Merge pull request #3831 from asgerf/asgerf/streaming-jsonl
Add streaming 'jsonl' parser
This commit is contained in:
@@ -1,26 +1,56 @@
|
||||
import { readFile } from "fs-extra";
|
||||
import { stat } from "fs/promises";
|
||||
import { createReadStream } from "fs-extra";
|
||||
import type { BaseLogger } from "./logging";
|
||||
|
||||
const doubleLineBreakRegexp = /\n\r?\n/;
|
||||
|
||||
/**
|
||||
* Read a file consisting of multiple JSON objects. Each object is separated from the previous one
|
||||
* by a double newline sequence. This is basically a more human-readable form of JSONL.
|
||||
*
|
||||
* The current implementation reads the entire text of the document into memory, but in the future
|
||||
* it will stream the document to improve the performance with large documents.
|
||||
*
|
||||
* @param path The path to the file.
|
||||
* @param handler Callback to be invoked for each top-level JSON object in order.
|
||||
*/
|
||||
export async function readJsonlFile<T>(
|
||||
path: string,
|
||||
handler: (value: T) => Promise<void>,
|
||||
logger?: BaseLogger,
|
||||
): Promise<void> {
|
||||
const logSummary = await readFile(path, "utf-8");
|
||||
|
||||
// Remove newline delimiters because summary is in .jsonl format.
|
||||
const jsonSummaryObjects: string[] = logSummary.split(/\r?\n\r?\n/g);
|
||||
|
||||
for (const obj of jsonSummaryObjects) {
|
||||
const jsonObj = JSON.parse(obj) as T;
|
||||
await handler(jsonObj);
|
||||
}
|
||||
// Stream the data as large evaluator logs won't fit in memory.
|
||||
// Also avoid using 'readline' as it is slower than our manual line splitting.
|
||||
void logger?.log(
|
||||
`Parsing ${path} (${(await stat(path)).size / 1024 / 1024} MB)...`,
|
||||
);
|
||||
return new Promise((resolve, reject) => {
|
||||
const stream = createReadStream(path, { encoding: "utf8" });
|
||||
let buffer = "";
|
||||
stream.on("data", async (chunk: string) => {
|
||||
const parts = (buffer + chunk).split(doubleLineBreakRegexp);
|
||||
buffer = parts.pop()!;
|
||||
if (parts.length > 0) {
|
||||
try {
|
||||
stream.pause();
|
||||
for (const part of parts) {
|
||||
await handler(JSON.parse(part));
|
||||
}
|
||||
stream.resume();
|
||||
} catch (e) {
|
||||
stream.destroy();
|
||||
reject(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
stream.on("end", async () => {
|
||||
try {
|
||||
if (buffer.trim().length > 0) {
|
||||
await handler(JSON.parse(buffer));
|
||||
}
|
||||
void logger?.log(`Finished parsing ${path}`);
|
||||
resolve();
|
||||
} catch (e) {
|
||||
reject(e);
|
||||
}
|
||||
});
|
||||
stream.on("error", reject);
|
||||
});
|
||||
}
|
||||
|
||||
87
extensions/ql-vscode/test/benchmarks/jsonl-reader.bench.ts
Normal file
87
extensions/ql-vscode/test/benchmarks/jsonl-reader.bench.ts
Normal file
@@ -0,0 +1,87 @@
|
||||
/**
|
||||
* Benchmarks the jsonl-parser against a reference implementation and checks that it generates
|
||||
* the same output.
|
||||
*
|
||||
* Usage:
|
||||
*
|
||||
* ts-node json-reader.bench.ts [evaluator-log.summary.jsonl] [count]
|
||||
*
|
||||
* The log file defaults to a small checked-in log and count defaults to 100
|
||||
* (and should be lowered significantly for large files).
|
||||
*
|
||||
* At the time of writing it is about as fast as the synchronous reference implementation,
|
||||
* but doesn't run out of memory for large files.
|
||||
*/
|
||||
import { readFile } from "fs-extra";
|
||||
import { readJsonlFile } from "../../src/common/jsonl-reader";
|
||||
import { performance } from "perf_hooks";
|
||||
import { join } from "path";
|
||||
|
||||
/** An "obviously correct" implementation to test against. */
|
||||
async function readJsonlReferenceImpl<T>(
|
||||
path: string,
|
||||
handler: (value: T) => Promise<void>,
|
||||
): Promise<void> {
|
||||
const logSummary = await readFile(path, "utf-8");
|
||||
|
||||
// Remove newline delimiters because summary is in .jsonl format.
|
||||
const jsonSummaryObjects: string[] = logSummary.split(/\r?\n\r?\n/g);
|
||||
|
||||
for (const obj of jsonSummaryObjects) {
|
||||
const jsonObj = JSON.parse(obj) as T;
|
||||
await handler(jsonObj);
|
||||
}
|
||||
}
|
||||
|
||||
type ParserFn = (
|
||||
text: string,
|
||||
callback: (v: unknown) => Promise<void>,
|
||||
) => Promise<void>;
|
||||
|
||||
const parsers: Record<string, ParserFn> = {
|
||||
readJsonlReferenceImpl,
|
||||
readJsonlFile,
|
||||
};
|
||||
|
||||
async function main() {
|
||||
const args = process.argv.slice(2);
|
||||
const file =
|
||||
args.length > 0
|
||||
? args[0]
|
||||
: join(
|
||||
__dirname,
|
||||
"../unit-tests/data/evaluator-log-summaries/bad-join-order.jsonl",
|
||||
);
|
||||
const numTrials = args.length > 1 ? Number(args[1]) : 100;
|
||||
const referenceValues: any[] = [];
|
||||
await readJsonlReferenceImpl(file, async (event) => {
|
||||
referenceValues.push(event);
|
||||
});
|
||||
const referenceValueString = JSON.stringify(referenceValues);
|
||||
// Do warm-up runs and check against reference implementation
|
||||
for (const [name, parser] of Object.entries(parsers)) {
|
||||
const values: unknown[] = [];
|
||||
await parser(file, async (event) => {
|
||||
values.push(event);
|
||||
});
|
||||
if (JSON.stringify(values) !== referenceValueString) {
|
||||
console.error(`${name}: failed to match reference implementation`);
|
||||
}
|
||||
}
|
||||
for (const [name, parser] of Object.entries(parsers)) {
|
||||
const startTime = performance.now();
|
||||
for (let i = 0; i < numTrials; ++i) {
|
||||
await Promise.all([
|
||||
parser(file, async () => {}),
|
||||
parser(file, async () => {}),
|
||||
]);
|
||||
}
|
||||
const duration = performance.now() - startTime;
|
||||
const durationPerTrial = duration / numTrials;
|
||||
console.log(`${name}: ${durationPerTrial.toFixed(1)} ms`);
|
||||
}
|
||||
}
|
||||
|
||||
main().catch((err: unknown) => {
|
||||
console.error(err);
|
||||
});
|
||||
Reference in New Issue
Block a user