From d35c72d7eefd8275d116284cf123b6277d5e27c2 Mon Sep 17 00:00:00 2001 From: Darius Couchard Date: Wed, 3 Jul 2024 14:17:15 +0200 Subject: [PATCH 01/13] Fixed load_collection parameters for S1, improved S1 catalogue query on CDSE and added CDSE_STAGING backend option in fetching --- src/openeo_gfmap/fetching/generic.py | 10 ++++++++++ src/openeo_gfmap/fetching/s1.py | 4 +--- src/openeo_gfmap/manager/job_splitters.py | 2 +- src/openeo_gfmap/utils/catalogue.py | 22 ++++++++++++++++++++-- 4 files changed, 32 insertions(+), 6 deletions(-) diff --git a/src/openeo_gfmap/fetching/generic.py b/src/openeo_gfmap/fetching/generic.py index f81744c..4113aee 100644 --- a/src/openeo_gfmap/fetching/generic.py +++ b/src/openeo_gfmap/fetching/generic.py @@ -117,6 +117,10 @@ def generic_default_processor(cube: openeo.DataCube, **params): "fetch": partial(_get_generic_fetcher, collection_name="AGERA5"), "preprocessor": partial(_get_generic_processor, collection_name="AGERA5"), }, + Backend.CDSE_STAGING: { + "fetch": partial(_get_generic_fetcher, collection_name="AGERA5"), + "preprocessor": partial(_get_generic_processor, collection_name="AGERA5"), + }, Backend.FED: { "fetch": partial(_get_generic_fetcher, collection_name="AGERA5"), "preprocessor": partial(_get_generic_processor, collection_name="AGERA5"), @@ -135,6 +139,12 @@ def generic_default_processor(cube: openeo.DataCube, **params): _get_generic_processor, collection_name="COPERNICUS_30" ), }, + Backend.CDSE_STAGING: { + "fetch": partial(_get_generic_fetcher, collection_name="COPERNICUS_30"), + "preprocessor": partial( + _get_generic_processor, collection_name="COPERNICUS_30" + ), + }, Backend.FED: { "fetch": partial(_get_generic_fetcher, collection_name="COPERNICUS_30"), "preprocessor": partial( diff --git a/src/openeo_gfmap/fetching/s1.py b/src/openeo_gfmap/fetching/s1.py index 6081d40..97d6fc2 100644 --- a/src/openeo_gfmap/fetching/s1.py +++ b/src/openeo_gfmap/fetching/s1.py @@ -67,8 +67,6 @@ def s1_grd_fetch_default( """ bands = convert_band_names(bands, BASE_SENTINEL1_GRD_MAPPING) - load_collection_parameters = params.get("load_collection", {}) - cube = _load_collection( connection, bands, @@ -76,7 +74,7 @@ def s1_grd_fetch_default( spatial_extent, temporal_extent, fetch_type, - **load_collection_parameters, + **params, ) if fetch_type is not FetchType.POINT and isinstance(spatial_extent, GeoJSON): diff --git a/src/openeo_gfmap/manager/job_splitters.py b/src/openeo_gfmap/manager/job_splitters.py index 6a8381c..7827e13 100644 --- a/src/openeo_gfmap/manager/job_splitters.py +++ b/src/openeo_gfmap/manager/job_splitters.py @@ -61,7 +61,7 @@ def split_job_s2grid( raise ValueError("The GeoDataFrame must contain a CRS") polygons = polygons.to_crs(epsg=4326) - if polygons.geometry.geom_type[0] != "Point": + if polygons.geometry.iloc[0].geom_type[0] != "Point": polygons["geometry"] = polygons.geometry.centroid # Dataset containing all the S2 tiles, find the nearest S2 tile for each point diff --git a/src/openeo_gfmap/utils/catalogue.py b/src/openeo_gfmap/utils/catalogue.py index 8199de1..6ee20e5 100644 --- a/src/openeo_gfmap/utils/catalogue.py +++ b/src/openeo_gfmap/utils/catalogue.py @@ -1,9 +1,12 @@ """Functionalities to interract with product catalogues.""" +from typing import Optional + import requests from geojson import GeoJSON from pyproj.crs import CRS from rasterio.warp import transform_bounds +from requests import adapters from shapely import unary_union from shapely.geometry import box, shape @@ -15,6 +18,20 @@ TemporalContext, ) +request_sessions: Optional[requests.Session] = None + + +def _request_session() -> requests.Session: + global request_sessions + + if request_sessions is None: + request_sessions = requests.Session() + retries = adapters.Retry( + total=5, backoff_factor=1, status_forcelist=[500, 502, 503, 504] + ) + request_sessions.mount("https://", adapters.HTTPAdapter(max_retries=retries)) + return request_sessions + class UncoveredS1Exception(Exception): """Exception raised when there is no product available to fully cover spatially a given @@ -48,13 +65,14 @@ def _query_cdse_catalogue( url = ( f"https://catalogue.dataspace.copernicus.eu/resto/api/collections/" f"{collection}/search.json?box={minx},{miny},{maxx},{maxy}" - f"&sortParam=startDate&maxRecords=100" + f"&sortParam=startDate&maxRecords=1000&polarisation=VV%26VH" f"&dataset=ESA-DATASET&startDate={start_date}&completionDate={end_date}" ) for key, value in additional_parameters.items(): url += f"&{key}={value}" - response = requests.get(url) + session = _request_session() + response = session.get(url, timeout=60) if response.status_code != 200: raise Exception( From 680499ee4ee98a6c9d941685e00ef57aafffee7e Mon Sep 17 00:00:00 2001 From: Darius Couchard Date: Thu, 4 Jul 2024 15:20:09 +0200 Subject: [PATCH 02/13] Fixed polarization issue in the load_collection tests --- src/openeo_gfmap/manager/job_splitters.py | 3 +-- tests/test_openeo_gfmap/test_s1_fetchers.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/openeo_gfmap/manager/job_splitters.py b/src/openeo_gfmap/manager/job_splitters.py index 7827e13..417864b 100644 --- a/src/openeo_gfmap/manager/job_splitters.py +++ b/src/openeo_gfmap/manager/job_splitters.py @@ -61,8 +61,7 @@ def split_job_s2grid( raise ValueError("The GeoDataFrame must contain a CRS") polygons = polygons.to_crs(epsg=4326) - if polygons.geometry.iloc[0].geom_type[0] != "Point": - polygons["geometry"] = polygons.geometry.centroid + polygons["geometry"] = polygons.geometry.centroid # Dataset containing all the S2 tiles, find the nearest S2 tile for each point s2_grid = load_s2_grid() diff --git a/tests/test_openeo_gfmap/test_s1_fetchers.py b/tests/test_openeo_gfmap/test_s1_fetchers.py index c36c099..70ee4b9 100644 --- a/tests/test_openeo_gfmap/test_s1_fetchers.py +++ b/tests/test_openeo_gfmap/test_s1_fetchers.py @@ -49,7 +49,7 @@ def sentinel1_grd( "elevation_model": "COPERNICUS_30", "coefficient": "gamma0-ellipsoid", "load_collection": { - "polarization": lambda polar: (polar == "VV") or (polar == "VH"), + "polarization": lambda polar: polar == "VV&VH", }, } From 48bbcc30c0b98a3332b0c6f1f6c83364fe46c374 Mon Sep 17 00:00:00 2001 From: Darius Couchard Date: Fri, 5 Jul 2024 08:55:30 +0200 Subject: [PATCH 03/13] Fixed test & fixed S2 job splitting by adding a valid_cdse column --- src/openeo_gfmap/manager/job_splitters.py | 9 +++++++-- tests/test_openeo_gfmap/test_s1_fetchers.py | 2 +- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/openeo_gfmap/manager/job_splitters.py b/src/openeo_gfmap/manager/job_splitters.py index 417864b..9a795fb 100644 --- a/src/openeo_gfmap/manager/job_splitters.py +++ b/src/openeo_gfmap/manager/job_splitters.py @@ -2,6 +2,7 @@ form of a GeoDataFrames. """ +from functools import lru_cache from pathlib import Path from typing import List @@ -12,16 +13,17 @@ from openeo_gfmap.manager import _log +@lru_cache(maxsize=1) def load_s2_grid() -> gpd.GeoDataFrame: """Returns a geo data frame from the S2 grid.""" # Builds the path where the geodataframe should be - gdf_path = Path.home() / ".openeo-gfmap" / "s2grid_bounds.geojson" + gdf_path = Path.home() / ".openeo-gfmap" / "s2grid_bounds_v2.geojson" if not gdf_path.exists(): _log.info("S2 grid not found, downloading it from artifactory.") # Downloads the file from the artifactory URL gdf_path.parent.mkdir(exist_ok=True) response = requests.get( - "https://artifactory.vgt.vito.be/artifactory/auxdata-public/gfmap/s2grid_bounds.geojson", + "https://artifactory.vgt.vito.be/artifactory/auxdata-public/gfmap/s2grid_bounds_v2.geojson", timeout=180, # 3mins ) with open(gdf_path, "wb") as f: @@ -67,6 +69,9 @@ def split_job_s2grid( s2_grid = load_s2_grid() s2_grid["geometry"] = s2_grid.geometry.centroid + # Filter tiles on CDSE availability + s2_grid = s2_grid[s2_grid.cdse_valid] + polygons = gpd.sjoin_nearest(polygons, s2_grid[["tile", "geometry"]]).drop( columns=["index_right"] ) diff --git a/tests/test_openeo_gfmap/test_s1_fetchers.py b/tests/test_openeo_gfmap/test_s1_fetchers.py index 70ee4b9..adafb42 100644 --- a/tests/test_openeo_gfmap/test_s1_fetchers.py +++ b/tests/test_openeo_gfmap/test_s1_fetchers.py @@ -152,7 +152,7 @@ def sentinel1_grd_point_based( "elevation_model": "COPERNICUS_30", "coefficient": "gamma0-ellipsoid", "load_collection": { - "polarization": lambda polar: (polar == "VV") or (polar == "VH"), + "polarization": lambda polar: polar == "VV&VH", }, } extractor = build_sentinel1_grd_extractor( From cf0c6aa939f47900a4bf02c56260abb068d662ba Mon Sep 17 00:00:00 2001 From: Darius Couchard Date: Tue, 16 Jul 2024 11:34:16 +0200 Subject: [PATCH 04/13] Improved job manager by persisting the STAC catalogue within a binary file, and covered some edge cases --- src/openeo_gfmap/manager/job_manager.py | 115 +++++++++++++++++------- 1 file changed, 85 insertions(+), 30 deletions(-) diff --git a/src/openeo_gfmap/manager/job_manager.py b/src/openeo_gfmap/manager/job_manager.py index b8248e6..e6dc9a5 100644 --- a/src/openeo_gfmap/manager/job_manager.py +++ b/src/openeo_gfmap/manager/job_manager.py @@ -1,9 +1,11 @@ import json +import pickle import threading from concurrent.futures import ThreadPoolExecutor from enum import Enum from functools import partial from pathlib import Path +from threading import Lock from typing import Callable, Optional, Union import pandas as pd @@ -16,7 +18,7 @@ from openeo_gfmap.stac import constants # Lock to use when writing to the STAC collection -_stac_lock = threading.Lock() +_stac_lock = Lock() def done_callback(future, df, idx): @@ -27,6 +29,8 @@ def done_callback(future, df, idx): df.loc[idx, "status"] = "finished" elif current_status == "postprocessing-error": df.loc[idx, "status"] = "error" + elif current_status == "running": + df.loc[idx, "status"] = "running" else: raise ValueError( f"Invalid status {current_status} for job {df.loc[idx, 'id']} for done_callback!" @@ -53,11 +57,11 @@ def __init__( post_job_action: Optional[Callable] = None, poll_sleep: int = 5, n_threads: int = 1, - post_job_params: dict = {}, resume_postproc: bool = True, # If we need to check for post-job actions that crashed restart_failed: bool = False, # If we need to restart failed jobs ): self._output_dir = output_dir + self._catalogue_cache = output_dir / "catalogue_cache.bin" self.stac = stac self.collection_id = collection_id @@ -74,7 +78,6 @@ def __init__( self._output_path_gen = output_path_generator self._post_job_action = post_job_action - self._post_job_params = post_job_params # Monkey patching the _normalize_df method to ensure we have no modification on the # geometry column @@ -85,7 +88,14 @@ def __init__( def _normalize_stac(self): default_collection_path = self._output_dir / "stac/collection.json" - if self.stac is not None: + if self._catalogue_cache.exists(): + _log.info( + "Loading the STAC collection from the persisted binary file: %s.", + self._catalogue_cache, + ) + with open(self._catalogue_cache, "rb") as file: + root_collection = pickle.load(file) + elif self.stac is not None: _log.info( f"Reloading the STAC collection from the provided path: {self.stac}." ) @@ -152,7 +162,7 @@ def _resume_postjob_actions(self, df: pd.DataFrame): _log.info( f"Resuming postprocessing of job {row.id}, queueing on_job_finished..." ) - future = self._executor.submit(self.on_job_done, job, row) + future = self._executor.submit(self.on_job_done, job, row, _stac_lock) future.add_done_callback(partial(done_callback, df=df, idx=idx)) else: _log.info( @@ -206,11 +216,17 @@ def _update_statuses(self, df: pd.DataFrame): f"Job {job.job_id} finished successfully, queueing on_job_done..." ) job_status = "postprocessing" - future = self._executor.submit(self.on_job_done, job, row) + future = self._executor.submit(self.on_job_done, job, row, _stac_lock) # Future will setup the status to finished when the job is done future.add_done_callback(partial(done_callback, df=df, idx=idx)) self._futures.append(future) - df.loc[idx, "costs"] = job_metadata["costs"] + if "costs" in job_metadata: + df.loc[idx, "costs"] = job_metadata["costs"] + else: + _log.warning( + "Costs not found in job %s metadata. Costs will be set to 'None'.", + job.job_id, + ) # Case in which it failed if (df.loc[idx, "status"] != "error") and ( @@ -260,10 +276,11 @@ def on_job_error(self, job: BatchJob, row: pd.Series): f"Couldn't find any error logs. Please check the error manually on job ID: {job.job_id}." ) - def on_job_done(self, job: BatchJob, row: pd.Series): + def on_job_done(self, job: BatchJob, row: pd.Series, lock: Lock): """Method called when a job finishes successfully. It will first download the results of the job and then call the `post_job_action` method. """ + job_products = {} for idx, asset in enumerate(job.get_results().get_assets()): try: @@ -281,7 +298,10 @@ def on_job_done(self, job: BatchJob, row: pd.Series): ) except Exception as e: _log.exception( - f"Error downloading asset {asset.name} from job {job.job_id}", e + "Error downloading asset %s from job %s:\n%s", + asset.name, + job.job_id, + e, ) raise e @@ -302,45 +322,65 @@ def on_job_done(self, job: BatchJob, row: pd.Series): asset.href = str( asset_path ) # Update the asset href to the output location set by the output_path_generator - # item.id = f"{job.job_id}_{item.id}" + # Add the item to the the current job items. job_items.append(item) - _log.info(f"Parsed item {item.id} from job {job.job_id}") + _log.info("Parsed item %s from job %s", item.id, job.job_id) except Exception as e: _log.exception( - f"Error failed to add item {item.id} from job {job.job_id} to STAC collection", + "Error failed to add item %s from job %s to STAC collection:\n%s", + item.id, + job.job_id, e, ) - raise e # _post_job_action returns an updated list of stac items. Post job action can therefore # update the stac items and access their products through the HREF. It is also the # reponsible of adding the appropriate metadata/assets to the items. if self._post_job_action is not None: - _log.debug(f"Calling post job action for job {job.job_id}...") - job_items = self._post_job_action(job_items, row, self._post_job_params) - - _log.info(f"Adding {len(job_items)} items to the STAC collection...") - - with _stac_lock: # Take the STAC lock to avoid concurrence issues - # Filters the job items to only keep the ones that are not already in the collection - existing_ids = [item.id for item in self._root_collection.get_all_items()] - job_items = [item for item in job_items if item.id not in existing_ids] + _log.debug("Calling post job action for job %s...", job.job_id) + job_items = self._post_job_action(job_items, row) - self._root_collection.add_items(job_items) - _log.info(f"Added {len(job_items)} items to the STAC collection.") + _log.info("Adding %s items to the STAC collection...", len(job_items)) - _log.info(f"Writing STAC collection for {job.job_id} to file...") + with lock: # Take the STAC lock to avoid concurrence issues try: - self._write_stac() + _log.info("Thread %s entered the STAC lock.", threading.get_ident()) + # Filters the job items to only keep the ones that are not already in the collection + existing_ids = [ + item.id for item in self._root_collection.get_all_items() + ] + job_items = [item for item in job_items if item.id not in existing_ids] + + validated_items = [] + # Validate the items + for item in job_items: + try: + item.validate() + validated_items.append(item) + except Exception as e: + _log.warning( + "Couldn't validate item %s from job %s, ignoring:\n%s", + item.id, + job.job_id, + e, + ) + + self._root_collection.add_items(validated_items) + _log.info( + "Added %s items to the STAC collection.", len(validated_items) + ) + + self._persist_stac() except Exception as e: _log.exception( - f"Error writing STAC collection for job {job.job_id} to file.", e + "Error adding items to the STAC collection for job %s:\n%s ", + job.job_id, + str(e), ) raise e - _log.info(f"Wrote STAC collection for {job.job_id} to file.") - _log.info(f"Job {job.job_id} and post job action finished successfully.") + _log.info("Job %s and post job action finished successfully.", job.job_id) def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame: """Ensure we have the required columns and the expected type for the geometry column. @@ -401,7 +441,7 @@ def run_jobs( The file to track the results of the jobs. """ # Starts the thread pool to work on the on_job_done and on_job_error methods - _log.info(f"Starting ThreadPoolExecutor with {self._n_threads} workers.") + _log.info("Starting ThreadPoolExecutor with %s workers.", self._n_threads) with ThreadPoolExecutor(max_workers=self._n_threads) as executor: _log.info("Creating and running jobs.") self._executor = executor @@ -412,6 +452,15 @@ def run_jobs( self._wait_queued_actions() _log.info("Exiting ThreadPoolExecutor.") self._executor = None + _log.info( + "Finished running jobs, saving persisted STAC collection to final .json collection." + ) + self._write_stac() + _log.info( + "Saved STAC catalogue to JSON format, deleting the persisted binary file." + ) + self._catalogue_cache.unlink() + _log.info("All tasks finished!") def _write_stac(self): """Writes the STAC collection to the output directory.""" @@ -428,6 +477,12 @@ def _write_stac(self): self._root_collection.normalize_hrefs(str(root_path)) self._root_collection.save(catalog_type=CatalogType.SELF_CONTAINED) + def _persist_stac(self): + """Persists the STAC collection by saving it into a binary file.""" + _log.info("Persisting STAC collection to temp file %s.", self._catalogue_cache) + with open(self._catalogue_cache, "wb") as file: + pickle.dump(self._root_collection, file) + def setup_stac( self, constellation: Optional[str] = None, From 8b0d1917b9884ec806a8d7519a7950eff9cb1777 Mon Sep 17 00:00:00 2001 From: Darius Couchard Date: Wed, 17 Jul 2024 11:55:12 +0200 Subject: [PATCH 05/13] Added 1 retry on failing on_job_done as they are often commited by openeo connectivity issues --- src/openeo_gfmap/manager/job_manager.py | 67 ++++++++++++++++++------- 1 file changed, 50 insertions(+), 17 deletions(-) diff --git a/src/openeo_gfmap/manager/job_manager.py b/src/openeo_gfmap/manager/job_manager.py index bb50333..92069de 100644 --- a/src/openeo_gfmap/manager/job_manager.py +++ b/src/openeo_gfmap/manager/job_manager.py @@ -21,10 +21,16 @@ _stac_lock = Lock() -def done_callback(future, df, idx): - """Sets the status of the job to the given status when the future is done.""" +def done_callback(future, df, idx, executor, retry=True): + """Sets the status of the job to the given status when the future is done. + + If an exception occurred, then tries to retry the post-job action once if + it wasn't done already. If this is the second time the post-job action + fails, then the job is set to error. + """ current_status = df.loc[idx, "status"] - if not future.exception(): + exception = future.exception() + if exception is None: if current_status == "postprocessing": df.loc[idx, "status"] = "finished" elif current_status == "postprocessing-error": @@ -35,6 +41,22 @@ def done_callback(future, df, idx): raise ValueError( f"Invalid status {current_status} for job {df.loc[idx, 'id']} for done_callback!" ) + else: # There is an exception that occured + _log.error( + "An exception occured in the postprocessing for job %s:\n%s", + df.loc[idx, 'id'], + exception + ) + if retry: + _log.info("Retrying the postprocessing for job %s...", df.loc[idx, 'id']) + if current_status == "postprocessing": + future = executor.submit(df.loc[idx, "on_job_done"], df.loc[idx, "job"], df.loc[idx], _stac_lock) + elif current_status == "postprocessing-error": + future = executor.submit(df.loc[idx, "on_job_error"], df.loc[idx, "job"], df.loc[idx]) + future.add_done_callback(partial(done_callback, df=df, idx=idx, executor=executor, retry=False)) + else: + _log.info("Post-job action was already retried once, setting job %s to error.", df.loc[idx, 'id']) + df.loc[idx, "status"] = "error" class PostJobStatus(Enum): @@ -97,12 +119,14 @@ def _normalize_stac(self): root_collection = pickle.load(file) elif self.stac is not None: _log.info( - f"Reloading the STAC collection from the provided path: {self.stac}." + "Reloading the STAC collection from the provided path: %s.", + self.stac ) root_collection = pystac.read_file(str(self.stac)) elif default_collection_path.exists(): _log.info( - f"Reload the STAC collection from the default path: {default_collection_path}." + "Reload the STAC collection from the default path: %s.", + default_collection_path ) self.stac = default_collection_path root_collection = pystac.read_file(str(self.stac)) @@ -160,16 +184,18 @@ def _resume_postjob_actions(self, df: pd.DataFrame): job = connection.job(row.id) if row.status == "postprocessing": _log.info( - f"Resuming postprocessing of job {row.id}, queueing on_job_finished..." + "Resuming postprocessing of job %s, queueing on_job_finished...", + row.id ) future = self._executor.submit(self.on_job_done, job, row, _stac_lock) - future.add_done_callback(partial(done_callback, df=df, idx=idx)) + future.add_done_callback(partial(done_callback, df=df, idx=idx, executor=self._executor)) else: _log.info( - f"Resuming postprocessing of job {row.id}, queueing on_job_error..." + "Resuming postprocessing of job %s, queueing on_job_error...", + row.id ) future = self._executor.submit(self.on_job_error, job, row) - future.add_done_callback(partial(done_callback, df=df, idx=idx)) + future.add_done_callback(partial(done_callback, df=df, idx=idx, executor=self._executor)) self._futures.append(future) def _restart_failed_jobs(self, df: pd.DataFrame): @@ -177,7 +203,9 @@ def _restart_failed_jobs(self, df: pd.DataFrame): failed_tasks = df[df.status.isin(["error", "start_failed"])] not_started_tasks = df[df.status == "not_started"] _log.info( - f"Resetting {len(failed_tasks)} failed jobs to 'not_started'. {len(not_started_tasks)} jobs are already 'not_started'." + "Resetting %s failed jobs to 'not_started'. %s jobs are already 'not_started'.", + len(failed_tasks), + len(not_started_tasks) ) for idx, _ in failed_tasks.iterrows(): df.loc[idx, "status"] = "not_started" @@ -213,12 +241,13 @@ def _update_statuses(self, df: pd.DataFrame): job_metadata["status"] == "finished" ): _log.info( - f"Job {job.job_id} finished successfully, queueing on_job_done..." + "Job %s finished successfully, queueing on_job_done...", + job.job_id ) job_status = "postprocessing" future = self._executor.submit(self.on_job_done, job, row, _stac_lock) # Future will setup the status to finished when the job is done - future.add_done_callback(partial(done_callback, df=df, idx=idx)) + future.add_done_callback(partial(done_callback, df=df, idx=idx, executor=self._executor)) self._futures.append(future) if "costs" in job_metadata: df.loc[idx, "costs"] = job_metadata["costs"] @@ -233,12 +262,13 @@ def _update_statuses(self, df: pd.DataFrame): job_metadata["status"] == "error" ): _log.info( - f"Job {job.job_id} finished with error, queueing on_job_error..." + "Job %s finished with error, queueing on_job_error...", + job.job_id, ) job_status = "postprocessing-error" future = self._executor.submit(self.on_job_error, job, row) # Future will setup the status to error when the job is done - future.add_done_callback(partial(done_callback, df=df, idx=idx)) + future.add_done_callback(partial(done_callback, df=df, idx=idx, executor=self._executor)) self._futures.append(future) df.loc[idx, "costs"] = job_metadata["costs"] @@ -285,7 +315,7 @@ def on_job_done(self, job: BatchJob, row: pd.Series, lock: Lock): for idx, asset in enumerate(job.get_results().get_assets()): try: _log.debug( - f"Generating output path for asset {asset.name} from job {job.job_id}..." + "Generating output path for asset %s from job %s...", asset.name, job.job_id ) output_path = self._output_path_gen(self._output_dir, idx, row) # Make the output path @@ -294,7 +324,10 @@ def on_job_done(self, job: BatchJob, row: pd.Series, lock: Lock): # Add to the list of downloaded products job_products[f"{job.job_id}_{asset.name}"] = [output_path] _log.debug( - f"Downloaded {asset.name} from job {job.job_id} -> {output_path}" + "Downloaded %s from job %s -> %s", + asset.name, + job.job_id, + output_path, ) except Exception as e: _log.exception( @@ -406,7 +439,7 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame: } df = df.assign(**new_columns) - _log.debug(f"Normalizing dataframe. Columns: {df.columns}") + _log.debug("Normalizing dataframe. Columns: %s", df.columns) return df From d39896b091c5158ccf52b280230c864e965c6bd1 Mon Sep 17 00:00:00 2001 From: Darius Couchard Date: Thu, 18 Jul 2024 12:49:49 +0200 Subject: [PATCH 06/13] Added dynamic max parallel jobs on Job Manager --- src/openeo_gfmap/manager/job_manager.py | 128 +++++++++++++++++++----- 1 file changed, 101 insertions(+), 27 deletions(-) diff --git a/src/openeo_gfmap/manager/job_manager.py b/src/openeo_gfmap/manager/job_manager.py index 92069de..70ee517 100644 --- a/src/openeo_gfmap/manager/job_manager.py +++ b/src/openeo_gfmap/manager/job_manager.py @@ -2,14 +2,16 @@ import pickle import threading from concurrent.futures import ThreadPoolExecutor +from datetime import datetime from enum import Enum from functools import partial from pathlib import Path from threading import Lock -from typing import Callable, Optional, Union +from typing import Callable, NamedTuple, Optional, Union import pandas as pd import pystac +from openeo import Connection from openeo.extra.job_management import MultiBackendJobManager from openeo.rest.job import BatchJob from pystac import CatalogType @@ -23,7 +25,7 @@ def done_callback(future, df, idx, executor, retry=True): """Sets the status of the job to the given status when the future is done. - + If an exception occurred, then tries to retry the post-job action once if it wasn't done already. If this is the second time the post-job action fails, then the job is set to error. @@ -44,18 +46,30 @@ def done_callback(future, df, idx, executor, retry=True): else: # There is an exception that occured _log.error( "An exception occured in the postprocessing for job %s:\n%s", - df.loc[idx, 'id'], - exception + df.loc[idx, "id"], + exception, ) if retry: - _log.info("Retrying the postprocessing for job %s...", df.loc[idx, 'id']) + _log.info("Retrying the postprocessing for job %s...", df.loc[idx, "id"]) if current_status == "postprocessing": - future = executor.submit(df.loc[idx, "on_job_done"], df.loc[idx, "job"], df.loc[idx], _stac_lock) + future = executor.submit( + df.loc[idx, "on_job_done"], + df.loc[idx, "job"], + df.loc[idx], + _stac_lock, + ) elif current_status == "postprocessing-error": - future = executor.submit(df.loc[idx, "on_job_error"], df.loc[idx, "job"], df.loc[idx]) - future.add_done_callback(partial(done_callback, df=df, idx=idx, executor=executor, retry=False)) + future = executor.submit( + df.loc[idx, "on_job_error"], df.loc[idx, "job"], df.loc[idx] + ) + future.add_done_callback( + partial(done_callback, df=df, idx=idx, executor=executor, retry=False) + ) else: - _log.info("Post-job action was already retried once, setting job %s to error.", df.loc[idx, 'id']) + _log.info( + "Post-job action was already retried once, setting job %s to error.", + df.loc[idx, "id"], + ) df.loc[idx, "status"] = "error" @@ -81,6 +95,9 @@ def __init__( n_threads: int = 1, resume_postproc: bool = True, # If we need to check for post-job actions that crashed restart_failed: bool = False, # If we need to restart failed jobs + dynamic_max_jobs: bool = True, # If we need to dynamically change the maximum number of parallel jobs + max_jobs_worktime: bool = 10, # Maximum number of jobs to run in a given time + max_jobs: int = 20, # Maximum number of jobs to run at the same time ): self._output_dir = output_dir self._catalogue_cache = output_dir / "catalogue_cache.bin" @@ -108,6 +125,59 @@ def __init__( self._root_collection = self._normalize_stac() + # Add a property that calculates the number of maximum concurrent jobs + # dinamically depending on the time + self._dynamic_max_jobs = dynamic_max_jobs + self._max_jobs_worktime = max_jobs_worktime + self._max_jobs = max_jobs + + def add_backend( + self, + name: str, + connection: Connection | Callable[[], Connection], + parallel_jobs: Optional[int] = 2, + dynamic_max_jobs: bool = True, + min_jobs: Optional[int] = None, + max_jobs: Optional[int] = None, + ): + if not dynamic_max_jobs: + if parallel_jobs is None: + raise ValueError( + "When dynamic_max_jobs is set to False, parallel_jobs must be provided." + ) + return super().add_backend(name, connection, parallel_jobs) + + if min_jobs is None or max_jobs is None: + raise ValueError( + "When dynamic_max_jobs is set to True, min_jobs and max_jobs must be provided." + ) + + if isinstance(connection, Connection): + c = connection + connection = lambda: c # noqa: E731 + assert callable(connection) + + # Create a new NamedTuple to store the dynamic backend properties + class _DynamicBackend(NamedTuple): + get_connection: Callable[[], Connection] + + @property + def parallel_jobs(self) -> int: + current_time = datetime.now() + + # Limiting working hours + start_worktime_hour = 7 + end_worktime_hour = 20 + + if ( + current_time.hour < start_worktime_hour + or current_time.hour > end_worktime_hour + ): + return min_jobs + return max_jobs + + self.backends[name] = _DynamicBackend(get_connection=connection) + def _normalize_stac(self): default_collection_path = self._output_dir / "stac/collection.json" if self._catalogue_cache.exists(): @@ -119,14 +189,13 @@ def _normalize_stac(self): root_collection = pickle.load(file) elif self.stac is not None: _log.info( - "Reloading the STAC collection from the provided path: %s.", - self.stac + "Reloading the STAC collection from the provided path: %s.", self.stac ) root_collection = pystac.read_file(str(self.stac)) elif default_collection_path.exists(): _log.info( "Reload the STAC collection from the default path: %s.", - default_collection_path + default_collection_path, ) self.stac = default_collection_path root_collection = pystac.read_file(str(self.stac)) @@ -185,17 +254,21 @@ def _resume_postjob_actions(self, df: pd.DataFrame): if row.status == "postprocessing": _log.info( "Resuming postprocessing of job %s, queueing on_job_finished...", - row.id + row.id, ) future = self._executor.submit(self.on_job_done, job, row, _stac_lock) - future.add_done_callback(partial(done_callback, df=df, idx=idx, executor=self._executor)) + future.add_done_callback( + partial(done_callback, df=df, idx=idx, executor=self._executor) + ) else: _log.info( "Resuming postprocessing of job %s, queueing on_job_error...", - row.id + row.id, ) future = self._executor.submit(self.on_job_error, job, row) - future.add_done_callback(partial(done_callback, df=df, idx=idx, executor=self._executor)) + future.add_done_callback( + partial(done_callback, df=df, idx=idx, executor=self._executor) + ) self._futures.append(future) def _restart_failed_jobs(self, df: pd.DataFrame): @@ -205,7 +278,7 @@ def _restart_failed_jobs(self, df: pd.DataFrame): _log.info( "Resetting %s failed jobs to 'not_started'. %s jobs are already 'not_started'.", len(failed_tasks), - len(not_started_tasks) + len(not_started_tasks), ) for idx, _ in failed_tasks.iterrows(): df.loc[idx, "status"] = "not_started" @@ -241,13 +314,14 @@ def _update_statuses(self, df: pd.DataFrame): job_metadata["status"] == "finished" ): _log.info( - "Job %s finished successfully, queueing on_job_done...", - job.job_id + "Job %s finished successfully, queueing on_job_done...", job.job_id ) job_status = "postprocessing" future = self._executor.submit(self.on_job_done, job, row, _stac_lock) # Future will setup the status to finished when the job is done - future.add_done_callback(partial(done_callback, df=df, idx=idx, executor=self._executor)) + future.add_done_callback( + partial(done_callback, df=df, idx=idx, executor=self._executor) + ) self._futures.append(future) if "costs" in job_metadata: df.loc[idx, "costs"] = job_metadata["costs"] @@ -268,7 +342,9 @@ def _update_statuses(self, df: pd.DataFrame): job_status = "postprocessing-error" future = self._executor.submit(self.on_job_error, job, row) # Future will setup the status to error when the job is done - future.add_done_callback(partial(done_callback, df=df, idx=idx, executor=self._executor)) + future.add_done_callback( + partial(done_callback, df=df, idx=idx, executor=self._executor) + ) self._futures.append(future) df.loc[idx, "costs"] = job_metadata["costs"] @@ -315,7 +391,9 @@ def on_job_done(self, job: BatchJob, row: pd.Series, lock: Lock): for idx, asset in enumerate(job.get_results().get_assets()): try: _log.debug( - "Generating output path for asset %s from job %s...", asset.name, job.job_id + "Generating output path for asset %s from job %s...", + asset.name, + job.job_id, ) output_path = self._output_path_gen(self._output_dir, idx, row) # Make the output path @@ -489,11 +567,7 @@ def run_jobs( "Finished running jobs, saving persisted STAC collection to final .json collection." ) self._write_stac() - _log.info( - "Saved STAC catalogue to JSON format, deleting the persisted binary file." - ) - self._catalogue_cache.unlink() - _log.info("All tasks finished!") + _log.info("Saved STAC catalogue to JSON format, all tasks finished!") def _write_stac(self): """Writes the STAC collection to the output directory.""" From b0e0bec3fc7cd7d5456297d92e5a00219e1df30c Mon Sep 17 00:00:00 2001 From: Darius Couchard Date: Thu, 18 Jul 2024 13:33:56 +0200 Subject: [PATCH 07/13] fix bug in dynamic parallel jobs --- src/openeo_gfmap/manager/job_manager.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/openeo_gfmap/manager/job_manager.py b/src/openeo_gfmap/manager/job_manager.py index 70ee517..1a57e93 100644 --- a/src/openeo_gfmap/manager/job_manager.py +++ b/src/openeo_gfmap/manager/job_manager.py @@ -166,12 +166,12 @@ def parallel_jobs(self) -> int: current_time = datetime.now() # Limiting working hours - start_worktime_hour = 7 + start_worktime_hour = 8 end_worktime_hour = 20 if ( - current_time.hour < start_worktime_hour - or current_time.hour > end_worktime_hour + current_time.hour >= start_worktime_hour + and current_time.hour < end_worktime_hour ): return min_jobs return max_jobs From ac73af81ddc48c19ed3be3c708175fd53f10e44c Mon Sep 17 00:00:00 2001 From: Darius Couchard Date: Thu, 18 Jul 2024 13:44:32 +0200 Subject: [PATCH 08/13] Removed problematic type hint --- src/openeo_gfmap/manager/job_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/openeo_gfmap/manager/job_manager.py b/src/openeo_gfmap/manager/job_manager.py index 1a57e93..3e8035f 100644 --- a/src/openeo_gfmap/manager/job_manager.py +++ b/src/openeo_gfmap/manager/job_manager.py @@ -134,7 +134,7 @@ def __init__( def add_backend( self, name: str, - connection: Connection | Callable[[], Connection], + connection, parallel_jobs: Optional[int] = 2, dynamic_max_jobs: bool = True, min_jobs: Optional[int] = None, From 1346845fdb54e5b69f6e3c5de7d048ca55333f02 Mon Sep 17 00:00:00 2001 From: Darius Couchard Date: Fri, 19 Jul 2024 12:18:10 +0200 Subject: [PATCH 09/13] Changed job retry strategy --- src/openeo_gfmap/manager/job_manager.py | 75 +++++++++++++++---------- 1 file changed, 45 insertions(+), 30 deletions(-) diff --git a/src/openeo_gfmap/manager/job_manager.py b/src/openeo_gfmap/manager/job_manager.py index 3e8035f..4967b91 100644 --- a/src/openeo_gfmap/manager/job_manager.py +++ b/src/openeo_gfmap/manager/job_manager.py @@ -1,6 +1,7 @@ import json import pickle import threading +import time from concurrent.futures import ThreadPoolExecutor from datetime import datetime from enum import Enum @@ -23,7 +24,24 @@ _stac_lock = Lock() -def done_callback(future, df, idx, executor, retry=True): +def retry_on_exception(max_retries, delay=30): + def decorator(func): + def wrapper(*args, **kwargs): + latest_exception = None + for _ in range(max_retries): + try: + return func(*args, **kwargs) + except Exception as e: + time.sleep(delay) + latest_exception = e + raise latest_exception + + return wrapper + + return decorator + + +def done_callback(future, df, idx): """Sets the status of the job to the given status when the future is done. If an exception occurred, then tries to retry the post-job action once if @@ -43,34 +61,13 @@ def done_callback(future, df, idx, executor, retry=True): raise ValueError( f"Invalid status {current_status} for job {df.loc[idx, 'id']} for done_callback!" ) - else: # There is an exception that occured - _log.error( - "An exception occured in the postprocessing for job %s:\n%s", + else: + _log.exception( + "Exception occurred in post-job future for job %s:\n%s", df.loc[idx, "id"], exception, ) - if retry: - _log.info("Retrying the postprocessing for job %s...", df.loc[idx, "id"]) - if current_status == "postprocessing": - future = executor.submit( - df.loc[idx, "on_job_done"], - df.loc[idx, "job"], - df.loc[idx], - _stac_lock, - ) - elif current_status == "postprocessing-error": - future = executor.submit( - df.loc[idx, "on_job_error"], df.loc[idx, "job"], df.loc[idx] - ) - future.add_done_callback( - partial(done_callback, df=df, idx=idx, executor=executor, retry=False) - ) - else: - _log.info( - "Post-job action was already retried once, setting job %s to error.", - df.loc[idx, "id"], - ) - df.loc[idx, "status"] = "error" + df.loc[idx, "status"] = "error" class PostJobStatus(Enum): @@ -258,7 +255,11 @@ def _resume_postjob_actions(self, df: pd.DataFrame): ) future = self._executor.submit(self.on_job_done, job, row, _stac_lock) future.add_done_callback( - partial(done_callback, df=df, idx=idx, executor=self._executor) + partial( + done_callback, + df=df, + idx=idx, + ) ) else: _log.info( @@ -267,7 +268,11 @@ def _resume_postjob_actions(self, df: pd.DataFrame): ) future = self._executor.submit(self.on_job_error, job, row) future.add_done_callback( - partial(done_callback, df=df, idx=idx, executor=self._executor) + partial( + done_callback, + df=df, + idx=idx, + ) ) self._futures.append(future) @@ -320,7 +325,11 @@ def _update_statuses(self, df: pd.DataFrame): future = self._executor.submit(self.on_job_done, job, row, _stac_lock) # Future will setup the status to finished when the job is done future.add_done_callback( - partial(done_callback, df=df, idx=idx, executor=self._executor) + partial( + done_callback, + df=df, + idx=idx, + ) ) self._futures.append(future) if "costs" in job_metadata: @@ -343,7 +352,11 @@ def _update_statuses(self, df: pd.DataFrame): future = self._executor.submit(self.on_job_error, job, row) # Future will setup the status to error when the job is done future.add_done_callback( - partial(done_callback, df=df, idx=idx, executor=self._executor) + partial( + done_callback, + df=df, + idx=idx, + ) ) self._futures.append(future) df.loc[idx, "costs"] = job_metadata["costs"] @@ -353,6 +366,7 @@ def _update_statuses(self, df: pd.DataFrame): # Clear the futures that are done and raise their potential exceptions if they occurred. self._clear_queued_actions() + @retry_on_exception(max_retries=2, delay=30) def on_job_error(self, job: BatchJob, row: pd.Series): """Method called when a job finishes with an error. @@ -382,6 +396,7 @@ def on_job_error(self, job: BatchJob, row: pd.Series): f"Couldn't find any error logs. Please check the error manually on job ID: {job.job_id}." ) + @retry_on_exception(max_retries=2, delay=30) def on_job_done(self, job: BatchJob, row: pd.Series, lock: Lock): """Method called when a job finishes successfully. It will first download the results of the job and then call the `post_job_action` method. From 96dac921fd2ffebd8e655fc5b80fd00a38ff6baf Mon Sep 17 00:00:00 2001 From: Vincent Verelst Date: Wed, 24 Jul 2024 14:09:07 +0200 Subject: [PATCH 10/13] removed validation of openeo STAC items --- src/openeo_gfmap/manager/job_manager.py | 34 +++++++++++++------------ 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/src/openeo_gfmap/manager/job_manager.py b/src/openeo_gfmap/manager/job_manager.py index 4967b91..e300a86 100644 --- a/src/openeo_gfmap/manager/job_manager.py +++ b/src/openeo_gfmap/manager/job_manager.py @@ -334,6 +334,8 @@ def _update_statuses(self, df: pd.DataFrame): self._futures.append(future) if "costs" in job_metadata: df.loc[idx, "costs"] = job_metadata["costs"] + df.loc[idx, "memory"] = job_metadata["usage"].get("max_executor_memory", {}).get("value", None) + else: _log.warning( "Costs not found in job %s metadata. Costs will be set to 'None'.", @@ -478,23 +480,23 @@ def on_job_done(self, job: BatchJob, row: pd.Series, lock: Lock): ] job_items = [item for item in job_items if item.id not in existing_ids] - validated_items = [] - # Validate the items - for item in job_items: - try: - item.validate() - validated_items.append(item) - except Exception as e: - _log.warning( - "Couldn't validate item %s from job %s, ignoring:\n%s", - item.id, - job.job_id, - e, - ) - - self._root_collection.add_items(validated_items) + # validated_items = [] + # # Validate the items + # for item in job_items: + # try: + # item.validate() + # validated_items.append(item) + # except Exception as e: + # _log.warning( + # "Couldn't validate item %s from job %s, ignoring:\n%s", + # item.id, + # job.job_id, + # e, + # ) + self._root_collection.add_items(job_items) + # self._root_collection.add_items(validated_items) _log.info( - "Added %s items to the STAC collection.", len(validated_items) + "Added %s items to the STAC collection.", len(job_items) ) self._persist_stac() From a7966d18cb0fbd6f5f2b3f95fba1a626e082b2cb Mon Sep 17 00:00:00 2001 From: World Cereal Date: Thu, 8 Aug 2024 14:44:42 +0200 Subject: [PATCH 11/13] Added check on costs metadata to avoid ValueError --- src/openeo_gfmap/manager/job_manager.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/openeo_gfmap/manager/job_manager.py b/src/openeo_gfmap/manager/job_manager.py index e300a86..61dec95 100644 --- a/src/openeo_gfmap/manager/job_manager.py +++ b/src/openeo_gfmap/manager/job_manager.py @@ -361,6 +361,7 @@ def _update_statuses(self, df: pd.DataFrame): ) ) self._futures.append(future) + if "costs" in job_metadata: df.loc[idx, "costs"] = job_metadata["costs"] df.loc[idx, "status"] = job_status From 49c1a2d0b1ea8c56081948e06a61482aede67afd Mon Sep 17 00:00:00 2001 From: Darius Couchard Date: Thu, 8 Aug 2024 14:49:26 +0200 Subject: [PATCH 12/13] Set-up dynamic jobs to false by default --- src/openeo_gfmap/manager/job_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/openeo_gfmap/manager/job_manager.py b/src/openeo_gfmap/manager/job_manager.py index 4967b91..3b120fd 100644 --- a/src/openeo_gfmap/manager/job_manager.py +++ b/src/openeo_gfmap/manager/job_manager.py @@ -133,7 +133,7 @@ def add_backend( name: str, connection, parallel_jobs: Optional[int] = 2, - dynamic_max_jobs: bool = True, + dynamic_max_jobs: bool = False, min_jobs: Optional[int] = None, max_jobs: Optional[int] = None, ): From 66272db6291cb1f1c1a7f7c8ccd3f43c2a94aed6 Mon Sep 17 00:00:00 2001 From: Darius Couchard Date: Thu, 8 Aug 2024 14:54:46 +0200 Subject: [PATCH 13/13] Lint fixes --- src/openeo_gfmap/manager/job_manager.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/openeo_gfmap/manager/job_manager.py b/src/openeo_gfmap/manager/job_manager.py index 7d7a5c3..f3046fb 100644 --- a/src/openeo_gfmap/manager/job_manager.py +++ b/src/openeo_gfmap/manager/job_manager.py @@ -335,14 +335,18 @@ def _update_statuses(self, df: pd.DataFrame): if "costs" in job_metadata: df.loc[idx, "costs"] = job_metadata["costs"] - + else: _log.warning( "Costs not found in job %s metadata. Costs will be set to 'None'.", job.job_id, ) - df.loc[idx, "memory"] = job_metadata["usage"].get("max_executor_memory", {}).get("value", None) + df.loc[idx, "memory"] = ( + job_metadata["usage"] + .get("max_executor_memory", {}) + .get("value", None) + ) df.loc[idx, "cpu"] = ( job_metadata["usage"].get("cpu", {}).get("value", None) ) @@ -504,9 +508,7 @@ def on_job_done(self, job: BatchJob, row: pd.Series, lock: Lock): # ) self._root_collection.add_items(job_items) # self._root_collection.add_items(validated_items) - _log.info( - "Added %s items to the STAC collection.", len(job_items) - ) + _log.info("Added %s items to the STAC collection.", len(job_items)) self._persist_stac() except Exception as e: