-
Notifications
You must be signed in to change notification settings - Fork 248
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[query] Improve error handling and job tracking in ServiceBackend #14751
base: qob-fast-cancel
Are you sure you want to change the base?
Conversation
32dfbf9
to
7cb9d44
Compare
Warning This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
This stack of pull requests is managed by Graphite. Learn more about stacking. |
hail/src/main/scala/is/hail/backend/service/ServiceBackend.scala
Outdated
Show resolved
Hide resolved
hail/src/main/scala/is/hail/backend/service/ServiceBackend.scala
Outdated
Show resolved
Hide resolved
7cb9d44
to
5e8dd07
Compare
5e8dd07
to
429847a
Compare
54aae49
to
aa8a2e8
Compare
429847a
to
03a6c93
Compare
new CancellationException( | ||
s"Job group ${jobGroup.job_group_id} for batch ${batchConfig.batchId} was cancelled" | ||
) | ||
} | ||
|
||
r = (Some(error), results) | ||
} | ||
def streamSuccessfulJobResults: Stream[(Array[Byte], Int)] = | ||
for { | ||
successes <- batchClient.getJobGroupJobs( | ||
jobGroup.batch_id, | ||
jobGroup.job_group_id, | ||
Some(JobStates.Success), | ||
) | ||
job <- successes | ||
partIdx = job.job_id - startJobId | ||
} yield (readPartitionResult(root, partIdx), partIdx) | ||
|
||
val r @ (_, results) = | ||
jobGroup.state match { | ||
case Success => | ||
runAllKeepFirstError(executor) { | ||
(partIdxs, parts.indices).zipped.map { (partIdx, jobIndex) => | ||
(() => readPartitionResult(root, jobIndex), partIdx) | ||
} | ||
} | ||
case Failure => | ||
val failedEntries = batchClient.getJobGroupJobs( | ||
jobGroup.batch_id, | ||
jobGroup.job_group_id, | ||
Some(JobStates.Failed), | ||
) | ||
assert( | ||
failedEntries.nonEmpty, | ||
s"Job group ${jobGroup.job_group_id} failed, but no failed jobs found.", | ||
) | ||
val error = readPartitionError(root, failedEntries.head.head.job_id - startJobId) | ||
|
||
(Some(error), streamSuccessfulJobResults.toIndexedSeq) | ||
case Cancelled => | ||
val error = | ||
new CancellationException(s"Job Group ${jobGroup.job_group_id} was cancelled.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert this - there was more context in the last message
Change Description
Security Assessment
Delete all except the correct answer:
Impact Description
Mainly just code for testing, nothing security related
(Reviewers: please confirm the security impact before approving)