-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink tuning #14
Comments
This is exciting! |
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
Recommended Flink configurationCurrently, this will configure Flink to handle task manager failures, but we will need to make additional changes to the deployment in order to handle job manager failures and avoid the ImagePullBackOffs failure mode that can occur as the number of nodes approaches the cluster's max limit. For now, using larger instances with more slots will help keep the cluster small and avoid ImagePullBackOffs. Issues
Configc.FlinkOperatorBakery.flink_configuration= {
# https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/
"execution.runtime-mode": "BATCH",
# recommend setting slots equal to the number of cores; t3.large=3
"taskmanager.numberOfTaskSlots": "2",
# configure according to your instance; we assume >= t3.large
# https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup_tm/
"taskmanager.memory.flink.size": "1536m",
"taskmanager.memory.task.off-heap.size": "256m",
"taskmanager.memory.jvm-overhead.max": "1024m",
# BROKEN job restart (HA)
# https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/
# broken because the cluster can't write to s3; need to create another store
"high-availability.type": "kubernetes",
"high-availability.storageDir": "s3://usgs-pforge-us-west-2-flink-cache/recovery", # created manually
# task restart
# https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/execution/task_failure_recovery/
"restart-strategy" : "fixed-delay",
"restart-strategy.fixed-delay.attempts": "3",
"restart-strategy.fixed-delay.delay": "10 s",
} TestsNext step will be to test job manager failures and HA mode. Test 1: Kill jobmanager at startup Test 2: Kill jobmanager during job Test 3 (edge case): Kill jobmanager just as tasks complete. Test 4 (edge case): Simultaneously kill job and task manager during job. If at least 0,1, and 2 pass, I believe we're safe to run on SPOT. Props
Test LogsTest 2 log:
|
Nice of you to look into all this stuff (especially the HA things) @thodson-usgs 🥳 Thanks for doing this. Thinking through some edge cases here. I often think of HA as being most useful in streaming workflows b/c if something bad goes wrong the job can recover and magically pick up from where the last checkpoint was. But in batch (let's assume you're using SPOT instances for this) I'm not sure how you'd differentiate between a job that failed b/c there's a valid bug and a job that failed b/c the SPOT instance was terminated. In the former scenario wouldn't the job just infinitely restart and fail?
I haven't run into this yet b/c the rate limit is decent. Is your |
Good point! I won't know until I test this. I think a valid bug would cause a task failure, which is handled by a separate strategy, like retry 3 times. Hopefully, the job won't restart when the task restart stategy fails. (In my testing, a failing task doesn't cause the job manager to fail, so task failures shouldn't cause the job to rerun 🤞 ) As for ImagePullBackOffs, I would typically get several setting |
yummy, like that
yeah, let's look at this b/c we can probably tune it to work differently but I agree that certain patterns will violate it eventually |
ah, might you be refering to the adaptive scheduler
Looks like |
Interesting, I've never seen that ☝️. I'm talking about how the I don't think any of the Flink images injected into the manifests use Assuming we have a decent sized node, all the above means is during scheduling of many pods per node only the first pod would need to request the image and all the other worker pods would be returned the cached image from the container runtime (per node). This of course assumes they are using the same image tag too. In your case, when 40 workers get scheduled that means we should only really be hitting Docker Hub once per node for as many nodes as your workers get scheduled on. So that would be a good thing to concretize and get numbers on. Then there is is the upper limit to node autoscaling to think about. It's probably something reasonable like 10. So it's an interesting problem that you seem to be running into |
I think we need to tune Flink before this forge can really cook. Opening this issue to start the discussion.
Here are some initial ideas:
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
. By default, I don't think Flink will handle any failures. Once we configure this, I think execution will be much more reliable.flink-config.yaml
that's read in likeyamlencode(file(fink-config.yaml))
. Some of this config may be tuned per runner, which is a reason for separating it from the static Terraform.config.py
. But before we can enable job manager restarts (called High Availability in Flink), we'll need to add a shared filesystem where Flink can store job metadata.The text was updated successfully, but these errors were encountered: