From 35ece080935dfee1bfc19ea7e9eebd3b3b76436f Mon Sep 17 00:00:00 2001 From: Rudi C Date: Fri, 16 Aug 2024 16:12:54 -0400 Subject: [PATCH] Fix deprecated client to work with kfp-server-api 2.0.1 The deprecated client is currently broken, as it refers to APIs that don't exist anymore. This change makes things less broken, but there might be more fixes needed. Basically: 1. Job -> RecurringRun 2. adapt code from current client that warns for FOO_job() and forwards to FOO_recurring_run() 3. adapt some code from create_recurring_run() 4. version the APIs: "Api" -> "V2beta1" Signed-off-by: Rudi C --- sdk/python/kfp/deprecated/_client.py | 240 ++++++++++++++++----------- 1 file changed, 147 insertions(+), 93 deletions(-) diff --git a/sdk/python/kfp/deprecated/_client.py b/sdk/python/kfp/deprecated/_client.py index 5d57dfe7825..6efbc9afe74 100644 --- a/sdk/python/kfp/deprecated/_client.py +++ b/sdk/python/kfp/deprecated/_client.py @@ -182,7 +182,7 @@ def __init__(self, header_value=self._context_setting.get( 'client_authentication_header_value')) _add_generated_apis(self, kfp_server_api, api_client) - self._job_api = kfp_server_api.api.job_service_api.JobServiceApi( + self._recurring_run_api = kfp_server_api.api.recurring_run_service_api.RecurringRunServiceApi( api_client) self._run_api = kfp_server_api.api.run_service_api.RunServiceApi( api_client) @@ -393,7 +393,7 @@ def set_user_namespace(self, namespace: str): with open(Client.LOCAL_KFP_CONTEXT, 'w') as f: json.dump(self._context_setting, f) - def get_kfp_healthz(self) -> kfp_server_api.ApiGetHealthzResponse: + def get_kfp_healthz(self) -> kfp_server_api.V2beta1GetHealthzResponse: """Gets healthz info of KFP deployment. Returns: @@ -431,7 +431,7 @@ def create_experiment( self, name: str, description: str = None, - namespace: str = None) -> kfp_server_api.ApiExperiment: + namespace: str = None) -> kfp_server_api.V2beta1Experiment: """Create a new experiment. Args: @@ -515,7 +515,7 @@ def list_experiments( sort_by: str = '', namespace: Optional[str] = None, filter: Optional[str] = None - ) -> kfp_server_api.ApiListExperimentsResponse: + ) -> kfp_server_api.V2beta1ListExperimentsResponse: """List experiments. Args: @@ -557,7 +557,7 @@ def list_experiments( def get_experiment(self, experiment_id=None, experiment_name=None, - namespace=None) -> kfp_server_api.ApiExperiment: + namespace=None) -> kfp_server_api.V2beta1Experiment: """Get details of an experiment. Either experiment_id or experiment_name is required @@ -685,7 +685,7 @@ def list_pipelines( page_size: int = 10, sort_by: str = '', filter: Optional[str] = None - ) -> kfp_server_api.ApiListPipelinesResponse: + ) -> kfp_server_api.V2beta1ListPipelinesResponse: """List pipelines. Args: @@ -728,7 +728,7 @@ def run_pipeline( pipeline_root: Optional[str] = None, enable_caching: Optional[str] = None, service_account: Optional[str] = None, - ) -> kfp_server_api.ApiRun: + ) -> kfp_server_api.V2beta1Run: """Run a specified pipeline. Args: @@ -771,7 +771,7 @@ def run_pipeline( version_id=version_id, enable_caching=enable_caching, ) - run_body = kfp_server_api.models.ApiRun( + run_body = kfp_server_api.models.V2beta1Run( pipeline_spec=job_config.spec, resource_references=job_config.resource_references, name=job_name, @@ -805,48 +805,52 @@ def create_recurring_run( enabled: bool = True, enable_caching: Optional[bool] = None, service_account: Optional[str] = None, - ) -> kfp_server_api.ApiJob: - """Create a recurring run. + ) -> kfp_server_api.V2beta1RecurringRun: + """Creates a recurring run. Args: - experiment_id: The string id of an experiment. - job_name: Name of the job. - description: An optional job description. - start_time: The RFC3339 time string of the time when to start the job. - end_time: The RFC3339 time string of the time when to end the job. - interval_second: Integer indicating the seconds between two recurring runs in for a periodic schedule. - cron_expression: A cron expression representing a set of times, using 6 space-separated fields, e.g. "0 0 9 ? * 2-6". - See `here `_ for details of the cron expression format. - max_concurrency: Integer indicating how many jobs can be run in parallel. - no_catchup: Whether the recurring run should catch up if behind schedule. - For example, if the recurring run is paused for a while and re-enabled - afterwards. If no_catchup=False, the scheduler will catch up on (backfill) each - missed interval. Otherwise, it only schedules the latest interval if more than one interval - is ready to be scheduled. - Usually, if your pipeline handles backfill internally, you should turn catchup - off to avoid duplicate backfill. (default: {False}) - pipeline_package_path: Local path of the pipeline package(the filename should end with one of the following .tar.gz, .tgz, .zip, .yaml, .yml). - params: A dictionary with key (string) as param name and value (string) as param value. - pipeline_id: The id of a pipeline. - version_id: The id of a pipeline version. - If both pipeline_id and version_id are specified, version_id will take precendence. - If only pipeline_id is specified, the default version of this pipeline is used to create the run. - enabled: A bool indicating whether the recurring run is enabled or disabled. - enable_caching: Optional. Whether or not to enable caching for the run. - This setting affects v2 compatible mode and v2 mode only. - If not set, defaults to the compile time settings, which are True for all - tasks by default, while users may specify different caching options for - individual tasks. - If set, the setting applies to all tasks in the pipeline -- overrides - the compile time settings. - service_account: Optional. Specifies which Kubernetes service account this - recurring run uses. - + experiment_id: ID of the experiment. + job_name: Name of the job. + description: Description of the job. + start_time: RFC3339 time string of the time when to start the + job. + end_time: RFC3339 time string of the time when to end the job. + interval_second: Integer indicating the seconds between two + recurring runs in for a periodic schedule. + cron_expression: Cron expression representing a set of times, + using 6 space-separated fields (e.g., ``'0 0 9 ? * 2-6'``). See `cron format + `_. + max_concurrency: Integer indicating how many jobs can be run in + parallel. + no_catchup: Whether the recurring run should catch up if behind + schedule. For example, if the recurring run is paused for a + while and re-enabled afterwards. If ``no_catchup=False``, the + scheduler will catch up on (backfill) each missed interval. + Otherwise, it only schedules the latest interval if more than + one interval is ready to be scheduled. Usually, if your pipeline + handles backfill internally, you should turn catchup off to + avoid duplicate backfill. + pipeline_package_path: Local path of the pipeline package (the + filename should end with one of the following .tar.gz, .tgz, + .zip, .json). + params: Arguments to the pipeline function provided as a dict. + pipeline_id: ID of a pipeline. + version_id: ID of a pipeline version. + If both ``pipeline_id`` and ``version_id`` are specified, ``version_id`` + will take precedence. + If only ``pipeline_id`` is specified, the default version of this + pipeline is used to create the run. + enabled: Whether to enable or disable the recurring run. + enable_caching: Whether or not to enable caching for the + run. If not set, defaults to the compile time settings, which + is ``True`` for all tasks by default, while users may specify + different caching options for individual tasks. If set, the + setting applies to all tasks in the pipeline (overrides the + compile time settings). + service_account: Specifies which Kubernetes service + account this recurring run uses. Returns: - A Job object. Most important field is id. - - Raises: - ValueError: If required parameters are not supplied. + ``V2beta1RecurringRun`` object. """ job_config = self._create_job_config( @@ -861,31 +865,37 @@ def create_recurring_run( if all([interval_second, cron_expression ]) or not any([interval_second, cron_expression]): raise ValueError( - 'Either interval_second or cron_expression is required') + 'Either interval_second or cron_expression is required.') if interval_second is not None: - trigger = kfp_server_api.models.ApiTrigger( - periodic_schedule=kfp_server_api.models.ApiPeriodicSchedule( + trigger = kfp_server_api.V2beta1Trigger( + periodic_schedule=kfp_server_api.V2beta1PeriodicSchedule( start_time=start_time, end_time=end_time, interval_second=interval_second)) if cron_expression is not None: - trigger = kfp_server_api.models.ApiTrigger( - cron_schedule=kfp_server_api.models.ApiCronSchedule( + trigger = kfp_server_api.V2beta1Trigger( + cron_schedule=kfp_server_api.V2beta1CronSchedule( start_time=start_time, end_time=end_time, cron=cron_expression)) - job_body = kfp_server_api.models.ApiJob( - enabled=enabled, - pipeline_spec=job_config.spec, - resource_references=job_config.resource_references, - name=job_name, + mode = kfp_server_api.RecurringRunMode.DISABLE + if enabled: + mode = kfp_server_api.RecurringRunMode.ENABLE + + job_body = kfp_server_api.V2beta1RecurringRun( + experiment_id=experiment_id, + mode=mode, + pipeline_spec=job_config.pipeline_spec, + pipeline_version_reference=job_config.pipeline_version_reference, + runtime_config=job_config.runtime_config, + display_name=job_name, description=description, no_catchup=no_catchup, trigger=trigger, max_concurrency=max_concurrency, service_account=service_account) - return self._job_api.create_job(body=job_body) + return self._recurring_run_api.create_recurring_run(body=job_body) def _create_job_config( self, @@ -1118,47 +1128,89 @@ def __repr__(self): ) return RunPipelineResult(self, run_info) - def delete_job(self, job_id: str): - """Deletes a job. + def delete_job(self, job_id: str) -> dict: + """Deletes a job (recurring run). Args: - job_id: id of the job. + job_id: ID of the job. Returns: - Object. If the method is called asynchronously, returns the request thread. + Empty dictionary. + """ + warnings.warn( + '`delete_job` is deprecated. Please use `delete_recurring_run` instead.' + f'\nReroute to calling `delete_recurring_run(recurring_run_id="{job_id}")`', + category=DeprecationWarning, + stacklevel=2) + return self.delete_recurring_run(recurring_run_id=job_id) - Raises: - kfp_server_api.ApiException: If the job is not found. + def delete_recurring_run(self, recurring_run_id: str) -> dict: + """Deletes a recurring run. + + Args: + recurring_run_id: ID of the recurring_run. + + Returns: + Empty dictionary. """ - return self._job_api.delete_job(id=job_id) + return self._recurring_run_api.delete_recurring_run( + recurring_run_id=recurring_run_id) - def disable_job(self, job_id: str): - """Disables a job. + def disable_job(self, job_id: str) -> dict: + """Disables a job (recurring run). Args: - job_id: id of the job. + job_id: ID of the job. Returns: - Object. If the method is called asynchronously, returns the request thread. + Empty dictionary. + """ + warnings.warn( + '`disable_job` is deprecated. Please use `disable_recurring_run` instead.' + f'\nReroute to calling `disable_recurring_run(recurring_run_id="{job_id}")`', + category=DeprecationWarning, + stacklevel=2) + return self.disable_recurring_run(recurring_run_id=job_id) - Raises: - ApiException: If the job is not found. + def disable_recurring_run(self, recurring_run_id: str) -> dict: + """Disables a recurring run. + + Args: + recurring_run_id: ID of the recurring_run. + + Returns: + Empty dictionary. """ - return self._job_api.disable_job(id=job_id) + return self._recurring_run_api.disable_recurring_run( + recurring_run_id=recurring_run_id) - def enable_job(self, job_id: str): - """Enables a job. + def enable_job(self, job_id: str) -> dict: + """Enables a job (recurring run). Args: - job_id: id of the job. + job_id: ID of the job. Returns: - Object. If the method is called asynchronously, returns the request thread. + Empty dictionary. + """ + warnings.warn( + '`enable_job` is deprecated. Please use `enable_recurring_run` instead.' + f'\nReroute to calling `enable_recurring_run(recurring_run_id="{job_id}")`', + category=DeprecationWarning, + stacklevel=2) + return self.enable_recurring_run(recurring_run_id=job_id) - Raises: - ApiException: If the job is not found. + def enable_recurring_run(self, recurring_run_id: str) -> dict: + """Enables a recurring run. + + Args: + recurring_run_id: ID of the recurring_run. + + Returns: + Empty dictionary. """ - return self._job_api.enable_job(id=job_id) + return self._recurring_run_api.enable_recurring_run( + recurring_run_id=recurring_run_id) def list_runs( self, @@ -1167,7 +1219,8 @@ def list_runs( sort_by: str = '', experiment_id: Optional[str] = None, namespace: Optional[str] = None, - filter: Optional[str] = None) -> kfp_server_api.ApiListRunsResponse: + filter: Optional[str] = None + ) -> kfp_server_api.V2beta1ListRunsResponse: """List runs, optionally can be filtered by experiment or namespace. Args: @@ -1229,7 +1282,8 @@ def list_recurring_runs( page_size: int = 10, sort_by: str = '', experiment_id: Optional[str] = None, - filter: Optional[str] = None) -> kfp_server_api.ApiListJobsResponse: + filter: Optional[str] = None + ) -> kfp_server_api.V2beta1ListRunsResponse: """List recurring runs. Args: @@ -1256,23 +1310,22 @@ def list_recurring_runs( A response object including a list of recurring_runs and next page token. """ if experiment_id is not None: - response = self._job_api.list_jobs( + response = self._recurring_run_api.list_recurring_runs( page_token=page_token, page_size=page_size, sort_by=sort_by, - resource_reference_key_type=kfp_server_api.models - .api_resource_type.ApiResourceType.EXPERIMENT, - resource_reference_key_id=experiment_id, + experiment_id=experiment_id, filter=filter) else: - response = self._job_api.list_jobs( + response = self._recurring_run_api.list_recurring_runs( page_token=page_token, page_size=page_size, sort_by=sort_by, filter=filter) return response - def get_recurring_run(self, job_id: str) -> kfp_server_api.ApiJob: + def get_recurring_run(self, + job_id: str) -> kfp_server_api.V2beta1RecurringRun: """Get recurring_run details. Args: @@ -1284,9 +1337,10 @@ def get_recurring_run(self, job_id: str) -> kfp_server_api.ApiJob: Raises: kfp_server_api.ApiException: If recurring_run is not found. """ - return self._job_api.get_job(id=job_id) + return self._recurring_run_api.get_recurring_run( + recurring_run_id=job_id) - def get_run(self, run_id: str) -> kfp_server_api.ApiRun: + def get_run(self, run_id: str) -> kfp_server_api.V2beta1Run: """Get run details. Args: @@ -1360,7 +1414,7 @@ def upload_pipeline( pipeline_package_path: str = None, pipeline_name: str = None, description: str = None, - ) -> kfp_server_api.ApiPipeline: + ) -> kfp_server_api.V2beta1Pipeline: """Uploads the pipeline to the Kubeflow Pipelines cluster. Args: @@ -1388,7 +1442,7 @@ def upload_pipeline_version( pipeline_id: Optional[str] = None, pipeline_name: Optional[str] = None, description: Optional[str] = None, - ) -> kfp_server_api.ApiPipelineVersion: + ) -> kfp_server_api.V2beta1PipelineVersion: """Uploads a new version of the pipeline to the Kubeflow Pipelines cluster. @@ -1439,7 +1493,7 @@ def upload_pipeline_version( IPython.display.display(IPython.display.HTML(html)) return response - def get_pipeline(self, pipeline_id: str) -> kfp_server_api.ApiPipeline: + def get_pipeline(self, pipeline_id: str) -> kfp_server_api.V2beta1Pipeline: """Get pipeline details. Args: @@ -1474,7 +1528,7 @@ def list_pipeline_versions( page_size: int = 10, sort_by: str = '', filter: Optional[str] = None - ) -> kfp_server_api.ApiListPipelineVersionsResponse: + ) -> kfp_server_api.V2beta1ListPipelineVersionsResponse: """Lists pipeline versions. Args: