Skip to content

Commit

Permalink
[teraslice] fix multi-job deletion bug in force job stop (#3751)
Browse files Browse the repository at this point in the history
This PR makes the following changes:

- Ensure that `K8s._deleteObjByExId()` contains an object in
`objList.items[]` before calling `K8s.deleted()`.
- Throw if `K8s.delete()` receives a `name` argument that is undefined
or an empty string.
- Refactor `K8s._deleteObjByExId()` to delete multiple objects and not
handle `force logic`
- Refactor `k8s.deleteExecution() to handle `force` logic by calling
`K8s._deleteObjByExId()` on multiple resources.
- bump teraslice from version 2.3.1 to 2.3.2
- pin the kind image used by k8s to:
`kindest/node:v1.28.12@sha256:fa0e48b1e83bb8688a5724aa7eebffbd6337abd7909ad089a2700bf08c30c6ea`.
Note that the version in the image tag refers to the kubernetes server
version that will be used. We should keep this in sync with prod.

ref: #3750
  • Loading branch information
busma13 authored Sep 19, 2024
1 parent 0066cb3 commit 2f1f453
Show file tree
Hide file tree
Showing 15 changed files with 462 additions and 260 deletions.
1 change: 1 addition & 0 deletions e2e/k8s/kindConfigDefaultPorts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ name: k8s-env
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
image: kindest/node:v1.28.12@sha256:fa0e48b1e83bb8688a5724aa7eebffbd6337abd7909ad089a2700bf08c30c6ea
extraPortMappings:
- containerPort: 30200 # Map internal elasticsearch service to host port
hostPort: 9200
Expand Down
1 change: 1 addition & 0 deletions e2e/k8s/kindConfigDefaultPortsDev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ name: k8s-env
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
image: kindest/node:v1.28.12@sha256:fa0e48b1e83bb8688a5724aa7eebffbd6337abd7909ad089a2700bf08c30c6ea
extraPortMappings:
- containerPort: 30200 # Map internal elasticsearch service to host port
hostPort: 9200
Expand Down
1 change: 1 addition & 0 deletions e2e/k8s/kindConfigTestPorts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ name: k8s-e2e
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
image: kindest/node:v1.28.12@sha256:fa0e48b1e83bb8688a5724aa7eebffbd6337abd7909ad089a2700bf08c30c6ea
extraPortMappings:
- containerPort: 30200 # Map internal elasticsearch service to host port
hostPort: 49200
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "teraslice-workspace",
"displayName": "Teraslice",
"version": "2.3.1",
"version": "2.3.2",
"private": true,
"homepage": "https://github.com/terascope/teraslice",
"bugs": {
Expand Down
2 changes: 1 addition & 1 deletion packages/job-components/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@terascope/job-components",
"displayName": "Job Components",
"version": "1.3.0",
"version": "1.3.1",
"description": "A teraslice library for validating jobs schemas, registering apis, and defining and running new Job APIs",
"homepage": "https://github.com/terascope/teraslice/tree/master/packages/job-components#readme",
"bugs": {
Expand Down
2 changes: 1 addition & 1 deletion packages/scripts/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@terascope/scripts",
"displayName": "Scripts",
"version": "1.1.1",
"version": "1.1.2",
"description": "A collection of terascope monorepo scripts",
"homepage": "https://github.com/terascope/teraslice/tree/master/packages/scripts#readme",
"bugs": {
Expand Down
4 changes: 2 additions & 2 deletions packages/teraslice-test-harness/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@
"fs-extra": "^11.2.0"
},
"devDependencies": {
"@terascope/job-components": "^1.3.0"
"@terascope/job-components": "^1.3.1"
},
"peerDependencies": {
"@terascope/job-components": ">=1.3.0"
"@terascope/job-components": ">=1.3.1"
},
"engines": {
"node": ">=18.18.0",
Expand Down
4 changes: 2 additions & 2 deletions packages/teraslice/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "teraslice",
"displayName": "Teraslice",
"version": "2.3.1",
"version": "2.3.2",
"description": "Distributed computing platform for processing JSON data",
"homepage": "https://github.com/terascope/teraslice#readme",
"bugs": {
Expand Down Expand Up @@ -40,7 +40,7 @@
"dependencies": {
"@kubernetes/client-node": "^0.21.0",
"@terascope/elasticsearch-api": "^4.1.0",
"@terascope/job-components": "^1.3.0",
"@terascope/job-components": "^1.3.1",
"@terascope/teraslice-messaging": "^1.4.0",
"@terascope/types": "^1.1.0",
"@terascope/utils": "^1.1.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ export class KubernetesClusterBackend {
*/
async listResourcesForJobId(jobId: string) {
const resources = [];
const resourceTypes = ['pods', 'deployments', 'services', 'jobs'];
const resourceTypes = ['pods', 'deployments', 'services', 'jobs', 'replicasets'];
for (const type of resourceTypes) {
const list = await this.k8s.list(`teraslice.terascope.io/jobId=${jobId}`, type);
if (list.items.length > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import KubeClient from 'kubernetes-client';
// @ts-expect-error
import Request from 'kubernetes-client/backends/request/index.js';
import { getRetryConfig } from './utils.js';
import { IncomingMessage } from 'node:http';

// @ts-expect-error
const { Client, KubeConfig } = KubeClient;
Expand Down Expand Up @@ -98,7 +99,7 @@ export class K8s {
const namespace = ns || this.defaultNamespace;
let now = Date.now();
const end = now + timeout;

while (true) {
const result = await pRetry(() => this.client
.api.v1.namespaces(namespace).pods()
Expand Down Expand Up @@ -136,7 +137,7 @@ export class K8s {
const namespace = ns || this.defaultNamespace;
let now = Date.now();
const end = now + timeout;

while (true) {
const result = await pRetry(() => this.client
.api.v1.namespaces(namespace).pods()
Expand All @@ -163,7 +164,7 @@ export class K8s {
* returns list of k8s objects matching provided selector
* @param {String} selector kubernetes selector, like 'app=teraslice'
* @param {String} objType Type of k8s object to get, valid options:
* 'pods', 'deployment', 'services', 'jobs'
* 'pods', 'deployment', 'services', 'jobs', 'replicasets'
* @param {String} ns namespace to search, this will override the default
* @return {Object} body of k8s get response.
*/
Expand All @@ -188,6 +189,10 @@ export class K8s {
response = await pRetry(() => this.client
.apis.batch.v1.namespaces(namespace).jobs()
.get({ qs: { labelSelector: selector } }), getRetryConfig());
} else if (objType === 'replicasets') {
response = await pRetry(() => this.client
.apis.apps.v1.namespaces(namespace).replicasets()
.get({ qs: { labelSelector: selector } }), getRetryConfig());
} else {
const error = new Error(`Wrong objType provided to get: ${objType}`);
this.logger.error(error);
Expand Down Expand Up @@ -298,12 +303,15 @@ export class K8s {
* Deletes k8s object of specified objType
* @param {String} name Name of the resource to delete
* @param {String} objType Type of k8s object to get, valid options:
* 'deployments', 'services', 'jobs'
* 'deployments', 'services', 'jobs', 'pods', 'replicasets'
* @param {Boolean} force Forcefully delete resource by setting gracePeriodSeconds to 1
* to be forcefully stopped.
* @return {Object} k8s delete response body.
*/
async delete(name: string, objType: string, force?: boolean) {
if (name === undefined || name.trim() === '') {
throw new Error(`Name of resource to delete must be specified. Received: "${name}".`);
}

let response;

// To get a Job to remove the associated pods you have to
Expand All @@ -321,23 +329,53 @@ export class K8s {
deleteOptions.body.gracePeriodSeconds = 1;
}

const deleteWithErrorHandling = async (deleteFn: () => Promise<{
response: IncomingMessage,
body: Record<string, any>
}>) => {
try {
const res = await deleteFn();
return res;
} catch (e) {
if (e.statusCode) {
// 404 should be an acceptable response to a delete request, not an error
if (e.statusCode === 404) {
this.logger.info(`No ${objType} with name ${name} found while attempting to delete.`);
return e;
}

if (e.statusCode >= 400) {
const err = new TSError(`Unexpected response code (${e.statusCode}), when deleting name: ${name}`);
this.logger.error(err);
err.code = e.statusCode.toString();
return Promise.reject(err);
}
}
throw e;
}
}

try {
if (objType === 'services') {
response = await pRetry(() => this.client
response = await pRetry(() => deleteWithErrorHandling(() => this.client
.api.v1.namespaces(this.defaultNamespace).services(name)
.delete(), getRetryConfig());
.delete(deleteOptions)), getRetryConfig());
} else if (objType === 'deployments') {
response = await pRetry(() => this.client
response = await pRetry(() => deleteWithErrorHandling(() => this.client
.apis.apps.v1.namespaces(this.defaultNamespace).deployments(name)
.delete(), getRetryConfig());
.delete(deleteOptions)), getRetryConfig());
} else if (objType === 'jobs') {
response = await pRetry(() => this.client
response = await pRetry(() => deleteWithErrorHandling(() => this.client
.apis.batch.v1.namespaces(this.defaultNamespace).jobs(name)
.delete(deleteOptions), getRetryConfig());
.delete(deleteOptions)), getRetryConfig());
} else if (objType === 'pods') {
response = await pRetry(() => this.client
response = await pRetry(() => deleteWithErrorHandling(() => this.client
.api.v1.namespaces(this.defaultNamespace).pods(name)
.delete(deleteOptions), getRetryConfig());
.delete(deleteOptions)), getRetryConfig());
} else if (objType === 'replicasets') {
response = await pRetry(() => deleteWithErrorHandling(() => this.client
.apis.apps.v1.namespaces(this.defaultNamespace).replicasets(name)
.delete(deleteOptions)), getRetryConfig());
} else {
throw new Error(`Invalid objType: ${objType}`);
}
Expand All @@ -347,44 +385,47 @@ export class K8s {
return Promise.reject(err);
}

if (response.statusCode >= 400) {
const err = new TSError(`Unexpected response code (${response.statusCode}), when deleting name: ${name}`);
this.logger.error(err);
err.code = response.statusCode;
return Promise.reject(err);
}

return response.body;
}

/**
* Delete all of Kubernetes resources related to the specified exId
* @param {String} exId ID of the execution
* @param {Boolean} force Forcefully stop all related pod, deployment, and job resources
* @param {Boolean} force Forcefully stop all pod, deployment,
* service, replicaset and job resources
* @return {Promise}
*/
async deleteExecution(exId: string, force = false) {
if (!exId) {
throw new Error('deleteExecution requires an executionId');
}

if (force) {
// Order matters. If we delete a parent resource before its children it
// will be marked for background deletion and then can't be force deleted.
await this._deleteObjByExId(exId, 'worker', 'pods', force);
await this._deleteObjByExId(exId, 'worker', 'replicasets', force);
await this._deleteObjByExId(exId, 'worker', 'deployments', force);
await this._deleteObjByExId(exId, 'execution_controller', 'pods', force);
await this._deleteObjByExId(exId, 'execution_controller', 'services', force);
}

await this._deleteObjByExId(exId, 'execution_controller', 'jobs', force);
}

/**
* Finds the k8s object by nodeType and exId and then deletes it
* Finds the k8s objects by nodeType and exId and then deletes them
* @param {String} exId Execution ID
* @param {String} nodeType valid Teraslice k8s node type:
* 'worker', 'execution_controller'
* @param {String} objType valid object type: `services`, `deployments`,
* 'jobs'
* @param {Boolean} force Forcefully stop all related pod, deployment, and job resources
* `jobs`, `pods`, `replicasets`
* @param {Boolean} force Forcefully stop all resources
* @return {Promise}
*/
async _deleteObjByExId(exId: string, nodeType: string, objType: string, force?: boolean) {
let objList;
let forcePodsList;
let deleteResponse;
const deleteResponses = [];

try {
objList = await this.list(`app.kubernetes.io/component=${nodeType},teraslice.terascope.io/exId=${exId}`, objType);
Expand All @@ -394,52 +435,38 @@ export class K8s {
return Promise.reject(err);
}

if (force) {
try {
forcePodsList = await this.list(`teraslice.terascope.io/exId=${exId}`, 'pods');
} catch (e) {
const err = new Error(`Request pods list in _deleteObjByExId with exId: ${exId} failed with: ${e}`);
this.logger.error(err);
return Promise.reject(err);
}
}

if (isEmpty(objList.items) && isEmpty(forcePodsList?.items)) {
if (isEmpty(objList.items)) {
this.logger.info(`k8s._deleteObjByExId: ${exId} ${nodeType} ${objType} has already been deleted`);
return Promise.resolve();
}

const deletePodResponses = [];
if (forcePodsList?.items) {
this.logger.info(`k8s._deleteObjByExId: ${exId} force deleting all pods`);
for (const pod of forcePodsList.items) {
const podName = pod.metadata.name;

try {
deletePodResponses.push(await this.delete(podName, 'pods', force));
} catch (e) {
const err = new Error(`Request k8s.delete in _deleteObjByExId with name: ${podName} failed with: ${e}`);
this.logger.error(err);
return Promise.reject(err);
}
for (const obj of objList.items) {
const { name, deletionTimestamp } = obj.metadata;

if (!name) {
const err = new Error(`Cannot delete ${objType} for ExId: ${exId} by name because it has no name`);
this.logger.error(err);
return Promise.reject(err);
}
}

const name = get(objList, 'items[0].metadata.name');
this.logger.info(`k8s._deleteObjByExId: ${exId} ${nodeType} ${objType} deleting: ${name}`);
// If deletionTimestamp is present then the resource is already terminating.
// K8s will not change the grace period in this case, so force deletion is not possible
if (force && deletionTimestamp) {
this.logger.warn(`Cannot force delete ${name} for ExId: ${exId}. It will finish deleting gracefully by ${deletionTimestamp}`);
return Promise.resolve();
}

try {
deleteResponse = await this.delete(name, objType, force);
} catch (e) {
const err = new Error(`Request k8s.delete in _deleteObjByExId with name: ${name} failed with: ${e}`);
this.logger.error(err);
return Promise.reject(err);
this.logger.info(`k8s._deleteObjByExId: ${exId} ${nodeType} ${objType} ${force ? 'force' : ''} deleting: ${name}`);
try {
deleteResponses.push(await this.delete(name, objType, force));
} catch (e) {
const err = new Error(`Request k8s.delete in _deleteObjByExId with name: ${name} failed with: ${e}`);
this.logger.error(err);
return Promise.reject(err);
}
}

if (deletePodResponses.length > 0) {
deleteResponse.deletePodResponses = deletePodResponses;
}
return deleteResponse;
return deleteResponses;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,12 @@ export class KubernetesClusterBackendV2 {
/**
* Returns a list of all k8s resources associated with a job ID
* @param {string} jobId The job ID of the job to list associated resources
* @returns {Array<any>}
* @returns {Array<K8sClient.V1PodList | K8sClient.V1DeploymentList | K8sClient.V1ServiceList
* | K8sClient.V1JobList | K8sClient.V1ReplicaSetList>}
*/
async listResourcesForJobId(jobId: string) {
const resources = [];
const resourceTypes = ['pods', 'deployments', 'services', 'jobs'];
const resourceTypes = ['pods', 'deployments', 'services', 'jobs', 'replicasets'];
for (const type of resourceTypes) {
const list = await this.k8s.list(`teraslice.terascope.io/jobId=${jobId}`, type);
if (list.items.length > 0) {
Expand Down
Loading

0 comments on commit 2f1f453

Please sign in to comment.