Merge pull request #3155 from github/koesie10/yauzl-concurrent
Add concurrency to unzipping using `yauzl`
This commit is contained in:
@@ -20,7 +20,7 @@ import { spawnSync } from "child_process";
|
|||||||
import { basename, resolve } from "path";
|
import { basename, resolve } from "path";
|
||||||
import { pathExists, readJSON } from "fs-extra";
|
import { pathExists, readJSON } from "fs-extra";
|
||||||
import { RawSourceMap, SourceMapConsumer } from "source-map";
|
import { RawSourceMap, SourceMapConsumer } from "source-map";
|
||||||
import { unzipToDirectory } from "../src/common/unzip";
|
import { unzipToDirectorySequentially } from "../src/common/unzip";
|
||||||
|
|
||||||
if (process.argv.length !== 4) {
|
if (process.argv.length !== 4) {
|
||||||
console.error(
|
console.error(
|
||||||
@@ -78,7 +78,7 @@ async function extractSourceMap() {
|
|||||||
releaseAssetsDirectory,
|
releaseAssetsDirectory,
|
||||||
]);
|
]);
|
||||||
|
|
||||||
await unzipToDirectory(
|
await unzipToDirectorySequentially(
|
||||||
resolve(releaseAssetsDirectory, sourcemapAsset.name),
|
resolve(releaseAssetsDirectory, sourcemapAsset.name),
|
||||||
sourceMapsDirectory,
|
sourceMapsDirectory,
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ import {
|
|||||||
showAndLogErrorMessage,
|
showAndLogErrorMessage,
|
||||||
showAndLogWarningMessage,
|
showAndLogWarningMessage,
|
||||||
} from "../common/logging";
|
} from "../common/logging";
|
||||||
import { unzipToDirectory } from "../common/unzip";
|
import { unzipToDirectoryConcurrently } from "../common/unzip-concurrently";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* distribution.ts
|
* distribution.ts
|
||||||
@@ -420,7 +420,10 @@ class ExtensionSpecificDistributionManager {
|
|||||||
void extLogger.log(
|
void extLogger.log(
|
||||||
`Extracting CodeQL CLI to ${this.getDistributionStoragePath()}`,
|
`Extracting CodeQL CLI to ${this.getDistributionStoragePath()}`,
|
||||||
);
|
);
|
||||||
await unzipToDirectory(archivePath, this.getDistributionStoragePath());
|
await unzipToDirectoryConcurrently(
|
||||||
|
archivePath,
|
||||||
|
this.getDistributionStoragePath(),
|
||||||
|
);
|
||||||
} finally {
|
} finally {
|
||||||
await remove(tmpDirectory);
|
await remove(tmpDirectory);
|
||||||
}
|
}
|
||||||
|
|||||||
16
extensions/ql-vscode/src/common/unzip-concurrently.ts
Normal file
16
extensions/ql-vscode/src/common/unzip-concurrently.ts
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
import { availableParallelism } from "os";
|
||||||
|
import { unzipToDirectory } from "./unzip";
|
||||||
|
import PQueue from "p-queue";
|
||||||
|
|
||||||
|
export async function unzipToDirectoryConcurrently(
|
||||||
|
archivePath: string,
|
||||||
|
destinationPath: string,
|
||||||
|
): Promise<void> {
|
||||||
|
const queue = new PQueue({
|
||||||
|
concurrency: availableParallelism(),
|
||||||
|
});
|
||||||
|
|
||||||
|
return unzipToDirectory(archivePath, destinationPath, async (tasks) => {
|
||||||
|
await queue.addAll(tasks);
|
||||||
|
});
|
||||||
|
}
|
||||||
@@ -97,9 +97,57 @@ async function copyStream(
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unzips a single file from a zip archive.
|
||||||
|
*
|
||||||
|
* @param zipFile
|
||||||
|
* @param entry
|
||||||
|
* @param rootDestinationPath
|
||||||
|
*/
|
||||||
|
async function unzipFile(
|
||||||
|
zipFile: ZipFile,
|
||||||
|
entry: ZipEntry,
|
||||||
|
rootDestinationPath: string,
|
||||||
|
): Promise<void> {
|
||||||
|
const path = join(rootDestinationPath, entry.fileName);
|
||||||
|
|
||||||
|
if (/\/$/.test(entry.fileName)) {
|
||||||
|
// Directory file names end with '/'
|
||||||
|
|
||||||
|
await ensureDir(path);
|
||||||
|
} else {
|
||||||
|
// Ensure the directory exists
|
||||||
|
await ensureDir(dirname(path));
|
||||||
|
|
||||||
|
const readable = await openZipReadStream(zipFile, entry);
|
||||||
|
|
||||||
|
let mode: number | undefined = entry.externalFileAttributes >>> 16;
|
||||||
|
if (mode <= 0) {
|
||||||
|
mode = undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
const writeStream = createWriteStream(path, {
|
||||||
|
autoClose: true,
|
||||||
|
mode,
|
||||||
|
});
|
||||||
|
|
||||||
|
await copyStream(readable, writeStream);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unzips all files from a zip archive. Please use
|
||||||
|
* `unzipToDirectoryConcurrently` or `unzipToDirectorySequentially` instead
|
||||||
|
* of this function.
|
||||||
|
*
|
||||||
|
* @param archivePath
|
||||||
|
* @param destinationPath
|
||||||
|
* @param taskRunner A function that runs the tasks (either sequentially or concurrently).
|
||||||
|
*/
|
||||||
export async function unzipToDirectory(
|
export async function unzipToDirectory(
|
||||||
archivePath: string,
|
archivePath: string,
|
||||||
destinationPath: string,
|
destinationPath: string,
|
||||||
|
taskRunner: (tasks: Array<() => Promise<void>>) => Promise<void>,
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
const zipFile = await openZip(archivePath, {
|
const zipFile = await openZip(archivePath, {
|
||||||
autoClose: false,
|
autoClose: false,
|
||||||
@@ -110,33 +158,29 @@ export async function unzipToDirectory(
|
|||||||
try {
|
try {
|
||||||
const entries = await readZipEntries(zipFile);
|
const entries = await readZipEntries(zipFile);
|
||||||
|
|
||||||
for (const entry of entries) {
|
await taskRunner(
|
||||||
const path = join(destinationPath, entry.fileName);
|
entries.map((entry) => () => unzipFile(zipFile, entry, destinationPath)),
|
||||||
|
);
|
||||||
if (/\/$/.test(entry.fileName)) {
|
|
||||||
// Directory file names end with '/'
|
|
||||||
|
|
||||||
await ensureDir(path);
|
|
||||||
} else {
|
|
||||||
// Ensure the directory exists
|
|
||||||
await ensureDir(dirname(path));
|
|
||||||
|
|
||||||
const readable = await openZipReadStream(zipFile, entry);
|
|
||||||
|
|
||||||
let mode: number | undefined = entry.externalFileAttributes >>> 16;
|
|
||||||
if (mode <= 0) {
|
|
||||||
mode = undefined;
|
|
||||||
}
|
|
||||||
|
|
||||||
const writeStream = createWriteStream(path, {
|
|
||||||
autoClose: true,
|
|
||||||
mode,
|
|
||||||
});
|
|
||||||
|
|
||||||
await copyStream(readable, writeStream);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} finally {
|
} finally {
|
||||||
zipFile.close();
|
zipFile.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sequentially unzips all files from a zip archive. Please use
|
||||||
|
* `unzipToDirectoryConcurrently` if you can. This function is only
|
||||||
|
* provided because Jest cannot import `p-queue`.
|
||||||
|
*
|
||||||
|
* @param archivePath
|
||||||
|
* @param destinationPath
|
||||||
|
*/
|
||||||
|
export async function unzipToDirectorySequentially(
|
||||||
|
archivePath: string,
|
||||||
|
destinationPath: string,
|
||||||
|
): Promise<void> {
|
||||||
|
return unzipToDirectory(archivePath, destinationPath, async (tasks) => {
|
||||||
|
for (const task of tasks) {
|
||||||
|
await task();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ import {
|
|||||||
} from "./shared/variant-analysis";
|
} from "./shared/variant-analysis";
|
||||||
import { DisposableObject, DisposeHandler } from "../common/disposable-object";
|
import { DisposableObject, DisposeHandler } from "../common/disposable-object";
|
||||||
import { EventEmitter } from "vscode";
|
import { EventEmitter } from "vscode";
|
||||||
import { unzipToDirectory } from "../common/unzip";
|
import { unzipToDirectoryConcurrently } from "../common/unzip-concurrently";
|
||||||
import { readRepoTask, writeRepoTask } from "./repo-tasks-store";
|
import { readRepoTask, writeRepoTask } from "./repo-tasks-store";
|
||||||
|
|
||||||
type CacheKey = `${number}/${string}`;
|
type CacheKey = `${number}/${string}`;
|
||||||
@@ -106,7 +106,7 @@ export class VariantAnalysisResultsManager extends DisposableObject {
|
|||||||
VariantAnalysisResultsManager.RESULTS_DIRECTORY,
|
VariantAnalysisResultsManager.RESULTS_DIRECTORY,
|
||||||
);
|
);
|
||||||
|
|
||||||
await unzipToDirectory(zipFilePath, unzippedFilesDirectory);
|
await unzipToDirectoryConcurrently(zipFilePath, unzippedFilesDirectory);
|
||||||
|
|
||||||
this._onResultDownloaded.fire({
|
this._onResultDownloaded.fire({
|
||||||
variantAnalysisId,
|
variantAnalysisId,
|
||||||
|
|||||||
@@ -8,9 +8,10 @@ import {
|
|||||||
openZip,
|
openZip,
|
||||||
openZipBuffer,
|
openZipBuffer,
|
||||||
readZipEntries,
|
readZipEntries,
|
||||||
unzipToDirectory,
|
unzipToDirectorySequentially,
|
||||||
} from "../../../src/common/unzip";
|
} from "../../../src/common/unzip";
|
||||||
import { walkDirectory } from "../../../src/common/files";
|
import { walkDirectory } from "../../../src/common/files";
|
||||||
|
import { unzipToDirectoryConcurrently } from "../../../src/common/unzip-concurrently";
|
||||||
|
|
||||||
const zipPath = resolve(__dirname, "../data/unzip/test-zip.zip");
|
const zipPath = resolve(__dirname, "../data/unzip/test-zip.zip");
|
||||||
|
|
||||||
@@ -88,7 +89,16 @@ describe("openZipBuffer", () => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe("unzipToDirectory", () => {
|
describe.each([
|
||||||
|
{
|
||||||
|
name: "unzipToDirectorySequentially",
|
||||||
|
unzipToDirectory: unzipToDirectorySequentially,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "unzipToDirectoryConcurrently",
|
||||||
|
unzipToDirectory: unzipToDirectoryConcurrently,
|
||||||
|
},
|
||||||
|
])("$name", ({ unzipToDirectory }) => {
|
||||||
let tmpDir: DirectoryResult;
|
let tmpDir: DirectoryResult;
|
||||||
|
|
||||||
beforeEach(async () => {
|
beforeEach(async () => {
|
||||||
@@ -186,6 +196,8 @@ async function expectFile(
|
|||||||
if (expectedContents) {
|
if (expectedContents) {
|
||||||
expect(contents.toString("utf-8")).toEqual(expectedContents);
|
expect(contents.toString("utf-8")).toEqual(expectedContents);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
await file.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
async function computeHash(contents: Buffer) {
|
async function computeHash(contents: Buffer) {
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import {
|
|||||||
getRequiredAssetName,
|
getRequiredAssetName,
|
||||||
codeQlLauncherName,
|
codeQlLauncherName,
|
||||||
} from "../../src/common/distribution";
|
} from "../../src/common/distribution";
|
||||||
import { unzipToDirectory } from "../../src/common/unzip";
|
import { unzipToDirectorySequentially } from "../../src/common/unzip";
|
||||||
import fetch from "node-fetch";
|
import fetch from "node-fetch";
|
||||||
import supportedCliVersions from "../../supported_cli_versions.json";
|
import supportedCliVersions from "../../supported_cli_versions.json";
|
||||||
|
|
||||||
@@ -126,7 +126,7 @@ export async function ensureCli(useCli: boolean) {
|
|||||||
|
|
||||||
console.log(`Unzipping into '${unzipDir}'`);
|
console.log(`Unzipping into '${unzipDir}'`);
|
||||||
mkdirpSync(unzipDir);
|
mkdirpSync(unzipDir);
|
||||||
await unzipToDirectory(downloadedFilePath, unzipDir);
|
await unzipToDirectorySequentially(downloadedFilePath, unzipDir);
|
||||||
console.log("Done.");
|
console.log("Done.");
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
console.error("Failed to download CLI.");
|
console.error("Failed to download CLI.");
|
||||||
|
|||||||
Reference in New Issue
Block a user