Skip to content

Commit

Permalink
feat: add kubernetes models and operator
Browse files Browse the repository at this point in the history
  • Loading branch information
GUANGHUIW\74777 committed Dec 23, 2024
1 parent b9b904a commit 79fd965
Show file tree
Hide file tree
Showing 8 changed files with 363 additions and 0 deletions.
4 changes: 4 additions & 0 deletions projects/ngx-lift/src/lib/models/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
export * from './async-state.model';
export * from './kubernetes-list.model';
export * from './kubernetes-object.model';
export * from './kubernetes-object-condition.model';
export * from './kubernetes-object-meta.model';
8 changes: 8 additions & 0 deletions projects/ngx-lift/src/lib/models/kubernetes-list.model.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import {KubernetesObject} from './kubernetes-object.model';

export interface KubernetesList<T extends KubernetesObject> {
apiVersion: string;
metadata: {continue: string; resourceVersion: string};
kind: string;
items: T[];
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
export interface KubernetesObjectCondition {
/**
* lastTransitionTime is a string representing the last time the condition transitioned from one status to another.
* If the underlying condition change is unknown, the time when the API field changed is used.
*/
lastTransitionTime: string;

/**
* message is a string providing a human - readable message about the transition. It can be an empty string.
*/
message: string;

/**
* reason is a string containing a programmatic identifier indicating the reason for the condition's last transition.
* Producers of specific condition types may define expected values and meanings for this field.
*/
reason: string;

/**
* status is an enum with possible values 'True', 'False', or 'Unknown' representing the current status of the condition.
*/
status: 'True' | 'False' | 'Unknown';

/**
* type is a string representing the type of condition in CamelCase or in foo.example.com/CamelCase format.
*/
type: string;

/**
* observedGeneration is an integer representing the.metadata.generation that the condition was set based upon.
*/
observedGeneration?: number;
}
39 changes: 39 additions & 0 deletions projects/ngx-lift/src/lib/models/kubernetes-object-meta.model.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.23/#objectmeta-v1-meta
*/
export interface KubernetesObjectMetaV1 {
name: string;
namespace?: string; // can be undefined for cluster-scoped resources
selfLink?: string;
uid?: string;
resourceVersion?: string;
generation?: number;
creationTimestamp?: string;
deletionTimestamp?: string; // only present if the object is being deleted
deletionGracePeriodSeconds?: number; // only present if the object is being deleted
labels?: Record<string, string>;
annotations?: Record<string, string>;
ownerReferences?: OwnerReference[];
finalizers?: string[];
clusterName?: string; // only present for objects in a cluster
managedFields?: ManagedField[];
}

interface OwnerReference {
apiVersion: string;
kind: string;
name: string;
uid: string;
controller: boolean;
blockOwnerDeletion?: boolean;
}

interface ManagedField {
apiVersion: string;
fieldsType: string;
fieldsV1: Record<string, unknown>;
manager: string;
operation: string;
subresource?: string;
time: string;
}
10 changes: 10 additions & 0 deletions projects/ngx-lift/src/lib/models/kubernetes-object.model.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import {KubernetesObjectMetaV1} from './kubernetes-object-meta.model';

export interface KubernetesObject {
apiVersion: string;
kind: string;
metadata: KubernetesObjectMetaV1;
spec?: any;
status?: any;
}
1 change: 1 addition & 0 deletions projects/ngx-lift/src/lib/operators/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
export * from './combine-latest-eager.operator';
export * from './create-async-state.operator';
export * from './distinct-on-change.operator';
export * from './kubernetes-pagination.operator';
export * from './logger.operator';
export * from './poll.operator';
export * from './start-with-tap.operator';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* ******************************************************************
* Copyright (c) 2024 Broadcom. All Rights Reserved.
* Broadcom Confidential. The term "Broadcom" refers to Broadcom Inc.
* and/or its subsidiaries.
* ******************************************************************
*/
import {HttpClient, provideHttpClient} from '@angular/common/http';
import {HttpTestingController, provideHttpClientTesting} from '@angular/common/http/testing';
import {TestBed} from '@angular/core/testing';
import {of} from 'rxjs';

import {KubernetesList, KubernetesObject} from '../models';
import {
aggregatePaginatedKubernetesResources,
fetchPaginatedKubernetesResources,
} from './kubernetes-pagination.operator';

describe('aggregatePaginatedKubernetesResources', () => {
let httpClient: HttpClient;
let httpTestingController: HttpTestingController;

beforeEach(() => {
TestBed.configureTestingModule({
imports: [],
providers: [provideHttpClient(), provideHttpClientTesting()],
});

httpClient = TestBed.inject(HttpClient);
httpTestingController = TestBed.inject(HttpTestingController);
});

afterEach(() => {
httpTestingController.verify();
});

it('should aggregate paginated Kubernetes resources correctly', () => {
const endpoint = '/test-endpoint';
const initialParams = {};
const mockResponse1: KubernetesList<KubernetesObject> = {
apiVersion: 'v1',
kind: 'List',
metadata: {continue: 'token1', resourceVersion: '1'},
items: [
{apiVersion: 'v1', kind: 'List', metadata: {name: 'Item 1'}},
{apiVersion: 'v1', kind: 'List', metadata: {name: 'Item 2'}},
],
};
const mockResponse2: KubernetesList<KubernetesObject> = {
apiVersion: 'v1',
kind: 'List',
metadata: {continue: '', resourceVersion: '1'},
items: [
{apiVersion: 'v1', kind: 'List', metadata: {name: 'Item 3'}},
{apiVersion: 'v1', kind: 'List', metadata: {name: 'Item 4'}},
],
};

const source$ = of(mockResponse1);

const result$ = source$.pipe(aggregatePaginatedKubernetesResources(httpClient, endpoint, initialParams));

result$.subscribe((aggregatedList) => {
expect(aggregatedList.items.length).toBe(4);
});

const req1 = httpTestingController.expectOne(`${endpoint}?continue=token1`);
expect(req1.request.method).toBe('GET');
req1.flush(mockResponse2);
});

it('should not call API if no more pages', (done) => {
const endpoint = '/test-endpoint';
const initialParams = {};
const mockResponse: KubernetesList<KubernetesObject> = {
apiVersion: 'v1',
kind: 'List',
metadata: {continue: '', resourceVersion: '1'},
items: [
{apiVersion: 'v1', kind: 'List', metadata: {name: 'Item 1'}},
{apiVersion: 'v1', kind: 'List', metadata: {name: 'Item 2'}},
],
};

const source$ = of(mockResponse);

const result$ = source$.pipe(aggregatePaginatedKubernetesResources(httpClient, endpoint, initialParams));

result$.subscribe((aggregatedList) => {
expect(aggregatedList.items.length).toBe(2);
done();
});

httpTestingController.expectNone(endpoint);
});
});

describe('fetchPaginatedKubernetesResources', () => {
let httpClient: HttpClient;
let httpTestingController: HttpTestingController;

beforeEach(() => {
TestBed.configureTestingModule({
imports: [],
providers: [provideHttpClient(), provideHttpClientTesting()],
});

httpClient = TestBed.inject(HttpClient);
httpTestingController = TestBed.inject(HttpTestingController);
});

afterEach(() => {
httpTestingController.verify();
});

it('should fetch paginated Kubernetes resources correctly', () => {
const endpoint = '/test-endpoint';
const initialParams = {};
const mockResponse1: KubernetesList<KubernetesObject> = {
apiVersion: 'v1',
kind: 'List',
metadata: {continue: 'token1', resourceVersion: '1'},
items: [
{apiVersion: 'v1', kind: 'List', metadata: {name: 'Item 1'}},
{apiVersion: 'v1', kind: 'List', metadata: {name: 'Item 2'}},
],
};
const mockResponse2: KubernetesList<KubernetesObject> = {
apiVersion: 'v1',
kind: 'List',
metadata: {continue: '', resourceVersion: '1'},
items: [
{apiVersion: 'v1', kind: 'List', metadata: {name: 'Item 3'}},
{apiVersion: 'v1', kind: 'List', metadata: {name: 'Item 4'}},
],
};

const result$ = fetchPaginatedKubernetesResources(httpClient, endpoint, initialParams);

result$.subscribe((aggregatedList) => {
expect(aggregatedList.items.length).toBe(4);
});

const req1 = httpTestingController.expectOne(endpoint);
expect(req1.request.method).toBe('GET');
req1.flush(mockResponse1);

const req2 = httpTestingController.expectOne(`${endpoint}?continue=token1`);
expect(req2.request.method).toBe('GET');
req2.flush(mockResponse2);
});

it('should make the 1st API call if no more pages', () => {
const endpoint = '/test-endpoint';
const initialParams = {};
const mockResponse: KubernetesList<KubernetesObject> = {
apiVersion: 'v1',
kind: 'List',
metadata: {continue: '', resourceVersion: '1'},
items: [
{apiVersion: 'v1', kind: 'List', metadata: {name: 'Item 1'}},
{apiVersion: 'v1', kind: 'List', metadata: {name: 'Item 2'}},
],
};

const result$ = fetchPaginatedKubernetesResources(httpClient, endpoint, initialParams);

result$.subscribe((aggregatedList) => {
expect(aggregatedList.items.length).toBe(2);
});

const req1 = httpTestingController.expectOne(endpoint);
expect(req1.request.method).toBe('GET');
req1.flush(mockResponse);
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* ******************************************************************
* Copyright (c) 2024 Broadcom. All Rights Reserved.
* Broadcom Confidential. The term "Broadcom" refers to Broadcom Inc.
* and/or its subsidiaries.
* ******************************************************************
*/

import {HttpClient} from '@angular/common/http';
import {EMPTY, expand, Observable, OperatorFunction, reduce} from 'rxjs';

import {KubernetesList, KubernetesObject} from '../models';

/**
* Fetches paginated Kubernetes resources by continually making requests
* until all pages have been retrieved, and aggregates the items from all pages
* into a single KubernetesList.
*
* @template T The type of the items contained within the KubernetesList.
* @param http The HttpClient instance used to make the HTTP requests.
* @param endpoint The API endpoint to fetch the resources from.
* @param initialParams Optional initial parameters to include in the request.
* Can include query parameters like filters and pagination settings.
* `limit` and `continue` parameters are parameters for kubernetes
* @returns An observable that emits a single KubernetesList containing all items from all pages.
*/
export function aggregatePaginatedKubernetesResources<T extends KubernetesObject>(
http: HttpClient,
endpoint: string,
initialParams: Record<string, string | number | boolean | (string | number | boolean)[]> = {},
): OperatorFunction<KubernetesList<T>, KubernetesList<T>> {
return (source$: Observable<KubernetesList<T>>) => {
return source$.pipe(
expand((response) => {
const {metadata} = response;
const {continue: continueToken} = metadata;
if (continueToken) {
const params = {...initialParams, continue: continueToken};
return http.get<KubernetesList<T>>(endpoint, {params});
}
return EMPTY; // No more pages
}),
reduce((acc, current) => {
const {items: currentPageItems} = current;
if (currentPageItems) {
acc.items = acc.items.concat(currentPageItems);
}
return acc;
}),
);
};
}

/**
* Fetches paginated Kubernetes resources by continually making requests
* until all pages have been retrieved.
*
* @template T The type of the items contained within the KubernetesList.
* @param http The HttpClient instance used to make the HTTP requests.
* @param endpoint The API endpoint to fetch the resources from.
* @param initialParams Optional initial parameters to include in the request.
* Can include query parameters like filters and pagination settings.
* `limit` and `continue` parameters are parameters for kubernetes
* @returns An observable that emits a single KubernetesList containing all items from all pages.
*/

export function fetchPaginatedKubernetesResources<T extends KubernetesObject>(
http: HttpClient,
endpoint: string,
initialParams: Record<string, string | number | boolean | (string | number | boolean)[]> = {},
) {
const initialRequest$ = http.get<KubernetesList<T>>(endpoint, {params: initialParams});

return initialRequest$.pipe(
expand((response) => {
const {metadata} = response;
const {continue: continueToken} = metadata;
if (continueToken) {
const params = {...initialParams, continue: continueToken};
return http.get<KubernetesList<T>>(endpoint, {params});
}
return EMPTY; // No more pages
}),
reduce((acc, current) => {
const {items: currentPageItems} = current;
if (currentPageItems) {
acc.items = acc.items.concat(currentPageItems);
}
return acc;
}),
);
}

0 comments on commit 79fd965

Please sign in to comment.