From e3aa49568bf1e85b6c87b047af8c6ea9b9028450 Mon Sep 17 00:00:00 2001 From: Rahul Mahajan Date: Mon, 22 Jul 2024 16:22:38 -0400 Subject: [PATCH 1/3] replace check_netcdf.sh checker with ocean output at the next forecast hour or finishing of the forecast job for ocean prod --- workflow/rocoto/gefs_tasks.py | 10 ++++++---- workflow/rocoto/gfs_tasks.py | 18 ++++++++---------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/workflow/rocoto/gefs_tasks.py b/workflow/rocoto/gefs_tasks.py index cfd8fa7093..78fb67fe53 100644 --- a/workflow/rocoto/gefs_tasks.py +++ b/workflow/rocoto/gefs_tasks.py @@ -218,7 +218,7 @@ def _atmosoceaniceprod(self, component: str): 'history_file_tmpl': f'{self.cdump}.t@Hz.master.grb2f#fhr#'}, 'ocean': {'config': 'oceanice_products', 'history_path_tmpl': 'COM_OCEAN_HISTORY_TMPL', - 'history_file_tmpl': f'{self.cdump}.ocean.t@Hz.{fhout_ocn_gfs}hr_avg.f#fhr#.nc'}, + 'history_file_tmpl': f'{self.cdump}.ocean.t@Hz.{fhout_ocn_gfs}hr_avg.f#fhr_next#.nc'}, 'ice': {'config': 'oceanice_products', 'history_path_tmpl': 'COM_ICE_HISTORY_TMPL', 'history_file_tmpl': f'{self.cdump}.ice.t@Hz.{fhout_ice_gfs}hr_avg.f#fhr#.nc'}} @@ -236,10 +236,9 @@ def _atmosoceaniceprod(self, component: str): if component in ['ocean']: dep_dict = {'type': 'data', 'data': data, 'age': 120} deps.append(rocoto.add_dependency(dep_dict)) - command = f"{self.HOMEgfs}/ush/check_netcdf.sh {history_path}/{history_file_tmpl}" - dep_dict = {'type': 'sh', 'command': command} + dep_dict = {'type': 'task', 'name': f'{self.cdump}fcst'} deps.append(rocoto.add_dependency(dep_dict)) - dependencies = rocoto.create_dependency(dep=deps, dep_condition='and') + dependencies = rocoto.create_dependency(dep=deps, dep_condition='or') elif component in ['ice']: command = f"{self.HOMEgfs}/ush/check_ice_netcdf.sh @Y @m @d @H #fhr# &ROTDIR; #member# {fhout_ice_gfs}" dep_dict = {'type': 'sh', 'command': command} @@ -271,6 +270,9 @@ def _atmosoceaniceprod(self, component: str): fhrs = self._get_forecast_hours('gefs', self._configs[config], component) fhr_var_dict = {'fhr': ' '.join([f"{fhr:03d}" for fhr in fhrs])} + if component in ['ocean']: + fhrs_next = fhrs[1:] + [fhrs[-1] + (fhrs[-1] - fhrs[-2])] + fhr_var_dict['fhr_next'] = ' '.join([f"{fhr:03d}" for fhr in fhrs_next]) fhr_metatask_dict = {'task_name': f'{component}_prod_#member#', 'task_dict': task_dict, diff --git a/workflow/rocoto/gfs_tasks.py b/workflow/rocoto/gfs_tasks.py index c14c3e2898..6187f297aa 100644 --- a/workflow/rocoto/gfs_tasks.py +++ b/workflow/rocoto/gfs_tasks.py @@ -1079,7 +1079,7 @@ def _atmosoceaniceprod(self, component: str): 'history_file_tmpl': f'{self.cdump}.t@Hz.master.grb2f#fhr#'}, 'ocean': {'config': 'oceanice_products', 'history_path_tmpl': 'COM_OCEAN_HISTORY_TMPL', - 'history_file_tmpl': f'{self.cdump}.ocean.t@Hz.6hr_avg.f#fhr#.nc'}, + 'history_file_tmpl': f'{self.cdump}.ocean.t@Hz.6hr_avg.f#fhr_next#.nc'}, 'ice': {'config': 'oceanice_products', 'history_path_tmpl': 'COM_ICE_HISTORY_TMPL', 'history_file_tmpl': f'{self.cdump}.ice.t@Hz.6hr_avg.f#fhr#.nc'}} @@ -1099,13 +1099,9 @@ def _atmosoceaniceprod(self, component: str): data = f'{history_path}/{history_file_tmpl}' dep_dict = {'type': 'data', 'data': data, 'age': 120} deps.append(rocoto.add_dependency(dep_dict)) - if component in ['ocean']: - command = f"{self.HOMEgfs}/ush/check_netcdf.sh {history_path}/{history_file_tmpl}" - dep_dict = {'type': 'sh', 'command': command} - deps.append(rocoto.add_dependency(dep_dict)) - dependencies = rocoto.create_dependency(dep=deps, dep_condition='and') - else: - dependencies = rocoto.create_dependency(dep=deps) + dep_dict = {'type': 'task', 'name': f'{self.cdump}fcst'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep=deps, dep_condition='or') cycledef = 'gdas_half,gdas' if self.cdump in ['gdas'] else self.cdump resources = self.get_resource(component_dict['config']) @@ -1124,10 +1120,12 @@ def _atmosoceaniceprod(self, component: str): fhrs = self._get_forecast_hours(self.cdump, self._configs[config], component) fhr_var_dict = {'fhr': ' '.join([f"{fhr:03d}" for fhr in fhrs])} + if component in ['ocean']: + fhrs_next = fhrs[1:] + [fhrs[-1] + (fhrs[-1] - fhrs[-2])] + fhr_var_dict['fhr_next'] = ' '.join([f"{fhr:03d}" for fhr in fhrs_next]) metatask_dict = {'task_name': f'{self.cdump}{component}_prod', 'task_dict': task_dict, - 'var_dict': fhr_var_dict - } + 'var_dict': fhr_var_dict} task = rocoto.create_task(metatask_dict) From 134de35b0ce0f164b71df45af40f060a6488b0f7 Mon Sep 17 00:00:00 2001 From: Rahul Mahajan Date: Tue, 23 Jul 2024 11:26:21 -0400 Subject: [PATCH 2/3] correct the error in dependency as pointed out by ES --- workflow/rocoto/gefs_tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workflow/rocoto/gefs_tasks.py b/workflow/rocoto/gefs_tasks.py index 78fb67fe53..1d23ecc0ff 100644 --- a/workflow/rocoto/gefs_tasks.py +++ b/workflow/rocoto/gefs_tasks.py @@ -236,7 +236,7 @@ def _atmosoceaniceprod(self, component: str): if component in ['ocean']: dep_dict = {'type': 'data', 'data': data, 'age': 120} deps.append(rocoto.add_dependency(dep_dict)) - dep_dict = {'type': 'task', 'name': f'{self.cdump}fcst'} + dep_dict = {'type': 'task', 'name': 'fcst_mem#member#'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep=deps, dep_condition='or') elif component in ['ice']: From 2a6c4e00e70b51ca89cae52c0c9d9802366d4e00 Mon Sep 17 00:00:00 2001 From: Rahul Mahajan Date: Thu, 1 Aug 2024 20:08:03 -0400 Subject: [PATCH 3/3] remove ush/check_netcdf.sh as it is not used anymore --- ush/check_netcdf.sh | 15 --------------- 1 file changed, 15 deletions(-) delete mode 100755 ush/check_netcdf.sh diff --git a/ush/check_netcdf.sh b/ush/check_netcdf.sh deleted file mode 100755 index 5f56a38aba..0000000000 --- a/ush/check_netcdf.sh +++ /dev/null @@ -1,15 +0,0 @@ -#! /usr/bin/env bash - -# shellcheck disable=SC2155,SC2312 -HOMEgfs=$(cd "$(dirname "$(readlink -f -n "${BASH_SOURCE[0]}" )" )/.." && pwd -P) -declare -rx HOMEgfs - -source "${HOMEgfs}/ush/load_fv3gfs_modules.sh" 1>/dev/null 2>&1 - -ncfile=${1?} - -(( $(ncdump "${ncfile}" 2> /dev/null | grep -Po '(?<=time = UNLIMITED ; // \()\d+(?= currently)' || echo 0) > 0 )) # redirect stdout and stderr to /dev/null to suppress output in cron -rc=$? -# If there is no error, rc=0, else rc!=0 - -exit "${rc}"