-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(backend): implement subdag output resolution #11196
Conversation
af3c3e1
to
1e62d0d
Compare
Happy to jump on a call if synchronous questions / feedback is easier. Although concise, these changes are quite convoluted. |
We just pushed up a commit that implements support for multiple layers of nested subdags (i.e. subdags of subdags). We validated that it behaves as expected with the following example code: from kfp import dsl
from kfp.client import Client
@dsl.component
def small_comp() -> str:
return "privet"
@dsl.component
def large_comp(input: str):
print("input :", input)
@dsl.pipeline
def small_matroushka_doll() -> str:
task = small_comp()
task.set_caching_options(False)
return task.output
@dsl.pipeline
def medium_matroushka_doll() -> str:
dag_task = small_matroushka_doll()
dag_task.set_caching_options(False)
return dag_task.output
@dsl.pipeline
def large_matroushka_doll():
dag_task = medium_matroushka_doll()
task = large_comp(input=dag_task.output)
task.set_caching_options(False)
dag_task.set_caching_options(False)
if __name__ == "__main__":
client = Client()
run = client.create_run_from_pipeline_func(
pipeline_func=large_matroushka_doll,
enable_caching=False,
) PS. I hate matroushka dolls, they're so full of themselves. |
So this PR handles subdag output parameters but not subdag output artifacts. We're going to add some logic to handle the latter as well since the problems are similar. |
1cb4db8
to
a0a7b7b
Compare
We just added and validated support for output artifacts as well, which addresses #10041. Here's a screenshot from a pipeline with nested DAGs and output artifacts that executed successfully: Here's the example code: from kfp import dsl
from kfp.client import Client
from kfp.compiler import Compiler
@dsl.component
def inner_comp(dataset: dsl.Output[dsl.Dataset]):
with open(dataset.path, "w") as f:
f.write("foobar")
@dsl.component
def outer_comp(input: dsl.Dataset):
print("input: ", input)
@dsl.pipeline
def inner_pipeline() -> dsl.Dataset:
inner_comp_task = inner_comp()
inner_comp_task.set_caching_options(False)
return inner_comp_task.output
@dsl.pipeline
def outer_pipeline():
inner_pipeline_task = inner_pipeline()
outer_comp_task = outer_comp(input=inner_pipeline_task.output)
outer_comp_task.set_caching_options(False)
if __name__ == "__main__":
# Compiler().compile(outer_pipeline, "ignore/subdag_artifacts.yaml")
client = Client()
run = client.create_run_from_pipeline_func(
pipeline_func=outer_pipeline,
enable_caching=False,
) There's still a lot more work to be done in terms of testing, decomposition, making the code more consistent and DRY, etc, but it works and it did not work before, so hooray for progress. |
58bed92
to
ee7f6c9
Compare
Just pushed up a commit that decomposes the graph traversal logic to improve readability, reduce complexity, and make granular testing easier. The next order of business is multiple outputs and NamedTuples. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey folks, love that you are doing this, amazing stuff!!
I just had a skim and left some quick thoughts, I see that it's still WIP, so apologies if the comments are premature. Haven't had a chance to try it out yet.
The approach does make sense to me. Given that we are just writing spec data as execution properties, I think it makes sense to do it in the driver, since we already have this info at pipeline creation.
Huge shout out to @boarder7395 for adding support for multiple artifacts and parameters. We merged his commit into this PR. We also added extensive large tests to the
We'll address your remaining comments shortly, @HumairAK. |
Hey @HumairAK, we managed to address most of your suggestions which required us to add a few more handlings, mainly the case for uses of Another piece is that for Oneof we would need to be able to grab the status of the Oneof branches that get created. It seems that they are registered as DAGs in the backend therefore we need to be able to have the sub-DAGs have their status' updated which is a known issue. I drafted a mini solution for this where after a task is successfully published, in the launcher we will attempt to check the statuses of the tasks associated with the current DAG context and update it accordingly. Granted there is the case where the last task could be a DAGExecution and that won't render the update but we set that aside for now to tackle after. We also took the example of the use of oneof from the sdk, modified it a bit and tossed it into the subdag tests scenarios. Some screenshots of (some of) the subDAG statuses changing 🎉 |
Sure thing. This PR was actually able to address both edge cases introduced by the collisions, we just think that handling the collisions directly would probably make the code cleaner and easier to understand. The first edge case surfaces as a result of The second edge case is extremely unlikely. Suppose you have three DAGs, A, B, and C. A calls B and B calls C. B and C share a component. Because the shared component task is in two separate DAGs, the task name is not incremented by the IR compiler, so it winds up being identical and stored with the same task_name value in MLMD. The issue surfaces when a component in A needs the output of DAG B, which comes from a producer subtask. When Initially, we tried calling GetExecutionsInDag without a filter (to get tasks from all DAGs in the specified context), but that inevitably raised the collision error because of the two tasks with the same name. It's an extremely unlikely edge case but we circumvented it using recursive DFS and overwriting higher level matches in the nested DAG with lower level matches. After making this change, all tests passed. We think that avoiding collisions with unique keys is a better long term approach but were concerned about making the PR any bigger than it already is. Prefixing with the DAG name is a great idea! Another approach that just occurred to me, though it's a client-side fix rather than a backend fix, is updating the IR compiler to avoid duplicate task names across layers of nested DAGs, much like the compiler is already doing within a given DAG. I hope that's clear. It's a rare, unlikely, and difficult to explain edge case. |
This did not seem to be the case in our testing. Maybe it's a separate bug. Let me find some example code. |
Given this pipeline, which is part of the SDK execution tests workflow, if you run the following query on the select * from ExecutionProperty where name = "task_name"; Note how there are:
These are from a single run. |
If this is unintended Note from today's community call: make sure to tag @gmfrasca in the issues. |
Signed-off-by: zazulam <[email protected]>
I see. I don't know on top of my head if this MLMD behavior is problematic or just fine. I was thinking about IR, where |
I definitely need to read the code to see what fix you're proposing, but I think client-side fix won't be sufficient, for the parallelfor case, we don't always know the exact number of tasks at compilation time. |
@@ -148,6 +148,17 @@ func (l *LauncherV2) Execute(ctx context.Context) (err error) { | |||
} | |||
} | |||
glog.Infof("publish success.") | |||
// At the end of the current task, we check the statuses of all tasks in the current DAG and update the DAG's status accordingly. | |||
// TODO: If there's a pipeline whose only components are DAGs, this launcher logic will never run and as a result the dag status will never be updated. We need to implement a mechanism to handle this edge case. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there's a pipeline whose only components are DAGs
I can't imagine a valid case for this. and I vaguely recall the compiler check that a pipeline must have concrete tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If that's the case then I'll reword it, as the screenshots of the oneof
scenario above showcases that there are some instances of dags that seem to not be updated with this functionality. There probably is some logic that can be used to address those edge cases.
backend/src/v2/cmd/driver/main.go
Outdated
@@ -85,6 +85,8 @@ func init() { | |||
flag.Set("logtostderr", "true") | |||
// Change the WARNING to INFO level for debugging. | |||
flag.Set("stderrthreshold", "WARNING") | |||
// Enable V(4) logging level for more verbose debugging. | |||
// flag.Set("v", "4") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove?
backend/src/v2/driver/driver.go
Outdated
|
||
// resolveUpstreamParametersConfig is just a config struct used to store the | ||
// input parameters of the resolveUpstreamParameters function. | ||
type resolveUpstreamParametersConfig struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you see opportunities to consolidate and merge this with resolveUpstreamArtifacts...
?
I tend to think the logic should be very similar between the two?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great question! We wondered the same thing (and even tried it out) but the resolution logic is so different that it results in almost no deduplication at the cost of a massive, convoluted function full of nested conditionals.
That being said, your comment inspired us to consolidate the two config structs that are passed to the two functions. Thanks!
@@ -0,0 +1,8 @@ | |||
from subdagio import artifact |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need an __init__.py
file in the sample folder?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was so that we can isolate/group the tests used for subdag outputs into a module. We can move the tests up level and remove the subdagio
module if that is the preferred method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Compiler().compile( | ||
pipeline_func=crust, | ||
package_path=f"{__file__.removesuffix('.py')}.yaml") | ||
client = Client() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This is a bit unconventional to include a submission code in the execution body, not sure users would intend to run it remotely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The rationale behind this was that it improves the developer experience by making it really easy to kick off a single test run when troubleshooting without having to manipulate the contents of sample_test.py
. I don't think there's any other context in which these modules are run as main. Let me us know if that makes sense / sounds reasonable. If not, we can remove it.
My thoughts on the ParallelFor tasks is that for As for the issue with the same task used in multiple dags of a a pipeline, that is where a solution is needed. Here is the corresponding IR from this pipeline. components:
comp-inner-pipeline:
dag:
outputs:
artifacts:
data:
artifactSelectors:
- outputArtifactKey: data
producerSubtask: print-op2
parameters:
msg:
valueFromParameter:
outputParameterKey: Output
producerSubtask: print-op1
tasks:
print-op1:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-op1-2
inputs:
parameters:
msg:
componentInputParameter: msg
taskInfo:
name: print-op1
print-op2:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-op2
dependentTasks:
- print-op1
inputs:
parameters:
msg:
taskOutputParameter:
outputParameterKey: Output
producerTask: print-op1
taskInfo:
name: print-op2
inputDefinitions:
parameters:
msg:
parameterType: STRING
outputDefinitions:
artifacts:
data:
artifactType:
schemaTitle: system.Artifact
schemaVersion: 0.0.1
parameters:
msg:
parameterType: STRING
comp-print-op1:
executorLabel: exec-print-op1
inputDefinitions:
parameters:
msg:
parameterType: STRING
outputDefinitions:
parameters:
Output:
parameterType: STRING
comp-print-op1-2:
executorLabel: exec-print-op1-2
inputDefinitions:
parameters:
msg:
parameterType: STRING
outputDefinitions:
parameters:
Output:
parameterType: STRING
comp-print-op2:
executorLabel: exec-print-op2
inputDefinitions:
parameters:
msg:
parameterType: STRING
outputDefinitions:
artifacts:
data:
artifactType:
schemaTitle: system.Artifact
schemaVersion: 0.0.1
...
root:
dag:
outputs:
artifacts:
Output:
artifactSelectors:
- outputArtifactKey: data
producerSubtask: inner-pipeline
tasks:
inner-pipeline:
cachingOptions:
enableCache: true
componentRef:
name: comp-inner-pipeline
inputs:
parameters:
msg:
runtimeValue:
constant: world
taskInfo:
name: inner-pipeline
print-op1:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-op1
inputs:
parameters:
msg:
componentInputParameter: msg
taskInfo:
name: print-op1
inputDefinitions:
parameters:
msg:
defaultValue: Hello
isOptional: true
parameterType: STRING
outputDefinitions:
artifacts:
Output:
artifactType:
schemaTitle: system.Artifact
schemaVersion: 0.0.1
schemaVersion: 2.1.0
sdkVersion: kfp-2.9.0 The issue here is that although the IR handles multiple instances of |
Signed-off-by: droctothorpe <[email protected]> Co-authored-by: zazulam <[email protected]>
@chensun Thank you for the review! We know it's not exactly the easiest PR to review (in terms of scale, complexity, and convolution). Hopefully, we can further decompose We resolved / responded to all notes. Standing by for further instructions. Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm
@@ -2006,13 +2008,18 @@ def convert_pipeline_outputs_to_dict( | |||
output name to PipelineChannel.""" | |||
if pipeline_outputs is None: | |||
return {} | |||
elif isinstance(pipeline_outputs, dict): | |||
# This condition is required to support pipelines that return NamedTuples. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this not covered by isinstance(..., tuple)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call out. This comment was not clear enough. I just pushed up a commit that expands on it. Some of the nested pipeline tests with namedtuples that we added to the subdagio
directory were failing until we added this handling to the IR compiler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's a specific example. If you try to compile IR from this file, it raises the following error:
return component_factory.create_component_from_func(
Traceback (most recent call last):
File "/Users/alex/code/pipelines/validation.py", line 21, in <module>
def core() -> NamedTuple('outputs', val1=str, val2=str): # type: ignore
File "/Users/alex/code/pipelines/sdk/python/kfp/dsl/pipeline_context.py", line 71, in pipeline
return component_factory.create_graph_component_from_func(
File "/Users/alex/code/pipelines/sdk/python/kfp/dsl/component_factory.py", line 708, in create_graph_component_from_func
return graph_component.GraphComponent(
File "/Users/alex/code/pipelines/sdk/python/kfp/dsl/graph_component.py", line 71, in __init__
pipeline_spec, platform_spec = builder.create_pipeline_spec(
File "/Users/alex/code/pipelines/sdk/python/kfp/compiler/pipeline_spec_builder.py", line 1915, in create_pipeline_spec
pipeline_outputs_dict = convert_pipeline_outputs_to_dict(pipeline_outputs)
File "/Users/alex/code/pipelines/sdk/python/kfp/compiler/pipeline_spec_builder.py", line 2022, in convert_pipeline_outputs_to_dict
raise ValueError(f'Got unknown pipeline output: {pipeline_outputs}')
ValueError: Got unknown pipeline output: {'val1': {{channel:task=core-comp;name=val1;type=String;}}, 'val2': {{channel:task=core-comp;name=val2;type=String;}}}
Adding the dict
conditional resolved this error. Shout out to @zazulam for identifying / correcting it.
Signed-off-by: droctothorpe <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm
/approve
Thanks!
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: chensun The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
Great work folks!! |
Description of your changes:
This PR:
resolveInputs
indriver.go
.driver.go
, paving the way for small tests.Checklist: