Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Backend + SDK): Update kfp backend and kubernetes sdk to support tolerations #10471

Merged
merged 4 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions backend/src/v2/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,28 @@ func extendPodSpecPatch(
podSpec.NodeSelector = kubernetesExecutorConfig.GetNodeSelector().GetLabels()
}

if tolerations := kubernetesExecutorConfig.GetTolerations(); tolerations != nil {
var k8sTolerations []k8score.Toleration

glog.Infof("Tolerations passed: %+v", tolerations)

for _, toleration := range tolerations {
if toleration != nil {
k8sToleration := k8score.Toleration{
droctothorpe marked this conversation as resolved.
Show resolved Hide resolved
Key: toleration.Key,
Operator: k8score.TolerationOperator(toleration.Operator),
Value: toleration.Value,
Effect: k8score.TaintEffect(toleration.Effect),
droctothorpe marked this conversation as resolved.
Show resolved Hide resolved
TolerationSeconds: toleration.TolerationSeconds,
}

k8sTolerations = append(k8sTolerations, k8sToleration)
}
}

podSpec.Tolerations = k8sTolerations
}

// Get secret mount information
for _, secretAsVolume := range kubernetesExecutorConfig.GetSecretAsVolume() {
secretVolume := k8score.Volume{
Expand Down
84 changes: 84 additions & 0 deletions backend/src/v2/driver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,3 +671,87 @@ func Test_extendPodSpecPatch_ImagePullSecrets(t *testing.T) {
})
}
}

func Test_extendPodSpecPatch_Tolerations(t *testing.T) {
droctothorpe marked this conversation as resolved.
Show resolved Hide resolved
tests := []struct {
name string
k8sExecCfg *kubernetesplatform.KubernetesExecutorConfig
expected *k8score.PodSpec
}{
{
"Valid - toleration",
&kubernetesplatform.KubernetesExecutorConfig{
Tolerations: []*kubernetesplatform.Toleration{
{
Key: "key1",
Operator: "Equal",
Value: "value1",
Effect: "NoSchedule",
},
},
},
&k8score.PodSpec{
Containers: []k8score.Container{
{
Name: "main",
},
},
Tolerations: []k8score.Toleration{
{
Key: "key1",
Operator: "Equal",
Value: "value1",
Effect: "NoSchedule",
TolerationSeconds: nil,
},
},
},
},
{
"Valid - no tolerations",
&kubernetesplatform.KubernetesExecutorConfig{},
&k8score.PodSpec{
Containers: []k8score.Container{
{
Name: "main",
},
},
},
},
{
"Valid - only pass operator",
&kubernetesplatform.KubernetesExecutorConfig{
Tolerations: []*kubernetesplatform.Toleration{
{
Operator: "Contains",
},
},
},
&k8score.PodSpec{
Containers: []k8score.Container{
{
Name: "main",
},
},
Tolerations: []k8score.Toleration{
{
Operator: "Contains",
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := &k8score.PodSpec{Containers: []k8score.Container{
{
Name: "main",
},
}}
err := extendPodSpecPatch(got, tt.k8sExecCfg, nil, nil)
assert.Nil(t, err)
assert.NotNil(t, got)
assert.Equal(t, tt.expected, got)
})
}
}
2 changes: 1 addition & 1 deletion backend/third_party_licenses/apiserver.csv
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ github.com/klauspost/cpuid,https://github.com/klauspost/cpuid/blob/v1.3.1/LICENS
github.com/klauspost/pgzip,https://github.com/klauspost/pgzip/blob/v1.2.5/LICENSE,MIT
github.com/kubeflow/pipelines/api/v2alpha1/go,https://github.com/kubeflow/pipelines/blob/758c91f76784/api/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/backend,https://github.com/kubeflow/pipelines/blob/HEAD/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform,https://github.com/kubeflow/pipelines/blob/f51dc39614e4/kubernetes_platform/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform,https://github.com/kubeflow/pipelines/blob/e129b0501379/kubernetes_platform/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/third_party/ml-metadata/go/ml_metadata,https://github.com/kubeflow/pipelines/blob/e1f0c010f800/third_party/ml-metadata/LICENSE,Apache-2.0
github.com/lann/builder,https://github.com/lann/builder/blob/47ae307949d0/LICENSE,MIT
github.com/lann/ps,https://github.com/lann/ps/blob/62de8c46ede0/LICENSE,MIT
Expand Down
2 changes: 1 addition & 1 deletion backend/third_party_licenses/driver.csv
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ github.com/josharian/intern,https://github.com/josharian/intern/blob/v1.0.0/lice
github.com/json-iterator/go,https://github.com/json-iterator/go/blob/v1.1.12/LICENSE,MIT
github.com/kubeflow/pipelines/api/v2alpha1/go,https://github.com/kubeflow/pipelines/blob/758c91f76784/api/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/backend,https://github.com/kubeflow/pipelines/blob/HEAD/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform,https://github.com/kubeflow/pipelines/blob/f51dc39614e4/kubernetes_platform/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform,https://github.com/kubeflow/pipelines/blob/e129b0501379/kubernetes_platform/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/third_party/ml-metadata/go/ml_metadata,https://github.com/kubeflow/pipelines/blob/e1f0c010f800/third_party/ml-metadata/LICENSE,Apache-2.0
github.com/mailru/easyjson,https://github.com/mailru/easyjson/blob/v0.7.7/LICENSE,MIT
github.com/modern-go/concurrent,https://github.com/modern-go/concurrent/blob/bacd9c7ef1dd/LICENSE,Apache-2.0
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ require (
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.4 // indirect
github.com/kubeflow/pipelines/api v0.0.0-20230331215358-758c91f76784
github.com/kubeflow/pipelines/kubernetes_platform v0.0.0-20240207171236-f51dc39614e4
github.com/kubeflow/pipelines/kubernetes_platform v0.0.0-20240216222951-e129b0501379
github.com/kubeflow/pipelines/third_party/ml-metadata v0.0.0-20230810215105-e1f0c010f800
github.com/lestrrat-go/strftime v1.0.4
github.com/mattn/go-sqlite3 v1.14.16
Expand Down
4 changes: 2 additions & 2 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 9 additions & 7 deletions kubernetes_platform/python/kfp/kubernetes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,25 @@
__version__ = '1.1.0'

__all__ = [
'add_node_selector',
'add_pod_annotation',
'add_pod_label',
'add_toleration',
'CreatePVC',
'DeletePVC',
'mount_pvc',
'set_image_pull_secrets',
'use_secret_as_env',
'use_secret_as_volume',
'add_node_selector',
'add_pod_label',
'add_pod_annotation',
'set_image_pull_secrets'
]

from kfp.kubernetes.pod_metadata import add_pod_label
from kfp.kubernetes.pod_metadata import add_pod_annotation
from kfp.kubernetes.image import set_image_pull_secrets
from kfp.kubernetes.node_selector import add_node_selector
from kfp.kubernetes.pod_metadata import add_pod_annotation
from kfp.kubernetes.pod_metadata import add_pod_label
from kfp.kubernetes.secret import use_secret_as_env
from kfp.kubernetes.secret import use_secret_as_volume
from kfp.kubernetes.toleration import add_toleration
from kfp.kubernetes.volume import CreatePVC
from kfp.kubernetes.volume import DeletePVC
from kfp.kubernetes.volume import mount_pvc
from kfp.kubernetes.image import set_image_pull_secrets
81 changes: 81 additions & 0 deletions kubernetes_platform/python/kfp/kubernetes/toleration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Copyright 2024 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Optional

from google.protobuf import json_format
from kfp.dsl import PipelineTask
from kfp.kubernetes import common
from kfp.kubernetes import kubernetes_executor_config_pb2 as pb

try:
from typing import Literal
except ImportError:
from typing_extensions import Literal


def add_toleration(
task: PipelineTask,
key: Optional[str] = None,
operator: Optional[Literal["Equal", "Exists"]] = None,
value: Optional[str] = None,
effect: Optional[Literal["NoExecute", "NoSchedule", "PreferNoSchedule"]] = None,
toleration_seconds: Optional[int] = None,
):
"""Add a `toleration<https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/>`_. to a task.

Args:
task:
Pipeline task.
key:
key is the taint key that the toleration applies to. Empty means
match all taint keys. If the key is empty, operator must be Exists;
this combination means to match all values and all keys.
operator:
operator represents a key's relationship to the value. Valid
operators are Exists and Equal. Defaults to Equal. Exists is
equivalent to wildcard for value, so that a pod can tolerate all
taints of a particular category.
value:
value is the taint value the toleration matches to. If the operator
is Exists, the value should be empty, otherwise just a regular
string.
effect:
effect indicates the taint effect to match. Empty means match all
taint effects. When specified, allowed values are NoSchedule,
PreferNoSchedule and NoExecute.
toleration_seconds:
toleration_seconds represents the period of time the toleration
(which must be of effect NoExecute, otherwise this field is ignored)
tolerates the taint. By default, it is not set, which means tolerate
the taint forever (do not evict). Zero and negative values will be
treated as 0 (evict immediately) by the system.

Returns:
Task object with added toleration.
"""

msg = common.get_existing_kubernetes_config_as_message(task)
msg.tolerations.append(
pb.Toleration(
key=key,
operator=operator,
value=value,
effect=effect,
toleration_seconds=toleration_seconds,
)
)
task.platform_config["kubernetes"] = json_format.MessageToDict(msg)

return task
41 changes: 41 additions & 0 deletions kubernetes_platform/python/test/snapshot/data/toleration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright 2024 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from kfp import dsl
from kfp import kubernetes
from kubernetes.client import V1Toleration


@dsl.component
def comp():
pass


@dsl.pipeline
def my_pipeline():
droctothorpe marked this conversation as resolved.
Show resolved Hide resolved
task = comp()
kubernetes.add_toleration(
task,
key="key1",
operator="Equal",
value="value1",
effect="NoExecute",
toleration_seconds=10,
)


if __name__ == "__main__":
from kfp import compiler

compiler.Compiler().compile(my_pipeline, __file__.replace(".py", ".yaml"))
61 changes: 61 additions & 0 deletions kubernetes_platform/python/test/snapshot/data/toleration.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# PIPELINE DEFINITION
# Name: my-pipeline
components:
comp-comp:
executorLabel: exec-comp
deploymentSpec:
executors:
exec-comp:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- comp
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.6.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)


printf "%s" "$0" > "$program_path/ephemeral_component.py"

_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"

'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef comp():\n pass\n\n"
image: python:3.7
pipelineInfo:
name: my-pipeline
root:
dag:
tasks:
comp:
cachingOptions:
enableCache: true
componentRef:
name: comp-comp
taskInfo:
name: comp
schemaVersion: 2.1.0
sdkVersion: kfp-2.6.0
---
platforms:
kubernetes:
deploymentSpec:
executors:
exec-comp:
tolerations:
- effect: NoExecute
key: key1
operator: Equal
tolerationSeconds: '10'
value: value1
Loading