diff --git a/extensions/ql-vscode/src/common/jsonl-reader.ts b/extensions/ql-vscode/src/common/jsonl-reader.ts index a20488c48..b1d4932f8 100644 --- a/extensions/ql-vscode/src/common/jsonl-reader.ts +++ b/extensions/ql-vscode/src/common/jsonl-reader.ts @@ -1,26 +1,54 @@ -import { readFile } from "fs-extra"; +import { stat } from "fs/promises"; +import { createReadStream } from "fs-extra"; + +const doubleLineBreakRegexp = /\n\r?\n/; /** * Read a file consisting of multiple JSON objects. Each object is separated from the previous one * by a double newline sequence. This is basically a more human-readable form of JSONL. * - * The current implementation reads the entire text of the document into memory, but in the future - * it will stream the document to improve the performance with large documents. - * * @param path The path to the file. * @param handler Callback to be invoked for each top-level JSON object in order. */ export async function readJsonlFile( path: string, handler: (value: T) => Promise, + logger?: { log: (message: string) => void }, ): Promise { - const logSummary = await readFile(path, "utf-8"); - - // Remove newline delimiters because summary is in .jsonl format. - const jsonSummaryObjects: string[] = logSummary.split(/\r?\n\r?\n/g); - - for (const obj of jsonSummaryObjects) { - const jsonObj = JSON.parse(obj) as T; - await handler(jsonObj); - } + void logger?.log( + `Parsing ${path} (${(await stat(path)).size / 1024 / 1024} MB)...`, + ); + return new Promise((resolve, reject) => { + const stream = createReadStream(path, { encoding: "utf8" }); + let buffer = ""; + stream.on("data", async (chunk: string) => { + const parts = (buffer + chunk).split(doubleLineBreakRegexp); + buffer = parts.pop()!; + if (parts.length > 0) { + try { + stream.pause(); + for (const part of parts) { + await handler(JSON.parse(part)); + } + stream.resume(); + } catch (e) { + stream.destroy(); + reject(e); + } + } + }); + stream.on("end", async () => { + if (buffer.trim().length > 0) { + try { + await handler(JSON.parse(buffer)); + } catch (e) { + reject(e); + return; + } + } + void logger?.log(`Finishing parsing ${path}`); + resolve(); + }); + stream.on("error", reject); + }); }