-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(components): Support dynamic machine type paramters in CustomTrainingJobOp #10883
Conversation
/retest |
1 similar comment
/retest |
@KevinGrantLee, can you please address required presubmit checks |
/assign @chensun |
…iningJobOp Signed-off-by: KevinGrantLee <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>
959fc6c
to
21d2bd8
Compare
Signed-off-by: KevinGrantLee <[email protected]>
/retest |
Signed-off-by: KevinGrantLee <[email protected]>
/retest |
|
||
@dsl.component | ||
def accelerator_count() -> int: | ||
# This can either be int or int string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this true? The type hint doesn't indicate the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I also tried compiling and submitting a pipeline with def accelerator_count() -> str:
returning '1'
and that pipeline succeeded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll remove the comment to avoid confusion because leaving return annotation as int and returning '1'
causes errors
worker_pool_specs=[{ | ||
'container_spec': { | ||
# doesn't need to be the container under test | ||
# just need an image within the VPC-SC perimeter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite follow this comment. Is it necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I copied this comment from another test pipeline, will remove.
'machine_spec': { | ||
'machine_type': machine_type_task.output, | ||
'accelerator_type': accelerator_type_task.output, | ||
'accelerator_count': accelerator_count_task.output, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you make a case where the dynamic value comes from pipeline input parameter? That would make sure we cover all the the dynamic value paths.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Tested compiling and submitting pipeline.
elif isinstance(data, list): | ||
return [recursive_replace(i, old_value, new_value) for i in data] | ||
else: | ||
if isinstance(data, pipeline_channel.PipelineChannel): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method seems explicitly replacing placeholder from one representation to another. It's not for replacing arbitrary value. So the method name should reflect it's purpose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although here I'm just using this method for replacing placeholders, it can be used for arbitrary values as well so I left the method name general.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed method to recursive_replace_placeholders
3: ['d'] | ||
}], | ||
'old_value': 'd', | ||
'new_value': 'dd', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The old and new values doesn't seem to be testing the real case scenario?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That testcase was for some simple dummy values to verify behavior of recursive_replace()
, I can remove if you think its redundant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed testcase.
@@ -239,70 +327,18 @@ def build_task_spec_for_task( | |||
component_input_parameter) | |||
|
|||
elif isinstance(input_value, str): | |||
# Handle extra input due to string concat |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC, this chunk of code is only applicable for string typed inputs, why merging the code and expand it to other input types? Also it's a bit hard to read the diff between the deleted code and the extracted. Can you try make the changes in place without refactoring, and see if it's actually necessary to expand the logic to non-string typed inputs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found that this block of code could be reused for handling PipelineChannels inside of worker_pool_specs
in addition to handling string typed inputs. Instead of copying the ~50 lines of code, I thought it'd be better to refactor the logic as a separate function def replace_and_inject_placeholders()
.
So I could un-refactor and duplicate the logic; I do have a slight preference for this refactoring but can go either way. wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This name of the extracted method isn't accurate--the code does more than placeholder manipulation but also component input expansion.
The branch logic now reads like this:
if isinstance(input_value, str):
# shared code
pipeline_task_spec.inputs.parameters[
input_name].runtime_value.constant.string_value = input_value
elif isinstance(input_value, (int, float, bool, dict, list)):
if isinstance(input_value, (dict, list):
# shared code
pipeline_task_spec.inputs.parameters[
input_name].runtime_value.constant.CopyFrom(
to_protobuf_value(input_value))
else:
raise
You can achieve the same goal, and even more code reuse, without extracting a shared method by:
if not isinstance(input_value, (str, dict, list, int, float, bool)):
raise
if isinstance(input_value, (str, dict, list)):
# shared code
pipeline_task_spec.inputs.parameters[
input_name].runtime_value.constant.CopyFrom(
to_protobuf_value(input_value))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aside from the refactoring part, I wonder what's the case for dict and list? In case CustomTrainingJobOp
is used, what's the input_value
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If CustomTrainingJobOp
is used, then we pass in worker_pool_spec
into input_value
.
It looks like this with PipelineChannel objects
input_value = [{'container_spec': {'image_uri': 'gcr.io/ml-pipeline/google-cloud-pipeline-components:2.5.0', 'command': ['echo'], 'args': ['foo']}, 'machine_spec': {'machine_type': {{channel:task=machine-type;name=Output;type=String;}}, 'accelerator_type': {{channel:task=accelerator-type;name=Output;type=String;}}, 'accelerator_count': {{channel:task=accelerator-count;name=Output;type=Integer;}}}, 'replica_count': 1}]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation. So input_value
would be of type list
in this case. Including dict
in the same code path is just for future use cases not entirely necessary at this moment, right? I'm fine to include dict
now.
Signed-off-by: KevinGrantLee <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>
@chensun I verified that nested dags with dsl.Condition() and custom jobs with dynamic (task output and pipeline inputs) machine parameters compile and run successfully. |
Signed-off-by: KevinGrantLee <[email protected]>
@@ -239,70 +327,18 @@ def build_task_spec_for_task( | |||
component_input_parameter) | |||
|
|||
elif isinstance(input_value, str): | |||
# Handle extra input due to string concat |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This name of the extracted method isn't accurate--the code does more than placeholder manipulation but also component input expansion.
The branch logic now reads like this:
if isinstance(input_value, str):
# shared code
pipeline_task_spec.inputs.parameters[
input_name].runtime_value.constant.string_value = input_value
elif isinstance(input_value, (int, float, bool, dict, list)):
if isinstance(input_value, (dict, list):
# shared code
pipeline_task_spec.inputs.parameters[
input_name].runtime_value.constant.CopyFrom(
to_protobuf_value(input_value))
else:
raise
You can achieve the same goal, and even more code reuse, without extracting a shared method by:
if not isinstance(input_value, (str, dict, list, int, float, bool)):
raise
if isinstance(input_value, (str, dict, list)):
# shared code
pipeline_task_spec.inputs.parameters[
input_name].runtime_value.constant.CopyFrom(
to_protobuf_value(input_value))
"""Recursively replaces values in a nested dict/list object. | ||
|
||
This method is used to replace PipelineChannel objects with pipeine channel | ||
placeholders in a nested object like worker_pool_specs for custom jobs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pipeline channel placeholders -> input parameter placeholder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -239,70 +327,18 @@ def build_task_spec_for_task( | |||
component_input_parameter) | |||
|
|||
elif isinstance(input_value, str): | |||
# Handle extra input due to string concat |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aside from the refactoring part, I wonder what's the case for dict and list? In case CustomTrainingJobOp
is used, what's the input_value
here?
Can you add a test case? |
Signed-off-by: KevinGrantLee <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>
/retest |
additional_input_name].task_output_parameter.output_parameter_key = ( | ||
channel.name) | ||
elif isinstance(input_value, (str, int, float, bool, dict, list)): | ||
if isinstance(input_value, (str, dict, list)): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can remove this if isinstance(input_value, (str, dict, list)):
check, extract_pipeline_channels_from_any
would return an empty list in case float, int, bool.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it fine to remove the inner if check and expand the type annotations?
It would simplify the code but I'm not sure if it makes sense to update the type annotations for extract_pipeline_channels_from_any
since ints, floats, and bools can't contain pipeline channels.
payload: Union[PipelineChannel, str, list, tuple, dict] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you can update the payload annotation type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
||
if isinstance(input_value, str): | ||
input_value = input_value.replace( | ||
channel.pattern, additional_input_placeholder) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be covered by recursive_replace_placeholders
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The string case is not covered by recursive_replace_placeholders
as is, I would need to embed string.replace()
logic in recursive_replace_placeholders
if we wanted to get rid of this ifelse block.
I suppose question is if we want to expose this logic in pipeline_spec_builder:build_task_spec_for_task
or compiler_utils:recursive_repalce_placeholders
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. It's up to you whether you want to keep it as-is or remove the ifelse block. I don't have a strong preference.
I'm not sure how this is related to your question on exposing the logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, kept as it.
Signed-off-by: KevinGrantLee <[email protected]>
@KevinGrantLee: The following test failed, say
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm
/approve
Thanks, @KevinGrantLee !
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: chensun The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
@KevinGrantLee is it possible to do the same for |
Hi @pthieu, this PR should also enable that usecase - I did some local tests, but can you confirm on your end? |
@KevinGrantLee think I'll need to wait for the next release as we use this in our CI/CD pipeline. Do you know what the release schedule is? I can do a test and confirm when it's out. |
I believe the same issue is occurring for the ModelBatchPredictOp @pthieu @KevinGrantLee , here is a minimal example:
producing the same error as your issue @pthieu : |
Hi @alexredplanet , I'm reasonably confident that this pr should also fix your case. Once the next kfp release is done, can you retry? |
Thanks @KevinGrantLee it did indeed fix that issue! |
@KevinGrantLee looks like getting the latest worked (at least no errors thrown), thanks Just waiting on a dependency requirement change on the google pipelines package:
|
@KevinGrantLee Error Message: TypeError: unsupported operand type(s) for +: 'PipelineParameterChannel' and 'str' Code Snippet:
|
Hi @senyuyouren , I think you need to cast the PipelineParameterChannel to str.
|
Hi @KevinGrantLee , In the pipeline, the value I passed through was modeled as str (modeled as)+date_time. In the first minio-training pipeline, the parameter value could be obtained normally. However, when the parameter value of modeled as cannot be obtained normally in the second minio-mode_name_test, do you know what the problem is
|
Description of your changes:
Enables setting the machine_type, accelerator_type, and accelerator_count dynamically from task outputs and pipeline inputs in CustomTrainingJobOp.
ex.
This PR also enables the following behavior:
Checklist: