Merge pull request #3157 from github/koesie10/yauzl-progress

Add progress reporting for unzipping files
This commit is contained in:
Koen Vlaswinkel
2023-12-21 14:40:53 +01:00
committed by GitHub
8 changed files with 252 additions and 49 deletions

View File

@@ -26,6 +26,7 @@ import {
showAndLogWarningMessage,
} from "../common/logging";
import { unzipToDirectoryConcurrently } from "../common/unzip-concurrently";
import { reportUnzipProgress } from "../common/vscode/unzip-progress";
/**
* distribution.ts
@@ -423,6 +424,12 @@ class ExtensionSpecificDistributionManager {
await unzipToDirectoryConcurrently(
archivePath,
this.getDistributionStoragePath(),
progressCallback
? reportUnzipProgress(
`Extracting CodeQL CLI ${release.name}`,
progressCallback,
)
: undefined,
);
} finally {
await remove(tmpDirectory);

View File

@@ -0,0 +1,3 @@
export function readableBytesMb(numBytes: number): string {
return `${(numBytes / (1024 * 1024)).toFixed(1)} MB`;
}

View File

@@ -1,16 +1,22 @@
import { availableParallelism } from "os";
import { unzipToDirectory } from "./unzip";
import { UnzipProgressCallback, unzipToDirectory } from "./unzip";
import PQueue from "p-queue";
export async function unzipToDirectoryConcurrently(
archivePath: string,
destinationPath: string,
progress?: UnzipProgressCallback,
): Promise<void> {
const queue = new PQueue({
concurrency: availableParallelism(),
});
return unzipToDirectory(archivePath, destinationPath, async (tasks) => {
await queue.addAll(tasks);
});
return unzipToDirectory(
archivePath,
destinationPath,
progress,
async (tasks) => {
await queue.addAll(tasks);
},
);
}

View File

@@ -1,5 +1,5 @@
import { Entry as ZipEntry, open, Options as ZipOptions, ZipFile } from "yauzl";
import { Readable } from "stream";
import { Readable, Transform } from "stream";
import { dirname, join } from "path";
import { WriteStream } from "fs";
import { createWriteStream, ensureDir } from "fs-extra";
@@ -25,6 +25,10 @@ export function excludeDirectories(entries: ZipEntry[]): ZipEntry[] {
return entries.filter((entry) => !/\/$/.test(entry.fileName));
}
function calculateTotalUncompressedByteSize(entries: ZipEntry[]): number {
return entries.reduce((total, entry) => total + entry.uncompressedSize, 0);
}
export function readZipEntries(zipFile: ZipFile): Promise<ZipEntry[]> {
return new Promise((resolve, reject) => {
const files: ZipEntry[] = [];
@@ -84,6 +88,7 @@ export async function openZipBuffer(
async function copyStream(
readable: Readable,
writeStream: WriteStream,
bytesExtractedCallback?: (bytesExtracted: number) => void,
): Promise<void> {
return new Promise((resolve, reject) => {
readable.on("error", (err) => {
@@ -93,28 +98,53 @@ async function copyStream(
resolve();
});
readable.pipe(writeStream);
readable
.pipe(
new Transform({
transform(chunk, _encoding, callback) {
bytesExtractedCallback?.(chunk.length);
this.push(chunk);
callback();
},
}),
)
.pipe(writeStream);
});
}
type UnzipProgress = {
filesExtracted: number;
totalFiles: number;
bytesExtracted: number;
totalBytes: number;
};
export type UnzipProgressCallback = (progress: UnzipProgress) => void;
/**
* Unzips a single file from a zip archive.
*
* @param zipFile
* @param entry
* @param rootDestinationPath
* @param bytesExtractedCallback Called when bytes are extracted.
* @return The number of bytes extracted.
*/
async function unzipFile(
zipFile: ZipFile,
entry: ZipEntry,
rootDestinationPath: string,
): Promise<void> {
bytesExtractedCallback?: (bytesExtracted: number) => void,
): Promise<number> {
const path = join(rootDestinationPath, entry.fileName);
if (/\/$/.test(entry.fileName)) {
// Directory file names end with '/'
await ensureDir(path);
return 0;
} else {
// Ensure the directory exists
await ensureDir(dirname(path));
@@ -131,7 +161,9 @@ async function unzipFile(
mode,
});
await copyStream(readable, writeStream);
await copyStream(readable, writeStream, bytesExtractedCallback);
return entry.uncompressedSize;
}
}
@@ -143,10 +175,12 @@ async function unzipFile(
* @param archivePath
* @param destinationPath
* @param taskRunner A function that runs the tasks (either sequentially or concurrently).
* @param progress
*/
export async function unzipToDirectory(
archivePath: string,
destinationPath: string,
progress: UnzipProgressCallback | undefined,
taskRunner: (tasks: Array<() => Promise<void>>) => Promise<void>,
): Promise<void> {
const zipFile = await openZip(archivePath, {
@@ -158,8 +192,43 @@ export async function unzipToDirectory(
try {
const entries = await readZipEntries(zipFile);
let filesExtracted = 0;
const totalFiles = entries.length;
let bytesExtracted = 0;
const totalBytes = calculateTotalUncompressedByteSize(entries);
const reportProgress = () => {
progress?.({
filesExtracted,
totalFiles,
bytesExtracted,
totalBytes,
});
};
reportProgress();
await taskRunner(
entries.map((entry) => () => unzipFile(zipFile, entry, destinationPath)),
entries.map((entry) => async () => {
let entryBytesExtracted = 0;
const totalEntryBytesExtracted = await unzipFile(
zipFile,
entry,
destinationPath,
(thisBytesExtracted) => {
entryBytesExtracted += thisBytesExtracted;
bytesExtracted += thisBytesExtracted;
reportProgress();
},
);
// Should be 0, but just in case.
bytesExtracted += -entryBytesExtracted + totalEntryBytesExtracted;
filesExtracted++;
reportProgress();
}),
);
} finally {
zipFile.close();
@@ -173,14 +242,21 @@ export async function unzipToDirectory(
*
* @param archivePath
* @param destinationPath
* @param progress
*/
export async function unzipToDirectorySequentially(
archivePath: string,
destinationPath: string,
progress?: UnzipProgressCallback,
): Promise<void> {
return unzipToDirectory(archivePath, destinationPath, async (tasks) => {
for (const task of tasks) {
await task();
}
});
return unzipToDirectory(
archivePath,
destinationPath,
progress,
async (tasks) => {
for (const task of tasks) {
await task();
}
},
);
}

View File

@@ -4,6 +4,7 @@ import {
ProgressOptions as VSCodeProgressOptions,
window as Window,
} from "vscode";
import { readableBytesMb } from "../bytes";
export class UserCancellationException extends Error {
/**
@@ -125,15 +126,13 @@ export function reportStreamProgress(
) {
if (progress && totalNumBytes) {
let numBytesDownloaded = 0;
const bytesToDisplayMB = (numBytes: number): string =>
`${(numBytes / (1024 * 1024)).toFixed(1)} MB`;
const updateProgress = () => {
progress({
step: numBytesDownloaded,
maxStep: totalNumBytes,
message: `${messagePrefix} [${bytesToDisplayMB(
message: `${messagePrefix} [${readableBytesMb(
numBytesDownloaded,
)} of ${bytesToDisplayMB(totalNumBytes)}]`,
)} of ${readableBytesMb(totalNumBytes)}]`,
});
};

View File

@@ -0,0 +1,18 @@
import { readableBytesMb } from "../bytes";
import { UnzipProgressCallback } from "../unzip";
import { ProgressCallback } from "./progress";
export function reportUnzipProgress(
messagePrefix: string,
progress: ProgressCallback,
): UnzipProgressCallback {
return ({ bytesExtracted, totalBytes }) => {
progress({
step: bytesExtracted,
maxStep: totalBytes,
message: `${messagePrefix} [${readableBytesMb(
bytesExtracted,
)} of ${readableBytesMb(totalBytes)}]`,
});
};
}

View File

@@ -164,6 +164,75 @@ describe.each([
expect(await pathExists(join(tmpDir.path, "empty-directory"))).toBe(true);
expect(await readdir(join(tmpDir.path, "empty-directory"))).toEqual([]);
});
describe("with reported progress", () => {
const progressCallback = jest.fn();
beforeEach(async () => {
progressCallback.mockReset();
await unzipToDirectory(zipPath, tmpDir.path, progressCallback);
});
it("has at least as many progress callbacks as files", () => {
expect(progressCallback.mock.calls.length).toBeGreaterThanOrEqual(11);
});
it("has an incrementing files extracted value", () => {
let previousValue = 0;
for (const call of progressCallback.mock.calls.values()) {
const [{ filesExtracted }] = call;
expect(filesExtracted).toBeGreaterThanOrEqual(previousValue);
previousValue = filesExtracted;
}
});
it("has an incrementing bytes extracted value", () => {
let previousValue = 0;
for (const call of progressCallback.mock.calls.values()) {
const [{ bytesExtracted }] = call;
expect(bytesExtracted).toBeGreaterThanOrEqual(previousValue);
previousValue = bytesExtracted;
}
});
it("always increments either bytes or files extracted", () => {
let previousBytesExtracted = 0;
let previousFilesExtracted = 0;
for (const [index, call] of progressCallback.mock.calls.entries()) {
if (index === 0) {
// The first call is always 0, 0
continue;
}
const [{ bytesExtracted, filesExtracted }] = call;
expect(bytesExtracted + filesExtracted).toBeGreaterThan(
previousBytesExtracted + previousFilesExtracted,
);
previousBytesExtracted = bytesExtracted;
previousFilesExtracted = filesExtracted;
}
});
it("has a first call with the correct values", () => {
expect(progressCallback).toHaveBeenNthCalledWith(1, {
bytesExtracted: 0,
totalBytes: 87,
filesExtracted: 0,
totalFiles: 11,
});
});
it("has a last call with the correct values", () => {
expect(progressCallback).toHaveBeenLastCalledWith({
bytesExtracted: 87,
totalBytes: 87,
filesExtracted: 11,
totalFiles: 11,
});
});
});
});
async function expectFile(

View File

@@ -36,6 +36,7 @@ import supportedCliVersions from "../../supported_cli_versions.json";
const _1MB = 1024 * 1024;
const _10MB = _1MB * 10;
const _100MB = _10MB * 10;
// CLI version to test. Use the latest supported version by default.
// And be sure to update the env if it is not otherwise set.
@@ -88,36 +89,7 @@ export async function ensureCli(useCli: boolean) {
`CLI version ${CLI_VERSION} zip file not found. Downloading from '${url}' into '${downloadedFilePath}'.`,
);
const assetStream = await fetch(url);
const contentLength = Number(
assetStream.headers.get("content-length") || 0,
);
console.log("Total content size", Math.round(contentLength / _1MB), "MB");
const archiveFile = createWriteStream(downloadedFilePath);
const body = assetStream.body;
await new Promise<void>((resolve, reject) => {
let numBytesDownloaded = 0;
let lastMessage = 0;
body.on("data", (data) => {
numBytesDownloaded += data.length;
if (numBytesDownloaded - lastMessage > _10MB) {
console.log(
"Downloaded",
Math.round(numBytesDownloaded / _1MB),
"MB",
);
lastMessage = numBytesDownloaded;
}
archiveFile.write(data);
});
body.on("finish", () => {
archiveFile.end(() => {
console.log("Finished download into", downloadedFilePath);
resolve();
});
});
body.on("error", reject);
});
await downloadWithProgress(url, downloadedFilePath);
} else {
console.log(
`CLI version ${CLI_VERSION} zip file found at '${downloadedFilePath}'.`,
@@ -126,7 +98,7 @@ export async function ensureCli(useCli: boolean) {
console.log(`Unzipping into '${unzipDir}'`);
mkdirpSync(unzipDir);
await unzipToDirectorySequentially(downloadedFilePath, unzipDir);
await unzipWithProgress(downloadedFilePath, unzipDir);
console.log("Done.");
} catch (e) {
console.error("Failed to download CLI.");
@@ -135,6 +107,59 @@ export async function ensureCli(useCli: boolean) {
}
}
async function downloadWithProgress(url: string, filePath: string) {
const assetStream = await fetch(url);
const contentLength = Number(assetStream.headers.get("content-length") || 0);
console.log("Total content size", Math.round(contentLength / _1MB), "MB");
const archiveFile = createWriteStream(filePath);
const body = assetStream.body;
await new Promise<void>((resolve, reject) => {
let numBytesDownloaded = 0;
let lastMessage = 0;
body.on("data", (data) => {
numBytesDownloaded += data.length;
if (numBytesDownloaded - lastMessage > _10MB) {
console.log("Downloaded", Math.round(numBytesDownloaded / _1MB), "MB");
lastMessage = numBytesDownloaded;
}
archiveFile.write(data);
});
body.on("finish", () => {
archiveFile.end(() => {
console.log("Finished download into", filePath);
resolve();
});
});
body.on("error", reject);
});
}
async function unzipWithProgress(
filePath: string,
unzipDir: string,
): Promise<void> {
let lastMessage = 0;
await unzipToDirectorySequentially(
filePath,
unzipDir,
({ bytesExtracted, totalBytes }) => {
if (bytesExtracted - lastMessage > _100MB) {
console.log(
"Extracted",
Math.round(bytesExtracted / _1MB),
"MB /",
Math.round(totalBytes / _1MB),
"MB",
);
lastMessage = bytesExtracted;
}
},
);
console.log("Finished unzipping into", unzipDir);
}
/**
* Url to download from
*/