Skip to content

Commit

Permalink
Merge branch 'issue150-advanced-splitting'
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Sep 20, 2024
2 parents f13edec + 1833cc9 commit 9b65520
Show file tree
Hide file tree
Showing 10 changed files with 1,765 additions and 159 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ The format is roughly based on [Keep a Changelog](https://keepachangelog.com/en/

<!-- start changelog -->

## 0.39.0

- More advanced process-graph splitting for cross-backend execution: not limited to splitting off `load_collection` nodes, but cut deeper into the graph. ([#150](https://github.com/Open-EO/openeo-aggregator/issues/150))

## 0.38.0

- Add request timeout configs for listing user jobs (eu-cdse/openeo-cdse-infra#188)
Expand Down
7 changes: 5 additions & 2 deletions scripts/crossbackend-processing-poc.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from openeo_aggregator.metadata import STAC_PROPERTY_FEDERATION_BACKENDS
from openeo_aggregator.partitionedjobs import PartitionedJob
from openeo_aggregator.partitionedjobs.crossbackend import (
CrossBackendSplitter,
CrossBackendJobSplitter,
LoadCollectionGraphSplitter,
run_partitioned_job,
)

Expand Down Expand Up @@ -62,7 +63,9 @@ def backend_for_collection(collection_id) -> str:
metadata = connection.describe_collection(collection_id)
return metadata["summaries"][STAC_PROPERTY_FEDERATION_BACKENDS][0]

splitter = CrossBackendSplitter(backend_for_collection=backend_for_collection, always_split=True)
splitter = CrossBackendJobSplitter(
graph_splitter=LoadCollectionGraphSplitter(backend_for_collection=backend_for_collection, always_split=True)
)
pjob: PartitionedJob = splitter.split({"process_graph": process_graph})
_log.info(f"Partitioned job: {pjob!r}")

Expand Down
2 changes: 1 addition & 1 deletion src/openeo_aggregator/about.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import sys
from typing import Optional

__version__ = "0.38.0a1"
__version__ = "0.39.0a1"


def log_version_info(logger: Optional[logging.Logger] = None):
Expand Down
101 changes: 74 additions & 27 deletions src/openeo_aggregator/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
streaming_flask_response,
)
from openeo_aggregator.constants import (
CROSSBACKEND_GRAPH_SPLIT_METHOD,
JOB_OPTION_FORCE_BACKEND,
JOB_OPTION_SPLIT_STRATEGY,
JOB_OPTION_TILE_GRID,
Expand All @@ -100,7 +101,11 @@
single_backend_collection_post_processing,
)
from openeo_aggregator.partitionedjobs import PartitionedJob
from openeo_aggregator.partitionedjobs.crossbackend import CrossBackendSplitter
from openeo_aggregator.partitionedjobs.crossbackend import (
CrossBackendJobSplitter,
DeepGraphSplitter,
LoadCollectionGraphSplitter,
)
from openeo_aggregator.partitionedjobs.splitting import FlimsySplitter, TileGridSplitter
from openeo_aggregator.partitionedjobs.tracking import (
PartitionedJobConnection,
Expand Down Expand Up @@ -803,25 +808,34 @@ def create_job(
if "process_graph" not in process:
raise ProcessGraphMissingException()

# TODO: better, more generic/specific job_option(s)?
if job_options and (job_options.get(JOB_OPTION_SPLIT_STRATEGY) or job_options.get(JOB_OPTION_TILE_GRID)):
if job_options.get(JOB_OPTION_SPLIT_STRATEGY) == "crossbackend":
# TODO this is temporary feature flag to trigger "crossbackend" splitting
return self._create_crossbackend_job(
user_id=user_id,
process=process,
api_version=api_version,
metadata=metadata,
job_options=job_options,
)
else:
return self._create_partitioned_job(
user_id=user_id,
process=process,
api_version=api_version,
metadata=metadata,
job_options=job_options,
)
# Coverage of messy "split_strategy" job option
# Also see https://github.com/Open-EO/openeo-aggregator/issues/156
# TODO: more generic and future proof handling of split strategy related options?
split_strategy = (job_options or {}).get(JOB_OPTION_SPLIT_STRATEGY)
tile_grid = (job_options or {}).get(JOB_OPTION_TILE_GRID)
crossbackend_mode = split_strategy == "crossbackend" or (
isinstance(split_strategy, dict) and "crossbackend" in split_strategy
)
# TODO: the legacy job option "tile_grid" is quite generic and not very explicit
# about being a job splitting approach. Can we deprecate this in a way?
spatial_split_mode = tile_grid or split_strategy == "flimsy"

if crossbackend_mode:
return self._create_crossbackend_job(
user_id=user_id,
process=process,
api_version=api_version,
metadata=metadata,
job_options=job_options,
)
elif spatial_split_mode:
return self._create_partitioned_job(
user_id=user_id,
process=process,
api_version=api_version,
metadata=metadata,
job_options=job_options,
)
else:
return self._create_job_standard(
user_id=user_id,
Expand Down Expand Up @@ -936,14 +950,47 @@ def _create_crossbackend_job(
if not self.partitioned_job_tracker:
raise FeatureUnsupportedException(message="Partitioned job tracking is not supported")

def backend_for_collection(collection_id) -> str:
return self._catalog.get_backends_for_collection(cid=collection_id)[0]
split_strategy = (job_options or {}).get(JOB_OPTION_SPLIT_STRATEGY)
if split_strategy == "crossbackend":
# Legacy job option format
graph_split_method = CROSSBACKEND_GRAPH_SPLIT_METHOD.SIMPLE
elif isinstance(split_strategy, dict) and isinstance(split_strategy.get("crossbackend"), dict):
graph_split_method = split_strategy.get("crossbackend", {}).get(
"method", CROSSBACKEND_GRAPH_SPLIT_METHOD.SIMPLE
)
else:
raise ValueError(f"Invalid split strategy {split_strategy!r}")

splitter = CrossBackendSplitter(
backend_for_collection=backend_for_collection,
# TODO: job option for `always_split` feature?
always_split=True,
)
_log.info(f"_create_crossbackend_job: {graph_split_method=} from {split_strategy=}")
if graph_split_method == CROSSBACKEND_GRAPH_SPLIT_METHOD.SIMPLE:

def backend_for_collection(collection_id) -> str:
return self._catalog.get_backends_for_collection(cid=collection_id)[0]

graph_splitter = LoadCollectionGraphSplitter(
backend_for_collection=backend_for_collection,
# TODO: job option for `always_split` feature?
always_split=True,
)
elif graph_split_method == CROSSBACKEND_GRAPH_SPLIT_METHOD.DEEP:

def supporting_backends(node_id: str, node: dict) -> Union[List[str], None]:
# TODO: wider coverage checking process id availability
if node["process_id"] == "load_collection":
collection_id = node["arguments"]["id"]
return self._catalog.get_backends_for_collection(cid=collection_id)

graph_splitter = DeepGraphSplitter(
supporting_backends=supporting_backends,
primary_backend=split_strategy.get("crossbackend", {}).get("primary_backend"),
# TODO: instead of this hardcoded deny-list, build it based on backend metadata inspection?
# TODO: make a config for this?
split_deny_list={"aggregate_spatial", "load_geojson", "load_url"},
)
else:
raise ValueError(f"Invalid graph split strategy {graph_split_method!r}")

splitter = CrossBackendJobSplitter(graph_splitter=graph_splitter)

pjob_id = self.partitioned_job_tracker.create_crossbackend_pjob(
user_id=user_id, process=process, metadata=metadata, job_options=job_options, splitter=splitter
Expand Down
6 changes: 6 additions & 0 deletions src/openeo_aggregator/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,9 @@

# Experimental feature to force a certain upstream back-end through job options
JOB_OPTION_FORCE_BACKEND = "_agg_force_backend"


class CROSSBACKEND_GRAPH_SPLIT_METHOD:
# Poor-man's StrEnum
SIMPLE = "simple"
DEEP = "deep"
Loading

0 comments on commit 9b65520

Please sign in to comment.