From ceca8d275740d851167a506f730961fd631d29c3 Mon Sep 17 00:00:00 2001 From: Kautilya Tripathi <ktripathi@microsoft.com> Date: Mon, 25 Nov 2024 11:30:43 +0530 Subject: [PATCH] frontend: Websocket backward compatibility This adds a new way to use new way to run websocket multiplexer. Default way would be the legacy way which creates multiple websocket connection. This adds a new flag `REACT_APP_ENABLE_WEBSOCKET_MULTIPLEXER` to run the new API. Signed-off-by: Kautilya Tripathi <ktripathi@microsoft.com> --- frontend/package.json | 1 + .../components/common/Resource/index.test.ts | 2 +- frontend/src/components/common/index.test.ts | 2 +- frontend/src/helpers/index.ts | 9 + .../src/lib/k8s/api/v2/useKubeObjectList.ts | 108 +++++++++- frontend/src/lib/k8s/api/v2/webSocket.ts | 203 ++++++++++++++++-- .../plugin/__snapshots__/pluginLib.snapshot | 28 +-- 7 files changed, 319 insertions(+), 34 deletions(-) diff --git a/frontend/package.json b/frontend/package.json index 55efa85ff4..c12bf83ba8 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -120,6 +120,7 @@ "build": "cross-env PUBLIC_URL=./ NODE_OPTIONS=--max-old-space-size=8096 vite build && npx shx rm -f build/frontend/index.baseUrl.html", "pretest": "npm run make-version", "test": "vitest", + "multiplexer": "cross-env REACT_APP_ENABLE_WEBSOCKET_MULTIPLEXER=true npm run start", "lint": "eslint --cache -c package.json --ext .js,.ts,.tsx src/ ../app/electron ../plugins/headlamp-plugin --ignore-pattern ../plugins/headlamp-plugin/template --ignore-pattern ../plugins/headlamp-plugin/lib/", "format": "prettier --config package.json --write --cache src ../app/electron ../app/tsconfig.json ../app/scripts ../plugins/headlamp-plugin/bin ../plugins/headlamp-plugin/config ../plugins/headlamp-plugin/template ../plugins/headlamp-plugin/test*.js ../plugins/headlamp-plugin/*.json ../plugins/headlamp-plugin/*.js", "format-check": "prettier --config package.json --check --cache src ../app/electron ../app/tsconfig.json ../app/scripts ../plugins/headlamp-plugin/bin ../plugins/headlamp-plugin/config ../plugins/headlamp-plugin/template ../plugins/headlamp-plugin/test*.js ../plugins/headlamp-plugin/*.json ../plugins/headlamp-plugin/*.js", diff --git a/frontend/src/components/common/Resource/index.test.ts b/frontend/src/components/common/Resource/index.test.ts index e4573b53a2..0b9f1ab355 100644 --- a/frontend/src/components/common/Resource/index.test.ts +++ b/frontend/src/components/common/Resource/index.test.ts @@ -39,7 +39,7 @@ function getFilesToVerify() { const filesToVerify: string[] = []; fs.readdirSync(__dirname).forEach(file => { const fileNoSuffix = file.replace(/\.[^/.]+$/, ''); - if (!avoidCheck.find(suffix => fileNoSuffix.endsWith(suffix))) { + if (fileNoSuffix && !avoidCheck.find(suffix => fileNoSuffix.endsWith(suffix))) { filesToVerify.push(fileNoSuffix); } }); diff --git a/frontend/src/components/common/index.test.ts b/frontend/src/components/common/index.test.ts index 0af5c688a8..cb3737dd62 100644 --- a/frontend/src/components/common/index.test.ts +++ b/frontend/src/components/common/index.test.ts @@ -49,7 +49,7 @@ function getFilesToVerify() { const filesToVerify: string[] = []; fs.readdirSync(__dirname).forEach(file => { const fileNoSuffix = file.replace(/\.[^/.]+$/, ''); - if (!avoidCheck.find(suffix => fileNoSuffix.endsWith(suffix))) { + if (fileNoSuffix && !avoidCheck.find(suffix => fileNoSuffix.endsWith(suffix))) { filesToVerify.push(fileNoSuffix); } }); diff --git a/frontend/src/helpers/index.ts b/frontend/src/helpers/index.ts index 40780c37e9..6528ef3642 100644 --- a/frontend/src/helpers/index.ts +++ b/frontend/src/helpers/index.ts @@ -352,6 +352,14 @@ function loadTableSettings(tableId: string): { id: string; show: boolean }[] { return settings; } +/** + * @returns true if the websocket multiplexer is enabled. + * defaults to false + */ +export function getWebsocketMultiplexerStatus(): boolean { + return import.meta.env.REACT_APP_ENABLE_WEBSOCKET_MULTIPLEXER === 'true'; +} + /** * The backend token to use when making API calls from Headlamp when running as an app. * The app opens the index.html?backendToken=... and passes the token to the frontend @@ -393,6 +401,7 @@ const exportFunctions = { storeClusterSettings, loadClusterSettings, getHeadlampAPIHeaders, + getWebsocketMultiplexerStatus, storeTableSettings, loadTableSettings, }; diff --git a/frontend/src/lib/k8s/api/v2/useKubeObjectList.ts b/frontend/src/lib/k8s/api/v2/useKubeObjectList.ts index 0eff96ca8b..395010dd9a 100644 --- a/frontend/src/lib/k8s/api/v2/useKubeObjectList.ts +++ b/frontend/src/lib/k8s/api/v2/useKubeObjectList.ts @@ -1,14 +1,15 @@ import { QueryObserverOptions, useQueries, useQueryClient } from '@tanstack/react-query'; import { useEffect, useMemo, useRef, useState } from 'react'; +import { getWebsocketMultiplexerStatus } from '../../../../helpers'; import { KubeObject, KubeObjectClass } from '../../KubeObject'; import { ApiError } from '../v1/clusterRequests'; import { QueryParameters } from '../v1/queryParameters'; import { clusterFetch } from './fetch'; import { QueryListResponse, useEndpoints } from './hooks'; -import { KubeList } from './KubeList'; +import { KubeList, KubeListUpdateEvent } from './KubeList'; import { KubeObjectEndpoint } from './KubeObjectEndpoint'; import { makeUrl } from './makeUrl'; -import { BASE_WS_URL, WebSocketManager } from './webSocket'; +import { BASE_WS_URL, useWebSockets, WebSocketManager } from './webSocket'; /** * Object representing a List of Kube object @@ -111,6 +112,44 @@ export function useWatchKubeObjectLists<K extends KubeObject>({ endpoint?: KubeObjectEndpoint | null; /** Which clusters and namespaces to watch */ lists: Array<{ cluster: string; namespace?: string; resourceVersion: string }>; +}) { + const websocketMultiplexerStatus = getWebsocketMultiplexerStatus(); + + if (websocketMultiplexerStatus) { + return useWatchKubeObjectListsMultiplexed({ + kubeObjectClass, + endpoint, + lists, + queryParams, + }); + } else { + return useWatchKubeObjectListsLegacy({ + kubeObjectClass, + endpoint, + lists, + queryParams, + }); + } +} + +/** + * Accepts a list of lists to watch. + * Upon receiving update it will modify query data for list query + * @param kubeObjectClass - KubeObject class of the watched resource list + * @param endpoint - Kube resource API endpoint information + * @param lists - Which clusters and namespaces to watch + * @param queryParams - Query parameters for the WebSocket connection URL + */ +function useWatchKubeObjectListsMultiplexed<K extends KubeObject>({ + kubeObjectClass, + endpoint, + lists, + queryParams, +}: { + kubeObjectClass: (new (...args: any) => K) & typeof KubeObject<any>; + endpoint?: KubeObjectEndpoint | null; + lists: Array<{ cluster: string; namespace?: string; resourceVersion: string }>; + queryParams?: QueryParameters; }) { const client = useQueryClient(); const latestResourceVersions = useRef<Record<string, string>>({}); @@ -121,7 +160,6 @@ export function useWatchKubeObjectLists<K extends KubeObject>({ return lists.map(list => { const key = `${list.cluster}:${list.namespace || ''}`; - // Only update resourceVersion if it's newer if ( !latestResourceVersions.current[key] || parseInt(list.resourceVersion) > parseInt(latestResourceVersions.current[key]) @@ -153,7 +191,6 @@ export function useWatchKubeObjectLists<K extends KubeObject>({ WebSocketManager.subscribe(cluster, parsedUrl.pathname, parsedUrl.search.slice(1), update => { if (!update || typeof update !== 'object') return; - // Update latest resourceVersion if (update.object?.metadata?.resourceVersion) { latestResourceVersions.current[key] = update.object.metadata.resourceVersion; } @@ -184,6 +221,69 @@ export function useWatchKubeObjectLists<K extends KubeObject>({ }, [connections, endpoint, client, kubeObjectClass, queryParams]); } +/** + * Accepts a list of lists to watch. + * Upon receiving update it will modify query data for list query + * @param kubeObjectClass - KubeObject class of the watched resource list + * @param endpoint - Kube resource API endpoint information + * @param lists - Which clusters and namespaces to watch + * @param queryParams - Query parameters for the WebSocket connection URL + */ +function useWatchKubeObjectListsLegacy<K extends KubeObject>({ + kubeObjectClass, + endpoint, + lists, + queryParams, +}: { + /** KubeObject class of the watched resource list */ + kubeObjectClass: (new (...args: any) => K) & typeof KubeObject<any>; + /** Query parameters for the WebSocket connection URL */ + queryParams?: QueryParameters; + /** Kube resource API endpoint information */ + endpoint?: KubeObjectEndpoint | null; + /** Which clusters and namespaces to watch */ + lists: Array<{ cluster: string; namespace?: string; resourceVersion: string }>; +}) { + const client = useQueryClient(); + + const connections = useMemo(() => { + if (!endpoint) return []; + + return lists.map(({ cluster, namespace, resourceVersion }) => { + const url = makeUrl([KubeObjectEndpoint.toUrl(endpoint!, namespace)], { + ...queryParams, + watch: 1, + resourceVersion, + }); + + return { + cluster, + url, + onMessage(update: KubeListUpdateEvent<K>) { + const key = kubeObjectListQuery<K>( + kubeObjectClass, + endpoint, + namespace, + cluster, + queryParams ?? {} + ).queryKey; + client.setQueryData(key, (oldResponse: ListResponse<any> | undefined | null) => { + if (!oldResponse) return oldResponse; + + const newList = KubeList.applyUpdate(oldResponse.list, update, kubeObjectClass); + return { ...oldResponse, list: newList }; + }); + }, + }; + }); + }, [lists, kubeObjectClass, endpoint]); + + useWebSockets<KubeListUpdateEvent<K>>({ + enabled: !!endpoint, + connections, + }); +} + /** * Creates multiple requests to list Kube objects * Handles multiple clusters, namespaces and allowed namespaces diff --git a/frontend/src/lib/k8s/api/v2/webSocket.ts b/frontend/src/lib/k8s/api/v2/webSocket.ts index 021ad7c5ed..217df6c3fa 100644 --- a/frontend/src/lib/k8s/api/v2/webSocket.ts +++ b/frontend/src/lib/k8s/api/v2/webSocket.ts @@ -1,8 +1,11 @@ import { useEffect, useMemo } from 'react'; -import { getUserIdFromLocalStorage } from '../../../../stateless'; +import { findKubeconfigByClusterName, getUserIdFromLocalStorage } from '../../../../stateless'; +import { getToken } from '../../../auth'; +import { getCluster } from '../../../cluster'; import { KubeObjectInterface } from '../../KubeObject'; import { BASE_HTTP_URL } from './fetch'; import { KubeListUpdateEvent } from './KubeList'; +import { makeUrl } from './makeUrl'; // Constants for WebSocket connection export const BASE_WS_URL = BASE_HTTP_URL.replace('http', 'ws'); @@ -32,7 +35,7 @@ interface WebSocketMessage { */ export const WebSocketManager = { /** Current WebSocket connection instance */ - socket: null as WebSocket | null, + socketMultiplexer: null as WebSocket | null, /** Flag to track if a connection attempt is in progress */ connecting: false, @@ -72,17 +75,17 @@ export const WebSocketManager = { */ async connect(): Promise<WebSocket> { // Return existing connection if available - if (this.socket?.readyState === WebSocket.OPEN) { - return this.socket; + if (this.socketMultiplexer?.readyState === WebSocket.OPEN) { + return this.socketMultiplexer; } // Wait for existing connection attempt to complete if (this.connecting) { return new Promise(resolve => { const checkConnection = setInterval(() => { - if (this.socket?.readyState === WebSocket.OPEN) { + if (this.socketMultiplexer?.readyState === WebSocket.OPEN) { clearInterval(checkConnection); - resolve(this.socket); + resolve(this.socketMultiplexer); } }, 100); }); @@ -95,7 +98,7 @@ export const WebSocketManager = { const socket = new WebSocket(wsUrl); socket.onopen = () => { - this.socket = socket; + this.socketMultiplexer = socket; this.connecting = false; resolve(socket); }; @@ -163,7 +166,7 @@ export const WebSocketManager = { */ handleCompletionMessage(data: any, key: string): void { this.completedPaths.add(key); - if (this.socket?.readyState === WebSocket.OPEN) { + if (this.socketMultiplexer?.readyState === WebSocket.OPEN) { const closeMsg: WebSocketMessage = { clusterId: data.clusterId, path: data.path, @@ -171,7 +174,7 @@ export const WebSocketManager = { userId: data.userId || '', type: 'CLOSE', }; - this.socket.send(JSON.stringify(closeMsg)); + this.socketMultiplexer.send(JSON.stringify(closeMsg)); } }, @@ -181,7 +184,7 @@ export const WebSocketManager = { */ handleWebSocketClose(): void { console.log('WebSocket closed, attempting reconnect...'); - this.socket = null; + this.socketMultiplexer = null; this.connecting = false; if (this.listeners.size > 0) { setTimeout(() => this.connect(), 1000); @@ -263,7 +266,7 @@ export const WebSocketManager = { this.completedPaths.delete(key); this.activeSubscriptions.delete(key); - if (this.socket?.readyState === WebSocket.OPEN) { + if (this.socketMultiplexer?.readyState === WebSocket.OPEN) { const [clusterId] = key.split(':'); const closeMsg: WebSocketMessage = { clusterId, @@ -272,13 +275,13 @@ export const WebSocketManager = { userId: userId || '', type: 'CLOSE', }; - this.socket.send(JSON.stringify(closeMsg)); + this.socketMultiplexer.send(JSON.stringify(closeMsg)); } } if (this.listeners.size === 0) { - this.socket?.close(); - this.socket = null; + this.socketMultiplexer?.close(); + this.socketMultiplexer = null; } }, }; @@ -362,6 +365,178 @@ export function useWebSocket<T extends KubeObjectInterface>({ }, [enabled, url, cluster, onMessage, onError]); } +export type WebSocketConnectionRequest<T> = { + cluster: string; + url: string; + onMessage: (data: T) => void; +}; + +// Keeps track of open WebSocket connections and active listeners +const sockets = new Map<string, WebSocket | 'pending'>(); +const listeners = new Map<string, Array<(update: any) => void>>(); + +/** + * Create new WebSocket connection to the backend + * + * @param url - WebSocket URL + * @param options - Connection options + * + * @returns WebSocket connection + */ +export async function openWebSocket<T>( + url: string, + { + protocols: moreProtocols = [], + type = 'binary', + cluster = getCluster() ?? '', + onMessage, + }: { + /** + * Any additional protocols to include in WebSocket connection + */ + protocols?: string | string[]; + /** + * + */ + type: 'json' | 'binary'; + /** + * Cluster name + */ + cluster?: string; + /** + * Message callback + */ + onMessage: (data: T) => void; + } +) { + const path = [url]; + const protocols = ['base64.binary.k8s.io', ...(moreProtocols ?? [])]; + + const token = getToken(cluster); + if (token) { + const encodedToken = btoa(token).replace(/=/g, ''); + protocols.push(`base64url.bearer.authorization.k8s.io.${encodedToken}`); + } + + if (cluster) { + path.unshift('clusters', cluster); + + try { + const kubeconfig = await findKubeconfigByClusterName(cluster); + + if (kubeconfig !== null) { + const userID = getUserIdFromLocalStorage(); + protocols.push(`base64url.headlamp.authorization.k8s.io.${userID}`); + } + } catch (error) { + console.error('Error while finding kubeconfig:', error); + } + } + + const socket = new WebSocket(makeUrl([BASE_WS_URL, ...path], {}), protocols); + socket.binaryType = 'arraybuffer'; + socket.addEventListener('message', (body: MessageEvent) => { + const data = type === 'json' ? JSON.parse(body.data) : body.data; + onMessage(data); + }); + socket.addEventListener('error', error => { + console.error('WebSocket error:', error); + }); + + return socket; +} + +/** + * Creates or joins mutiple existing WebSocket connections + * + * @param url - endpoint URL + * @param options - WebSocket options + */ +export function useWebSockets<T>({ + connections, + enabled = true, + protocols, + type = 'json', +}: { + enabled?: boolean; + /** Make sure that connections value is stable between renders */ + connections: Array<WebSocketConnectionRequest<T>>; + /** + * Any additional protocols to include in WebSocket connection + * make sure that the value is stable between renders + */ + protocols?: string | string[]; + /** + * Type of websocket data + */ + type?: 'json' | 'binary'; +}) { + useEffect(() => { + if (!enabled) return; + + let isCurrent = true; + + /** Open a connection to websocket */ + function connect({ cluster, url, onMessage }: WebSocketConnectionRequest<T>) { + const connectionKey = cluster + url; + + if (!sockets.has(connectionKey)) { + // Add new listener for this URL + listeners.set(connectionKey, [...(listeners.get(connectionKey) ?? []), onMessage]); + + // Mark socket as pending, so we don't open more than one + sockets.set(connectionKey, 'pending'); + + let ws: WebSocket | undefined; + openWebSocket(url, { protocols, type, cluster, onMessage }) + .then(socket => { + ws = socket; + + // Hook was unmounted while it was connecting to WebSocket + // so we close the socket and clean up + if (!isCurrent) { + ws.close(); + sockets.delete(connectionKey); + return; + } + + sockets.set(connectionKey, ws); + }) + .catch(err => { + console.error(err); + }); + } + + return () => { + const connectionKey = cluster + url; + + // Clean up the listener + const newListeners = listeners.get(connectionKey)?.filter(it => it !== onMessage) ?? []; + listeners.set(connectionKey, newListeners); + + // No one is listening to the connection + // so we can close it + if (newListeners.length === 0) { + const maybeExisting = sockets.get(connectionKey); + if (maybeExisting) { + if (maybeExisting !== 'pending') { + maybeExisting.close(); + } + sockets.delete(connectionKey); + } + } + }; + } + + const disconnectCallbacks = connections.map(endpoint => connect(endpoint)); + + return () => { + isCurrent = false; + disconnectCallbacks.forEach(fn => fn()); + }; + }, [enabled, type, connections, protocols]); +} + /** * Type guard to check if a message is a valid Kubernetes list update event * @param data - The data to check diff --git a/frontend/src/plugin/__snapshots__/pluginLib.snapshot b/frontend/src/plugin/__snapshots__/pluginLib.snapshot index 7bb73c5a37..f19d47969a 100644 --- a/frontend/src/plugin/__snapshots__/pluginLib.snapshot +++ b/frontend/src/plugin/__snapshots__/pluginLib.snapshot @@ -2073,7 +2073,7 @@ "defaultDefaultValue": false, "defaultValueSource": undefined, "description": "Controls whether the Find Widget should read or modify the shared find clipboard on macOS.", - "included": false, + "included": true, "restricted": false, "scope": 5, "source": undefined, @@ -2209,12 +2209,12 @@ }, }, "fontFamily": EditorStringOption { - "defaultValue": "'Droid Sans Mono', 'monospace', monospace", + "defaultValue": "Menlo, Monaco, 'Courier New', monospace", "id": 49, "name": "fontFamily", "schema": { - "default": "'Droid Sans Mono', 'monospace', monospace", - "defaultDefaultValue": "'Droid Sans Mono', 'monospace', monospace", + "default": "Menlo, Monaco, 'Courier New', monospace", + "defaultDefaultValue": "Menlo, Monaco, 'Courier New', monospace", "defaultValueSource": undefined, "description": "Controls the font family.", "restricted": false, @@ -2254,12 +2254,12 @@ }, }, "fontSize": EditorFontSize { - "defaultValue": 14, + "defaultValue": 12, "id": 52, "name": "fontSize", "schema": { - "default": 14, - "defaultDefaultValue": 14, + "default": 12, + "defaultDefaultValue": 12, "defaultValueSource": undefined, "description": "Controls the font size in pixels.", "maximum": 100, @@ -2740,8 +2740,8 @@ ], "markdownEnumDescriptions": [ "Inlay hints are enabled", - "Inlay hints are showing by default and hide when holding Ctrl+Alt", - "Inlay hints are hidden by default and show when holding Ctrl+Alt", + "Inlay hints are showing by default and hide when holding Ctrl+Option", + "Inlay hints are hidden by default and show when holding Ctrl+Option", "Inlay hints are disabled", ], "restricted": false, @@ -3284,7 +3284,7 @@ "default": false, "defaultDefaultValue": false, "defaultValueSource": undefined, - "markdownDescription": "Zoom the font of the editor when using mouse wheel and holding `Ctrl`.", + "markdownDescription": "Zoom the font of the editor when using mouse wheel and holding `Cmd`.", "restricted": false, "scope": 5, "source": undefined, @@ -3739,12 +3739,12 @@ "on", "dimmed", ], - "defaultValue": "dimmed", + "defaultValue": "on", "id": 96, "name": "renderFinalNewline", "schema": { - "default": "dimmed", - "defaultDefaultValue": "dimmed", + "default": "on", + "defaultDefaultValue": "on", "defaultValueSource": undefined, "description": "Render last line number when the file ends with a newline.", "enum": [ @@ -4099,7 +4099,7 @@ "defaultDefaultValue": true, "defaultValueSource": undefined, "description": "Controls whether the Linux primary clipboard should be supported.", - "included": true, + "included": false, "restricted": false, "scope": 5, "source": undefined,