Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[backend] Argo workflow validation fails for kfp v2 pipeline with downstream tasks depending on tasks in dsl.ParallelFor (using dsl.Collected) #8830

Closed
kakaxilyp opened this issue Feb 9, 2023 · 10 comments

Comments

@kakaxilyp
Copy link

Environment

  • How did you deploy Kubeflow Pipelines (KFP)?
    Full Kubeflow deployment on GKE.

  • KFP version:
    2.0.0-beta.0

  • KFP SDK version:
    2.0.0-beta.11

Steps to reproduce

# pipeline.py

from kfp import compiler, dsl

from components import components as test_components


@dsl.pipeline(
    name='Churn model prediction pipeline',
    description='Churn model prediction pipeline'
)
def bug_repro_pipeline(
        loop_items: list = ['1', '2'],
) -> None:
    with dsl.ParallelFor(
            name='predict-loop',
            items=loop_items,
            parallelism=2,
    ) as item:
        task_in_loop = test_components.op_in_loop(str_input_p1=item)

    test_components.downstream_op(list_input_p1=dsl.Collected(task_in_loop.output))


if __name__ == '__main__':
    compiler.Compiler().compile(bug_repro_pipeline, 'bug_repro_pipeline.yaml')
# components/components.py

from typing import List

from kfp import dsl

BASE_IMAGE = 'python:3.8'
TARGET_IMAGE = 'pipeline-bug-repro:v0.1'


@dsl.component(
    base_image=BASE_IMAGE,
    target_image=TARGET_IMAGE,
)
def op_in_loop(str_input_p1: str) -> str:
    return str_input_p1


@dsl.component(
    base_image=BASE_IMAGE,
    target_image=TARGET_IMAGE,
)
def downstream_op(list_input_p1: List[str]) -> str:
    return ' '.join(list_input_p1)
  1. Compile the pipeline defined in pipeline.py.
  2. Upload the compiled pipeline to KFP through the Kubeflow central dashboard.
  3. Try to create a run of the uploaded pipeline through the Kubeflow central dashboard.
  4. Got the following error message: {"error":"Failed to create a new v1beta1 run: InternalServerError: Failed to validate workflow for (): templates.entrypoint.tasks.root templates.root sorting failed: invalid dependency for-loop-1","code":13,"message":"Failed to create a new v1beta1 run: InternalServerError: Failed to validate workflow for (): templates.entrypoint.tasks.root templates.root sorting failed: invalid dependency for-loop-1","details":[{"@type":"type.googleapis.com/google.rpc.Status","code":13,"message":"Internal Server Error"}]}

Expected result

Can start a run of the specified pipeline.

Materials and Reference


Impacted by this bug? Give it a 👍.

@kakaxilyp
Copy link
Author

Not sure what was the reason adding a -iterations suffix to the loop task name here, but tested in my Kubeflow deployment, simply removing the suffix would make the compiled Argo workflow passing validation.

@b4sus
Copy link
Contributor

b4sus commented May 10, 2023

Tested with beta.2, still happening.

@mbellphd
Copy link

I've encountered the issue in 2.0.0-rc.2 as well

@github-actions
Copy link

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the lifecycle/stale The issue / pull request is stale, any activities remove this label. label Sep 19, 2023
@zhuwq0
Copy link

zhuwq0 commented Sep 20, 2023

Happend in my case using 2.0.1 when running the test example here: https://github.com/kubeflow/pipelines/blob/adb86777a0c8bf8c28bb0cee1d936daf70d9a59f/sdk/python/test_data/pipelines/parallelfor_fan_in/artifacts_simple.py

@github-actions github-actions bot removed the lifecycle/stale The issue / pull request is stale, any activities remove this label. label Sep 20, 2023
@chensun
Copy link
Member

chensun commented Sep 28, 2023

Apologize for the confusion, but dsl.Collected is currently not supported on KFP backend yet. This is called out in the documentation: https://www.kubeflow.org/docs/components/pipelines/v2/pipelines/control-flow/#parallel-looping-dslparallelfor.

@chensun chensun closed this as completed Sep 28, 2023
@zhuwq0
Copy link

zhuwq0 commented Sep 28, 2023

I have a quick followup. If dsl.Collected is not supported in KFP. How could I add steps after the ParallelFor step? It seems adding .after() does not work now.

@ivanbondyrev
Copy link

Hi! Any updates on this?

If dsl.Collected is not supported in KFP. How could I add steps after the ParallelFor step? It seems adding .after() does not work now.

@ausalimov
Copy link

Bumping this as well.

@papagala
Copy link
Contributor

Yes, sorry to bother you @chensun, there seems to be no workaround at all. There is another issue which is marked as P1 in the triage and is closed, but it actually still does not work

#10050

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: Closed
Development

No branches or pull requests

8 participants