diff --git a/packages/kubernetes-api/src/client/client-factory.ts b/packages/kubernetes-api/src/client/client-factory.ts index 72fafcb6..d72b0d1d 100644 --- a/packages/kubernetes-api/src/client/client-factory.ts +++ b/packages/kubernetes-api/src/client/client-factory.ts @@ -1,7 +1,7 @@ import { KubeObject } from '../globals' import { ClientInstance } from './client-instance' import { CollectionImpl } from './collection' -import { log, Collection, KOptions, ClientFactory, ProcessDataCallback } from './globals' +import { log, Watched, KOptions, ClientFactory, ProcessDataCallback } from './globals' import { getKey } from './support' /* @@ -10,8 +10,8 @@ import { getKey } from './support' export class ClientFactoryImpl implements ClientFactory { private _clients: Record = {} - create(options: KOptions): Collection { - const key = getKey(options.kind, options.namespace, options.continueRef) + create(options: KOptions): Watched { + const key = getKey(options.kind, options.namespace, options.name) if (this._clients[key]) { const client = this._clients[key] as ClientInstance client.addRef() @@ -26,18 +26,11 @@ export class ClientFactoryImpl implements ClientFactory { } } - destroy(client: Collection, ...handles: Array>) { + destroy(client: Watched, ...handles: Array>) { handles.forEach(handle => { client.unwatch(handle) }) const key = client.getKey() - - console.log(`XXX destroying client with key: ${key}`) - console.log(`XXX Number of clients: `, Object.getOwnPropertyNames(this._clients).length) - for (const key of Object.getOwnPropertyNames(this._clients)) { - console.log(`XXX client keys: ${key}`) - } - if (this._clients[key]) { const c = this._clients[key] as ClientInstance c.removeRef() @@ -46,14 +39,8 @@ export class ClientFactoryImpl implements ClientFactory { delete this._clients[key] c.destroy() log.debug('Destroyed client for key:', key) - } else { - console.log('Not destroying client because key ref count too high: ', key, c.refCount) } } - - for (const key of Object.getOwnPropertyNames(this._clients)) { - console.log(`XXX Remaining client keys: ${key}`) - } } } diff --git a/packages/kubernetes-api/src/client/client-instance.ts b/packages/kubernetes-api/src/client/client-instance.ts index fb18ac1e..b2f30437 100644 --- a/packages/kubernetes-api/src/client/client-instance.ts +++ b/packages/kubernetes-api/src/client/client-instance.ts @@ -1,15 +1,15 @@ import { KubeObject } from '../globals' -import { Collection } from './globals' +import { Watched } from './globals' /* * Manages references to collection instances to allow them to be shared between views */ export class ClientInstance { private _refCount = 0 - private _collection: Collection + private _watched: Watched - constructor(_collection: Collection) { - this._collection = _collection + constructor(_watched: Watched) { + this._watched = _watched } get refCount() { @@ -25,7 +25,7 @@ export class ClientInstance { } get collection() { - return this._collection + return this._watched } disposable() { @@ -33,7 +33,7 @@ export class ClientInstance { } destroy() { - this._collection.destroy() - // delete this._collection + this._watched.destroy() + // delete this._watched } } diff --git a/packages/kubernetes-api/src/client/collection.ts b/packages/kubernetes-api/src/client/collection.ts index 7c88a7eb..55b8f221 100644 --- a/packages/kubernetes-api/src/client/collection.ts +++ b/packages/kubernetes-api/src/client/collection.ts @@ -1,5 +1,5 @@ import { ServiceSpec } from 'kubernetes-types/core/v1' -import { K8S_EXT_PREFIX, KubeSearchMetadata, KubeObject } from '../globals' +import { K8S_EXT_PREFIX, KubeObject } from '../globals' import {} from '../kubernetes-service' import { fetchPath, FetchPathCallback, isFunction, joinPaths } from '../utils' import { getClusterIP, getName, getNamespace, namespaced, prefixForKind, toCollectionName, wsUrl } from '../helpers' @@ -8,13 +8,13 @@ import { k8Api } from '../init' import { log, UNKNOWN_VALUE, - Collection, KOptions, ObjectList, WSHandler, ProcessDataCallback, ErrorDataCallback, POLLING_INTERVAL, + Watched, } from './globals' import { ObjectListImpl } from './object-list' import { WSHandlerImpl } from './ws-handler' @@ -23,11 +23,10 @@ import { getKey } from './support' /* * Implements the external API for working with k8s collections of objects */ -export class CollectionImpl implements Collection { +export class CollectionImpl implements Watched { private _namespace?: string private _path: string private _apiVersion: string - private metadata: KubeSearchMetadata private list: ObjectList private handler: WSHandler private _isOpenshift: boolean @@ -41,6 +40,7 @@ export class CollectionImpl implements Collection { const pref = this.getPrefix() + // /apis/apps/v1/namespaces/{namespace}/pods/{name} if (this._namespace) { this._path = joinPaths(pref, 'namespaces', this._namespace, this.kind) } else { @@ -52,9 +52,6 @@ export class CollectionImpl implements Collection { this.handler = new WSHandlerImpl(this) const list = (this.list = new ObjectListImpl(_options.kind, _options.namespace)) this.handler.list = list - - const metadata = (this.metadata = { count: -1 }) - this.handler.metadata = metadata } get oAuthToken(): string { @@ -78,18 +75,19 @@ export class CollectionImpl implements Collection { url = new URL(joinPaths(k8Api.masterUri(), this._path)) } - if (this.options.nsLimit !== undefined) { - url.searchParams.append('limit', `${this.options.nsLimit}`) - } - - if (this.options.continueRef !== undefined) { - url.searchParams.append('continue', this.options.continueRef) - } - if (this.options.labelSelector) { url.searchParams.append('labelSelector', this.options.labelSelector) } + if (this.options.name) { + /* + * Must add fieldSelector to pin it to a singular entity. + * Has to be done this way to allow for watch parameter to be added. + * See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#read-operations-pod-v1-core + */ + url.searchParams.append('fieldSelector', `metadata.name=${this.options.name}`) + } + return url } @@ -118,6 +116,15 @@ export class CollectionImpl implements Collection { url = wsUrl(urlStr) } + if (this.options.name) { + /* + * Must add fieldSelector to pin it to the singular entity. + * Has to be done this way to allow for watch parameter to be added. + * See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#read-operations-pod-v1-core + */ + url.searchParams.append('fieldSelector', `metadata.name=${this.options.name}`) + } + url.searchParams.append('watch', 'true') if (this.options.labelSelector) { @@ -127,7 +134,7 @@ export class CollectionImpl implements Collection { } getKey() { - return getKey(this.kind, this._namespace, this._options.continueRef) + return getKey(this.kind, this._namespace, this._options.name) } get wsURL() { @@ -170,7 +177,7 @@ export class CollectionImpl implements Collection { this.list.doOnce(WatchActions.INIT, cb) } else { setTimeout(() => { - cb(this.list.objects, this.metadata) + cb(this.list.objects) }, 10) } } @@ -229,15 +236,15 @@ export class CollectionImpl implements Collection { if (this.list.initialized) { setTimeout(() => { log.debug(this.kind, 'passing existing objects:', this.list.objects) - cb(this.list.objects, this.metadata) + cb(this.list.objects) }, POLLING_INTERVAL) } log.debug(this.kind, 'adding watch callback:', cb) - this.list.doOn(WatchActions.ANY, ((data: T[]) => { + this.list.doOn(WatchActions.ANY, (data: T[]) => { log.debug(this.kind, 'got data:', data) - cb(data, this.metadata) - })) + cb(data) + }) return cb } @@ -287,7 +294,7 @@ export class CollectionImpl implements Collection { success: data => { try { const response = JSON.parse(data) - cb(response, this.metadata) + cb(response) } catch (err) { log.error(err) if (error && err instanceof Error) { @@ -330,7 +337,7 @@ export class CollectionImpl implements Collection { success: data => { try { const response = JSON.parse(data) - cb(response, this.metadata) + cb(response) } catch (err) { log.error(err) if (error && err instanceof Error) { diff --git a/packages/kubernetes-api/src/client/globals.ts b/packages/kubernetes-api/src/client/globals.ts index 7a429bd5..b97feb1c 100644 --- a/packages/kubernetes-api/src/client/globals.ts +++ b/packages/kubernetes-api/src/client/globals.ts @@ -10,6 +10,7 @@ export const pollingOnly = [WatchTypes.IMAGE_STREAM_TAGS] export const POLLING_INTERVAL = 60000 export const UNKNOWN_VALUE = '' +export const UNKNOWN_NAME_VALUE = 'unknown' export const NO_KIND = 'No kind in supplied options' export const NO_OBJECT = 'No object in supplied options' export const NO_OBJECTS = 'No objects in list object' @@ -22,6 +23,7 @@ export interface CompareResult { export interface KOptions extends Record { kind: string + name?: string namespace?: string apiVersion?: string labelSelector?: string @@ -30,14 +32,13 @@ export interface KOptions extends Record { error?: ErrorDataCallback urlFunction?: (options: KOptions) => string nsLimit?: number - continueRef?: string } export type ProcessDataCallback = (data: T[], metadata?: KubeSearchMetadata) => void export type ErrorDataCallback = (err: Error, response?: SimpleResponse) => void -export interface Collection { +export interface Watched { options: KOptions kind: string wsURL: string @@ -74,9 +75,8 @@ export interface ObjectList { export interface WSHandler { connected: boolean kind: string - metadata: { count: number } list: ObjectList - collection: Collection + watched: Watched error: ErrorDataCallback | undefined connect(): void onOpen(event: Event): void @@ -87,6 +87,6 @@ export interface WSHandler { } export interface ClientFactory { - create(options: KOptions, namespace?: string): Collection - destroy(client: Collection, ...handles: Array>): void + create(options: KOptions, namespace?: string): Watched + destroy(client: Watched, ...handles: Array>): void } diff --git a/packages/kubernetes-api/src/client/namespace-client.ts b/packages/kubernetes-api/src/client/namespace-client.ts index 7397abd9..974410bc 100644 --- a/packages/kubernetes-api/src/client/namespace-client.ts +++ b/packages/kubernetes-api/src/client/namespace-client.ts @@ -1,199 +1,218 @@ import jsonpath from 'jsonpath' -import { JOLOKIA_PORT_QUERY, KubeObject, KubePod, PagingMetadata } from "../globals" +import { JOLOKIA_PORT_QUERY, KubeObject, KubePod, Paging } from "../globals" import { WatchTypes } from "../model" -import { SimpleResponse, isObject, isString } from "../utils" -import { PagingMetadataImpl } from '../paging-metadata-impl' -import { Collection, KOptions, ProcessDataCallback } from './globals' +import { isObject } from "../utils" +import { Watched, KOptions, ProcessDataCallback } from './globals' import { clientFactory } from './client-factory' -export type NamespaceClientCallback = (jolokiaPods: KubePod[], pagingMetadata: PagingMetadata, error?: Error) => void +export type NamespaceClientCallback = (jolokiaPods: KubePod[], error?: Error) => void export interface Client { - collection: Collection + watched: Watched watch: ProcessDataCallback } -interface PodsResult { - id: number - podsClient: Client - jolokiaPods: KubePod[] +interface PodWatcher { + client: Client + jolokiaPod: KubePod | undefined } -interface PodsResults { - [key: string]: PodsResult +interface PodWatchers { + [key: string]: PodWatcher } -export class NamespaceClient { - private _namespace - private _limit - private _pagingMetadata: PagingMetadataImpl - private _podsResults: PodsResults = {} - private _callback: NamespaceClientCallback - - constructor(namespace: string, limit: number, callback: NamespaceClientCallback, pagingMetadata?: PagingMetadata) { - this._namespace = namespace - this._limit = limit - this._callback = callback - if (isObject(pagingMetadata)) { - this._pagingMetadata = pagingMetadata as PagingMetadataImpl - /* - * 2 use-cases: - * - 1. First Execution of pods in namespace so current is uninitialized - * - 2. PagingRef returned to by using Prev/Next so need to refresh except - * for continueRef - */ - this._pagingMetadata.refresh() +export class NamespaceClient implements Paging { - } else { - // pagingMetadata never been defined for the namespace - this._pagingMetadata = new PagingMetadataImpl() - } + private _current = 0 + private _podList: Set = new Set() + private _podWatchers: PodWatchers = {} + private _nsWatcher?: Client + private _refreshing = 0 + + constructor(private _namespace: string, private _limit: number, private _callback: NamespaceClientCallback) { } - /* - * Updates the paging metadata with the results of the pod search - */ - private updatePaging(iteration: number, jolokiaPods: KubePod[], limit: number, continueRef?: string): string|undefined { - if (! this._pagingMetadata.pageSelected()) { - // current page not yet defined in pagingMetadata (1st execution) - // Add paging reference for the current query of pods - return this._pagingMetadata.addPage(jolokiaPods.length, limit, continueRef) + private handleError(error: Error, name?: string) { + let cbError = error + if (name) { + cbError = new Error(`Failed to connect to pod ${name}`) + cbError.cause = error } - // current page already defined so update with latest information + this._callback(this.getJolokiaPods(), cbError) + } - if (iteration === 0) { - // As the 1st iteration, store the count of the number of pods - this._pagingMetadata.setCount(jolokiaPods.length) + private initPodOptions(kind: string, name?: string): KOptions { + const podOptions: KOptions = { + kind: kind, + name: name, + namespace: this._namespace, + error: (err: Error) => { this.handleError(err, name)} } - return this._pagingMetadata.resolveContinue(jolokiaPods.length, limit, continueRef) + return podOptions } - private handleError(iteration: number, error: Error, response: SimpleResponse|undefined) { - console.log('Error: pods_watch error', error) - - if (isObject(response) && response.status === 410) { - console.log('Renewing gone connection 410: ', iteration) - // Need to renew the continueRef property given the continue in the response - - if (iteration === 0 && this._pagingMetadata.hasContinue()) { - console.log('Refreshing iteration 0') - // The first execution used a continue ref that is now invalid - let continueRef - if (response.data) { - const dataObj = JSON.parse(response.data) - continueRef = dataObj?.metadata?.continue - } + private createPodWatchers() { + if (this._podList.size === 0 || this._current >= this._podList.size) + return - console.log('Old continueRef: ', this._pagingMetadata.continue()) - console.log('New continueRef: ', continueRef) - this._pagingMetadata.setContinue(continueRef) + const podNames = Array.from(this._podList).sort().slice(this._current, (this._current + this._limit)) + + // Remove watchers for pods not in the slice of the sorted list + Object.entries(this._podWatchers) + .filter(([name, _]) => { return (! podNames.includes(name)) }) + .forEach(([name, podWatcher]) => { + clientFactory.destroy(podWatcher.client.watched, podWatcher.client.watch) + delete this._podWatchers[name] + }) + + this._refreshing = podNames.length + podNames.forEach(name => { + // Already watching this pod + if (isObject(this._podWatchers[name])) { + this._refreshing-- + return } - // Destroy the pod results - this.destroy() - // Refresh the metadata - this._pagingMetadata.refresh() - // Restart the execution - this.execute() - } else { - // Some other error has occurred - this._callback([], this._pagingMetadata, error) - - // Destroy the polling connections - this.destroy() - } - } + // Set up new watcher for this pod + const _podClient = clientFactory.create(this.initPodOptions(WatchTypes.PODS, name)) + const _podWatcher = _podClient.watch((podList) => { + if (this._refreshing > 0) this._refreshing-- - initPodOptions(iteration: number, limit: number, continueRef: string|undefined): KOptions { - const podOptions: KOptions = { - kind: WatchTypes.PODS, - namespace: this._namespace, - nsLimit: limit, - error: (err: Error, response?: SimpleResponse) => { this.handleError(iteration, err, response)} - } + if (podList.length === 0) return - /* - * See if a continue reference is needed and add to podOptions - * The query requires a continue reference in order to start the search - * at the correct index of the pod list - */ - if (isString(continueRef) && continueRef.length > 0) - podOptions.continueRef = continueRef + // podList should only contain 1 pod (due to name) + this._podWatchers[name].jolokiaPod = podList[0] - return podOptions - } + if (this._refreshing === 0) { + // Limit callback to final watch returning + this._callback(this.getJolokiaPods()) + } + }) - private executeInternal(iteration: number, limit: number, podOptions: KOptions) { + /* + * Pod is part of the current page so connect its pod watcher + */ + _podClient.connect() + + this._podWatchers[name] = { + client: { + watch: _podWatcher, + watched: _podClient + }, + jolokiaPod: undefined + } + }) + } - // Query the namespace for pods - const pods_client = clientFactory.create(podOptions) - const pods_watch = pods_client.watch((pods, resultMetadata) => { - const jolokiaPods = pods.filter(pod => jsonpath.query(pod, JOLOKIA_PORT_QUERY).length > 0) + isConnected(): boolean { + return isObject(this._nsWatcher) && this._nsWatcher.watched.connected + } - // Add the found jolokia pods - const podResult = this._podsResults[iteration.toString()] - if (isObject(podResult)) - podResult.jolokiaPods = [...jolokiaPods] + connect() { + if (this.isConnected()) return + const _nsClient = clientFactory.create(this.initPodOptions(WatchTypes.PODS)) + const _nsWatch = _nsClient.watch((pods) => { /* - * If a continueRef is returned then it means that - * not enough jolokia pods were found to match the limit but - * that there are more pods available to be fetched and tested. - * So prepare another execution to fetch another set of pods + * Filter out any non-jolokia pods immediately and add + * the applicable pods to the pod name list */ - const continueRef = this.updatePaging(iteration, jolokiaPods, limit, resultMetadata?.continue) - if (isString(continueRef)) { - const newLimit = limit - jolokiaPods.length - const newOptions = this.initPodOptions((iteration + 1), newLimit, continueRef) - this.executeInternal((iteration + 1), newLimit, newOptions) - } else { - /* - * Execution is completed so return the callback. - * Should be called just once from the main execute call and not - * any subsequent chained executions - */ - this._callback(this.getJolokiaPods(), this._pagingMetadata) - } + const podNames: string[] = [] + pods + .filter(pod => jsonpath.query(pod, JOLOKIA_PORT_QUERY).length > 0) + .forEach(pod => { + const name = pod.metadata?.name || undefined + if (! name) return + + podNames.push(name) + }) + + // Initialise the sorted set list of pod names + this._podList = new Set(podNames.sort()) + + // Create the first set of pod watchers + this.createPodWatchers() }) - pods_client.connect() + /* + * Track any changes to the namespace + * Deleted pods will be removed from the pod list and pod watchers disposed of + * Added pods will be inserted into the pod list and pod watchers will be created + * when the page they appear in is required + */ + _nsClient.connect() - this._podsResults[iteration.toString()] = { - id: iteration, - podsClient: { - collection: pods_client, - watch: pods_watch, - }, - jolokiaPods: [] + this._nsWatcher = { + watch: _nsWatch, + watched: _nsClient } } - execute() { - const podOptions = this.initPodOptions(0, this._limit, this._pagingMetadata.continueRef()) - this.executeInternal(0, this._limit, podOptions) - } - + /* + * Collection of jolokia pods returned is an aggregate of the + * pods currently been watched by the pod watchers + */ getJolokiaPods() { const pods: KubePod[] = [] - for (const pr of Object.values(this._podsResults)) { - pods.push(...pr.jolokiaPods) + for (const pw of Object.values(this._podWatchers)) { + if (! pw.jolokiaPod) continue + + pods.push(pw.jolokiaPod) } return pods } destroy() { - if (Object.values(this._podsResults).length === 0) - return + if (isObject(this._nsWatcher)) { + clientFactory.destroy(this._nsWatcher.watched, this._nsWatcher.watch) + delete this._nsWatcher + } - for (const pr of Object.values(this._podsResults)) { - const pods_client = pr.podsClient - clientFactory.destroy(pods_client.collection, pods_client.watch) - pr.jolokiaPods = [] + for (const pr of Object.values(this._podWatchers)) { + const pods_client = pr.client + clientFactory.destroy(pods_client.watched, pods_client.watch) + delete pr.jolokiaPod } + this._podWatchers = {} + } - this._podsResults = {} + hasPrevious(): boolean { + return this._current > 0 + } + + previous() { + if (this._current === 0) return + + // Ensure current never goes below 0 + this._current = Math.max(this._current - this._limit, 0) + + /* + * If already connected then recreate the pod watchers + * according to the new position of _current + */ + if (this.isConnected()) + this.createPodWatchers() + } + + hasNext(): boolean { + const nextPage = this._current + this._limit + return nextPage < this._podList.size + } + + next() { + const nextPage = this._current + this._limit + if (nextPage >= this._podList.size) return + + this._current = nextPage + + /* + * If already connected then recreate the pod watchers + * according to the new position of _current + */ + if (this.isConnected()) + this.createPodWatchers() } } diff --git a/packages/kubernetes-api/src/client/object-list.ts b/packages/kubernetes-api/src/client/object-list.ts index 2db9bddf..78b63646 100644 --- a/packages/kubernetes-api/src/client/object-list.ts +++ b/packages/kubernetes-api/src/client/object-list.ts @@ -7,7 +7,7 @@ import { debounce } from '../utils' import { log, ObjectList, ProcessDataCallback } from './globals' /** - * Manages the array of k8s objects for a client instance + * Manages the k8s object(s) for a client instance **/ export class ObjectListImpl extends EventEmitter implements ObjectList { triggerChangedEvent = debounce(() => { diff --git a/packages/kubernetes-api/src/client/object-poller.ts b/packages/kubernetes-api/src/client/object-poller.ts index 1e0d24d2..974d8b7f 100644 --- a/packages/kubernetes-api/src/client/object-poller.ts +++ b/packages/kubernetes-api/src/client/object-poller.ts @@ -1,7 +1,7 @@ -import { KubeObject, KubeObjectList } from '../globals' -import { fetchPath, SimpleResponse } from '../utils' +import { KubeObject } from '../globals' +import { fetchPath, hasProperty, SimpleResponse } from '../utils' import { log, WSHandler, POLLING_INTERVAL } from './globals' -import { compare } from './support' +import { compare, isKubeObject } from './support' /* * Manages polling the server for objects that don't support websocket connections @@ -35,8 +35,14 @@ export class ObjectPoller { return } - const kObjList: KubeObjectList = JSON.parse(data) - const items = kObjList && kObjList.items ? kObjList.items : [] + const objectOrList = JSON.parse(data) + const items: T[] = [] + if (hasProperty(objectOrList, 'items')) { + items.push(...objectOrList.items) + } else if (isKubeObject(objectOrList)) { + items.push(objectOrList as T) + } + const result = compare(this._lastFetch, items) this._lastFetch = items @@ -45,9 +51,6 @@ export class ObjectPoller { const event = { data: JSON.stringify({ type: action.toUpperCase(), - metadata: { - continue: kObjList.metadata.continue - }, object: item, }), } diff --git a/packages/kubernetes-api/src/client/support.ts b/packages/kubernetes-api/src/client/support.ts index 0153950d..c420de60 100644 --- a/packages/kubernetes-api/src/client/support.ts +++ b/packages/kubernetes-api/src/client/support.ts @@ -2,13 +2,23 @@ import { KubeObject } from '../globals' import { CompareResult } from './globals' import { equals } from '../helpers' -export function getKey(kind: string, namespace?: string, continueRef?: string) { +export function getKey(kind: string, namespace?: string, name?: string) { let key: string = kind key = namespace ? namespace + '-' + key : key - key = continueRef ? continueRef + '-' + key : key + key = name ? name + '-' + key : key return key } +export function isKubeObject(obj: unknown): obj is KubeObject { + if (!obj) return false + + return ( + (obj as KubeObject).kind !== undefined && + (obj as KubeObject).metadata !== undefined && + (obj as KubeObject).spec !== undefined + ) +} + export function compare(old: KubeObject[], _new: KubeObject[]): CompareResult { const answer = { added: [], diff --git a/packages/kubernetes-api/src/client/ws-handler.ts b/packages/kubernetes-api/src/client/ws-handler.ts index 59edf369..c685509f 100644 --- a/packages/kubernetes-api/src/client/ws-handler.ts +++ b/packages/kubernetes-api/src/client/ws-handler.ts @@ -1,6 +1,6 @@ -import { KubeSearchMetadata, KubeObject } from '../globals' -import { isFunction, isObject } from '../utils' -import { Collection, ErrorDataCallback, log, ObjectList, WSHandler } from './globals' +import { KubeObject } from '../globals' +import { SimpleResponse, fetchPath, hasProperty, isFunction, isString } from '../utils' +import { Watched, ErrorDataCallback, log, ObjectList, pollingOnly, WSHandler } from './globals' import { ObjectListImpl } from './object-list' import { ObjectPoller } from './object-poller' @@ -8,15 +8,17 @@ import { ObjectPoller } from './object-poller' * Manages the polling connection to the backend and passes events to the ObjectListImpl */ export class WSHandlerImpl implements WSHandler { - private _collection: Collection + private _watched: Watched private _list?: ObjectList - private _metadata?: KubeSearchMetadata + private retries = 0 + private connectTime = 0 + private socket?: WebSocket private poller?: ObjectPoller private destroyed = false - constructor(collection: Collection) { - this._collection = collection + constructor(collection: Watched) { + this._watched = collection } set list(_list: ObjectList) { @@ -27,28 +29,102 @@ export class WSHandlerImpl implements WSHandler { return this._list || new ObjectListImpl() } - set metadata(_metadata: KubeSearchMetadata) { - this._metadata = _metadata + get watched() { + return this._watched } - get metadata(): KubeSearchMetadata { - return this._metadata || { count: 0 } + get error(): ErrorDataCallback | undefined { + return this._watched.options.error } - get collection() { - return this._collection + get kind() { + return this._watched.kind } - get error(): ErrorDataCallback | undefined { - return this._collection.options.error + private createWebSocket(url: string): WebSocket { + /* + * Pass the bearer token via WebSocket sub-protocol + * An extra sub-protocol is required along with the authentication one, that gets removed + * See https://github.com/kubernetes/kubernetes/commit/714f97d7baf4975ad3aa47735a868a81a984d1f0 + * (Update 2023: this commit is from 2017 but still holds good) + */ + const token = this.watched.oAuthToken + const bearerProtocol = `base64url.bearer.authorization.k8s.io.${btoa(token).replace(/=/g, '')}` + + /* + * The binary protocol is required for correct authentication. + * Otherwise, connection fails with a 400 or 401 authentication error + */ + const protocols = ['base64.binary.k8s.io', bearerProtocol] + + return new WebSocket(url, protocols) } - get kind() { - return this._collection.kind + private setHandlers(self: WSHandler, ws: WebSocket) { + log.debug("Adding WebSocket event handler for 'open'") + ws.addEventListener('open', (event: Event) => self.onOpen(event)) + + log.debug("Adding WebSocket event handler for 'message'") + ws.addEventListener('message', (event: MessageEvent) => { + if (!event.origin || event.origin.length === 0) { + log.warn('Ignoring WebSocket message as origin is not defined') + return + } + + try { + const originUrl = new URL(event.origin) + if (!window.location || window.location.hostname !== originUrl.hostname) { + log.warn('The origin of the WebSocket message is not recognized') + return + } + } catch (error) { + log.warn('The origin of the WebSocket message is invalid', error) + return + } + + self.onMessage(event) + }) + + log.debug("Adding WebSocket event handler for 'close'") + ws.addEventListener('close', (event: CloseEvent) => self.onClose(event)) + + log.debug("Adding WebSocket event handler for 'error'") + ws.addEventListener('error', (event: Event) => self.onError(event)) + } + + send(data: string | KubeObject) { + if (!isString(data)) { + data = JSON.stringify(data) + } + + if (this.socket) this.socket.send(data) + } + + shouldClose(event: Event): boolean { + if (this.destroyed && this.socket && this.socket.readyState === WebSocket.OPEN) { + log.debug( + 'Connection destroyed but still receiving messages, closing websocket, kind:', + this.watched.kind, + 'namespace:', + this.watched.namespace, + ) + try { + log.debug('Closing websocket for kind:', this.watched.kind) + this.socket.close() + } catch (err) { + // nothing to do, assume it's already closed + } + return true + } + return false } onMessage(event: MessageEvent | { data: string }) { - log.debug('Receiving message from polling: ', event) + log.debug('Receiving message from web socket: ', event) + if (event instanceof MessageEvent && this.shouldClose(event)) { + log.debug('Should be closed!') + return + } if (!this.list) { log.debug('Cannot onmessage as no object list') return @@ -58,31 +134,58 @@ export class WSHandlerImpl implements WSHandler { const eventType: keyof ObjectList = data.type.toLowerCase() if (eventType !== 'added' && eventType !== 'modified' && eventType !== 'deleted') return - this.metadata.continue = data.metadata?.continue ?? undefined - if (isFunction(this.list[eventType])) this.list[eventType](data.object) else log.debug(`Property ${data.object} is not a function`) } onOpen(event: Event) { - log.debug('Received open event for kind:', this.collection.kind, 'namespace:', this.collection.namespace) + log.debug('Received open event for kind:', this.watched.kind, 'namespace:', this.watched.namespace) + if (this.shouldClose(event)) { + return + } + this.retries = 0 + this.connectTime = new Date().getTime() } onClose(event: CloseEvent) { - log.debug('Received close event for kind:', this.collection.kind, 'namespace:', this.collection.namespace) + log.debug('Received close event for kind:', this.watched.kind, 'namespace:', this.watched.namespace) if (this.destroyed) { - log.debug('polling destroyed for kind:', this.collection.kind, 'namespace:', this.collection.namespace) - // delete this.socket + log.debug('websocket destroyed for kind:', this.watched.kind, 'namespace:', this.watched.namespace) + delete this.socket return } + if (this.retries < 3 && this.connectTime && new Date().getTime() - this.connectTime > 5000) { + setTimeout(() => { + log.debug('Retrying after connection closed:', event) + this.retries = this.retries + 1 + log.debug('watch ', this.watched.kind, 'disconnected, retry #', this.retries) + const ws = this.createWebSocket(this.watched.wsURL) + this.setHandlers(this, ws) + }, 5000) + } else { + log.debug('websocket for ', this.watched.kind, 'closed, event:', event) + if (!event.wasClean) { + log.debug('Switching to polling mode') + delete this.socket + this.poller = new ObjectPoller(this.watched.restURL, this) + this.poller.start() + } + } } onError(event: Event) { - log.debug('polling for kind:', this.collection.kind, 'received an error:', event) + log.debug('websocket for kind:', this.watched.kind, 'received an error:', event) + if (this.shouldClose(event)) { + return + } } get connected(): boolean { - return isObject(this.poller) && this.poller.connected + if (this.socket && this.socket.readyState === WebSocket.OPEN) return true + + if (this.poller && this.poller.connected) return true + + return false } connect() { @@ -93,24 +196,83 @@ export class WSHandlerImpl implements WSHandler { } // in case a custom URL is going to be used - if (this.collection.restURL === '') { + if (this.watched.restURL === '' && this.watched.wsURL === '') { setTimeout(() => { this.connect() }, 500) return } - if (!this.poller) { - log.info('Using polling for kind:', this.collection.kind) - this.poller = new ObjectPoller(this.collection.restURL, this) - this.poller.start() + if (!this.socket && !this.poller) { + if (pollingOnly.some(kind => kind === this.watched.kind)) { + log.info('Using polling for kind:', this.watched.kind) + this.poller = new ObjectPoller(this.watched.restURL, this) + this.poller.start() + } else { + const doConnect = () => { + const wsURL = this.watched.wsURL + if (wsURL) { + log.debug('Connecting websocket for kind:', this.watched.kind) + this.socket = this.createWebSocket(wsURL) + this.setHandlers(this, this.socket) + } else { + log.info('No wsURL for kind: ' + this.watched.kind) + } + } + + log.debug('Fetching initial collection of object from web socket: ' + this.watched.restURL) + + fetchPath(this.watched.restURL, { + success: (data: string) => { + const objectOrList = JSON.parse(data) + if (hasProperty(objectOrList, 'items')) { + this.list.objects = objectOrList.items || [] + } else { + this.list.objects = [objectOrList] + } + + setTimeout(() => { + doConnect() + }, 10) + }, + error: (err: Error, response?: SimpleResponse) => { + if (response?.status === 403) { + log.info( + 'Failed to fetch data while connecting to backend for type:', + this.watched.kind, + ', user is not authorized', + ) + this.list.objects = [] + + if (this.error) this.error(err, response) + } else { + log.info('Failed to fetch data while connecting to backend for type:', this.watched.kind, 'error:', err) + + if (this.error) this.error(err, response) + + setTimeout(() => { + doConnect() + }, 10) + } + }, + }) + } } } destroy() { this.destroyed = true + if (this.socket && this.socket.readyState === WebSocket.OPEN) { + try { + log.debug('Closing websocket for kind:', this.watched.kind, 'namespace:', this.watched.namespace) + this.socket.close() + log.debug('Close called on websocket for kind:', this.watched.kind, 'namespace:', this.watched.namespace) + } catch (err) { + // nothing to do, assume it's already closed + } + } if (this.poller) { - log.debug('Destroying poller for kind:', this.collection.kind, 'namespace:', this.collection.namespace) + log.debug('Destroying poller for kind:', this.watched.kind, 'namespace:', this.watched.namespace) this.poller.destroy() } } diff --git a/packages/kubernetes-api/src/globals.ts b/packages/kubernetes-api/src/globals.ts index 47071b78..254e3576 100644 --- a/packages/kubernetes-api/src/globals.ts +++ b/packages/kubernetes-api/src/globals.ts @@ -49,20 +49,13 @@ export type KubePodsOrError = { export type KubePodsByProject = { [key: string]: KubePodsOrError } -export interface KubeSearchMetadata { - count: number - continue?: string +export interface Paging { + hasPrevious: (namespace?: string) => boolean + hasNext: (namespace?: string) => boolean + previous: (namespace?: string) => void + next: (namespace?: string) => void } -export interface PagingMetadata { - hasPrevious: () => boolean, - hasNext: () => boolean, - previous: () => void, - next: () => void -} - -export type KMetadataByProject = { [namespace: string]: PagingMetadata } - /* * States emitted by the Kubernetes Service */ diff --git a/packages/kubernetes-api/src/kubernetes-service.ts b/packages/kubernetes-api/src/kubernetes-service.ts index 52e6e4c5..b902785a 100644 --- a/packages/kubernetes-api/src/kubernetes-service.ts +++ b/packages/kubernetes-api/src/kubernetes-service.ts @@ -11,19 +11,18 @@ import { import { WatchTypes } from './model' import { isError, pathGet } from './utils' import { clientFactory, log, Client, NamespaceClient } from './client' -import { K8Actions, KMetadataByProject, KubePod, KubePodsByProject, KubeProject, PagingMetadata } from './globals' +import { K8Actions, KubePod, KubePodsByProject, KubeProject, Paging } from './globals' import { k8Api } from './init' -export class KubernetesService extends EventEmitter { +export class KubernetesService extends EventEmitter implements Paging { private _loading = 0 private _initialized = false private _error: Error | null = null private _oAuthProfile: UserProfile | null = null private projects: KubeProject[] = [] - private metadataByProject: KMetadataByProject = {} private podsByProject: KubePodsByProject = {} private projects_client: Client | null = null - private namespaceClients: { [namespace: string]: NamespaceClient} = {} + private namespace_clients: { [namespace: string]: NamespaceClient } = {} private _nsLimit = 3 @@ -56,19 +55,11 @@ export class KubernetesService extends EventEmitter { } private initNamespaceClient(namespace: string) { - - /* - * When called for first query pagingMetadata is undefined. - * When called after findNextPods then pagingMetadata is defined and - * current has moved to 'next' page containing a continueRef - */ - const pagingMetadata = this.metadataByProject[namespace] - const cb = (jolokiaPods: KubePod[], pagingMetadata: PagingMetadata, error?: Error) => { - this._loading-- + const cb = (jolokiaPods: KubePod[], error?: Error) => { + this._loading = this._loading > 0 ? this._loading-- : 0 if (isError(error)) { this.podsByProject[namespace] = { pods: [], error: error } - console.log(this.podsByProject[namespace]) this.emit(K8Actions.CHANGED) return } @@ -77,33 +68,17 @@ export class KubernetesService extends EventEmitter { for (const jpod of jolokiaPods) { const pos = projectPods.findIndex(pod => pod.metadata?.uid === jpod.metadata?.uid) if (pos > -1) { - // replace the pod - not sure we need to ...? projectPods.splice(pos, 1) } projectPods.push(jpod) } - this.metadataByProject[namespace] = pagingMetadata - - if (projectPods.length > 0) { - this.podsByProject[namespace] = { pods: projectPods } - } else { - // No jolokia pods but need to check whether we continue to list the namespace - if (! pagingMetadata.hasPrevious() && !pagingMetadata.hasNext()) { - // No other pods to query so okay to delete this namespace - delete this.podsByProject[namespace] - } else { - // This namespace has either previous pods or next pods - // so keep to allow for retrieving other pages - this.podsByProject[namespace] = { pods: [] } - } - } - + this.podsByProject[namespace] = { pods: projectPods } this.emit(K8Actions.CHANGED) } - this.namespaceClients[namespace] = new NamespaceClient(namespace, this._nsLimit, cb, pagingMetadata) - this.namespaceClients[namespace].execute() + this.namespace_clients[namespace] = new NamespaceClient(namespace, this._nsLimit, cb) + this.namespace_clients[namespace].connect() } private initNamespaceConfig(profile: UserProfile) { @@ -142,7 +117,7 @@ export class KubernetesService extends EventEmitter { // handle delete projects filtered = this.projects.filter(project => !projects.some(p => p.metadata?.uid === project.metadata?.uid)) for (const project of filtered) { - this.namespaceClients[project.metadata?.name as string].destroy() + this.namespace_clients[project.metadata?.name as string].destroy() } this.projects.splice(0, this.projects.length) // clear the array @@ -150,7 +125,7 @@ export class KubernetesService extends EventEmitter { this._loading-- }) - this.projects_client = { collection: projects_client, watch: projects_watch } + this.projects_client = { watched: projects_client, watch: projects_watch } projects_client.connect() } @@ -196,11 +171,6 @@ export class KubernetesService extends EventEmitter { return mode === this._oAuthProfile?.metadataValue(HAWTIO_MODE_KEY) } - getMetadata(): KMetadataByProject { - this.checkInitOrError() - return this.metadataByProject - } - getPods(): KubePodsByProject { this.checkInitOrError() return this.podsByProject @@ -219,10 +189,10 @@ export class KubernetesService extends EventEmitter { disconnect() { this.checkInitOrError() if (this.is(HawtioMode.Cluster) && this.projects_client) { - clientFactory.destroy(this.projects_client.collection, this.projects_client.watch) + clientFactory.destroy(this.projects_client.watched, this.projects_client.watch) } - Object.values(this.namespaceClients).forEach(client => { + Object.values(this.namespace_clients).forEach(client => { client.destroy() }) } @@ -309,27 +279,39 @@ export class KubernetesService extends EventEmitter { return reason || 'unknown' } - findPrevPods(namespace: string) { - const namespaceClient = this.namespaceClients[namespace] - if (! namespaceClient) return + hasPrevious(namespace?: string): boolean { + if (!namespace) return false - const pagingMetadata = this.metadataByProject[namespace] - if (!pagingMetadata) return + const namespaceClient = this.namespace_clients[namespace] + if (! namespaceClient) return false - namespaceClient.destroy() - pagingMetadata.previous() - this.initNamespaceClient(namespace) + return this.namespace_clients[namespace].hasPrevious() } - findNextPods(namespace: string) { - const namespaceClient = this.namespaceClients[namespace] + previous(namespace?: string) { + if (!namespace) return + + const namespaceClient = this.namespace_clients[namespace] if (! namespaceClient) return - const pagingMetadata = this.metadataByProject[namespace] - if (!pagingMetadata) return + this.namespace_clients[namespace].previous() + } + + hasNext(namespace?: string): boolean { + if (!namespace) return false - namespaceClient.destroy() - pagingMetadata.next() - this.initNamespaceClient(namespace) + const namespaceClient = this.namespace_clients[namespace] + if (! namespaceClient) return false + + return this.namespace_clients[namespace].hasNext() + } + + next(namespace?: string) { + if (!namespace) return + + const namespaceClient = this.namespace_clients[namespace] + if (! namespaceClient) return + + this.namespace_clients[namespace].next() } } diff --git a/packages/kubernetes-api/src/paging-metadata-impl.ts b/packages/kubernetes-api/src/paging-metadata-impl.ts index f817cfd9..de386be1 100644 --- a/packages/kubernetes-api/src/paging-metadata-impl.ts +++ b/packages/kubernetes-api/src/paging-metadata-impl.ts @@ -1,5 +1,5 @@ -import { PagingMetadata } from "./globals" -import { isObject, isString } from "./utils" +import { PagingMetadata } from './globals' +import { isObject, isString } from './utils' export interface PagingRef { count: number @@ -7,7 +7,7 @@ export interface PagingRef { } export class PagingMetadataImpl implements PagingMetadata { - private _current: number = 0 + private _current = 0 private pagingRefs: PagingRef[] = [] private numPages(): number { @@ -24,49 +24,44 @@ export class PagingMetadataImpl implements PagingMetadata { } hasContinue(): boolean { - if (this.numPages() <= this._current) - return false + if (this.numPages() <= this._current) return false const pagingRef = this.pagingRefs[this._current] return this.isContinueRef(pagingRef.continue) } - continue(): string|undefined { - if (this.numPages() <= this._current) - return undefined + continue(): string | undefined { + if (this.numPages() <= this._current) return undefined const pagingRef = this.pagingRefs[this._current] return pagingRef.continue } setContinue(continueRef?: string) { - if (this.numPages() <= this._current) - return + if (this.numPages() <= this._current) return const pagingRef = this.pagingRefs[this._current] pagingRef.continue = continueRef } refresh() { - if (this.numPages() <= this._current) - return + if (this.numPages() <= this._current) return const pagingRef = this.pagingRefs[this._current] pagingRef.count = -1 // Retain the continueRef since that was set by previous to current - if ((this._current + 1) < this.numPages()) { + if (this._current + 1 < this.numPages()) { // Remove all next paging refs as they are potentially out-of-date - this.pagingRefs = this.pagingRefs.slice(0, (this._current + 1)) + this.pagingRefs = this.pagingRefs.slice(0, this._current + 1) } } - resolveContinue(count: number, required: number, continueRef: string|undefined): string|undefined { - if (! this.isContinueRef(continueRef)) - return undefined + resolveContinue(count: number, required: number, continueRef: string | undefined): string | undefined { + if (!this.isContinueRef(continueRef)) return undefined - const enoughPods = (required - count) === 0 + const enoughPods = required - count === 0 if (!enoughPods) { // Not enough pods were found to satisfy the required number // so need to return the continueRef to find some more @@ -93,7 +88,7 @@ export class PagingMetadataImpl implements PagingMetadata { return undefined } - addPage(count: number, required: number, continueRef: string|undefined): string|undefined { + addPage(count: number, required: number, continueRef: string | undefined): string | undefined { const pagingRefLen = this.pagingRefs.push({ count: count }) this._current = pagingRefLen - 1 @@ -103,16 +98,14 @@ export class PagingMetadataImpl implements PagingMetadata { count(index?: number): number { if (!index) index = this._current - if (this.numPages() <= index) - return -1 + if (this.numPages() <= index) return -1 const pagingRef = this.pagingRefs[index] return pagingRef.count } setCount(count: number) { - if (this.numPages() <= this._current) - return + if (this.numPages() <= this._current) return const pagingRef = this.pagingRefs[this._current] pagingRef.count = count @@ -121,8 +114,7 @@ export class PagingMetadataImpl implements PagingMetadata { continueRef(index?: number): string | undefined { if (!index) index = this._current - if (this.numPages() <= index) - return undefined + if (this.numPages() <= index) return undefined const pagingRef = this.pagingRefs[index] return pagingRef.continue @@ -141,15 +133,13 @@ export class PagingMetadataImpl implements PagingMetadata { previous() { if (this._current === 0) return - if (! this.hasPrevious()) - return + if (!this.hasPrevious()) return --this._current } hasNext(): boolean { - if (this.numPages() <= this._current + 1) - return false + if (this.numPages() <= this._current + 1) return false const nextRef = this.pagingRefs[this._current + 1] return isObject(nextRef) && this.isContinueRef(nextRef.continue) @@ -159,11 +149,9 @@ export class PagingMetadataImpl implements PagingMetadata { * Moves current forward 1 to get the next paging metadata */ next() { - if (this._current === (this.numPages() - 1)) - return + if (this._current === this.numPages() - 1) return - if (! this.hasNext()) - return + if (!this.hasNext()) return ++this._current } diff --git a/packages/kubernetes-api/src/utils/objects.ts b/packages/kubernetes-api/src/utils/objects.ts index bdfc2664..a9550293 100644 --- a/packages/kubernetes-api/src/utils/objects.ts +++ b/packages/kubernetes-api/src/utils/objects.ts @@ -13,9 +13,9 @@ export function isEmpty(obj: object): boolean { return Object.keys(obj).length === 0 } -export function hasProperty(obj: object|undefined, property: string): boolean { - if (! obj) return false - return obj.hasOwnProperty(property) +export function hasProperty(obj: object | undefined, property: string): boolean { + if (!obj) return false + return Object.hasOwn(obj, property) } export function isArray(obj: T | T[]): obj is T[] {