Skip to content

Commit

Permalink
Added method to enable_shared_memory in kubernetes_platform sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
hsteude committed Apr 17, 2024
1 parent aab7510 commit 95fb2ae
Show file tree
Hide file tree
Showing 5 changed files with 249 additions and 0 deletions.
2 changes: 2 additions & 0 deletions kubernetes_platform/python/kfp/kubernetes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
'use_config_map_as_volume',
'use_secret_as_env',
'use_secret_as_volume',
'enable_shared_memory',
]

from kfp.kubernetes.config_map import use_config_map_as_env
Expand All @@ -49,3 +50,4 @@
from kfp.kubernetes.volume import CreatePVC
from kfp.kubernetes.volume import DeletePVC
from kfp.kubernetes.volume import mount_pvc
from kfp.kubernetes.shared_memory import enable_shared_memory
49 changes: 49 additions & 0 deletions kubernetes_platform/python/kfp/kubernetes/shared_memory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright 2023 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from google.protobuf import json_format
from kfp.dsl import PipelineTask
from kfp.kubernetes import common
from kfp.kubernetes import kubernetes_executor_config_pb2 as pb


def enable_shared_memory(
task: PipelineTask, volume_name: str = "shm", size: str = ""
) -> PipelineTask:
"""Add shared memory configuration to the task's Kubernetes configuration.
This function adds a shared memory volume with optional name and size
parameters to the task's Kubernetes Executor config. Size should be
specified in standard Kubernetes format (e.g., '1Gi', '500Mi').
Args:
task: Pipeline task.
volume_name: Name of the shared memory volume, defaults to 'shm'.
size: Size of the shared memory, defaults to an empty string, ''.
Returns:
Task object with updated shared memory configuration.
"""

# get existing k8s config
msg = common.get_existing_kubernetes_config_as_message(task)

# set new values
msg.enabled_shared_memory.size = size if size is not None else ""
msg.enabled_shared_memory.volume_name = volume_name

# update task specific k8s config
task.platform_config["kubernetes"] = json_format.MessageToDict(msg)

return task
33 changes: 33 additions & 0 deletions kubernetes_platform/python/test/snapshot/data/shared_memory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Copyright 2023 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from kfp import dsl
from kfp import kubernetes


@dsl.component
def comp():
pass


@dsl.pipeline
def my_pipeline():
task = comp()
kubernetes.enable_shared_memory(task, volume_name="Random-Name", size="2Gi")


if __name__ == "__main__":
from kfp import compiler

compiler.Compiler().compile(my_pipeline, __file__.replace(".py", ".yaml"))
54 changes: 54 additions & 0 deletions kubernetes_platform/python/test/snapshot/data/shared_memory.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# PIPELINE DEFINITION
# Name: my-pipeline
components:
comp-comp:
executorLabel: exec-comp
deploymentSpec:
executors:
exec-comp:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- comp
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef comp():\n pass\n\n"
image: python:3.7
pipelineInfo:
name: my-pipeline
root:
dag:
tasks:
comp:
cachingOptions:
enableCache: true
componentRef:
name: comp-comp
taskInfo:
name: comp
schemaVersion: 2.1.0
sdkVersion: kfp-2.7.0
---
platforms:
kubernetes:
deploymentSpec:
executors:
exec-comp:
enabledSharedMemory:
size: 2Gi
volumeName: Random-Name
111 changes: 111 additions & 0 deletions kubernetes_platform/python/test/unit/test_shared_memory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Copyright 2024 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from google.protobuf import json_format
from kfp import dsl
from kfp import kubernetes


class TestEnableSharedMemory:

def test_default_options(self):
@dsl.pipeline
def my_pipeline():
task = comp()
kubernetes.enable_shared_memory(task)

assert json_format.MessageToDict(my_pipeline.platform_spec) == {
"platforms": {
"kubernetes": {
"deploymentSpec": {
"executors": {
"exec-comp": {"enabledSharedMemory": {"volumeName": "shm"}}
}
}
}
}
}

def test_custom_volume_name(self):
@dsl.pipeline
def my_pipeline():
task = comp()
kubernetes.enable_shared_memory(task, volume_name="Random-Name")

assert json_format.MessageToDict(my_pipeline.platform_spec) == {
"platforms": {
"kubernetes": {
"deploymentSpec": {
"executors": {
"exec-comp": {
"enabledSharedMemory": {"volumeName": "Random-Name"}
}
}
}
}
}
}

def test_custom_volume_size(self):
@dsl.pipeline
def my_pipeline():
task = comp()
kubernetes.enable_shared_memory(task, size="100Mi")

assert json_format.MessageToDict(my_pipeline.platform_spec) == {
"platforms": {
"kubernetes": {
"deploymentSpec": {
"executors": {
"exec-comp": {
"enabledSharedMemory": {
"volumeName": "shm",
"size": "100Mi",
}
}
}
}
}
}
}

def test_custom_volume_name_and_size(self):
@dsl.pipeline
def my_pipeline():
task = comp()
kubernetes.enable_shared_memory(
task, volume_name="Random-Name", size="100Mi"
)

assert json_format.MessageToDict(my_pipeline.platform_spec) == {
"platforms": {
"kubernetes": {
"deploymentSpec": {
"executors": {
"exec-comp": {
"enabledSharedMemory": {
"volumeName": "Random-Name",
"size": "100Mi",
}
}
}
}
}
}
}


@dsl.component
def comp():
pass

0 comments on commit 95fb2ae

Please sign in to comment.