Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backend): Add Parallelism Limit to ParallelFor tasks. Fixes #8718 #10798

Merged
merged 7 commits into from
Dec 3, 2024

Conversation

gmfrasca
Copy link
Member

@gmfrasca gmfrasca commented May 7, 2024

Description of your changes:
Fixes #8718

Adds the Parallelism item to a DAG template if specified by a task's IteratorPolicy (ie ParallelFor w/ a parallelism limit).

Checklist:

@google-oss-prow google-oss-prow bot requested review from chensun and rimolive May 7, 2024 19:56
@gmfrasca
Copy link
Member Author

gmfrasca commented May 7, 2024

looks like a CI infra issue?

/retest just to check

Copy link

@gmfrasca: The /retest command does not accept any targets.
The following commands are available to trigger required jobs:

  • /test kfp-kubernetes-test-python310
  • /test kfp-kubernetes-test-python311
  • /test kfp-kubernetes-test-python312
  • /test kfp-kubernetes-test-python38
  • /test kfp-kubernetes-test-python39
  • /test kubeflow-pipeline-backend-test
  • /test kubeflow-pipeline-frontend-test
  • /test kubeflow-pipeline-mkp-snapshot-test
  • /test kubeflow-pipeline-mkp-test
  • /test kubeflow-pipelines-backend-visualization
  • /test kubeflow-pipelines-component-yaml
  • /test kubeflow-pipelines-components-google-cloud-python38
  • /test kubeflow-pipelines-integration-v2
  • /test kubeflow-pipelines-manifests
  • /test kubeflow-pipelines-sdk-docformatter
  • /test kubeflow-pipelines-sdk-execution-tests
  • /test kubeflow-pipelines-sdk-isort
  • /test kubeflow-pipelines-sdk-python310
  • /test kubeflow-pipelines-sdk-python311
  • /test kubeflow-pipelines-sdk-python312
  • /test kubeflow-pipelines-sdk-python38
  • /test kubeflow-pipelines-sdk-python39
  • /test kubeflow-pipelines-sdk-yapf
  • /test test-kfp-runtime-code-python310
  • /test test-kfp-runtime-code-python311
  • /test test-kfp-runtime-code-python312
  • /test test-kfp-runtime-code-python38
  • /test test-kfp-runtime-code-python39
  • /test test-run-all-gcpc-modules
  • /test test-upgrade-kfp-sdk

The following commands are available to trigger optional jobs:

  • /test kfp-kubernetes-execution-tests
  • /test kubeflow-pipeline-e2e-test
  • /test kubeflow-pipeline-upgrade-test
  • /test kubeflow-pipeline-upgrade-test-v2
  • /test kubeflow-pipelines-samples-v2

Use /test all to run the following jobs that were automatically triggered:

  • kubeflow-pipeline-backend-test
  • kubeflow-pipeline-e2e-test
  • kubeflow-pipeline-upgrade-test
  • kubeflow-pipeline-upgrade-test-v2
  • kubeflow-pipelines-samples-v2

In response to this:

looks like a CI infra issue?

/retest just to check

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@gmfrasca
Copy link
Member Author

gmfrasca commented May 7, 2024

/retest

@HumairAK
Copy link
Collaborator

HumairAK commented May 9, 2024

tested with this pipeline:

parallelfor.py
from kfp import compiler
from kfp import dsl
from kfp.dsl import Input, InputPath, Output, OutputPath, Dataset, Model, component

@dsl.component(base_image="quay.io/opendatahub/ds-pipelines-ci-executor-image:v1.0")
def preprocess(
        message: Input[str],
        output_model: Output[Model]
):
    import random
    line = "some_model"
    print(f"Message: {message}")
    with open(output_model.path, 'w') as output_file:
        output_file.write('line: {}'.format(line))
    output_model.metadata['accuracy'] = random.uniform(0, 1)


@dsl.component(base_image="quay.io/opendatahub/ds-pipelines-ci-executor-image:v1.0")
def train(
        model: Input[Model],
        epoch: Input[int],
        trained_model: Output[Model],
):
    import random
    line = "some_model"
    print(f"Train for epoch: {epoch}")
    with open(trained_model.path, 'w') as output_file:
        output_file.write('line: {}'.format(line))
    trained_model.metadata['accuracy'] = random.uniform(0, 1)


@dsl.pipeline(pipeline_root='', name='tutorial-data-passing')
def data_passing_pipeline():
    preprocess_task = preprocess(message="dataset").set_caching_options(enable_caching=False)
    with dsl.ParallelFor(items=[1, 5, 10, 25], parallelism=2) as epochs:
        train(model=preprocess_task.outputs['output_model'], epoch=epochs).set_caching_options(enable_caching=False)


if __name__ == '__main__':
    compiler.Compiler().compile(data_passing_pipeline, __file__ + '.yaml')

Worked successfully:

~ $ kubectl -n ${kfp_ns} get workflow tutorial-data-passing-drsfz -o yaml | yq '.spec.templates[-2]'
dag:
  tasks:
    - arguments:
        parameters:
          - name: component
            value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-for-loop-2}}'
          - name: parent-dag-id
            value: '{{inputs.parameters.parent-dag-id}}'
          - name: task
            value: '{"componentRef":{"name":"comp-for-loop-2"},"dependentTasks":["preprocess"],"inputs":{"artifacts":{"pipelinechannel--preprocess-output_model":{"taskOutputArtifact":{"outputArtifactKey":"output_model","producerTask":"preprocess"}}}},"iteratorPolicy":{"parallelismLimit":2},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-1","items":{"raw":"[1, 5, 10, 25]"}},"taskInfo":{"name":"for-loop-2"}}'
      depends: preprocess.Succeeded
      name: for-loop-2-driver
      template: system-dag-driver
    - arguments:
        parameters:
          - name: parent-dag-id
            value: '{{tasks.for-loop-2-driver.outputs.parameters.execution-id}}'
          - name: iteration-index
            value: '{{item}}'
      depends: for-loop-2-driver.Succeeded
      name: for-loop-2-iterations
      template: comp-for-loop-2-for-loop-2
      withSequence:
        count: '{{tasks.for-loop-2-driver.outputs.parameters.iteration-count}}'
    - arguments:
        parameters:
          - name: component
            value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-preprocess}}'
          - name: task
            value: '{"cachingOptions":{},"componentRef":{"name":"comp-preprocess"},"inputs":{"parameters":{"message":{"runtimeValue":{"constant":"dataset"}}}},"taskInfo":{"name":"preprocess"}}'
          - name: container
            value: '{{workflow.annotations.pipelines.kubeflow.org/implementations-comp-preprocess}}'
          - name: parent-dag-id
            value: '{{inputs.parameters.parent-dag-id}}'
      name: preprocess-driver
      template: system-container-driver
    - arguments:
        parameters:
          - name: pod-spec-patch
            value: '{{tasks.preprocess-driver.outputs.parameters.pod-spec-patch}}'
          - default: "false"
            name: cached-decision
            value: '{{tasks.preprocess-driver.outputs.parameters.cached-decision}}'
      depends: preprocess-driver.Succeeded
      name: preprocess
      template: system-container-executor
inputs:
  parameters:
    - name: parent-dag-id
metadata:
  annotations:
    sidecar.istio.io/inject: "false"
name: root
outputs: {}
parallelism: 2

Note the:

parallelism: 2

The UI feedback on this could be better:

image

Currently all 4 iterations show executing at the same time (they also never really show the done checkmark even once they finish). But this problem existed prior to this, and seems out of scope for this task, and should be in a follow up issue.

However I confirmed the pods are scheduled 2 at a time in this example (since parallelism = 2 in this example).

@HumairAK
Copy link
Collaborator

HumairAK commented May 9, 2024

Hrmm testing it with the above pipeline amended to:

@dsl.pipeline(pipeline_root='', name='tutorial-data-passing')
def data_passing_pipeline():
    preprocess_task = preprocess(message="dataset").set_caching_options(enable_caching=False)
    with dsl.ParallelFor(items=[1, 5, 10, 25], parallelism=2) as epochs:
        train(model=preprocess_task.outputs['output_model'], epoch=epochs).set_caching_options(enable_caching=False)

    with dsl.ParallelFor(items=[6, 12, 24, 48], parallelism=4) as epochs:
        train(model=preprocess_task.outputs['output_model'], epoch=epochs).set_caching_options(enable_caching=False)

It looks like the workflow will use parallelism = 4 for both

@gmfrasca
Copy link
Member Author

gmfrasca commented May 9, 2024

as @HumairAK noted it looks like there's a problem when there are multiple ParallelFor components in a single DAG, as the parallelism mechanism applies to the entire DAG, and as it stands there's no way to assign various parallelismLimits to individual tasks within a DAG template.

The obvious solution/workaround for this is to add another layer of abstraction on iterator tasks, (ie, root DAG calls a new "XYZ-iterator" DAG, which contains the withSequence iterator and parallelism limit value), but this will make the DAG a bit more complex than it already is, and I'm not sure at this moment what the consequences of updating the workflow path to this may be. will need a bit to investigate.

@hsteude
Copy link
Contributor

hsteude commented May 10, 2024

as @HumairAK noted it looks like there's a problem when there are multiple ParallelFor components in a single DAG, as the parallelism mechanism applies to the entire DAG, and as it stands there's no way to assign various parallelismLimits to individual tasks within a DAG template.

The obvious solution/workaround for this is to add another layer of abstraction on iterator tasks, (ie, root DAG calls a new "XYZ-iterator" DAG, which contains the withSequence iterator and parallelism limit value), but this will make the DAG a bit more complex than it already is, and I'm not sure at this moment what the consequences of updating the workflow path to this may be. will need a bit to investigate.

Hi @gmfrasca, I think it's really important to have a limit that applies to the individual for loop, not the entire DAG. Have you considered using Argo's implementation of loop parallelism? Using that might even simplify the DAG. However, implementing this could lead to issues with other backends.

@gmfrasca
Copy link
Member Author

Hey @hsteude - so this implementation actually already leverages the Argo loop parallelism mechanism. The issue here is that the current compiled architecture of a pipeline aggregates all KFP pipeline steps into sequential tasks of a top-level root DAG Template, but the finest granularity you can specify that limit is at the Template level, not an individual DAGTask. Essentially, we do not have the concept of parallelism on a per-step basis to use in this current state.

The workaround/DAG re-architecture I mentioned above would bump out each of these steps to call their own intermediate Template, each time with its own DAG and iterator, and this template would simply call the component Template itself. With that, we could then specify individual parallelism limits for individual steps, since they are now encapsulated in a Template, at the cost of introducing another layer of abstraction/templating

@google-oss-prow google-oss-prow bot added size/XL and removed size/XS labels May 13, 2024
@gmfrasca
Copy link
Member Author

/test kubeflow-pipelines-samples-v2

Copy link

@gmfrasca: The following tests failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
kubeflow-pipeline-e2e-test 9780650 link false /test kubeflow-pipeline-e2e-test
kubeflow-pipelines-samples-v2 fb8a3ff link false /test kubeflow-pipelines-samples-v2
kubeflow-pipeline-upgrade-test fb8a3ff link false /test kubeflow-pipeline-upgrade-test

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

@rimolive
Copy link
Member

/lgtm

@rimolive
Copy link
Member

I reviewed this PR a month ago and this PR is still opened. I think if would be good having one more lgtm to ensure this PR is still good to merge.

@gregsheremeta
Copy link
Contributor

/hold

does hold work here? I'd like to review this before it's merged 😄

@gregsheremeta
Copy link
Contributor

they also never really show the done checkmark even once they finish

I've seen this in other pipelines too -- ones without loops. Might be anything with a sub-DAG.

@gregsheremeta
Copy link
Contributor

I'm caught off guard a bit because it looks like ParallelFor was mostly already working, except for the limit? And this PR fixes the limit and that's about it, right?

Is #8718 the correct Issue to fix, then? Should it be tweaked to call out that only parallelization limit was left? Or should 8718 be closed and a new Issue opened?

@gregsheremeta
Copy link
Contributor

While running this PR in an attempt to validate it and understand it better, I found that I can reliably cause an apiserver panic. The same pipeline works fine on both upstream master of https://github.com/kubeflow/pipelines and https://github.com/opendatahub-io/data-science-pipelines.

test pipeline (adapted from what Humair posted above):

from kfp import compiler
from kfp import dsl
from kfp.dsl import Input, Output, Model

@dsl.component(base_image="python:3.12")
def preprocess(
        message: Input[str],
        output_model: Output[Model]
):
    line = "useful_data"
    print(f"Message: {message}")
    with open(output_model.path, 'w') as output_file:
        output_file.write('line: {}'.format(line))

@dsl.component(base_image="python:3.12")
def train(
        model: Input[Model],
        counter: Input[int],
        trained_model: Output[Model],
):
    import random
    line = "some_model"
    print(f"train loop counter: {counter}")
    with open(trained_model.path, 'w') as output_file:
        output_file.write('line: {}'.format(line))
    trained_model.metadata['accuracy'] = random.uniform(0, 1)

@dsl.pipeline(pipeline_root='', name='pl-1')
def parallel_limit_test_pipeline():
    preprocess_task = preprocess(message="hello-world").set_caching_options(enable_caching=False)
    
    with dsl.ParallelFor(items=[1, 2, 3, 4], parallelism=2) as counter:
        train(model=preprocess_task.outputs['output_model'], counter=counter).set_caching_options(enable_caching=False)

if __name__ == '__main__':
    compiler.Compiler().compile(parallel_limit_test_pipeline, __file__ + '.yaml')

compiled version of that pipeline:

# PIPELINE DEFINITION
# Name: pl-1
components:
  comp-for-loop-2:
    dag:
      tasks:
        train:
          cachingOptions: {}
          componentRef:
            name: comp-train
          inputs:
            artifacts:
              model:
                componentInputArtifact: pipelinechannel--preprocess-output_model
            parameters:
              counter:
                componentInputParameter: pipelinechannel--loop-item-param-1
          taskInfo:
            name: train
    inputDefinitions:
      artifacts:
        pipelinechannel--preprocess-output_model:
          artifactType:
            schemaTitle: system.Model
            schemaVersion: 0.0.1
      parameters:
        pipelinechannel--loop-item-param-1:
          parameterType: NUMBER_INTEGER
  comp-preprocess:
    executorLabel: exec-preprocess
    inputDefinitions:
      parameters:
        message:
          parameterType: STRING
    outputDefinitions:
      artifacts:
        output_model:
          artifactType:
            schemaTitle: system.Model
            schemaVersion: 0.0.1
  comp-train:
    executorLabel: exec-train
    inputDefinitions:
      artifacts:
        model:
          artifactType:
            schemaTitle: system.Model
            schemaVersion: 0.0.1
      parameters:
        counter:
          parameterType: NUMBER_INTEGER
    outputDefinitions:
      artifacts:
        trained_model:
          artifactType:
            schemaTitle: system.Model
            schemaVersion: 0.0.1
deploymentSpec:
  executors:
    exec-preprocess:
      container:
        args:
        - --executor_input
        - '{{$}}'
        - --function_to_execute
        - preprocess
        command:
        - sh
        - -c
        - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
          \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
          \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0'\
          \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
          $0\" \"$@\"\n"
        - sh
        - -ec
        - 'program_path=$(mktemp -d)


          printf "%s" "$0" > "$program_path/ephemeral_component.py"

          _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"

          '
        - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
          \ *\nfrom builtins import str\n\ndef preprocess(\n        message: Input[str],\n\
          \        output_model: Output[Model]\n):\n    line = \"useful_data\"\n \
          \   print(f\"Message: {message}\")\n    with open(output_model.path, 'w')\
          \ as output_file:\n        output_file.write('line: {}'.format(line))\n\n"
        image: python:3.12
    exec-train:
      container:
        args:
        - --executor_input
        - '{{$}}'
        - --function_to_execute
        - train
        command:
        - sh
        - -c
        - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
          \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
          \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0'\
          \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
          $0\" \"$@\"\n"
        - sh
        - -ec
        - 'program_path=$(mktemp -d)


          printf "%s" "$0" > "$program_path/ephemeral_component.py"

          _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"

          '
        - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
          \ *\nfrom builtins import int\n\ndef train(\n        model: Input[Model],\n\
          \        counter: Input[int],\n        trained_model: Output[Model],\n):\n\
          \    import random\n    line = \"some_model\"\n    print(f\"train loop counter:\
          \ {counter}\")\n    with open(trained_model.path, 'w') as output_file:\n\
          \        output_file.write('line: {}'.format(line))\n    trained_model.metadata['accuracy']\
          \ = random.uniform(0, 1)\n\n"
        image: python:3.12
pipelineInfo:
  name: pl-1
root:
  dag:
    tasks:
      for-loop-2:
        componentRef:
          name: comp-for-loop-2
        dependentTasks:
        - preprocess
        inputs:
          artifacts:
            pipelinechannel--preprocess-output_model:
              taskOutputArtifact:
                outputArtifactKey: output_model
                producerTask: preprocess
        iteratorPolicy:
          parallelismLimit: 2
        parameterIterator:
          itemInput: pipelinechannel--loop-item-param-1
          items:
            raw: '[1, 2, 3, 4]'
        taskInfo:
          name: for-loop-2
      preprocess:
        cachingOptions: {}
        componentRef:
          name: comp-preprocess
        inputs:
          parameters:
            message:
              runtimeValue:
                constant: hello-world
        taskInfo:
          name: preprocess
schemaVersion: 2.1.0
sdkVersion: kfp-2.7.0

The pipeline uploads fine and renders fine. However, when I run it, I get the following error in the UI:

Run creation failed
{"error":"Failed to create a new run: InternalServerError: Failed to validate workflow for (): 
templates.entrypoint.tasks.root templates.root.tasks.for-loop-2-loop templates.comp-for-loop-2-loop-for-loop-2 
sorting failed: invalid dependency preprocess","code":13,"message":"Failed to create a new run: InternalServerError: 
Failed to validate workflow for (): templates.entrypoint.tasks.root templates.root.tasks.for-loop-2-loop 
templates.comp-for-loop-2-loop-for-loop-2 sorting failed: invalid dependency preprocess","details":
[{"@type":"type.googleapis.com/google.rpc.Status","code":13,"message":"Internal Server Error"}]}

I can see the following in the apiserver logs:


I0801 17:54:36.656761       1 interceptor.go:29] /kubeflow.pipelines.backend.api.v2beta1.RunService/CreateRun handler starting
I0801 17:54:36.670814       1 error.go:278] templates.entrypoint.tasks.root templates.root.tasks.for-loop-2-loop templates.comp-for-loop-2-loop-for-loop-2 sorting failed: invalid dependency preprocess
InternalServerError: Failed to validate workflow for ()
github.com/kubeflow/pipelines/backend/src/common/util.NewInternalServerError
	/opt/app-root/src/backend/src/common/util/error.go:144
github.com/kubeflow/pipelines/backend/src/apiserver/resource.(*ResourceManager).CreateRun
	/opt/app-root/src/backend/src/apiserver/resource/resource_manager.go:516
github.com/kubeflow/pipelines/backend/src/apiserver/server.(*RunServer).createRun
	/opt/app-root/src/backend/src/apiserver/server/run_server.go:131
github.com/kubeflow/pipelines/backend/src/apiserver/server.(*RunServer).CreateRun
	/opt/app-root/src/backend/src/apiserver/server/run_server.go:500
github.com/kubeflow/pipelines/backend/api/v2beta1/go_client._RunService_CreateRun_Handler.func1
	/opt/app-root/src/backend/api/v2beta1/go_client/run.pb.go:2711
main.apiServerInterceptor
	/opt/app-root/src/backend/src/apiserver/interceptor.go:30
github.com/kubeflow/pipelines/backend/api/v2beta1/go_client._RunService_CreateRun_Handler
	/opt/app-root/src/backend/api/v2beta1/go_client/run.pb.go:2713
google.golang.org/grpc.(*Server).processUnaryRPC
	/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:1335
google.golang.org/grpc.(*Server).handleStream
	/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:1712
google.golang.org/grpc.(*Server).serveStreams.func1.1
	/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:947
runtime.goexit
	/usr/lib/golang/src/runtime/asm_amd64.s:1650
Failed to create a new run
github.com/kubeflow/pipelines/backend/src/common/util.(*UserError).wrap
	/opt/app-root/src/backend/src/common/util/error.go:271
github.com/kubeflow/pipelines/backend/src/common/util.Wrap
	/opt/app-root/src/backend/src/common/util/error.go:350
github.com/kubeflow/pipelines/backend/src/apiserver/server.(*RunServer).CreateRun
	/opt/app-root/src/backend/src/apiserver/server/run_server.go:502
github.com/kubeflow/pipelines/backend/api/v2beta1/go_client._RunService_CreateRun_Handler.func1
	/opt/app-root/src/backend/api/v2beta1/go_client/run.pb.go:2711
main.apiServerInterceptor
	/opt/app-root/src/backend/src/apiserver/interceptor.go:30
github.com/kubeflow/pipelines/backend/api/v2beta1/go_client._RunService_CreateRun_Handler
	/opt/app-root/src/backend/api/v2beta1/go_client/run.pb.go:2713
google.golang.org/grpc.(*Server).processUnaryRPC
	/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:1335
google.golang.org/grpc.(*Server).handleStream
	/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:1712
google.golang.org/grpc.(*Server).serveStreams.func1.1
	/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:947
runtime.goexit
	/usr/lib/golang/src/runtime/asm_amd64.s:1650
/kubeflow.pipelines.backend.api.v2beta1.RunService/CreateRun call failed
github.com/kubeflow/pipelines/backend/src/common/util.(*UserError).wrapf
	/opt/app-root/src/backend/src/common/util/error.go:266
github.com/kubeflow/pipelines/backend/src/common/util.Wrapf
	/opt/app-root/src/backend/src/common/util/error.go:337
main.apiServerInterceptor
	/opt/app-root/src/backend/src/apiserver/interceptor.go:32
github.com/kubeflow/pipelines/backend/api/v2beta1/go_client._RunService_CreateRun_Handler
	/opt/app-root/src/backend/api/v2beta1/go_client/run.pb.go:2713
google.golang.org/grpc.(*Server).processUnaryRPC
	/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:1335
google.golang.org/grpc.(*Server).handleStream
	/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:1712
google.golang.org/grpc.(*Server).serveStreams.func1.1
	/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:947
runtime.goexit
	/usr/lib/golang/src/runtime/asm_amd64.s:1650

removing the parallelism=2 like so doesn't help:

    with dsl.ParallelFor(items=[1, 2, 3, 4]) as counter:
        train(model=preprocess_task.outputs['output_model'], counter=counter).set_caching_options(enable_caching=False)

removing the loop entirely does help, i.e. no more panic:

    train(model=preprocess_task.outputs['output_model'], counter=1).set_caching_options(enable_caching=False)

leaving the hold in place :)

@gmfrasca gmfrasca force-pushed the v2-loop-parallelism branch 3 times, most recently from 1166016 to e74b9ce Compare October 3, 2024 22:05
@HumairAK
Copy link
Collaborator

HumairAK commented Oct 4, 2024

@gmfrasca this pipeline failed for the 2nd loop:

pipeline.py
from kfp import compiler
from kfp import dsl
from kfp.dsl import Input, InputPath, Output, OutputPath, Dataset, Model, component

@dsl.component(base_image="quay.io/opendatahub/ds-pipelines-ci-executor-image:v1.0")
def preprocess(
        message: Input[str],
        output_model: Output[Model]
):
    import random
    line = "some_model"
    print(f"Message: {message}")
    with open(output_model.path, 'w') as output_file:
        output_file.write('line: {}'.format(line))
    output_model.metadata['accuracy'] = random.uniform(0, 1)


@dsl.component(base_image="quay.io/opendatahub/ds-pipelines-ci-executor-image:v1.0")
def train(
        model: Input[Model],
        epoch: Input[int],
        trained_model: Output[Model],
):
    import random
    line = "some_model"
    print(f"Train for epoch: {epoch}")
    with open(trained_model.path, 'w') as output_file:
        output_file.write('line: {}'.format(line))
    trained_model.metadata['accuracy'] = random.uniform(0, 1)


@dsl.pipeline(pipeline_root='', name='tutorial-data-passing')
def data_passing_pipeline():
    preprocess_task = preprocess(message="dataset").set_caching_options(enable_caching=False)
    with dsl.ParallelFor(items=[1, 5, 10, 25], parallelism=2) as epochs:
        train(model=preprocess_task.outputs['output_model'], epoch=epochs).set_caching_options(enable_caching=False)

    with dsl.ParallelFor(items=[6, 12, 24, 48], parallelism=4) as epochs:
        train(model=preprocess_task.outputs['output_model'], epoch=epochs).set_caching_options(enable_caching=False)


if __name__ == '__main__':
    compiler.Compiler().compile(data_passing_pipeline, __file__ + '.yaml')

With the error in a dag driver pod:

F1004 15:35:39.425156 18 main.go:79] KFP driver: driver.DAG(pipelineName=tutorial-data-passing, runID=45604c64-db2d-4936-8c4b-39fda3827564, task="for-loop-4", component="comp-for-loop-4", dagExecutionID=164, componentSpec) failed: failed to resolve inputs: failed to resolve input artifact pipelinechannel--preprocess-output_model with spec task_output_artifact:{producer_task:"preprocess" output_artifact_key:"output_model"}: failed to get executions in DAG(executionID=164): two tasks have the same task name "for-loop-2", id1=166 id2=168

Seems like there is are multiple executions writing the same task name
This pipeline worked on the code in the master branch, can you take another look?

@HumairAK
Copy link
Collaborator

HumairAK commented Oct 4, 2024

I'm thinking we should make some of these pipelines into integration tests to ensure we're catching these cases

@gmfrasca gmfrasca force-pushed the v2-loop-parallelism branch from e74b9ce to 1362de6 Compare October 19, 2024 01:48
@rimolive
Copy link
Member

rimolive commented Nov 19, 2024

/lgtm

After following Greg's comment in #10798 (comment), I had a successful pipeline run using his example code.

cc @HumairAK @gregsheremeta @chensun

@google-oss-prow google-oss-prow bot added the lgtm label Nov 19, 2024
@hbelmiro
Copy link
Contributor

@gmfrasca is there any pending work for this PR?

@HumairAK
Copy link
Collaborator

/ok-to-test

Copy link

Approvals successfully granted for pending runs.

@github-actions github-actions bot added the ci-passed All CI tests on a pull request have passed label Nov 29, 2024
@gregsheremeta
Copy link
Contributor

tested it out side-by-side with master. Makes sense, works, code lgtm.

Can you squash it and rebase it and then I'll tag it?

Thanks!

- Passthrough ParentDagID rather than DriverExecutionID to iterator such
  that iteration item correctly detects dependentTasks.
- Remove depends from iterator DAG as it is already handled by
  root-level task
- Update Iterator template names/nomenclature for clarity
- Update tests accordingly

Signed-off-by: Giulio Frasca <[email protected]>
- Removes the Driver pod from the Iterator abstraction-layer template
  as it confuses MLMD and is purley an Argo implementation
- Drivers still used on the Component and Iteration-item templates

Signed-off-by: Giulio Frasca <[email protected]>
@gmfrasca gmfrasca force-pushed the v2-loop-parallelism branch from 1362de6 to f13c0c2 Compare December 3, 2024 23:05
@google-oss-prow google-oss-prow bot removed the lgtm label Dec 3, 2024
@HumairAK
Copy link
Collaborator

HumairAK commented Dec 3, 2024

/lgtm
/approve

yesssss

Copy link

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: HumairAK

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@google-oss-prow google-oss-prow bot merged commit b7d8c97 into kubeflow:master Dec 3, 2024
16 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved ci-passed All CI tests on a pull request have passed lgtm ok-to-test size/XL
Projects
None yet
Development

Successfully merging this pull request may close these issues.

feat(backend): Support loop parallelism
7 participants