Skip to content

Commit

Permalink
Apply reviews v2
Browse files Browse the repository at this point in the history
Signed-off-by: davidmirror-ops <[email protected]>
  • Loading branch information
davidmirror-ops committed May 15, 2024
1 parent 9ae180f commit 234c98f
Showing 1 changed file with 24 additions and 24 deletions.
48 changes: 24 additions & 24 deletions docs/deployment/configuration/performance.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ It's implemented as a ``goroutine``, and illustrated here as a hard-working goph

1. Pulls from the ``WorkQueue`` and loads what it needs to do the job: the workflow specification (desired state) and the previously recorded execution status.
2. Observes the actual state by querying the Kubernetes API (or the Informer cache).
3. Calculates the difference between desired and observed state, and triggers an effect to reconcile both states (eg. Launch/kill a Pod, handle failures, schedule a node execution, etc), interacting with the Propeller executors to process Inputs, Outputs and Offloaded data as indicated in the workflow spec.
3. Calculates the difference between desired and observed state, and triggers an effect to reconcile both states (eg. Launch/kill a Pod, handle failures, schedule a node execution, etc), interacting with the Propeller executors to process inputs, outputs and offloaded data as indicated in the workflow spec.
4. Keeps a local copy of the execution status, besides what the K8s API stores in ``etcd``.
5. Reports status to the control plane and, hence, to the user.

Expand All @@ -43,8 +43,8 @@ Optimizing ``round_latency`` is one of the main goals of the recommendations pro
Performance tuning at each stage
--------------------------------

1. Workers, the WorkQueue and the evaluation loop
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
1. Workers, the WorkQueue, and the evaluation loop
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. list-table:: Important Properties
:widths: 25 50 25 50 25
Expand All @@ -58,7 +58,7 @@ Performance tuning at each stage
* - ``workers``
- Number of processes that can work concurrently. Also implies number of workflows that can be executed in parallel. Since FlytePropeller uses ``goroutines``, it can accommodate significantly more processes than the number of physical cores.
- ``flyte:propeller:all:free_workers_count``
- A low number may result in higher overall latency for each workflow evaluation round. Larger the number, implies more workflows can be evaluated in parallel. But it should depend on number of CPU cores assigned to FlytePropeller and evaluated against the cost of context switching. A number around 500 - 800 with 4-8 CPU cores usually works fine.
- A low number may result in higher overall latency for each workflow evaluation loop, while a higher number implies that more workflows can be evaluated in parallel, reducing latency. The number of workers depends on the number of CPU cores assigned to the FlytePropeller pod, and should be evaluated against the cost of context switching. A number around 500 - 800 workers with 4-8 CPU cores is usually adequate.
- ``plugins.workqueue.config.workers`` Default value: ``10``.
* - Workqueue depth
- Current number of workflow IDs in the queue awaiting processing
Expand All @@ -69,9 +69,9 @@ Performance tuning at each stage
2. Query observed state
^^^^^^^^^^^^^^^^^^^^^^^

The Kube client config controls the request throughput from FlytePropeller to the Kube API server. These requests may include creating/monitoring Pods or creating/updating FlyteWorkflow CRDs to track workflow execution.
The Kube client config controls the request throughput from FlytePropeller to the Kube API server. These requests may include creating/monitoring pods or creating/updating FlyteWorkflow CRDs to track workflow execution.
The `default configuration provided by K8s <https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/client/config#GetConfigWithContext>`__ results in very conservative rate-limiting. FlytePropeller provides a default configuration that may offer better performance.
However, if your workload involves larger scales (e.g., >5k fanout dynamic or map tasks, >8k concurrent workflows, etc.,) the kube-client rate limiting config may still contribute to a noticeable drop in performance.
However, if your workload involves larger scales (e.g., >5k fanout dynamic or map tasks, >8k concurrent workflows, etc.,) the kube-client rate limiting config provided by FlytePropeller may still contribute to a noticeable drop in performance.
Increasing the ``qps`` and ``burst`` values may help alleviate back pressure and improve FlytePropeller performance. The following is an example kube-client config applied to Propeller:

.. code-block:: yaml
Expand All @@ -86,8 +86,8 @@ Increasing the ``qps`` and ``burst`` values may help alleviate back pressure and

In the previous example, the kube-apiserver will accept ``100`` queries per second, temporariliy admitting up to ``120`` before blocking any subsequent query. A query blocked for ``30s`` will timeout.

It is worth noting that the Kube API server tends to throttle requests transparently. This means that even increasing the allowed frequency of API requests (e.g., increasing FlytePropeller workers or relaxing Kube client config rate-limiting), there may be steep performance decreases for no apparent reason.
While it's possible to easily monitor Kube API saturation using system-level metrics like CPU, memory and network usage; it's recommended to look at kube-apiserver-specific metrics like ``workqueue_depth`` which can assist in identifying whether throttling is to blame. Unfortunately, there is no one-size-fits-all solution here, and customizing these parameters for your workload will require trial and error.
It is worth noting that the Kube API server tends to throttle requests transparently. This means that even after increasing the allowed frequency of API requests (e.g., increasing FlytePropeller workers or relaxing Kube client config rate-limiting), there may be steep performance decreases for no apparent reason.
While it's possible to easily monitor Kube API saturation using system-level metrics like CPU, memory, and network usage, we recommend looking at kube-apiserver-specific metrics like ``workqueue_depth`` which can assist in identifying whether throttling is to blame. Unfortunately, there is no one-size-fits-all solution here, and customizing these parameters for your workload will require trial and error.
`Learn more about Kubernetes metrics <https://kubernetes.io/docs/reference/instrumentation/metrics/>`__

3. Evaluate the DAG and reconcile state as needed
Expand All @@ -111,7 +111,7 @@ While it's possible to easily monitor Kube API saturation using system-level met
- ``propeller.downstream-eval-duration``. Default value: ``5s``.
* - ``max-streak-length``
- Maximum number of consecutive evaluation rounds that one propeller worker can use for one workflow.
- A large value can lead to faster completion times for workflows that benefit from continuous processing, especially cached or computationally intensive workflows, but at the cost of lower throughput and higher latency as workers would be spending most of their time on a few workflows. If set to ``1``, the worker adds the workflowID back to the WorkQueue immediately after a single evaluation loop is completed, and waits for another worker to pick it up before processing again, effectively prioritizing fast-changing or "hot" workflows.
- A large value can lead to faster completion times for workflows that benefit from continuous processing, especially cached or computationally intensive workflows, but at the cost of lower throughput and higher latency as workers will spend most of their time on a few workflows. If set to ``1``, the worker adds the workflowID back to the WorkQueue immediately after a single evaluation loop is completed, and waits for another worker to pick it up before processing again, effectively prioritizing fast-changing or "hot" workflows.
- ``propeller.max-streak-length``. Default value: ``8`` .
* - ``max-size_mbs``
- Max size of the write-through in-memory cache that FlytePropeller can use to store Inputs/Outputs metadata for faster read operations.
Expand Down Expand Up @@ -170,16 +170,16 @@ Sample output (excerpt):
...
resourceVersion: "1055227"
Every time a resource (e.g. a Pod, a flyteworkflow CR, etc.) is modified, this counter is incremented.
Every time a resource (e.g. a pod, a flyteworkflow CR, etc.) is modified, this counter is incremented.
As ``etcd`` is a distributed key-value store, it needs to manage writes from multiple clients (controllers in this case)
in a way that maintains consistency and performance.
That's why, in addition to using ``Revisions`` (implemented in Kubernetes as ``Resource Version``), ``etcd`` also prevents clients from writing if they're using
an outdated ``ResourceVersion``; something that could happen after a temporary client disconnection or whenever a status replication from the Kubernetes API to
the Informer cache hasn't completed yet. Poorly handled by a controller, this could result into kube-server and FlytePropeller worker overload by repeatedly attempting to perform outdated (or "stale") writes.
an outdated ``ResourceVersion``, which could happen after a temporary client disconnection or whenever a status replication from the Kubernetes API to
the Informer cache hasn't completed yet. Poorly handled by a controller, this could result in kube-server and FlytePropeller worker overload by repeatedly attempting to perform outdated (or "stale") writes.

FlytePropeller handles these situations by keeping a record of the last known ``ResourceVersion``. In the event that ``etcd`` denies a write operation due to an outdated version, FlytePropeller continues the Workflow
evaluation loop, waiting for the Informer cache to become consistent. This mechanism, enabled by default and known as ``ResourceVersionCache``, prevents from both overloading the K8s API and wasting ``workers`` resources on invalid operations.
It also mitigates the impact of cache propagation latency, which can be order of seconds.
FlytePropeller handles these situations by keeping a record of the last known ``ResourceVersion``. In the event that ``etcd`` denies a write operation due to an outdated version, FlytePropeller continues the workflow
evaluation loop, waiting for the Informer cache to become consistent. This mechanism, enabled by default and known as ``ResourceVersionCache``, avoids both overloading the K8s API and wasting ``workers`` resources on invalid operations.
It also mitigates the impact of cache propagation latency, which can be on the order of seconds.

If ``max-streak-length`` is enabled, instead of waiting for the Informer cache to become consistent during the evaluation loop, FlytePropeller runs multiple evaluation loops using its in-memory copy of the ``ResourceVersion`` and corresponding Resource state, as long
as there are mutations in any of the resources associated with that particular workflow. When the ``max-streak-length`` limit is reached, the evaluation loop is done and, if further evaluation is required, the cycle will start again by trying to get the most recent ``Resource Version`` as stored in ``etcd``.
Expand Down Expand Up @@ -207,11 +207,11 @@ Other supported options for ``workflowStore.policy`` are described below:
Concurrency vs parallelism
==========================

While FlytePropeller is designed to efficiently handle concurrency using the mechanisms described in this section; parallel executions -not only concurrent, but evaluated at the same time-, pose an additional challenge, especially with workflows that have an extremely large fan-out.
This is because FlytePropeller implements a greedy traversal algorithm, that tries to evaluate the entire unblocked nodes within a workflow in every round.
While FlytePropeller is designed to efficiently handle concurrency using the mechanisms described in this section, parallel executions (not only concurrent, but evaluated at the same time) pose an additional challenge, especially with workflows that have an extremely large fan-out.
This is because FlytePropeller implements a greedy traversal algorithm, that tries to evaluate all unblocked nodes within a workflow in every round.
A way to mitigate the potential performance impact is to limit the maximum number of nodes that can be evaluated simultaneously. This can be done by setting ``max-parallelism`` using any of the following methods:

a. Platform default: This allows to set platform-wide defaults for maximum parallelism within a Workflow execution evaluation loop. This can be overridden per Launch plan or per execution.
a. Platform default: This allows to set platform-wide defaults for maximum parallelism within a Workflow execution evaluation loop. This can be overridden per launch plan or per execution.
The default `maxParallelism is configured to be 25 <https://github.com/flyteorg/flyteadmin/blob/master/pkg/runtime/application_config_provider.go#L40>`_.
It can be overridden with this config block in flyteadmin

Expand All @@ -236,25 +236,25 @@ b. Default for a specific launch plan. For any launch plan, the ``max_parallelis

Scaling out FlyteAdmin
=======================
FlyteAdmin is a stateless service. Often time before needing to scale FlyteAdmin, you need to scale the backing database.
Check out the `FlyteAdmin Dashboard <https://github.com/flyteorg/flyte/blob/master/deployment/stats/prometheus/flyteadmin-dashboard.json>`__ to see signs of database or API latency degradation.
FlyteAdmin is a stateless service. Often, before needing to scale FlyteAdmin, you need to scale the backing database.
Check the `FlyteAdmin Dashboard <https://github.com/flyteorg/flyte/blob/master/deployment/stats/prometheus/flyteadmin-dashboard.json>`__ for signs of database or API latency degradation.
PostgreSQL scaling techniques like connection pooling can help alleviate pressure on the database instance.
If needed, change the number of replicas of the FlyteAdmin K8s deployment to allow higher throughput.

Scaling out Datacatalog
========================
Datacatalog is a stateless service and it connects to the same database as FlyteAdmin, so the recommendations to scale out the backing PostgreSQL database also apply here.
Datacatalog is a stateless service that connects to the same database as FlyteAdmin, so the recommendation to scale out the backing PostgreSQL database also applies here.

Scaling out FlytePropeller
===========================

Sharded scale-out
-------------------
FlytePropeller Manager is a new component introduced to facilitate horizontal scaling of FlytePropeller through sharding. Effectively, the Manager is responsible for maintaining liveness and proper configuration over a collection of FlytePropeller instances. This scheme uses K8s label selectors to deterministically assign FlyteWorkflow CRD responsibilities to FlytePropeller instances, effectively distributing processing load over the shards.
FlytePropeller Manager facilitates horizontal scaling of FlytePropeller through sharding. Effectively, the Manager is responsible for maintaining liveness and proper configuration over a collection of FlytePropeller instances. This scheme uses K8s label selectors to deterministically assign FlyteWorkflow CRD responsibilities to FlytePropeller instances, effectively distributing load processing over the shards.

Deployment of FlytePropeller Manager requires K8s configuration updates including a modified FlytePropeller Deployment and a new PodTemplate defining managed FlytePropeller instances. The easiest way to apply these updates is by setting the ``flytepropeller.manager`` value to ``true`` in the Helm values and setting the manager config at ``configmap.core.manager``.
Deployment of FlytePropeller Manager requires K8s configuration updates including a modified FlytePropeller deployment and a new PodTemplate defining managed FlytePropeller instances. The easiest way to apply these updates is to set the ``flytepropeller.manager`` value to ``true`` in the Helm values and set the manager config at ``configmap.core.manager``.

Flyte provides a variety of Shard Strategies to configure how FlyteWorkflows are sharded among managed FlytePropeller instances. These include ``hash``, which uses consistent hashing to load-balance evaluation over shards, and ``project`` / ``domain``, which map the respective IDs to specific managed FlytePropeller instances. Below we include examples of Helm configurations for each of the existing Shard Strategies.
Flyte provides a variety of shard strategies to configure how FlyteWorkflows are sharded among managed FlytePropeller instances. These include ``hash``, which uses consistent hashing to load balance evaluation over shards, and ``project`` / ``domain``, which map the respective IDs to specific managed FlytePropeller instances. Below we include examples of Helm configurations for each of the existing shard strategies.

The hash shard Strategy, denoted by ``type: Hash`` in the configuration below, uses consistent hashing to evenly distribute Flyte workflows over managed FlytePropeller instances. This configuration requires a ``shard-count`` variable, which defines the number of managed FlytePropeller instances. You may change the shard count without impacting existing workflows. Note that changing the ``shard-count`` is a manual step; it is not auto-scaling.

Expand Down

0 comments on commit 234c98f

Please sign in to comment.