Skip to content

Commit

Permalink
feat: Rework paging algorithm of kubernetes-api (hawtio#496)
Browse files Browse the repository at this point in the history
* Current paging algorithm buggy and unnecessarily complex. Using limit,
  we are trying to save on network data load but this is not the issue with
  discover tab slowdown. Rather the slowdown is the number of pods being
  displayed and handled in the UI.

* Better methodology:
  1) Watch namespace and collect names of pods that are jolokia and add to
     a set
  2) Take a page/chunk of pod names and watch them individually
  3) Return only those pods watched for display in the UI
  4) Prev/Next will move on the page/chunk in the pod name list and
     re-init the pod watchers accordingly

* Removes the PagingMetadata

* Fetches all the pods in a namespace using namespace watcher and adds
  names to list

* Takes chunk from list determined by 'current' page and 'limit' size and
  starts watching each pod using fieldSelector. Provides only those pods as
  the collection of pods that k8Service displays for the namespace

* Restores the wss handler and connections since no longer using limit and
  continue references.

* No longer need to refresh continue tokens since not used

* Changes name of Collection interface to Watched since it also now handles
  single entities being returned from k8 fieldSelector name API calls

* NamespaceClients no longer need to be destroyed and recreated in order
  to work with pod changes. The pod watchers inside it are renewed if
  prev/next is called but otherwise those pod watchers already initialised
  are retained. Only those no longer in the 'page' are removed

* Ref key from ClientFactor uses options name rather than continue ref

* PagingMetadata interface refactored to Paging and applied to K8 Service
  and each namespace client
  • Loading branch information
phantomjinx committed Sep 25, 2024
1 parent 0d544a3 commit 0393b5c
Show file tree
Hide file tree
Showing 13 changed files with 497 additions and 346 deletions.
21 changes: 4 additions & 17 deletions packages/kubernetes-api/src/client/client-factory.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { KubeObject } from '../globals'
import { ClientInstance } from './client-instance'
import { CollectionImpl } from './collection'
import { log, Collection, KOptions, ClientFactory, ProcessDataCallback } from './globals'
import { log, Watched, KOptions, ClientFactory, ProcessDataCallback } from './globals'
import { getKey } from './support'

/*
Expand All @@ -10,8 +10,8 @@ import { getKey } from './support'
export class ClientFactoryImpl implements ClientFactory {
private _clients: Record<string, unknown> = {}

create<T extends KubeObject>(options: KOptions): Collection<T> {
const key = getKey(options.kind, options.namespace, options.continueRef)
create<T extends KubeObject>(options: KOptions): Watched<T> {
const key = getKey(options.kind, options.namespace, options.name)
if (this._clients[key]) {
const client = this._clients[key] as ClientInstance<T>
client.addRef()
Expand All @@ -26,18 +26,11 @@ export class ClientFactoryImpl implements ClientFactory {
}
}

destroy<T extends KubeObject>(client: Collection<T>, ...handles: Array<ProcessDataCallback<T>>) {
destroy<T extends KubeObject>(client: Watched<T>, ...handles: Array<ProcessDataCallback<T>>) {
handles.forEach(handle => {
client.unwatch(handle)
})
const key = client.getKey()

console.log(`XXX destroying client with key: ${key}`)
console.log(`XXX Number of clients: `, Object.getOwnPropertyNames(this._clients).length)
for (const key of Object.getOwnPropertyNames(this._clients)) {
console.log(`XXX client keys: ${key}`)
}

if (this._clients[key]) {
const c = this._clients[key] as ClientInstance<T>
c.removeRef()
Expand All @@ -46,14 +39,8 @@ export class ClientFactoryImpl implements ClientFactory {
delete this._clients[key]
c.destroy()
log.debug('Destroyed client for key:', key)
} else {
console.log('Not destroying client because key ref count too high: ', key, c.refCount)
}
}

for (const key of Object.getOwnPropertyNames(this._clients)) {
console.log(`XXX Remaining client keys: ${key}`)
}
}
}

Expand Down
14 changes: 7 additions & 7 deletions packages/kubernetes-api/src/client/client-instance.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import { KubeObject } from '../globals'
import { Collection } from './globals'
import { Watched } from './globals'

/*
* Manages references to collection instances to allow them to be shared between views
*/
export class ClientInstance<T extends KubeObject> {
private _refCount = 0
private _collection: Collection<T>
private _watched: Watched<T>

constructor(_collection: Collection<T>) {
this._collection = _collection
constructor(_watched: Watched<T>) {
this._watched = _watched
}

get refCount() {
Expand All @@ -25,15 +25,15 @@ export class ClientInstance<T extends KubeObject> {
}

get collection() {
return this._collection
return this._watched
}

disposable() {
return this._refCount <= 0
}

destroy() {
this._collection.destroy()
// delete this._collection
this._watched.destroy()
// delete this._watched
}
}
53 changes: 30 additions & 23 deletions packages/kubernetes-api/src/client/collection.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ServiceSpec } from 'kubernetes-types/core/v1'
import { K8S_EXT_PREFIX, KubeSearchMetadata, KubeObject } from '../globals'
import { K8S_EXT_PREFIX, KubeObject } from '../globals'
import {} from '../kubernetes-service'
import { fetchPath, FetchPathCallback, isFunction, joinPaths } from '../utils'
import { getClusterIP, getName, getNamespace, namespaced, prefixForKind, toCollectionName, wsUrl } from '../helpers'
Expand All @@ -8,13 +8,13 @@ import { k8Api } from '../init'
import {
log,
UNKNOWN_VALUE,
Collection,
KOptions,
ObjectList,
WSHandler,
ProcessDataCallback,
ErrorDataCallback,
POLLING_INTERVAL,
Watched,
} from './globals'
import { ObjectListImpl } from './object-list'
import { WSHandlerImpl } from './ws-handler'
Expand All @@ -23,11 +23,10 @@ import { getKey } from './support'
/*
* Implements the external API for working with k8s collections of objects
*/
export class CollectionImpl<T extends KubeObject> implements Collection<T> {
export class CollectionImpl<T extends KubeObject> implements Watched<T> {
private _namespace?: string
private _path: string
private _apiVersion: string
private metadata: KubeSearchMetadata
private list: ObjectList<T>
private handler: WSHandler<T>
private _isOpenshift: boolean
Expand All @@ -41,6 +40,7 @@ export class CollectionImpl<T extends KubeObject> implements Collection<T> {

const pref = this.getPrefix()

// /apis/apps/v1/namespaces/{namespace}/pods/{name}
if (this._namespace) {
this._path = joinPaths(pref, 'namespaces', this._namespace, this.kind)
} else {
Expand All @@ -52,9 +52,6 @@ export class CollectionImpl<T extends KubeObject> implements Collection<T> {
this.handler = new WSHandlerImpl(this)
const list = (this.list = new ObjectListImpl(_options.kind, _options.namespace))
this.handler.list = list

const metadata = (this.metadata = { count: -1 })
this.handler.metadata = metadata
}

get oAuthToken(): string {
Expand All @@ -78,18 +75,19 @@ export class CollectionImpl<T extends KubeObject> implements Collection<T> {
url = new URL(joinPaths(k8Api.masterUri(), this._path))
}

if (this.options.nsLimit !== undefined) {
url.searchParams.append('limit', `${this.options.nsLimit}`)
}

if (this.options.continueRef !== undefined) {
url.searchParams.append('continue', this.options.continueRef)
}

if (this.options.labelSelector) {
url.searchParams.append('labelSelector', this.options.labelSelector)
}

if (this.options.name) {
/*
* Must add fieldSelector to pin it to a singular entity.
* Has to be done this way to allow for watch parameter to be added.
* See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#read-operations-pod-v1-core
*/
url.searchParams.append('fieldSelector', `metadata.name=${this.options.name}`)
}

return url
}

Expand Down Expand Up @@ -118,6 +116,15 @@ export class CollectionImpl<T extends KubeObject> implements Collection<T> {
url = wsUrl(urlStr)
}

if (this.options.name) {
/*
* Must add fieldSelector to pin it to the singular entity.
* Has to be done this way to allow for watch parameter to be added.
* See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#read-operations-pod-v1-core
*/
url.searchParams.append('fieldSelector', `metadata.name=${this.options.name}`)
}

url.searchParams.append('watch', 'true')

if (this.options.labelSelector) {
Expand All @@ -127,7 +134,7 @@ export class CollectionImpl<T extends KubeObject> implements Collection<T> {
}

getKey() {
return getKey(this.kind, this._namespace, this._options.continueRef)
return getKey(this.kind, this._namespace, this._options.name)
}

get wsURL() {
Expand Down Expand Up @@ -170,7 +177,7 @@ export class CollectionImpl<T extends KubeObject> implements Collection<T> {
this.list.doOnce(WatchActions.INIT, cb)
} else {
setTimeout(() => {
cb(this.list.objects, this.metadata)
cb(this.list.objects)
}, 10)
}
}
Expand Down Expand Up @@ -229,15 +236,15 @@ export class CollectionImpl<T extends KubeObject> implements Collection<T> {
if (this.list.initialized) {
setTimeout(() => {
log.debug(this.kind, 'passing existing objects:', this.list.objects)
cb(this.list.objects, this.metadata)
cb(this.list.objects)
}, POLLING_INTERVAL)
}
log.debug(this.kind, 'adding watch callback:', cb)

this.list.doOn(WatchActions.ANY, ((data: T[]) => {
this.list.doOn(WatchActions.ANY, (data: T[]) => {
log.debug(this.kind, 'got data:', data)
cb(data, this.metadata)
}))
cb(data)
})
return cb
}

Expand Down Expand Up @@ -287,7 +294,7 @@ export class CollectionImpl<T extends KubeObject> implements Collection<T> {
success: data => {
try {
const response = JSON.parse(data)
cb(response, this.metadata)
cb(response)
} catch (err) {
log.error(err)
if (error && err instanceof Error) {
Expand Down Expand Up @@ -330,7 +337,7 @@ export class CollectionImpl<T extends KubeObject> implements Collection<T> {
success: data => {
try {
const response = JSON.parse(data)
cb(response, this.metadata)
cb(response)
} catch (err) {
log.error(err)
if (error && err instanceof Error) {
Expand Down
12 changes: 6 additions & 6 deletions packages/kubernetes-api/src/client/globals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export const pollingOnly = [WatchTypes.IMAGE_STREAM_TAGS]
export const POLLING_INTERVAL = 60000

export const UNKNOWN_VALUE = '<unknown>'
export const UNKNOWN_NAME_VALUE = 'unknown'
export const NO_KIND = 'No kind in supplied options'
export const NO_OBJECT = 'No object in supplied options'
export const NO_OBJECTS = 'No objects in list object'
Expand All @@ -22,6 +23,7 @@ export interface CompareResult<T> {

export interface KOptions extends Record<string, unknown> {
kind: string
name?: string
namespace?: string
apiVersion?: string
labelSelector?: string
Expand All @@ -30,14 +32,13 @@ export interface KOptions extends Record<string, unknown> {
error?: ErrorDataCallback
urlFunction?: (options: KOptions) => string
nsLimit?: number
continueRef?: string
}

export type ProcessDataCallback<T extends KubeObject> = (data: T[], metadata?: KubeSearchMetadata) => void

export type ErrorDataCallback = (err: Error, response?: SimpleResponse) => void

export interface Collection<T extends KubeObject> {
export interface Watched<T extends KubeObject> {
options: KOptions
kind: string
wsURL: string
Expand Down Expand Up @@ -74,9 +75,8 @@ export interface ObjectList<T extends KubeObject> {
export interface WSHandler<T extends KubeObject> {
connected: boolean
kind: string
metadata: { count: number }
list: ObjectList<T>
collection: Collection<T>
watched: Watched<T>
error: ErrorDataCallback | undefined
connect(): void
onOpen(event: Event): void
Expand All @@ -87,6 +87,6 @@ export interface WSHandler<T extends KubeObject> {
}

export interface ClientFactory {
create<T extends KubeObject>(options: KOptions, namespace?: string): Collection<T>
destroy<T extends KubeObject>(client: Collection<T>, ...handles: Array<ProcessDataCallback<T>>): void
create<T extends KubeObject>(options: KOptions, namespace?: string): Watched<T>
destroy<T extends KubeObject>(client: Watched<T>, ...handles: Array<ProcessDataCallback<T>>): void
}
Loading

0 comments on commit 0393b5c

Please sign in to comment.