fix(server&web): fix logs disorder issue (#1982)

This commit is contained in:
limbo 2024-05-30 17:15:14 +08:00 committed by GitHub
parent 07019af62e
commit b2f01152f9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 11757 additions and 4971 deletions

5626
pnpm-lock.yaml generated Normal file

File diff suppressed because it is too large Load Diff

View File

@ -6,6 +6,7 @@ import {
Query,
UseGuards,
Sse,
MessageEvent,
} from '@nestjs/common'
import http from 'http'
import { ApiBearerAuth, ApiOperation, ApiQuery, ApiTags } from '@nestjs/swagger'
@ -103,7 +104,7 @@ export class LogController {
@Param('podName') podName: string,
@Query('containerName') containerName: string,
@Param('appid') appid: string,
) {
): Promise<Observable<MessageEvent>> {
if (!containerName) {
containerName = appid
}
@ -114,12 +115,7 @@ export class LogController {
if (!podNameList.includes(podName) && podName !== 'all') {
return new Observable<MessageEvent>((subscriber) => {
subscriber.next(
JSON.stringify({
error: 'podName not exist',
}) as unknown as MessageEvent,
)
subscriber.complete()
subscriber.error(new Error('podName not exist'))
})
}
@ -136,19 +132,34 @@ export class LogController {
const logs = new Log(kc)
const streamsEnded = new Set<string>()
const timerId = setInterval(() => {
subscriber.next('\u200B' as unknown as MessageEvent)
}, 30000)
const k8sLogResponses: http.IncomingMessage[] = []
const podLogStreams: PassThrough[] = []
const destroyStream = () => {
combinedLogStream?.removeAllListeners()
combinedLogStream?.destroy()
clearInterval(timerId)
combinedLogStream.removeAllListeners()
combinedLogStream.destroy()
k8sLogResponses.forEach((response) => {
response.removeAllListeners()
response.destroy()
})
podLogStreams.forEach((stream) => {
stream.removeAllListeners()
stream.destroy()
})
}
let idCounter = 1
combinedLogStream.on('data', (chunk) => {
subscriber.next(chunk.toString() as MessageEvent)
const dataString = chunk.toString()
const messageEvent: MessageEvent = {
id: idCounter.toString(),
data: dataString,
type: 'log',
}
idCounter++
subscriber.next(messageEvent)
})
combinedLogStream.on('error', (error) => {
@ -157,18 +168,18 @@ export class LogController {
destroyStream()
})
combinedLogStream.on('end', () => {
combinedLogStream.on('close', () => {
subscriber.complete()
destroyStream()
})
const fetchLog = async (podName: string) => {
let k8sResponse: http.IncomingMessage | undefined
const podLogStream = new PassThrough()
streamsEnded.add(podName)
podLogStreams.push(podLogStream)
try {
k8sResponse = await logs.log(
const k8sResponse: http.IncomingMessage = await logs.log(
namespaceOfApp,
podName,
containerName,
@ -181,26 +192,33 @@ export class LogController {
tailLines: 1000,
},
)
k8sLogResponses.push(k8sResponse)
podLogStream.pipe(combinedLogStream, { end: false })
podLogStream.on('error', (error) => {
combinedLogStream.emit('error', error)
podLogStream.removeAllListeners()
podLogStream.destroy()
subscriber.error(error)
this.logger.error(`podLogStream error for pod ${podName}`, error)
destroyStream()
})
podLogStream.once('end', () => {
k8sResponse.on('close', () => {
streamsEnded.delete(podName)
if (streamsEnded.size === 0) {
combinedLogStream.end()
combinedLogStream.emit('close')
}
})
podLogStream.on('close', () => {
streamsEnded.delete(podName)
if (streamsEnded.size === 0) {
combinedLogStream.emit('close')
}
})
} catch (error) {
this.logger.error(`Failed to get logs for pod ${podName}`, error)
subscriber.error(error)
k8sResponse?.destroy()
podLogStream.removeAllListeners()
podLogStream.destroy()
this.logger.error(`Failed to get logs for pod ${podName}`, error)
destroyStream()
}
}
@ -212,8 +230,11 @@ export class LogController {
} else {
fetchLog(podName)
}
// Clean up when the client disconnects
return () => destroyStream()
return () => {
destroyStream()
}
})
}
}

11
web/package-lock.json generated
View File

@ -18,6 +18,7 @@
"@codingame/monaco-vscode-typescript-basics-default-extension": "~1.82.3",
"@emotion/react": "^11.11.0",
"@emotion/styled": "^11.11.0",
"@microsoft/fetch-event-source": "^2.0.1",
"@monaco-editor/react": "^4.6.0",
"@patternfly/react-log-viewer": "^5.0.0",
"@sentry/integrations": "^7.73.0",
@ -3771,6 +3772,11 @@
"@jridgewell/sourcemap-codec": "1.4.14"
}
},
"node_modules/@microsoft/fetch-event-source": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/@microsoft/fetch-event-source/-/fetch-event-source-2.0.1.tgz",
"integrity": "sha512-W6CLUJ2eBMw3Rec70qrsEW0jOm/3twwJv21mrmj2yORiaVmVYGS4sSS5yUwvQc1ZlDLYGPnClVWmUUMagKNsfA=="
},
"node_modules/@monaco-editor/loader": {
"version": "1.4.0",
"resolved": "https://registry.npmjs.org/@monaco-editor/loader/-/loader-1.4.0.tgz",
@ -14875,6 +14881,11 @@
"@jridgewell/sourcemap-codec": "1.4.14"
}
},
"@microsoft/fetch-event-source": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/@microsoft/fetch-event-source/-/fetch-event-source-2.0.1.tgz",
"integrity": "sha512-W6CLUJ2eBMw3Rec70qrsEW0jOm/3twwJv21mrmj2yORiaVmVYGS4sSS5yUwvQc1ZlDLYGPnClVWmUUMagKNsfA=="
},
"@monaco-editor/loader": {
"version": "1.4.0",
"resolved": "https://registry.npmjs.org/@monaco-editor/loader/-/loader-1.4.0.tgz",

View File

@ -23,6 +23,7 @@
"@codingame/monaco-vscode-typescript-basics-default-extension": "~1.82.3",
"@emotion/react": "^11.11.0",
"@emotion/styled": "^11.11.0",
"@microsoft/fetch-event-source": "^2.0.1",
"@monaco-editor/react": "^4.6.0",
"@patternfly/react-log-viewer": "^5.0.0",
"@sentry/integrations": "^7.73.0",

10835
web/pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,4 @@
import React, { useCallback, useEffect, useState } from "react";
import React, { useCallback, useEffect, useRef, useState } from "react";
import { useTranslation } from "react-i18next";
import {
Button,
@ -15,14 +15,12 @@ import {
useColorMode,
useDisclosure,
} from "@chakra-ui/react";
import { EventStreamContentType, fetchEventSource } from "@microsoft/fetch-event-source";
import { LogViewer, LogViewerSearch } from "@patternfly/react-log-viewer";
import { useQuery } from "@tanstack/react-query";
import clsx from "clsx";
import { debounce } from "lodash";
import { DownIcon, RefreshIcon } from "@/components/CommonIcon";
import { formatDate } from "@/utils/format";
import { streamFetch } from "@/utils/streamFetch";
import "./index.scss";
@ -30,47 +28,36 @@ import { PodControllerGetContainerNameList, PodControllerGetPodNameList } from "
import useCustomSettingStore from "@/pages/customSetting";
import useGlobalStore from "@/pages/globalStore";
type Log = {
data: string;
event: string;
id: string;
retry?: number;
};
const MAX_RETRIES = 5;
export default function LogsModal(props: { children: React.ReactElement }) {
const { children } = props;
const { isOpen, onOpen, onClose } = useDisclosure();
const { t } = useTranslation();
const settingStore = useCustomSettingStore();
const { showWarning } = useGlobalStore(({ showWarning }) => ({ showWarning }));
const { currentApp } = useGlobalStore((state) => state);
const [logs, setLogs] = useState("");
const [podName, setPodName] = useState("");
const [containerName, setContainerName] = useState("");
const [isLoading, setIsLoading] = useState(true);
const [rowNumber, setRowNumber] = useState(0);
const [isPaused, setIsPaused] = useState(false);
const [pausedRowNumber, setPausedRowNumber] = useState(0);
const [paused, setPaused] = useState(false);
const [logs, setLogs] = useState<Log[]>([]);
const [renderLogs, setRenderLogs] = useState("");
const [refresh, setRefresh] = useState(true);
const retryCountRef = useRef(0);
const darkMode = useColorMode().colorMode === "dark";
useEffect(() => {
const resizeHandler = debounce(() => {
if (!isPaused) {
setRefresh((pre) => !pre);
}
}, 200);
window.addEventListener("resize", resizeHandler);
return () => {
window.removeEventListener("resize", resizeHandler);
};
}, [isPaused]);
useEffect(() => {
if (!isPaused) {
setRenderLogs(logs.trim());
}
}, [isPaused, logs]);
const { data: podData } = useQuery(
["GetPodQuery"],
() => {
@ -103,46 +90,75 @@ export default function LogsModal(props: { children: React.ReactElement }) {
);
const fetchLogs = useCallback(() => {
if (!podName && !containerName) return;
const controller = new AbortController();
streamFetch({
url: `/v1/apps/${currentApp.appid}/logs/${podName}?containerName=${containerName}`,
abortSignal: controller,
firstResponse() {
setIsLoading(false);
},
onMessage(text) {
const regex = /id:\s\d+\s+data:\s(.*)\s+data:/g;
const logs = [...text.matchAll(regex)];
const regexTime = /(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3}Z)/g;
if (!podName || !containerName) return;
const ctrl = new AbortController();
const logStr = logs
.map((log) =>
log[1].replace(regexTime, (str) => formatDate(str, "YYYY-MM-DD HH:mm:ss.SSS")),
)
.join("\n");
fetchEventSource(
`/v1/apps/${currentApp.appid}/logs/${podName}?containerName=${containerName}`,
{
method: "GET",
headers: {
Authorization: "Bearer " + localStorage.getItem("token"),
},
signal: ctrl.signal,
async onopen(response) {
if (response.ok && response.headers.get("content-type") === EventStreamContentType) {
setIsLoading(false);
} else {
throw new Error(`Unexpected response: ${response.status} ${response.statusText}`);
}
},
setRowNumber((pre) => pre + logs.length);
setLogs((pre) => pre + logStr + "\n");
onmessage(msg) {
if (msg.event === "error") {
showWarning(msg.data);
}
if (msg.event === "log") {
const newLineCount = (msg.data.match(/\n/g) || []).length;
setLogs((pre) => [...pre, msg]);
setRowNumber((prevRowNumber) => prevRowNumber + newLineCount);
retryCountRef.current = 0;
}
},
onclose() {
// if the server closes the connection unexpectedly, retry:
if (retryCountRef.current < MAX_RETRIES) {
retryCountRef.current += 1;
setRefresh((pre) => !pre);
setPaused(false);
}
},
onerror(err) {
showWarning(err.message);
// auto retry fetch
},
},
}).catch((e) => {
if (e.includes("BodyStreamBuffer was aborted")) {
return;
}
throw e;
});
return controller;
}, [podName, containerName, currentApp.appid]);
);
return ctrl;
}, [podName, containerName, currentApp.appid, showWarning]);
useEffect(() => {
if (!isOpen) return;
setLogs("");
setLogs([]);
setIsLoading(true);
const controller = fetchLogs();
const ctrl = fetchLogs();
return () => {
controller?.abort();
ctrl?.abort();
};
}, [podName, containerName, isOpen, refresh]);
}, [podName, containerName, isOpen, refresh, fetchLogs]);
useEffect(() => {
const sortedLogs = [...logs].sort((a, b) => parseInt(a.id) - parseInt(b.id));
const concatenatedLogs = sortedLogs.map((log) => log.data).join("");
setRenderLogs(concatenatedLogs);
}, [logs]);
useEffect(() => {
retryCountRef.current = 0;
}, [isOpen]);
return (
<>
@ -163,8 +179,6 @@ export default function LogsModal(props: { children: React.ReactElement }) {
className="ml-4 !h-8 !w-64"
onChange={(e) => {
setPodName(e.target.value);
setIsLoading(true);
setLogs("");
}}
value={podName}
>
@ -185,8 +199,6 @@ export default function LogsModal(props: { children: React.ReactElement }) {
className="ml-1 !h-8 !w-32"
onChange={(e) => {
setContainerName(e.target.value);
setIsLoading(true);
setLogs("");
}}
value={containerName}
>
@ -205,7 +217,7 @@ export default function LogsModal(props: { children: React.ReactElement }) {
px={2}
onClick={() => {
setRefresh((pre) => !pre);
setIsPaused(false);
setPaused(false);
}}
>
{t("Refresh")}
@ -224,20 +236,17 @@ export default function LogsModal(props: { children: React.ReactElement }) {
className="text-sm flex h-full flex-col px-2 font-mono"
style={{ fontSize: settingStore.commonSettings.fontSize - 1 }}
onWheel={(e) => {
if (e.deltaY < 0 && !isPaused) {
setIsPaused(true);
setPausedRowNumber(rowNumber);
}
setPaused(true);
}}
>
<LogViewer
data={renderLogs}
hasLineNumbers={false}
scrollToRow={isPaused ? pausedRowNumber : rowNumber + 1}
scrollToRow={!paused ? rowNumber + 1 : undefined}
height={"98%"}
onScroll={(e) => {
if (e.scrollOffsetToBottom <= 0) {
setIsPaused(false);
setPaused(false);
}
}}
toolbar={
@ -251,10 +260,10 @@ export default function LogsModal(props: { children: React.ReactElement }) {
}
/>
<div className="absolute bottom-1 w-[95%]">
{isPaused && (
{paused && (
<HStack
onClick={() => {
setIsPaused(false);
setPaused(false);
}}
className={clsx(
"flex w-full cursor-pointer items-center justify-center",

View File

@ -21,11 +21,12 @@ type SITE_KEY =
| "enable_web_promo_page"
| "sealaf_notification";
export type SiteSettings = {
// eslint-disable-next-line no-unused-vars
[key in SITE_KEY]?: TSetting;
};
type State = {
siteSettings: {
// eslint-disable-next-line no-unused-vars
[key in SITE_KEY]?: TSetting;
};
siteSettings: SiteSettings;
getSiteSettings: () => void;
};

View File

@ -1,12 +1,12 @@
import { lazy, Suspense } from "react";
import { useRoutes } from "react-router-dom";
import { wrapUseRoutes } from "@sentry/react";
import AuthLayout from "@/layouts/Auth";
import BasicLayout from "@/layouts/Basic";
import FunctionLayout from "@/layouts/Function";
import TemplateLayout from "@/layouts/Template";
import useSiteSettingStore from "@/pages/siteSetting";
import { useRoutes } from "react-router-dom";
import { wrapUseRoutes } from "@sentry/react";
import useSiteSettingStore, { SiteSettings } from "@/pages/siteSetting";
const route404 = {
path: "*",
@ -148,10 +148,9 @@ function LazyElement(props: any) {
);
}
function dealRoutes(routesArr: any) {
const { siteSettings } = useSiteSettingStore();
function dealRoutes(routesArr: any, siteSettings: SiteSettings) {
if (routesArr && Array.isArray(routesArr) && routesArr.length > 0) {
if (siteSettings.enable_web_promo_page?.value === "false") {
if (siteSettings.enable_web_promo_page?.value === "false") {
for (let i = 0; i < routesArr.length; i++) {
const route = routesArr[i];
if (route.index) {
@ -176,16 +175,17 @@ function dealRoutes(routesArr: any) {
route.element = <LazyElement importFunc={importFunc} />;
}
if (route.children) {
dealRoutes(route.children);
dealRoutes(route.children, siteSettings);
}
});
}
}
function RouteElement() {
const { siteSettings } = useSiteSettingStore();
const useSentryRoutes = wrapUseRoutes(useRoutes);
dealRoutes(routes);
dealRoutes(routes, siteSettings);
const element = useSentryRoutes(routes as any);
return element;
}