Skip to content

Commit

Permalink
Merge pull request #773 from revit13/super131
Browse files Browse the repository at this point in the history
Modify superpipeline params type.
  • Loading branch information
roytman authored Nov 7, 2024
2 parents a4e4f3d + f7c0548 commit 0c52678
Show file tree
Hide file tree
Showing 16 changed files with 45 additions and 67 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
REPOROOT=../../
REPOROOT=../../../
# Use make help, to see the available rules
include ${REPOROOT}/.make.defaults

Expand All @@ -16,11 +16,11 @@ else
endif

.PHONY: workflow-build
setup:
workflow-build:
ifeq ($(KFPv2), 1)
echo "Skipping build as KFPv2 is defined"
else
$(MAKE) -C ray/kfp_v1 setup
$(MAKE) -C ray/kfp_v1 workflow-build
endif

.PHONY: workflow-test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
REPOROOT=${CURDIR}/../../../..
REPOROOT=${CURDIR}/../../../../../
WORKFLOW_VENV_ACTIVATE=${REPOROOT}/transforms/venv/bin/activate
include $(REPOROOT)/transforms/.make.workflows

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
# empty comment to triigger pre-commit
# Components
# For every sub workflow we need a separate components, that knows about this subworkflow.
component_spec_path = "../../../kfp_ray_components/"
component_spec_path = "../../../../../kfp/kfp_ray_components/"
run_code_to_parquet_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
run_code_quality_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
run_malware_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
run_license_check_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
run_license_select_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
run_header_cleanser_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
run_proglang_select_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
run_doc_id_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
Expand All @@ -35,7 +35,7 @@
proglang_select_image = "quay.io/dataprep1/data-prep-kit/proglang_select-ray:latest"
code_quality_image = "quay.io/dataprep1/data-prep-kit/code_quality-ray:latest"
malware_image = "quay.io/dataprep1/data-prep-kit/malware-ray:latest"
license_check_image = "quay.io/dataprep1/data-prep-kit/license_check-ray:latest"
license_select_image = "quay.io/dataprep1/data-prep-kit/license_select-ray:latest"
header_cleanser_image = "quay.io/dataprep1/data-prep-kit/header-cleanser-ray:latest"
doc_id_image = "quay.io/dataprep1/data-prep-kit/doc_id-ray:latest"
ededup_image = "quay.io/dataprep1/data-prep-kit/ededup-ray:latest"
Expand All @@ -50,27 +50,27 @@
)
def sample_code_ray_orchestrator(
# the super pipeline parameters
p1_orch_code_to_parquet_name: str = "code_2_parquet_wf",
p1_orch_code_to_parquet_name: str = "code2parquet_wf",
p1_orch_code_quality_name: str = "code_quality_wf",
p1_orch_malware_name: str = "malware_wf",
p1_orch_license_check_name: str = "license_check_wf",
p1_orch_license_select_name: str = "license_select_wf",
p1_orch_header_cleanser_name: str = "header_cleanser_wf",
p1_orch_proglang_select_name: str = "proglang_select_wf",
p1_orch_doc_id_name: str = "doc_id_wf",
p1_orch_exact_dedup_name: str = "ededup_wf",
p1_orch_fuzzy_dedup_name: str = "fdedup_wf",
p1_orch_tokenization_wf_name: str = "tokenization_wf",
p2_pipeline_runtime_pipeline_id: str = "pipeline_id",
p2_pipeline_ray_head_options: str = '{"cpu": 1, "memory": 4, "image_pull_secret": ""}',
p2_pipeline_ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image_pull_secret": ""}',
p2_pipeline_ray_head_options: dict = {"cpu": 1, "memory": 4, "image_pull_secret": ""},
p2_pipeline_ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image_pull_secret": ""},
p2_pipeline_server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888",
p2_pipeline_input_parent_path: str = "test/code2parquet/output/",
p2_pipeline_input_parent_path: str = "test/code2parquet/input/",
p2_pipeline_output_parent_path: str = "test/super/output/",
p2_pipeline_parent_path_suffix: str = "",
p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
p2_pipeline_data_s3_access_secret: str = "s3-secret",
p2_pipeline_runtime_code_location: str = '{"github": "github", "commit_hash": "12345", "path": "path"}',
p2_pipeline_runtime_actor_options: str = '{"num_cpus": 0.8}',
p2_pipeline_runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'},
p2_pipeline_runtime_actor_options: dict = {'num_cpus': 0.7},
p2_pipeline_data_max_files: int = -1,
p2_pipeline_data_num_samples: int = -1,
# code to parquet step parameters
Expand Down Expand Up @@ -186,16 +186,16 @@ def sample_code_ray_orchestrator(
+ malware_image
+ '"}}',
# license check step parameters
p10_name: str = "license_check",
p10_name: str = "license_select",
p10_skip: bool = False,
p10_lc_license_column_name: str = "license",
p10_lc_licenses_file: str = "test/license_check/sample_approved_licenses.json",
p10_lc_licenses_file: str = "test/license_select/sample_approved_licenses.json",
# orchestrator
# overriding parameters
p10_overriding_params: str = '{"ray_worker_options": {"image": "'
+ license_check_image
+ license_select_image
+ '"}, "ray_head_options": {"image": "'
+ license_check_image
+ license_select_image
+ '"}}',
# header cleanser step parameters
p11_name: str = "header_cleanser",
Expand Down Expand Up @@ -246,7 +246,7 @@ def _set_component(op: dsl.BaseOp, displaied_name: str, prev_op: dsl.BaseOp = No
op.after(prev_op)

# code to parquet deduplication
code_to_parquet = run_exact_dedup_op(
code_to_parquet = run_code_to_parquet_op(
name=p1_orch_code_to_parquet_name,
prefix="p3_",
params=args,
Expand Down Expand Up @@ -298,25 +298,25 @@ def _set_component(op: dsl.BaseOp, displaied_name: str, prev_op: dsl.BaseOp = No
_set_component(malware, "malware", code_quality)

# license check
license_check = run_license_check_op(
name=p1_orch_license_check_name, prefix="p10_", params=args, host=orch_host, input_folder=malware.output
license_select = run_license_select_op(
name=p1_orch_license_select_name, prefix="p10_", params=args, host=orch_host, input_folder=malware.output
)
_set_component(license_check, "license_check", malware)
_set_component(license_select, "license_select", malware)

# header cleanser
header_cleanser = run_header_cleanser_op(
name=p1_orch_header_cleanser_name,
prefix="p11_",
params=args,
host=orch_host,
input_folder=license_check.output,
input_folder=license_select.output,
)
_set_component(header_cleanser, "header_cleanser", license_check)
_set_component(header_cleanser, "header_cleanser", license_select)

# tokenization
tokenization = run_tokenization_op(
name=p1_orch_tokenization_wf_name,
prefix="p10_",
prefix="p12_",
params=args,
host=orch_host,
input_folder=header_cleanser.output,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@
import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
from kfp_support.workflow_support.runtime_utils import ONE_WEEK_SEC

from workflow_support.compile_utils import ONE_WEEK_SEC

# Components
# path to kfp component specifications files
component_spec_path = "../../../kfp_ray_components/"
component_spec_path = "../../../../../kfp/kfp_ray_components/"
# For every sub workflow we need a separate components, that knows about this subworkflow.
run_doc_id_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
run_exact_dedup_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
Expand All @@ -39,16 +38,16 @@ def sample_ray_orchestrator(
p1_orch_exact_dedup_name: str = "ededup_wf",
p1_orch_fuzzy_dedup_name: str = "fdedup_wf",
p2_pipeline_runtime_pipeline_id: str = "pipeline_id",
p2_pipeline_ray_head_options: str = '{"cpu": 1, "memory": 4, "image_pull_secret": ""}',
p2_pipeline_ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image_pull_secret": ""}',
p2_pipeline_ray_head_options: dict = {"cpu": 1, "memory": 4, "image_pull_secret": ""},
p2_pipeline_ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image_pull_secret": ""},
p2_pipeline_server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888",
p2_pipeline_input_parent_path: str = "test/doc_id/input/",
p2_pipeline_output_parent_path: str = "test/super/output/",
p2_pipeline_parent_path_suffix: str = "",
p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
p2_pipeline_data_s3_access_secret: str = "s3-secret",
p2_pipeline_runtime_code_location: str = '{"github": "github", "commit_hash": "12345", "path": "path"}',
p2_pipeline_runtime_actor_options: str = '{"num_cpus": 0.8}',
p2_pipeline_runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'},
p2_pipeline_runtime_actor_options: dict = {'num_cpus': 0.7},
# data access.
p2_pipeline_data_max_files: int = -1,
p2_pipeline_data_num_samples: int = -1,
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Another useful feature of the KFP v2 is the `Json` editor for the `dict` type in

### How to compile the superpipeline
```
cd kfp/superworkflows/ray/kfp_v2/
cd examples/kfp/superworkflows/ray/kfp_v2/
make clean
export KFPv2=1
export PYTHONPATH=../../../../transforms
Expand Down
4 changes: 2 additions & 2 deletions kfp/doc/multi_transform_pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ The sections that follow display two super pipelines as examples:

### Dedups super pipeline <a name = "dedups"></a>

This pipeline combines several transforms, `doc_id`, `ededup`, and `fdedup`, can be found in [superworkflow_dedups_sample_wf.py](../superworkflows/ray/kfp_v1/superworkflow_dedups_sample_wf.py).
This pipeline combines several transforms, `doc_id`, `ededup`, and `fdedup`, can be found in [superworkflow_dedups_sample_wf.py](../../examples/kfp/superworkflows/ray/kfp_v1/superworkflow_dedups_sample_wf.py).

![super pipeline](super_pipeline.png)

The input parameters of the super pipelines are described in this [section](#super-pipeline-Input-Parameters).

### Programming languages super pipeline <a name = "code"></a>

This pipeline combines transforms for programming languages data preprocessing: `ededup`, `doc_id`, `fdedup`, `proglang_select`, `code_quality`, `malware` and `tokenization`. It can be found in [superworkflow_code_wf.py](../superworkflows/ray/kfp_v1/superworkflow_code_sample_wf.py).
This pipeline combines transforms for programming languages data preprocessing: `ededup`, `doc_id`, `fdedup`, `proglang_select`, `code_quality`, `malware` and `tokenization`. It can be found in [superworkflow_code_wf.py](../../examples/kfp/superworkflows/ray/kfp_v1/superworkflow_code_sample_wf.py).

![super pipeline](super-code-pipeline.png)

Expand Down
6 changes: 1 addition & 5 deletions kfp/kfp_ray_components/src/subworkflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def _skip_task(prms) -> bool:
input_folder = input_folder.replace('"', "'")
output_folder = output_folder.replace('"', "'")
data_s3_config = {"input_folder": input_folder, "output_folder": output_folder}
prm["data_s3_config"] = data_s3_config
prm["data_s3_config"] = ParamsUtils.convert_to_ast(data_s3_config)
# Check if to skip preprocessing
if _skip_task(prm):
print("skipped preprocess step")
Expand All @@ -150,10 +150,6 @@ def _skip_task(prms) -> bool:

_remove_unused_params(prm)

for key, value in prm.items():
if isinstance(value, dict):
prm[key] = ParamsUtils.convert_to_ast(value)

print(f"start pipeline {name} with parameters {prm}")

utils = PipelinesUtils(host="http://ml-pipeline:8888")
Expand Down
2 changes: 1 addition & 1 deletion scripts/check-workflows.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ if [ ! -d transforms ]; then
echo Please run this script from the top of the repository
exit 1
fi
KFP_BLACK_LIST="doc_chunk pdf2parquet pii_redactor text_encoder license_select repo_level_ordering"
KFP_BLACK_LIST="doc_chunk pdf2parquet pii_redactor text_encoder license_select repo_level_ordering header_cleanser"
while [ $# -ne 0 ]; do
case $1 in
-show-kfp-black-list) echo $KFP_BLACK_LIST; exit 0;
Expand Down
35 changes: 9 additions & 26 deletions transforms/code/repo_level_ordering/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -48,41 +48,24 @@ load-image::

.PHONY: workflow-venv
workflow-venv:
@is_blacklisted=$$(cd $(REPOROOT); bash scripts/check-workflows.sh -show-kfp-black-list | grep $(TRANSFORM_NAME)); \
if [ -z "$$is_blacklisted" ]; \
then \
echo $(MAKE) -C kfp_ray $@ ; \
else \
echo "Skipping KFP workflow: Transform is blacklisted " ; \
if [ -e kfp_ray ]; then \
$(MAKE) -C kfp_ray workflow-venv; \
fi

.PHONY: workflow-test
workflow-test:
@is_blacklisted=$$(cd $(REPOROOT); bash scripts/check-workflows.sh -show-kfp-black-list | grep $(TRANSFORM_NAME)); \
if [ -z "$$is_blacklisted" ]; \
then \
echo $(MAKE) -C kfp_ray $@ ; \
else \
echo "Skipping KFP workflow: Transform is blacklisted " ; \
if [ -e kfp_ray ]; then \
$(MAKE) -C kfp_ray workflow-test; \
fi

.PHONY: workflow-upload
workflow-upload:
@is_blacklisted=$$(cd $(REPOROOT); bash scripts/check-workflows.sh -show-kfp-black-list | grep $(TRANSFORM_NAME)); \
if [ -z "$$is_blacklisted" ]; \
then \
echo $(MAKE) -C kfp_ray $@ ; \
else \
echo "Skipping KFP workflow: Transform is blacklisted " ; \
if [ -e kfp_ray ]; then \
$(MAKE) -C kfp_ray workflow-upload; \
fi

.PHONY: workflow-build
workflow-build:
is_blacklisted=$$(cd $(REPOROOT); bash scripts/check-workflows.sh -show-kfp-black-list | grep $(TRANSFORM_NAME)); \
if [ -z "$$is_blacklisted" ]; \
then \
echo $(MAKE) -C kfp_ray $@ ; \
else \
echo "Skipping KFP workflow: Transform is blacklisted " ; \
if [ -e kfp_ray ]; then \
$(MAKE) -C kfp_ray workflow-build; \
fi

0 comments on commit 0c52678

Please sign in to comment.