Implement streaming for compare view

This commit is contained in:
Koen Vlaswinkel
2024-10-22 15:19:03 +02:00
parent 4ed616769c
commit a69ef15594
3 changed files with 176 additions and 2 deletions

View File

@@ -360,6 +360,9 @@ interface ChangeCompareMessage {
export type ToCompareViewMessage = export type ToCompareViewMessage =
| SetComparisonQueryInfoMessage | SetComparisonQueryInfoMessage
| SetComparisonsMessage | SetComparisonsMessage
| StreamingComparisonSetupMessage
| StreamingComparisonAddResultsMessage
| StreamingComparisonCompleteMessage
| SetUserSettingsMsg; | SetUserSettingsMsg;
/** /**
@@ -419,6 +422,24 @@ export type InterpretedQueryCompareResult = {
to: Result[]; to: Result[];
}; };
export interface StreamingComparisonSetupMessage {
readonly t: "streamingComparisonSetup";
readonly currentResultSetName: string;
readonly message: string | undefined;
// The from and to fields will only contain a chunk of the results
readonly result: QueryCompareResult;
}
interface StreamingComparisonAddResultsMessage {
readonly t: "streamingComparisonAddResults";
// The from and to fields will only contain a chunk of the results
readonly result: QueryCompareResult;
}
interface StreamingComparisonCompleteMessage {
readonly t: "streamingComparisonComplete";
}
/** /**
* Extract the name of the default result. Prefer returning * Extract the name of the default result. Prefer returning
* 'alerts', or '#select'. Otherwise return the first in the list. * 'alerts', or '#select'. Otherwise return the first in the list.

View File

@@ -183,13 +183,92 @@ export class CompareView extends AbstractWebview<
message = getErrorMessage(e); message = getErrorMessage(e);
} }
await this.streamResults(result, currentResultSetDisplayName, message);
}
}
private async streamResults(
result: QueryCompareResult | undefined,
currentResultSetName: string,
message: string | undefined,
) {
// Since there is a string limit of 1GB in Node.js, the comparison is send as a JSON.stringified string to the webview
// and some comparisons may be larger than that, we sometimes need to stream results. This uses a heuristic of 2,000 results
// to determine if we should stream results.
if (!this.shouldStreamResults(result)) {
await this.postMessage({ await this.postMessage({
t: "setComparisons", t: "setComparisons",
result, result,
currentResultSetName: currentResultSetDisplayName, currentResultSetName,
message, message,
}); });
return;
} }
// Streaming itself is implemented like this:
// - 1 setup message which contains the first 1,000 results
// - n "add results" messages which contain 1,000 results each
// - 1 complete message which just tells the webview that we're done
await this.postMessage({
t: "streamingComparisonSetup",
result: this.chunkResults(result, 0, 1000),
currentResultSetName,
message,
});
const { from, to } = result;
const maxResults = Math.max(from.length, to.length);
for (let i = 1000; i < maxResults; i += 1000) {
const chunk = this.chunkResults(result, i, i + 1000);
await this.postMessage({
t: "streamingComparisonAddResults",
result: chunk,
});
}
await this.postMessage({
t: "streamingComparisonComplete",
});
}
private shouldStreamResults(
result: QueryCompareResult | undefined,
): result is QueryCompareResult {
if (result === undefined) {
return false;
}
// We probably won't run into limits if we have less than 2,000 total results
const totalResults = result.from.length + result.to.length;
return totalResults > 2000;
}
private chunkResults(
result: QueryCompareResult,
start: number,
end: number,
): QueryCompareResult {
if (result.kind === "raw") {
return {
...result,
from: result.from.slice(start, end),
to: result.to.slice(start, end),
};
}
if (result.kind === "interpreted") {
return {
...result,
from: result.from.slice(start, end),
to: result.to.slice(start, end),
};
}
assertNever(result);
} }
protected getPanelConfig(): WebviewPanelConfig { protected getPanelConfig(): WebviewPanelConfig {

View File

@@ -1,4 +1,4 @@
import { useState, useEffect } from "react"; import { useState, useEffect, useRef } from "react";
import { styled } from "styled-components"; import { styled } from "styled-components";
import type { import type {
@@ -6,6 +6,8 @@ import type {
SetComparisonsMessage, SetComparisonsMessage,
SetComparisonQueryInfoMessage, SetComparisonQueryInfoMessage,
UserSettings, UserSettings,
StreamingComparisonSetupMessage,
QueryCompareResult,
} from "../../common/interface-types"; } from "../../common/interface-types";
import { DEFAULT_USER_SETTINGS } from "../../common/interface-types"; import { DEFAULT_USER_SETTINGS } from "../../common/interface-types";
import CompareSelector from "./CompareSelector"; import CompareSelector from "./CompareSelector";
@@ -37,6 +39,12 @@ export function Compare(_: Record<string, never>): React.JSX.Element {
DEFAULT_USER_SETTINGS, DEFAULT_USER_SETTINGS,
); );
// This is a ref because we don't need to re-render when we get a new streaming comparison message
// and we don't want to change the listener every time we get a new message
const streamingComparisonRef = useRef<StreamingComparisonSetupMessage | null>(
null,
);
const message = comparison?.message || "Empty comparison"; const message = comparison?.message || "Empty comparison";
const hasRows = const hasRows =
comparison?.result && comparison?.result &&
@@ -53,6 +61,72 @@ export function Compare(_: Record<string, never>): React.JSX.Element {
case "setComparisons": case "setComparisons":
setComparison(msg); setComparison(msg);
break; break;
case "streamingComparisonSetup":
setComparison(null);
streamingComparisonRef.current = msg;
break;
case "streamingComparisonAddResults": {
const prev = streamingComparisonRef.current;
if (prev === null) {
console.warn(
'Received "streamingComparisonAddResults" before "streamingComparisonSetup"',
);
break;
}
let result: QueryCompareResult;
switch (prev.result.kind) {
case "raw":
if (msg.result.kind !== "raw") {
throw new Error(
"Streaming comparison: expected raw results, got interpreted results",
);
}
result = {
...prev.result,
from: [...prev.result.from, ...msg.result.from],
to: [...prev.result.to, ...msg.result.to],
};
break;
case "interpreted":
if (msg.result.kind !== "interpreted") {
throw new Error(
"Streaming comparison: expected interpreted results, got raw results",
);
}
result = {
...prev.result,
from: [...prev.result.from, ...msg.result.from],
to: [...prev.result.to, ...msg.result.to],
};
break;
default:
throw new Error("Unexpected comparison result kind");
}
streamingComparisonRef.current = {
...prev,
result,
};
break;
}
case "streamingComparisonComplete":
if (streamingComparisonRef.current === null) {
console.warn(
'Received "streamingComparisonComplete" before "streamingComparisonSetup"',
);
setComparison(null);
break;
}
setComparison({
...streamingComparisonRef.current,
t: "setComparisons",
});
streamingComparisonRef.current = null;
break;
case "setUserSettings": case "setUserSettings":
setUserSettings(msg.userSettings); setUserSettings(msg.userSettings);
break; break;