Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add command for creating pipelines #231

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## ?.?.? - Unreleased

* New command `datasets create-pipeline` for creating pipelines for existing
datasets.
* More robust dataset/version/edition URI parsing.

## 4.2.0 - 2024-06-18
Expand Down
9 changes: 8 additions & 1 deletion okdata/cli/commands/datasets/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
from requests.exceptions import HTTPError

from okdata.cli.command import BaseCommand, BASE_COMMAND_OPTIONS
from okdata.cli.commands.datasets.wizards import DatasetCreateWizard
from okdata.cli.commands.datasets.wizards import (
DatasetCreateWizard,
PipelineCreateWizard,
)
from okdata.cli.io import read_json, resolve_output_filepath
from okdata.cli.output import create_output

Expand All @@ -22,6 +25,7 @@ class DatasetsCommand(BaseCommand):
okdata datasets create-version <datasetid> [options]
okdata datasets create-edition <datasetid> [<versionid>] [options]
okdata datasets create-distribution <datasetid> [<versionid> <editionid>] [options]
okdata datasets create-pipeline <datasetid> [options]

Examples:
okdata datasets ls
Expand All @@ -32,6 +36,7 @@ class DatasetsCommand(BaseCommand):
okdata datasets ls my-dataset/1/20240101T102030 --format=json
okdata datasets create --file=dataset.json
okdata datasets cp /tmp/file.csv ds:my-dataset-id
okdata datasets create-pipeline my-dataset

Options:{BASE_COMMAND_OPTIONS}
--file=<file> # Use this file for configuration or upload
Expand Down Expand Up @@ -60,6 +65,8 @@ def handler(self):
self.create_edition()
elif self.cmd("create-distribution"):
self.create_distribution()
elif self.cmd("create-pipeline"):
PipelineCreateWizard(self, self.arg("datasetid")).start()
else:
self.help()

Expand Down
14 changes: 13 additions & 1 deletion okdata/cli/commands/datasets/questions.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
]


def qs_create():
def qs_create_dataset():
return [
{
**required_style,
Expand Down Expand Up @@ -114,3 +114,15 @@ def qs_create():
"when": lambda x: x["sourceType"] in pipeline_choices,
},
]


def qs_create_pipeline():
return [
{
**required_style,
"type": "select",
"name": "pipeline",
"message": "Prosessering",
"choices": pipeline_choices["file"],
},
]
80 changes: 48 additions & 32 deletions okdata/cli/commands/datasets/wizards.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,41 @@
from okdata.sdk.pipelines.client import PipelineApiClient

from okdata.cli.command import confirm_to_continue
from okdata.cli.commands.datasets.questions import qs_create
from okdata.cli.commands.datasets.questions import qs_create_dataset, qs_create_pipeline
from okdata.cli.commands.wizard import run_questionnaire


def _pipeline_config(pipeline_processor_id, dataset_id, version):
return {
"pipelineProcessorId": pipeline_processor_id,
"id": dataset_id,
"datasetUri": f"output/{dataset_id}/{version}",
}


def _pipeline_input_config(pipeline_id, dataset_id, version):
return {
"pipelineInstanceId": pipeline_id,
"datasetUri": f"input/{dataset_id}/{version}",
"stage": "raw",
}


def _create_pipeline(command, env, dataset_id, pipeline):
command.print("Creating pipeline...")
pipeline_client = PipelineApiClient(env=env)
pipeline_config = _pipeline_config(pipeline, dataset_id, "1")
pipeline_id = pipeline_client.create_pipeline_instance(pipeline_config)
pipeline_id = pipeline_id.strip('"') # What's up with these?
command.print(f"Created pipeline with ID: {pipeline_id}")

command.print("Creating pipeline input...")
pipeline_input_config = _pipeline_input_config(pipeline_id, dataset_id, "1")
pipeline_input_id = pipeline_client.create_pipeline_input(pipeline_input_config)
pipeline_input_id = pipeline_input_id.strip('"') # What's up with these?
command.print(f"Created pipeline input with ID: {pipeline_input_id}")


class DatasetCreateWizard:
"""Wizard for the `datasets create` command.

Expand Down Expand Up @@ -39,23 +70,9 @@ def dataset_config(self, choices):

return config

def pipeline_config(self, pipeline_processor_id, dataset_id, version):
return {
"pipelineProcessorId": pipeline_processor_id,
"id": dataset_id,
"datasetUri": f"output/{dataset_id}/{version}",
}

def pipeline_input_config(self, pipeline_id, dataset_id, version):
return {
"pipelineInstanceId": pipeline_id,
"datasetUri": f"input/{dataset_id}/{version}",
"stage": "raw",
}

def start(self):
env = self.command.opt("env")
choices = run_questionnaire(*qs_create())
choices = run_questionnaire(*qs_create_dataset())

confirm_to_continue(
"Will create a new dataset '{}'.{}".format(
Expand All @@ -76,22 +93,7 @@ def start(self):
self.command.print(f"Created dataset with ID: {dataset_id}")

if choices.get("pipeline"):
self.command.print("Creating pipeline...")
pipeline_client = PipelineApiClient(env=env)
pipeline_config = self.pipeline_config(choices["pipeline"], dataset_id, "1")
pipeline_id = pipeline_client.create_pipeline_instance(pipeline_config)
pipeline_id = pipeline_id.strip('"') # What's up with these?
self.command.print(f"Created pipeline with ID: {pipeline_id}")

self.command.print("Creating pipeline input...")
pipeline_input_config = self.pipeline_input_config(
pipeline_id, dataset_id, "1"
)
pipeline_input_id = pipeline_client.create_pipeline_input(
pipeline_input_config
)
pipeline_input_id = pipeline_input_id.strip('"') # What's up with these?
self.command.print(f"Created pipeline input with ID: {pipeline_input_id}")
_create_pipeline(self.command, env, dataset_id, choices["pipeline"])

if choices["sourceType"] == "file":
self.command.print(
Expand All @@ -102,3 +104,17 @@ def start(self):
)
else:
self.command.print("Done!")


class PipelineCreateWizard:
"""Wizard for the `datasets create-pipeline` command."""

def __init__(self, command, dataset_id):
self.command = command
self.dataset_id = dataset_id

def start(self):
choices = run_questionnaire(*qs_create_pipeline())
_create_pipeline(
self.command, self.command.opt("env"), self.dataset_id, choices["pipeline"]
)
Loading