Skip to content

Commit

Permalink
feat(Backend + SDK): Update kfp backend and kubernetes sdk to support…
Browse files Browse the repository at this point in the history
… EmptyDir

Update kfp backend and kubernetes sdk to support mounting EmptyDir
volumes to task pods.

Inspired by #10427

Fixes: #10656

Signed-off-by: Greg Sheremeta <[email protected]>
  • Loading branch information
gregsheremeta committed Jul 25, 2024
1 parent d911c8b commit 1063e9c
Show file tree
Hide file tree
Showing 9 changed files with 484 additions and 6 deletions.
28 changes: 28 additions & 0 deletions backend/src/v2/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/structpb"
k8score "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
k8sres "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -665,6 +666,33 @@ func extendPodSpecPatch(
podSpec.Volumes = append(podSpec.Volumes, ephemeralVolume)
podSpec.Containers[0].VolumeMounts = append(podSpec.Containers[0].VolumeMounts, ephemeralVolumeMount)
}

// EmptyDirMounts
for _, emptyDirVolumeSpec := range kubernetesExecutorConfig.GetEmptyDirMounts() {
var sizeLimitResource *resource.Quantity
if emptyDirVolumeSpec.GetSizeLimit() != "" {
r := k8sres.MustParse(emptyDirVolumeSpec.GetSizeLimit())
sizeLimitResource = &r
}

emptyDirVolume := k8score.Volume{
Name: emptyDirVolumeSpec.GetVolumeName(),
VolumeSource: k8score.VolumeSource{
EmptyDir: &k8score.EmptyDirVolumeSource{
Medium: k8score.StorageMedium(emptyDirVolumeSpec.GetMedium()),
SizeLimit: sizeLimitResource,
},
},
}
emptyDirVolumeMount := k8score.VolumeMount{
Name: emptyDirVolumeSpec.GetVolumeName(),
MountPath: emptyDirVolumeSpec.GetMountPath(),
}

podSpec.Volumes = append(podSpec.Volumes, emptyDirVolume)
podSpec.Containers[0].VolumeMounts = append(podSpec.Containers[0].VolumeMounts, emptyDirVolumeMount)
}

return nil
}

Expand Down
166 changes: 163 additions & 3 deletions backend/src/v2/driver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ package driver

import (
"encoding/json"
"testing"

k8sres "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"testing"

"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
"github.com/kubeflow/pipelines/backend/src/v2/metadata"
Expand Down Expand Up @@ -532,7 +533,7 @@ func Test_extendPodSpecPatch_Secret(t *testing.T) {
{
Name: "secret1",
VolumeSource: k8score.VolumeSource{
Secret: &k8score.SecretVolumeSource{SecretName: "secret1", Optional: &[]bool{false}[0],},
Secret: &k8score.SecretVolumeSource{SecretName: "secret1", Optional: &[]bool{false}[0]},
},
},
},
Expand Down Expand Up @@ -730,7 +731,7 @@ func Test_extendPodSpecPatch_ConfigMap(t *testing.T) {
VolumeSource: k8score.VolumeSource{
ConfigMap: &k8score.ConfigMapVolumeSource{
LocalObjectReference: k8score.LocalObjectReference{Name: "cm1"},
Optional: &[]bool{false}[0],},
Optional: &[]bool{false}[0]},
},
},
},
Expand Down Expand Up @@ -890,6 +891,165 @@ func Test_extendPodSpecPatch_ConfigMap(t *testing.T) {
}
}

func Test_extendPodSpecPatch_EmptyVolumeMount(t *testing.T) {
medium := "Memory"
sizeLimit := "1Gi"
var sizeLimitResource *k8sres.Quantity
r := k8sres.MustParse(sizeLimit)
sizeLimitResource = &r

tests := []struct {
name string
k8sExecCfg *kubernetesplatform.KubernetesExecutorConfig
podSpec *k8score.PodSpec
expected *k8score.PodSpec
}{
{
"Valid - emptydir mount with no medium or size limit",
&kubernetesplatform.KubernetesExecutorConfig{
EmptyDirMounts: []*kubernetesplatform.EmptyDirMount{
{
VolumeName: "emptydir1",
MountPath: "/data/path",
},
},
},
&k8score.PodSpec{
Containers: []k8score.Container{
{
Name: "main",
},
},
},
&k8score.PodSpec{
Containers: []k8score.Container{
{
Name: "main",
VolumeMounts: []k8score.VolumeMount{
{
Name: "emptydir1",
MountPath: "/data/path",
},
},
},
},
Volumes: []k8score.Volume{
{
Name: "emptydir1",
VolumeSource: k8score.VolumeSource{
EmptyDir: &k8score.EmptyDirVolumeSource{},
},
},
},
},
},
{
"Valid - emptydir mount with medium and size limit",
&kubernetesplatform.KubernetesExecutorConfig{
EmptyDirMounts: []*kubernetesplatform.EmptyDirMount{
{
VolumeName: "emptydir1",
MountPath: "/data/path",
Medium: &medium,
SizeLimit: &sizeLimit,
},
},
},
&k8score.PodSpec{
Containers: []k8score.Container{
{
Name: "main",
},
},
},
&k8score.PodSpec{
Containers: []k8score.Container{
{
Name: "main",
VolumeMounts: []k8score.VolumeMount{
{
Name: "emptydir1",
MountPath: "/data/path",
},
},
},
},
Volumes: []k8score.Volume{
{
Name: "emptydir1",
VolumeSource: k8score.VolumeSource{
EmptyDir: &k8score.EmptyDirVolumeSource{
Medium: k8score.StorageMedium(medium),
SizeLimit: sizeLimitResource,
},
},
},
},
},
},
{
"Valid - multiple emptydir mounts",
&kubernetesplatform.KubernetesExecutorConfig{
EmptyDirMounts: []*kubernetesplatform.EmptyDirMount{
{
VolumeName: "emptydir1",
MountPath: "/data/path",
},
{
VolumeName: "emptydir2",
MountPath: "/data/path2",
},
},
},
&k8score.PodSpec{
Containers: []k8score.Container{
{
Name: "main",
},
},
},
&k8score.PodSpec{
Containers: []k8score.Container{
{
Name: "main",
VolumeMounts: []k8score.VolumeMount{
{
Name: "emptydir1",
MountPath: "/data/path",
},
{
Name: "emptydir2",
MountPath: "/data/path2",
},
},
},
},
Volumes: []k8score.Volume{
{
Name: "emptydir1",
VolumeSource: k8score.VolumeSource{
EmptyDir: &k8score.EmptyDirVolumeSource{},
},
},
{
Name: "emptydir2",
VolumeSource: k8score.VolumeSource{
EmptyDir: &k8score.EmptyDirVolumeSource{},
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := extendPodSpecPatch(tt.podSpec, tt.k8sExecCfg, nil, nil)
assert.Nil(t, err)
assert.Equal(t, tt.expected, tt.podSpec)
})
}
}

func Test_extendPodSpecPatch_ImagePullSecrets(t *testing.T) {
tests := []struct {
name string
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ require (
github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-exithandler v0.0.0-20231127195001-a75d4b3711ff
github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-kfptask v0.0.0-20231127195001-a75d4b3711ff
github.com/kubeflow/pipelines/api v0.0.0-20231027040853-58ce09e07d03
github.com/kubeflow/pipelines/kubernetes_platform v0.0.0-20240403164522-8b2a099e8c9f
github.com/kubeflow/pipelines/kubernetes_platform v0.0.0-20240725205754-d911c8b73b49
github.com/kubeflow/pipelines/third_party/ml-metadata v0.0.0-20230810215105-e1f0c010f800
github.com/lestrrat-go/strftime v1.0.4
github.com/mattn/go-sqlite3 v1.14.19
Expand Down
4 changes: 2 additions & 2 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions kubernetes_platform/python/kfp/kubernetes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
'add_toleration',
'CreatePVC',
'DeletePVC',
'empty_dir_mount',
'mount_pvc',
'set_image_pull_policy',
'use_field_path_as_env',
Expand Down Expand Up @@ -49,3 +50,4 @@
from kfp.kubernetes.volume import CreatePVC
from kfp.kubernetes.volume import DeletePVC
from kfp.kubernetes.volume import mount_pvc
from kfp.kubernetes.empty_dir import empty_dir_mount
56 changes: 56 additions & 0 deletions kubernetes_platform/python/kfp/kubernetes/empty_dir.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Copyright 2024 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Optional

from google.protobuf import json_format
from kfp.dsl import PipelineTask
from kfp.kubernetes import common
from kfp.kubernetes import kubernetes_executor_config_pb2 as pb


def empty_dir_mount(
task: PipelineTask,
volume_name: str,
mount_path: str,
medium: Optional[str] = None,
size_limit: Optional[str] = None,
) -> PipelineTask:
"""Mount an EmptyDir volume to the task's container.
Args:
task: Pipeline task.
volume_name: Name of the EmptyDir volume.
mount_path: Path within the container at which the EmptyDir should be mounted.
medium: Storage medium to back the EmptyDir. Must be one of `Memory` or `HugePages`. Defaults to `None`.
size_limit: Maximum size of the EmptyDir. For example, `5Gi`. Defaults to `None`.
Returns:
Task object with updated EmptyDir mount configuration.
"""

msg = common.get_existing_kubernetes_config_as_message(task)

empty_dir_mount = pb.EmptyDirMount(
volume_name=volume_name,
mount_path=mount_path,
medium=medium,
size_limit=size_limit,
)

msg.empty_dir_mounts.append(empty_dir_mount)

task.platform_config['kubernetes'] = json_format.MessageToDict(msg)

return task
36 changes: 36 additions & 0 deletions kubernetes_platform/python/test/snapshot/data/empty_dir_mounts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright 2024 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from kfp import dsl
from kfp import kubernetes


@dsl.component
def comp():
pass

@dsl.pipeline
def my_pipeline():
task = comp()
kubernetes.empty_dir_mount(
task,
volume_name='emptydir-vol-1',
mount_path='/mnt/my_vol_1',
medium='Memory',
size_limit='1Gi'
)

if __name__ == '__main__':
from kfp import compiler
compiler.Compiler().compile(my_pipeline, __file__.replace('.py', '.yaml'))
Loading

0 comments on commit 1063e9c

Please sign in to comment.