Extract process output handling in CLI server

This commit is contained in:
Koen Vlaswinkel
2024-03-08 11:34:50 +01:00
parent c32065e895
commit 23003ec8bf

View File

@@ -369,9 +369,6 @@ export class CodeQLCliServer implements Disposable {
onLine?: OnLineCallback,
silent?: boolean,
): Promise<string> {
const stderrBuffers: Buffer[] = [];
// The current buffer of stderr of a single line. To be used for logging.
let currentLineStderrBuffer: Buffer = Buffer.alloc(0);
if (this.commandInProcess) {
throw new Error("runCodeQlCliInternal called while cli was running");
}
@@ -383,8 +380,6 @@ export class CodeQLCliServer implements Disposable {
}
// Grab the process so that typescript know that it is always defined.
const process = this.process;
// The array of fragments of stdout
const stdoutBuffers: Buffer[] = [];
// Compute the full args array
const args = command.concat(LOGGING_FLAGS).concat(commandArgs);
@@ -396,26 +391,83 @@ export class CodeQLCliServer implements Disposable {
);
}
try {
await new Promise<void>((resolve, reject) => {
// Start listening to stdout
process.stdout.addListener("data", (newData: Buffer) => {
if (onLine) {
void (async () => {
const response = await onLine(newData.toString("utf-8"));
return await this.handleProcessOutput(process, {
handleNullTerminator: true,
onListenStart: (process) => {
// Write the command followed by a null terminator.
process.stdin.write(JSON.stringify(args), "utf8");
process.stdin.write(this.nullBuffer);
},
description,
args,
silent,
onLine,
});
} catch (err) {
// Kill the process if it isn't already dead.
this.killProcessIfRunning();
if (!response) {
return;
}
throw err;
}
} finally {
this.commandInProcess = false;
// start running the next command immediately
this.runNext();
}
}
process.stdin.write(`${response}${EOL}`);
private async handleProcessOutput(
process: ChildProcessWithoutNullStreams,
{
handleNullTerminator,
args,
description,
onLine,
onListenStart,
silent,
}: {
handleNullTerminator: boolean;
args: string[];
description: string;
onLine?: OnLineCallback;
onListenStart?: (process: ChildProcessWithoutNullStreams) => void;
silent?: boolean;
},
): Promise<string> {
const stderrBuffers: Buffer[] = [];
// The current buffer of stderr of a single line. To be used for logging.
let currentLineStderrBuffer: Buffer = Buffer.alloc(0);
// Remove newData from stdoutBuffers because the data has been consumed
// by the onLine callback.
stdoutBuffers.splice(stdoutBuffers.indexOf(newData), 1);
})();
}
// The listeners of the process. Declared here so they can be removed in the finally block.
let stdoutListener: ((newData: Buffer) => void) | undefined = undefined;
let stderrListener: ((newData: Buffer) => void) | undefined = undefined;
let closeListener: ((code: number | null) => void) | undefined = undefined;
stdoutBuffers.push(newData);
try {
// The array of fragments of stdout
const stdoutBuffers: Buffer[] = [];
await new Promise<void>((resolve, reject) => {
stdoutListener = (newData: Buffer) => {
if (onLine) {
void (async () => {
const response = await onLine(newData.toString("utf-8"));
if (!response) {
return;
}
process.stdin.write(`${response}${EOL}`);
// Remove newData from stdoutBuffers because the data has been consumed
// by the onLine callback.
stdoutBuffers.splice(stdoutBuffers.indexOf(newData), 1);
})();
}
stdoutBuffers.push(newData);
if (handleNullTerminator) {
// If the buffer ends in '0' then exit.
// We don't have to check the middle as no output will be written after the null until
// the next command starts
@@ -425,89 +477,104 @@ export class CodeQLCliServer implements Disposable {
) {
resolve();
}
});
// Listen to stderr
process.stderr.addListener("data", (newData: Buffer) => {
stderrBuffers.push(newData);
}
};
stderrListener = (newData: Buffer) => {
stderrBuffers.push(newData);
if (!silent) {
currentLineStderrBuffer = Buffer.concat([
currentLineStderrBuffer,
newData,
]);
if (!silent) {
currentLineStderrBuffer = Buffer.concat([
currentLineStderrBuffer,
newData,
]);
// Print the stderr to the logger as it comes in. We need to ensure that
// we don't split messages on the same line, so we buffer the stderr and
// split it on EOLs.
const eolBuffer = Buffer.from(EOL);
// Print the stderr to the logger as it comes in. We need to ensure that
// we don't split messages on the same line, so we buffer the stderr and
// split it on EOLs.
const eolBuffer = Buffer.from(EOL);
let hasCreatedSubarray = false;
let hasCreatedSubarray = false;
let eolIndex;
while (
(eolIndex = currentLineStderrBuffer.indexOf(eolBuffer)) !== -1
) {
const line = currentLineStderrBuffer.subarray(0, eolIndex);
void this.logger.log(line.toString("utf-8"));
currentLineStderrBuffer = currentLineStderrBuffer.subarray(
eolIndex + eolBuffer.length,
);
hasCreatedSubarray = true;
}
// We have created a subarray, which means that the complete original buffer is now referenced
// by the subarray. We need to create a new buffer to avoid memory leaks.
if (hasCreatedSubarray) {
currentLineStderrBuffer = Buffer.from(currentLineStderrBuffer);
}
let eolIndex;
while (
(eolIndex = currentLineStderrBuffer.indexOf(eolBuffer)) !== -1
) {
const line = currentLineStderrBuffer.subarray(0, eolIndex);
void this.logger.log(line.toString("utf-8"));
currentLineStderrBuffer = currentLineStderrBuffer.subarray(
eolIndex + eolBuffer.length,
);
hasCreatedSubarray = true;
}
});
// Listen for process exit.
process.addListener("close", (code) =>
reject(new ExitCodeError(code)),
);
// Write the command followed by a null terminator.
process.stdin.write(JSON.stringify(args), "utf8");
process.stdin.write(this.nullBuffer);
});
// Join all the data together
const fullBuffer = Buffer.concat(stdoutBuffers);
// Make sure we remove the terminator;
const data = fullBuffer.toString("utf8", 0, fullBuffer.length - 1);
if (!silent) {
void this.logger.log(currentLineStderrBuffer.toString("utf8"));
currentLineStderrBuffer = Buffer.alloc(0);
void this.logger.log("CLI command succeeded.");
}
return data;
} catch (err) {
// Kill the process if it isn't already dead.
this.killProcessIfRunning();
// Report the error (if there is a stderr then use that otherwise just report the error code or nodejs error)
const cliError = getCliError(
err,
stderrBuffers.length > 0
? Buffer.concat(stderrBuffers).toString("utf8")
: undefined,
description,
args,
);
cliError.stack += getErrorStack(err);
throw cliError;
} finally {
if (!silent && currentLineStderrBuffer.length > 0) {
void this.logger.log(currentLineStderrBuffer.toString("utf8"));
}
// Remove the listeners we set up.
process.stdout.removeAllListeners("data");
process.stderr.removeAllListeners("data");
process.removeAllListeners("close");
// We have created a subarray, which means that the complete original buffer is now referenced
// by the subarray. We need to create a new buffer to avoid memory leaks.
if (hasCreatedSubarray) {
currentLineStderrBuffer = Buffer.from(currentLineStderrBuffer);
}
}
};
closeListener = (code) => {
if (handleNullTerminator) {
reject(new ExitCodeError(code));
} else {
if (code === 0) {
resolve();
} else {
reject(new ExitCodeError(code));
}
}
};
// Start listening to stdout
process.stdout.addListener("data", stdoutListener);
// Listen to stderr
process.stderr.addListener("data", stderrListener);
// Listen for process exit.
process.addListener("close", closeListener);
onListenStart?.(process);
});
// Join all the data together
const fullBuffer = Buffer.concat(stdoutBuffers);
// Make sure we remove the terminator
const data = fullBuffer.toString(
"utf8",
0,
handleNullTerminator ? fullBuffer.length - 1 : fullBuffer.length,
);
if (!silent) {
void this.logger.log(currentLineStderrBuffer.toString("utf8"));
currentLineStderrBuffer = Buffer.alloc(0);
void this.logger.log("CLI command succeeded.");
}
return data;
} catch (err) {
// Report the error (if there is a stderr then use that otherwise just report the error code or nodejs error)
const cliError = getCliError(
err,
stderrBuffers.length > 0
? Buffer.concat(stderrBuffers).toString("utf8")
: undefined,
description,
args,
);
cliError.stack += getErrorStack(err);
throw cliError;
} finally {
this.commandInProcess = false;
// start running the next command immediately
this.runNext();
if (!silent && currentLineStderrBuffer.length > 0) {
void this.logger.log(currentLineStderrBuffer.toString("utf8"));
}
// Remove the listeners we set up.
if (stdoutListener) {
process.stdout.removeListener("data", stdoutListener);
}
if (stderrListener) {
process.stderr.removeListener("data", stderrListener);
}
if (closeListener) {
process.removeListener("close", closeListener);
}
}
}