Skip to content

Commit

Permalink
feat(clouddriver): make fetching properties file more resilient for k…
Browse files Browse the repository at this point in the history
…8s jobs (#4783)

* tests(orca/clouddriver): refactor tests to remove some test code duplications and make them more descriptive

* feat(orca/clouddriver): make waitOnJobCompletion retries configurable

* feat(clouddriver): make fetching properties file more resilient for k8s jobs

If a k8s run job is marked as succeeded, and property file is defined in the stage context,
then it can so happen that multiple pods are created for that job.
 See https://kubernetes.io/docs/concepts/workloads/controllers/job/#handling-pod-and-container-failures

In extreme edge cases, the first pod may be around before the second one succeeds.
That leads to kubectl logs job/ command failing as seen below:
kubectl -n test logs job/test-run-job-5j2vl -c parser
Found 2 pods, using pod/test-run-job-5j2vl-fj8hd
Error from server (BadRequest): container "parser" in pod "test-run-job-5j2vl-fj8hd" is terminated
or
Found 2 pods, using pod/test-run-job-5j2vl-fj8hd
Error from server (BadRequest): container "parser" in pod "test-run-job-5j2vl-fj8hd" is waiting to start: PodInitializing

where that commands defaults to using one of the two pods.

To fix this issue, if we encounter an error from the kubectl logs job/ command, we
find a successful pod in the job and directly query it for logs.

---------

Co-authored-by: Apoorv Mahajan <[email protected]>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Sep 27, 2024
1 parent 54ba109 commit 9428996
Show file tree
Hide file tree
Showing 13 changed files with 1,279 additions and 1,045 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,11 @@ public class WaitOnJobCompletion implements CloudProviderAware, OverridableTimeo
InputStream jobStream
retrySupport.retry({
jobStream = katoRestService.collectJob(appName, account, location, name).body.in()
}, 6, 5000, false) // retry for 30 seconds
},
configProperties.getJobStatusRetry().maxAttempts,
Duration.ofMillis(configProperties.getJobStatusRetry().getBackOffInMs()),
configProperties.getJobStatusRetry().exponentialBackoffEnabled
)
Map job = objectMapper.readValue(jobStream, new TypeReference<Map>() {})
outputs.jobStatus = job

Expand All @@ -165,24 +169,16 @@ public class WaitOnJobCompletion implements CloudProviderAware, OverridableTimeo

if ((status == ExecutionStatus.SUCCEEDED) || (status == ExecutionStatus.TERMINAL)) {
if (stage.context.propertyFile) {
Map<String, Object> properties = [:]
try {
retrySupport.retry({
properties = katoRestService.getFileContents(appName, account, location, name, stage.context.propertyFile)
}, 6, 5000, false) // retry for 30 seconds
} catch (Exception e) {
if (status == ExecutionStatus.SUCCEEDED) {
throw new ConfigurationException("Property File: ${stage.context.propertyFile} contents could not be retrieved. Error: " + e)
}
log.warn("failed to get file contents for ${appName}, account: ${account}, namespace: ${location}, " +
"manifest: ${name} from propertyFile: ${stage.context.propertyFile}. Error: ", e)
}

if (properties.size() == 0) {
if (status == ExecutionStatus.SUCCEEDED) {
throw new ConfigurationException("Expected properties file ${stage.context.propertyFile} but it was either missing, empty or contained invalid syntax")
}
} else if (properties.size() > 0) {
Map<String, Object> properties = getPropertyFileContents(
job,
appName,
status,
account,
location,
name,
stage.context.propertyFile as String)

if (properties.size() > 0) {
outputs << properties
outputs.propertyFileContents = properties
}
Expand Down Expand Up @@ -251,4 +247,197 @@ public class WaitOnJobCompletion implements CloudProviderAware, OverridableTimeo
}
throw new JobFailedException(errorMessage)
}

/**
* <p>this method attempts to get property file from clouddriver and then parses its contents. Depending
* on the job itself, it could be handled by any job provider in clouddriver. This method should only be
* called for jobs with ExecutionStatus as either SUCCEEDED or TERMINAL.
*
* <p>If property file contents could not be retrieved from clouddriver, then the error handling depends
* on the job's ExecutionStatus. If it is SUCCEEDED, then an exception is thrown. Otherwise, no exception
* is thrown since we don't want to mask the real reason behind the job failure.
*
* <p>If ExecutionStatus == SUCCEEDED, and especially for kubernetes run jobs, it can so happen that a user
* has configured the job spec to run 1 pod, have completions and parallelism == 1, and
* restartPolicy == Never. Despite that, kubernetes may end up running another pod as stated here:
* https://kubernetes.io/docs/concepts/workloads/controllers/job/#handling-pod-and-container-failures
* In such a scenario, it may so happen that two pods are created for that job. The first pod may still be
* around, such as in a PodInitializing state and the second pod could complete before the first one is
* terminated. This leads to the getFileContents() call failing, since under the covers, kubernetes job
* provider runs kubectl logs job/<jobName> command, which picks one out of the two pods to obtain the
* logs as seen here:
*
* <p>kubectl -n test logs job/test-run-job-5j2vl -c parser
* Found 2 pods, using pod/test-run-job-5j2vl-fj8hd
* Error from server (BadRequest): container "parser" in pod "test-run-job-5j2vl-fj8hd" is PodInitializing
*
* <p>That means, even if kubernetes and clouddriver marked the job as successful, since number of
* succeeded pods >= number of completions, the kubectl command shown above could still end using
* the failed pod for obtaining the logs.
*
* <p>To handle this case, if we get an error while making the getFileContents() call or if we don't receive
* any properties, then for kubernetes jobs, we figure out if the job status has any pod with phase
* SUCCEEDED. If we find such a pod, then we directly get the logs from this succeeded pod. Otherwise,
* we throw an exception as before.
*
* <p> we aren't handling the above case for ExecutionStatus == TERMINAL, because at that point, we wouldn't
* know which pod to query for properties file contents. It could so happen that all the pods in such a job
* have failed, then we would have to loop over each pod and see what it generated. Then if say, two pods
* generated different property values for the same key, which one do we choose? Bearing this complexity
* in mind, and knowing that for succeeded jobs, this solution actually helps prevent a pipeline failure,
* we are limiting this logic to succeeded jobs only for now.
*
* @param job - job status returned by clouddriver
* @param appName - application name where the job is run
* @param status - Execution status of the job. Should either be SUCCEEDED or TERMINAL
* @param account - account under which this job is run
* @param location - where this job is run
* @param name - name of the job
* @param propertyFile - file name to query from the job
* @return map of property file contents
*/
private Map<String, Object> getPropertyFileContents(
Map job,
String appName,
ExecutionStatus status,
String account,
String location,
String name,
String propertyFile
) {
Map<String, Object> properties = [:]
try {
retrySupport.retry({
properties = katoRestService.getFileContents(appName, account, location, name, propertyFile)
},
configProperties.getFileContentRetry().maxAttempts,
Duration.ofMillis(configProperties.getFileContentRetry().getBackOffInMs()),
configProperties.getFileContentRetry().exponentialBackoffEnabled
)
} catch (Exception e) {
log.warn("Error occurred while retrieving property file contents from job: ${name}" +
" in application: ${appName}, in account: ${account}, location: ${location}," +
" using propertyFile: ${propertyFile}. Error: ", e
)

// For succeeded kubernetes jobs, let's try one more time to get property file contents.
if (status == ExecutionStatus.SUCCEEDED) {
properties = getPropertyFileContentsForSucceededKubernetesJob(
job,
appName,
account,
location,
propertyFile
)
if (properties.size() == 0) {
// since we didn't get any properties, we fail with this exception
throw new ConfigurationException("Expected properties file: ${propertyFile} in " +
"job: ${name}, application: ${appName}, location: ${location}, account: ${account} " +
"but it was either missing, empty or contained invalid syntax. Error: ${e}")
}
}
}

if (properties.size() == 0) {
log.warn("Could not parse propertyFile: ${propertyFile} in job: ${name}" +
" in application: ${appName}, in account: ${account}, location: ${location}." +
" It is either missing, empty or contains invalid syntax"
)

// For succeeded kubernetes jobs, let's try one more time to get property file contents.
if (status == ExecutionStatus.SUCCEEDED) {
// let's try one more time to get properties from a kubernetes pod
properties = getPropertyFileContentsForSucceededKubernetesJob(
job,
appName,
account,
location,
propertyFile
)
if (properties.size() == 0) {
// since we didn't get any properties, we fail with this exception
throw new ConfigurationException("Expected properties file: ${propertyFile} in " +
"job: ${name}, application: ${appName}, location: ${location}, account: ${account} " +
"but it was either missing, empty or contained invalid syntax")
}
}
}
return properties
}

/**
* This method is supposed to be called from getPropertyFileContents(). This is only applicable for
* Kubernetes jobs. It finds a successful pod in the job and directly queries it for property file
* contents.
*
* <p>It is meant to handle the following case:
*
* <p> if ExecutionStatus == SUCCEEDED, and especially for kubernetes run jobs, it can so happen that a
* user has configured the job spec to run 1 pod, have completions and parallelism == 1, and
* restartPolicy == Never. Despite that, kubernetes may end up running another pod as stated here:
* https://kubernetes.io/docs/concepts/workloads/controllers/job/#handling-pod-and-container-failures
* In such a scenario, it may so happen that two pods are created for that job. The first pod may still be
* around, such as in a PodInitializing state and the second pod could complete before the first one is
* terminated. This leads to the getFileContents() call failing, since under the covers, kubernetes job
* provider runs kubectl logs job/<jobName> command, which picks one out of the two pods to obtain the
* logs as seen here:
*
* <p>kubectl -n test logs job/test-run-job-5j2vl -c parser
* Found 2 pods, using pod/test-run-job-5j2vl-fj8hd
* Error from server (BadRequest): container "parser" in pod "test-run-job-5j2vl-fj8hd" is PodInitializing
*
* <p>That means, even if kubernetes and clouddriver marked the job as successful, since number of
* succeeded pods >= number of completions, the kubectl command shown above could still end using
* the failed pod for obtaining the logs.
*
* <p>To handle this case, if we get an error while making the getFileContents() call or if we don't receive
* any properties, then for kubernetes jobs, we figure out if the job status has any pod with phase
* SUCCEEDED. If we find such a pod, then we directly get the logs from this succeeded pod. Otherwise,
* we throw an exception as before.
*
* <p>To keep it simple, and not worry about how to deal with property file
* contents obtained from various successful pods in a job, if that may happen, we simply query the first
* successful pod in that job.
*
* @param job - job status returned by clouddriver
* @param appName - application in which this job is run
* @param account - account under which this job is run
* @param namespace - where this job is run
* @param propertyFile - file name to query from the job
* @return map of property file contents
*/
private Map<String, Object> getPropertyFileContentsForSucceededKubernetesJob(
Map job,
String appName,
String account,
String namespace,
String propertyFile
) {
Map<String, Object> properties = [:]
if (job.get("provider", "unknown") == "kubernetes") {
Optional<Map> succeededPod = job.get("pods", [])
.stream()
.filter({ Map pod -> pod.get("status", [:]).get("phase", "Running") == "Succeeded"
})
.findFirst()

if (succeededPod.isPresent()) {
String podName = (succeededPod.get() as Map).get("name")
retrySupport.retry({
properties = katoRestService.getFileContentsFromKubernetesPod(
appName,
account,
namespace,
podName,
propertyFile
)
},
configProperties.getFileContentRetry().maxAttempts,
Duration.ofMillis(configProperties.getFileContentRetry().getBackOffInMs()),
configProperties.getFileContentRetry().exponentialBackoffEnabled
)
}
}
return properties
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ public Map<String, Object> getFileContents(
return getService().getFileContents(app, account, region, id, fileName);
}

@Override
public Map<String, Object> getFileContentsFromKubernetesPod(
String app, String account, String namespace, String podName, String fileName) {
return getService()
.getFileContentsFromKubernetesPod(app, account, namespace, podName, fileName);
}

@Override
public Task lookupTask(String id) {
return getService().lookupTask(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ Map<String, Object> getFileContents(
@Path("id") String id,
@Path("fileName") String fileName);

@GET("/applications/{app}/kubernetes/pods/{account}/{namespace}/{podName}/{fileName}")
Map<String, Object> getFileContentsFromKubernetesPod(
@Path("app") String app,
@Path("account") String account,
@Path("namespace") String namespace,
@Path("podName") String podName,
@Path("fileName") String fileName);

/**
* This should _only_ be called if there is a problem retrieving the Task from
* CloudDriverTaskStatusService (ie. a clouddriver replica).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,22 @@ public static class WaitOnJobCompletionTaskConfig {
* Default or empty set means that no keys will be excluded.
*/
private Set<String> excludeKeysFromOutputs = Set.of();

private Retries jobStatusRetry = new Retries();

private Retries fileContentRetry = new Retries();

@Data
public static class Retries {
// total number of attempts
int maxAttempts = 6;

// time in ms to wait before subsequent retry attempts
long backOffInMs = 5000;

// flag to enable exponential backoff
boolean exponentialBackoffEnabled = false;
}
}

@Data
Expand Down
Loading

0 comments on commit 9428996

Please sign in to comment.