From 1928f72b52a6f676accd535f862a34271c9dd7fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simen=20Heggest=C3=B8yl?= Date: Mon, 23 Sep 2024 14:12:15 +0200 Subject: [PATCH] Add command for creating pipelines Add a new command `datasets create-pipeline` for creating pipelines for existing datasets. --- CHANGELOG.md | 2 + okdata/cli/commands/datasets/datasets.py | 9 ++- okdata/cli/commands/datasets/questions.py | 14 +++- okdata/cli/commands/datasets/wizards.py | 80 ++++++++++++++--------- 4 files changed, 71 insertions(+), 34 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c365f51..d67c6ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ ## ?.?.? - Unreleased +* New command `datasets create-pipeline` for creating pipelines for existing + datasets. * More robust dataset/version/edition URI parsing. ## 4.2.0 - 2024-06-18 diff --git a/okdata/cli/commands/datasets/datasets.py b/okdata/cli/commands/datasets/datasets.py index 2718bd1..245d4bd 100644 --- a/okdata/cli/commands/datasets/datasets.py +++ b/okdata/cli/commands/datasets/datasets.py @@ -6,7 +6,10 @@ from requests.exceptions import HTTPError from okdata.cli.command import BaseCommand, BASE_COMMAND_OPTIONS -from okdata.cli.commands.datasets.wizards import DatasetCreateWizard +from okdata.cli.commands.datasets.wizards import ( + DatasetCreateWizard, + PipelineCreateWizard, +) from okdata.cli.io import read_json, resolve_output_filepath from okdata.cli.output import create_output @@ -22,6 +25,7 @@ class DatasetsCommand(BaseCommand): okdata datasets create-version [options] okdata datasets create-edition [] [options] okdata datasets create-distribution [ ] [options] + okdata datasets create-pipeline [options] Examples: okdata datasets ls @@ -32,6 +36,7 @@ class DatasetsCommand(BaseCommand): okdata datasets ls my-dataset/1/20240101T102030 --format=json okdata datasets create --file=dataset.json okdata datasets cp /tmp/file.csv ds:my-dataset-id + okdata datasets create-pipeline my-dataset Options:{BASE_COMMAND_OPTIONS} --file= # Use this file for configuration or upload @@ -60,6 +65,8 @@ def handler(self): self.create_edition() elif self.cmd("create-distribution"): self.create_distribution() + elif self.cmd("create-pipeline"): + PipelineCreateWizard(self, self.arg("datasetid")).start() else: self.help() diff --git a/okdata/cli/commands/datasets/questions.py b/okdata/cli/commands/datasets/questions.py index 15ce791..e6a170b 100644 --- a/okdata/cli/commands/datasets/questions.py +++ b/okdata/cli/commands/datasets/questions.py @@ -26,7 +26,7 @@ ] -def qs_create(): +def qs_create_dataset(): return [ { **required_style, @@ -114,3 +114,15 @@ def qs_create(): "when": lambda x: x["sourceType"] in pipeline_choices, }, ] + + +def qs_create_pipeline(): + return [ + { + **required_style, + "type": "select", + "name": "pipeline", + "message": "Prosessering", + "choices": pipeline_choices["file"], + }, + ] diff --git a/okdata/cli/commands/datasets/wizards.py b/okdata/cli/commands/datasets/wizards.py index 21f0a97..5ff6bf3 100644 --- a/okdata/cli/commands/datasets/wizards.py +++ b/okdata/cli/commands/datasets/wizards.py @@ -2,10 +2,41 @@ from okdata.sdk.pipelines.client import PipelineApiClient from okdata.cli.command import confirm_to_continue -from okdata.cli.commands.datasets.questions import qs_create +from okdata.cli.commands.datasets.questions import qs_create_dataset, qs_create_pipeline from okdata.cli.commands.wizard import run_questionnaire +def _pipeline_config(pipeline_processor_id, dataset_id, version): + return { + "pipelineProcessorId": pipeline_processor_id, + "id": dataset_id, + "datasetUri": f"output/{dataset_id}/{version}", + } + + +def _pipeline_input_config(pipeline_id, dataset_id, version): + return { + "pipelineInstanceId": pipeline_id, + "datasetUri": f"input/{dataset_id}/{version}", + "stage": "raw", + } + + +def _create_pipeline(command, env, dataset_id, pipeline): + command.print("Creating pipeline...") + pipeline_client = PipelineApiClient(env=env) + pipeline_config = _pipeline_config(pipeline, dataset_id, "1") + pipeline_id = pipeline_client.create_pipeline_instance(pipeline_config) + pipeline_id = pipeline_id.strip('"') # What's up with these? + command.print(f"Created pipeline with ID: {pipeline_id}") + + command.print("Creating pipeline input...") + pipeline_input_config = _pipeline_input_config(pipeline_id, dataset_id, "1") + pipeline_input_id = pipeline_client.create_pipeline_input(pipeline_input_config) + pipeline_input_id = pipeline_input_id.strip('"') # What's up with these? + command.print(f"Created pipeline input with ID: {pipeline_input_id}") + + class DatasetCreateWizard: """Wizard for the `datasets create` command. @@ -39,23 +70,9 @@ def dataset_config(self, choices): return config - def pipeline_config(self, pipeline_processor_id, dataset_id, version): - return { - "pipelineProcessorId": pipeline_processor_id, - "id": dataset_id, - "datasetUri": f"output/{dataset_id}/{version}", - } - - def pipeline_input_config(self, pipeline_id, dataset_id, version): - return { - "pipelineInstanceId": pipeline_id, - "datasetUri": f"input/{dataset_id}/{version}", - "stage": "raw", - } - def start(self): env = self.command.opt("env") - choices = run_questionnaire(*qs_create()) + choices = run_questionnaire(*qs_create_dataset()) confirm_to_continue( "Will create a new dataset '{}'.{}".format( @@ -76,22 +93,7 @@ def start(self): self.command.print(f"Created dataset with ID: {dataset_id}") if choices.get("pipeline"): - self.command.print("Creating pipeline...") - pipeline_client = PipelineApiClient(env=env) - pipeline_config = self.pipeline_config(choices["pipeline"], dataset_id, "1") - pipeline_id = pipeline_client.create_pipeline_instance(pipeline_config) - pipeline_id = pipeline_id.strip('"') # What's up with these? - self.command.print(f"Created pipeline with ID: {pipeline_id}") - - self.command.print("Creating pipeline input...") - pipeline_input_config = self.pipeline_input_config( - pipeline_id, dataset_id, "1" - ) - pipeline_input_id = pipeline_client.create_pipeline_input( - pipeline_input_config - ) - pipeline_input_id = pipeline_input_id.strip('"') # What's up with these? - self.command.print(f"Created pipeline input with ID: {pipeline_input_id}") + _create_pipeline(self.command, env, dataset_id, choices["pipeline"]) if choices["sourceType"] == "file": self.command.print( @@ -102,3 +104,17 @@ def start(self): ) else: self.command.print("Done!") + + +class PipelineCreateWizard: + """Wizard for the `datasets create-pipeline` command.""" + + def __init__(self, command, dataset_id): + self.command = command + self.dataset_id = dataset_id + + def start(self): + choices = run_questionnaire(*qs_create_pipeline()) + _create_pipeline( + self.command, self.command.opt("env"), self.dataset_id, choices["pipeline"] + )