Skip to content

Commit

Permalink
Bug in chunked parameters (#143)
Browse files Browse the repository at this point in the history
* fix: minor bugs with on faliure behaviours

* fix: minor bugs with on faliure behaviours

* fix: adding secrets to sdk

* docs: adding more examples

* docs: still working through

* fix: map variable and parametrs
  • Loading branch information
vijayvammi authored Apr 23, 2024
1 parent 4884a46 commit 0a780fa
Show file tree
Hide file tree
Showing 15 changed files with 102 additions and 39 deletions.
2 changes: 1 addition & 1 deletion examples/Dockerfile.39
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ WORKDIR /app
RUN poetry config repositories.FPHO https://files.pythonhosted.org \
&& poetry config certificates.FPHO.cert false

RUN poetry install --all-extras --without dev --without tutorial
RUN poetry install --all-extras --without dev,docs,binary,tutorial,perf,release && poetry cache clear --all .
24 changes: 17 additions & 7 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ Please use this as an index to find specific example.
- [scripts.py](./01-tasks/scripts.py), [scripts.yaml](./01-tasks/scripts.yaml): uses shell scripts as tasks
The stdout/stderr of all scripts are captured and stored in the catalog.

---


The above examples showcase executable units of the pipeline.
The next section has examples on stitching these tasks together for complex operations.
Expand All @@ -30,23 +32,31 @@ The next section has examples on stitching these tasks together for complex oper
- [on_failure_success.py](./02-sequential/on_failure_succeed.py), [on_failure_success.yaml](./02-sequential/on_failure_succeed.yaml): On failure of a step, take a different route


---

The above examples show stitching complex operations of the pipeline.
The next section has examples on communicating between tasks during execution.

- 03: Examples of passing parameters between tasks of a pipeline.

Guidelines:
Below table summarizes the input/output types of different task types. For ex: notebooks can only take JSON serializable
parameters as input but can return json/pydantic/objects. Any python object that could be serialized using "dill" can be used.

- python functions can get/set simple python data types, pydantic models, objects marked as pickled. Some of the
simple data types can also be marked as a metric.
-
| | Input | Output |
| -------- | :---------------------: | :----------------------: |
| python | json,pydantic, object | json, pydantic, object |
| notebook | json | json, pydantic, object |
| shell | json | json |


- [static_parameters_python.py](./03-parameters/static_parameters_python.py), [static_parameters_python.yaml](./03-parameters/static_parameters_python.yaml): A pipeline to show the access of static or known parameters by python tasks.

- [static_parameters_non_python.py](./03-parameters/static_parameters_non_python.py), [static_parameters_non_python.yaml](./03-parameters/static_parameters_non_python.yaml): A pipeline to show the access of static or known parameters by python tasks.

- [passing_parameters_python.py](./03-parameters/passing_parameters_python.py), [passing_parameters_python.yaml](./03-parameters/passing_parameters_python.yaml): shows the mechanism of passing parameters (simple python datatypes, "dillable" objects, pydantic models) and registering metrics between python tasks.
- [passing_parameters_python.py](./03-parameters/passing_parameters_python.py), [passing_parameters_python.yaml](./03-parameters/passing_parameters_python.yaml): shows the mechanism of passing parameters (simple python datatypes, objects, pydantic models) and registering metrics between python tasks.

- [passing_parameters_notebook.py](./03-parameters/passing_parameters_notebook.py), [passing_parameters_notebook.yaml](./03-parameters/passing_parameters_notebook.yaml): shows the mechanism of passing parameters between notebook tasks. Please note that
we cannot inject pydantic models or objects into the notebook.

- [passing_parameters_notebook.py](./03-parameters/passing_parameters_notebook.py), [passing_parameters_notebook.yaml](./03-parameters/passing_parameters_notebook.yaml): shows the mechanism of passing parameters (simple python datatypes, "dillable" objects, pydantic models) and registering metrics between tasks. runnable can "get" object
parameters from notebooks but cannot inject them into notebooks.
- [passing_parameters_shell.py](./03-parameters/passing_parameters_shell.py), [passing_parameters_shell.yaml](./03-parameters/passing_parameters_shell.yaml): shows the mechanism of passing parameters between shell tasks. Please note that
we cannot inject pydantic models or objects into shells.
7 changes: 4 additions & 3 deletions examples/concepts/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ def process_chunk(stride: int, start_index: int):
The files between the start_index and the start_index + stride
are processed per chunk.
"""
print("stride", stride, type(stride))
print("start_index", start_index, type(start_index))
for i in range(start_index, start_index + stride, stride):
pass

return stride * start_index


def main():
"""
Expand All @@ -56,6 +56,7 @@ def main():
execute = PythonTask(
name="execute",
function=process_chunk,
returns=["me"],
terminate_with_success=True,
)

Expand All @@ -76,7 +77,7 @@ def main():

pipeline = Pipeline(steps=[generate, iterate_and_execute], add_terminal_nodes=True)

_ = pipeline.execute(configuration_file="examples/configs/fs-catalog-run_log.yaml")
_ = pipeline.execute(configuration_file="examples/configs/fs-catalog-chunked_run_log.yaml")

return pipeline

Expand Down
4 changes: 2 additions & 2 deletions examples/configs/argo-config-full.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ executor:
image: $argo_docker_image
max_workflow_duration_in_seconds: 86400 # Apply to spec
node_selector:
parallelism: 0 #apply to spec
parallelism: 1 #apply to spec
service_account_name: pipeline-runner
resources:
limits:
Expand Down Expand Up @@ -45,4 +45,4 @@ run_log_store: # (5)
catalog: # (5)
type: file-system
config:
log_folder: /mnt/catalog # (6)
catalog_location: /mnt/catalog # (6)
12 changes: 7 additions & 5 deletions examples/configs/argo-config.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
executor:
type: "argo" # (1)
config:
image: runnable:demo # (2)
image: harbor.csis.astrazeneca.net/mlops/runnable:latest # (2)
service_account_name: default-editor
persistent_volumes: # (3)
- name: runnable-volume
- name: magnus-volume
mount_path: /mnt

run_log_store: # (4)
Expand All @@ -13,7 +13,9 @@ run_log_store: # (4)
log_folder: /mnt/run_log_store

catalog:
type: do-nothing
type: file-system
config:
catalog_location: /mnt/catalog

secrets:
type: do-nothing
# secrets:
# type: do-nothing
13 changes: 8 additions & 5 deletions examples/tutorials/mnist/hyper_parameter_tuning.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ def convert_to_categorically(y_train: np.ndarray, y_test: np.ndarray, num_classe
return y_train, y_test


def build_model(train_params: TrainParams, hp: List[int], num_classes: int):
def build_model(train_params: TrainParams, hpt_id: int, hpt: List[List[int]], num_classes: int):
hp = hpt[hpt_id]
hp_id = "_".join(map(str, hp))
print(hp_id)

Expand Down Expand Up @@ -81,7 +82,8 @@ def build_model(train_params: TrainParams, hp: List[int], num_classes: int):
model.save(f"model{hp_id}.keras")


def train_model(x_train: np.ndarray, y_train: np.ndarray, train_params: TrainParams, hp: List[int]):
def train_model(x_train: np.ndarray, y_train: np.ndarray, hpt_id: int, train_params: TrainParams, hpt: List[List[int]]):
hp = hpt[hpt_id]
hp_id = "_".join(map(str, hp))
model = keras.models.load_model(f"model{hp_id}.keras")
model.compile(loss=train_params.loss, optimizer=train_params.optimizer, metrics=train_params.metrics)
Expand All @@ -97,7 +99,8 @@ def train_model(x_train: np.ndarray, y_train: np.ndarray, train_params: TrainPar
model.save(f"trained_model{hp_id}.keras")


def evaluate_model(x_test: np.ndarray, y_test: np.ndarray, hp: List[int]):
def evaluate_model(x_test: np.ndarray, y_test: np.ndarray, hpt: List[List[int]], hpt_id: int):
hp = hpt[hpt_id]
hp_id = "_".join(map(str, hp))
trained_model = keras.models.load_model(f"trained_model{hp_id}.keras")

Expand Down Expand Up @@ -161,8 +164,8 @@ def main():
hpt_step = Map(
name="hpt",
branch=train_pipeline,
iterate_on="hpt",
iterate_as="hp",
iterate_on="hpt_ids",
iterate_as="hpt_id",
reducer="lambda *x: max(x, key=lambda x: x[1])",
terminate_with_success=True,
)
Expand Down
5 changes: 5 additions & 0 deletions examples/tutorials/mnist/parameters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ baseline_params:

metrics: ['accuracy']

hpt_ids:
- 0
- 1


hpt:
- [16, 32]
- [16, 64]
Expand Down
2 changes: 1 addition & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ matplotlib = "^3.8.3"
[tool.poetry.group.release.dependencies]
python-semantic-release = "^9.4.2"


[tool.poetry.group.examples.dependencies]
pandas = "^2.2.2"

[tool.poetry.extras]
docker = ['docker']
notebook = ['ploomber-engine']
Expand Down
6 changes: 4 additions & 2 deletions runnable/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,10 @@ def get_summary(self) -> Dict[str, Any]:
summary["Catalog Location"] = _context.catalog_handler.get_summary()
summary["Full Run log present at: "] = _context.run_log_store.get_summary()

summary["Final Parameters"] = {p: v.description for p, v in self.parameters.items()}
summary["Collected metrics"] = {p: v.description for p, v in self.parameters.items() if v.kind == "metric"}
run_log = _context.run_log_store.get_run_log_by_id(run_id=_context.run_id, full=True)

summary["Final Parameters"] = {p: v.description for p, v in run_log.parameters.items()}
summary["Collected metrics"] = {p: v.description for p, v in run_log.parameters.items() if v.kind == "metric"}

return summary

Expand Down
2 changes: 1 addition & 1 deletion runnable/extensions/catalog/file_system/implementation.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ def sync_between_runs(self, previous_run_id: str, run_id: str):
for cataloged_file in cataloged_files:
if str(cataloged_file).endswith("execution.log"):
continue
print(cataloged_file.name)

if cataloged_file.is_file():
shutil.copy(cataloged_file, run_catalog / cataloged_file.name)
else:
Expand Down
8 changes: 4 additions & 4 deletions runnable/extensions/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ def fan_out(self, map_variable: TypeMapVariable = None, **kwargs):
for _, v in map_variable.items():
for branch_return in self.branch_returns:
param_name, param_type = branch_return
raw_parameters[f"{param_name}_{v}"] = param_type.copy()
raw_parameters[f"{v}_{param_name}"] = param_type.copy()
else:
for branch_return in self.branch_returns:
param_name, param_type = branch_return
Expand Down Expand Up @@ -606,9 +606,9 @@ def fan_in(self, map_variable: TypeMapVariable = None, **kwargs):
param_name, _ = branch_return
to_reduce = []
for iter_variable in iterate_on:
to_reduce.append(params[f"{param_name}_{iter_variable}"].get_value())
to_reduce.append(params[f"{iter_variable}_{param_name}"].get_value())

param_name = f"{param_name}_{v}"
param_name = f"{v}_{param_name}"
params[param_name].value = reducer_f(to_reduce)
params[param_name].reduced = True
else:
Expand All @@ -617,7 +617,7 @@ def fan_in(self, map_variable: TypeMapVariable = None, **kwargs):

to_reduce = []
for iter_variable in iterate_on:
to_reduce.append(params[f"{param_name}_{iter_variable}"].get_value())
to_reduce.append(params[f"{iter_variable}_{param_name}"].get_value())

params[param_name].value = reducer_f(*to_reduce)
params[param_name].reduced = True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ def get_matches(self, run_id: str, name: str, multiple_allowed: bool = False) ->
name (str): The suffix of the file name to check in the run log store.
"""
log_folder = self.log_folder_with_run_id(run_id=run_id)

sub_name = Template(name).safe_substitute({"creation_time": ""})

matches = list(log_folder.glob(f"{sub_name}*"))

if matches:
if not multiple_allowed:
if len(matches) > 1:
Expand Down
26 changes: 22 additions & 4 deletions runnable/extensions/run_log_store/generic_chunked.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,16 @@
from typing import Any, Dict, Optional, Sequence, Union

from runnable import defaults, exceptions
from runnable.datastore import BaseRunLogStore, BranchLog, RunLog, StepLog
from runnable.datastore import (
BaseRunLogStore,
BranchLog,
JsonParameter,
MetricParameter,
ObjectParameter,
Parameter,
RunLog,
StepLog,
)

logger = logging.getLogger(defaults.LOGGER_NAME)

Expand Down Expand Up @@ -164,7 +173,9 @@ def retrieve(self, run_id: str, log_type: LogTypes, name: str = "", multiple_all
raise Exception(f"Name is required during retrieval for {log_type}")

naming_pattern = self.naming_pattern(log_type=log_type, name=name)

matches = self.get_matches(run_id=run_id, name=naming_pattern, multiple_allowed=multiple_allowed)

if matches:
if not multiple_allowed:
contents = self._retrieve(name=matches) # type: ignore
Expand Down Expand Up @@ -370,10 +381,17 @@ def get_parameters(self, run_id: str, **kwargs) -> dict:
Raises:
RunLogNotFoundError: If the run log for run_id is not found in the datastore
"""
parameters = {}
parameters: Dict[str, Parameter] = {}
try:
parameters_list = self.retrieve(run_id=run_id, log_type=self.LogTypes.PARAMETER, multiple_allowed=True)
parameters = {key: value for param in parameters_list for key, value in param.items()}
for param in parameters_list:
for key, value in param.items():
if value["kind"] == "json":
parameters[key] = JsonParameter(**value)
if value["kind"] == "metric":
parameters[key] = MetricParameter(**value)
if value["kind"] == "object":
parameters[key] = ObjectParameter(**value)
except EntityNotFoundError:
# No parameters are set
pass
Expand Down Expand Up @@ -401,7 +419,7 @@ def set_parameters(self, run_id: str, parameters: dict, **kwargs):
self.store(
run_id=run_id,
log_type=self.LogTypes.PARAMETER,
contents={key: value},
contents={key: value.model_dump(by_alias=True)},
name=key,
)

Expand Down
24 changes: 21 additions & 3 deletions runnable/tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import contextlib
import copy
import importlib
import io
import json
Expand Down Expand Up @@ -99,6 +100,20 @@ def execute_command(
"""
raise NotImplementedError()

def _diff_parameters(
self, parameters_in: Dict[str, Parameter], context_params: Dict[str, Parameter]
) -> Dict[str, Parameter]:
diff: Dict[str, Parameter] = {}
for param_name, param in context_params.items():
if param_name in parameters_in:
if parameters_in[param_name] != param:
diff[param_name] = param
continue

diff[param_name] = param

return diff

@contextlib.contextmanager
def expose_secrets(self):
"""Context manager to expose secrets to the execution.
Expand Down Expand Up @@ -128,7 +143,7 @@ def execution_context(self, map_variable: TypeMapVariable = None, allow_complex:
if param.reduced is False:
context_param = param_name
for _, v in map_variable.items(): # type: ignore
context_param = f"{context_param}_{v}"
context_param = f"{v}_{context_param}"

if context_param in params:
params[param_name].value = params[context_param].value
Expand All @@ -147,6 +162,8 @@ def execution_context(self, map_variable: TypeMapVariable = None, allow_complex:

log_file = open(log_file_name, "w")

parameters_in = copy.deepcopy(params)

f = io.StringIO()
try:
with contextlib.redirect_stdout(f):
Expand All @@ -168,7 +185,8 @@ def execution_context(self, map_variable: TypeMapVariable = None, allow_complex:

# Update parameters
# This should only update the parameters that are changed at the root level.
self._context.run_log_store.set_parameters(parameters=params, run_id=self._context.run_id)
diff_parameters = self._diff_parameters(parameters_in=parameters_in, context_params=params)
self._context.run_log_store.set_parameters(parameters=diff_parameters, run_id=self._context.run_id)


def task_return_to_parameter(task_return: TaskReturns, value: Any) -> Parameter:
Expand Down Expand Up @@ -259,7 +277,7 @@ def execute_command(
param_name = task_return.name
if map_variable:
for _, v in map_variable.items():
param_name = f"{param_name}_{v}"
param_name = f"{v}_{param_name}"

output_parameters[param_name] = output_parameter

Expand Down

0 comments on commit 0a780fa

Please sign in to comment.