diff --git a/.github/workflows/forcingprocessor.yml b/.github/workflows/forcingprocessor.yml index 2438c3d52..cb50d0ffd 100644 --- a/.github/workflows/forcingprocessor.yml +++ b/.github/workflows/forcingprocessor.yml @@ -43,7 +43,7 @@ jobs: - name: Test with pytest run: | cd forcingprocessor - python -m pytest -vv --deselect="tests/test_forcingprocessor.py::test_google_cloud_storage" --deselect="tests/test_forcingprocessor.py::test_gcs" --deselect="tests/test_forcingprocessor.py::test_gs" --deselect="tests/test_forcingprocessor.py::test_ciroh_zarr" --deselect="tests/test_forcingprocessor.py::test_nomads_post_processed" --deselect="tests/test_forcingprocessor.py::test_retro_ciroh_zarr" --deselect="tests/test_forcingprocessor.py::test_noaa_nwm_pds_https_analysis_assim" --deselect="tests/test_hf2ds.py" --deselect="tests/test_plotter.py" + python -m pytest -vv --deselect="tests/test_forcingprocessor.py::test_google_cloud_storage" --deselect="tests/test_forcingprocessor.py::test_gcs" --deselect="tests/test_forcingprocessor.py::test_gs" --deselect="tests/test_forcingprocessor.py::test_ciroh_zarr" --deselect="tests/test_forcingprocessor.py::test_nomads_post_processed" --deselect="tests/test_forcingprocessor.py::test_retro_ciroh_zarr" --deselect="tests/test_hf2ds.py" --deselect="tests/test_plotter.py" python -m pytest -vv tests/test_hf2ds.py python -m pytest -vv tests/test_plotter.py python -m pytest -vv -k test_google_cloud_storage diff --git a/.github/workflows/test_datastream_options.yml b/.github/workflows/test_datastream_options.yml index 0f8a778a9..3c2131cb9 100644 --- a/.github/workflows/test_datastream_options.yml +++ b/.github/workflows/test_datastream_options.yml @@ -42,14 +42,12 @@ jobs: - name: Get geopackage from hfsubset run: | - # HFSUBSET IS BROKEN - # hfsubset -w medium_range -s nextgen -v 2.1.1 -l divides,flowlines,network,nexus,forcing-weights,flowpath-attributes,model-attributes -o palisade.gpkg -t hl "Gages-09106150" - curl -L -O https://ngen-datastream.s3.us-east-2.amazonaws.com/palisade.gpkg - + hfsubset -w medium_range -s nextgen -v 2.1.1 -l divides,flowlines,network,nexus,forcing-weights,flowpath-attributes,model-attributes -o palisade.gpkg -t hl "Gages-09106150" + # curl -L -O https://ngen-datastream.s3.us-east-2.amazonaws.com/palisade.gpkg - name: Base test and NWM_RETRO_V3 run: | - sudo rm -rf $(pwd)/data/datastream_test + sudo rm -rf $(pwd)/data/datastream_test ./scripts/stream.sh -s 202006200100 -e 202006200200 -C NWM_RETRO_V3 -d $(pwd)/data/datastream_test -g $(pwd)/palisade.gpkg -R $(pwd)/configs/ngen/realization_sloth_nom_cfe_pet.json - name: Cache resource directory @@ -60,6 +58,12 @@ jobs: cp -r ./data/cache/datastream-resources ./data/cache/datastream-resources-missing sudo rm -rf ./data/cache/datastream-resources-no-forcings/ngen-forcings + - name: NextGen forcings CLI option test + if: always() + run: | + sudo rm -rf $(pwd)/data/datastream_test + ./scripts/stream.sh -g $(pwd)/palisade.gpkg -R $(pwd)/data/cache/datastream-resources/config/realization_sloth_nom_cfe_pet.json -F $(pwd)/data/cache/datastream-resources/ngen-forcings/1_forcings.nc -s 202006200100 -e 202006200200 -C NWM_RETRO_V3 -d $(pwd)/data/datastream_test + - name: Resource directory test missing all if: always() run: | @@ -101,11 +105,11 @@ jobs: TODAY=$(env TZ=US/Eastern date +'%Y%m%d') ./scripts/stream.sh -r ./data/cache/datastream-resources-no-forcings -s $TODAY"0100" -e $TODAY"0200" -C NOMADS -d $(pwd)/data/datastream_test - # - name: Test hfsubset options - # if: always() - # run: | - # sudo rm -rf $(pwd)/data/datastream_test - # ./scripts/stream.sh -s 202006200100 -e 202006200200 -C NWM_RETRO_V3 -d $(pwd)/data/datastream_test -I "Gages-09106150" -i hl -v 2.1.1 -R $(pwd)/configs/ngen/realization_sloth_nom_cfe_pet.json + - name: Test hfsubset options + if: always() + run: | + sudo rm -rf $(pwd)/data/datastream_test + ./scripts/stream.sh -s 202006200100 -e 202006200200 -C NWM_RETRO_V3 -d $(pwd)/data/datastream_test -I "Gages-09106150" -i hl -v 2.1.1 -R $(pwd)/configs/ngen/realization_sloth_nom_cfe_pet.json - name: S3 write out test if: always() @@ -140,10 +144,10 @@ jobs: ./scripts/stream.sh -s DAILY -C NWM_MEDIUM_RANGE -d $(pwd)/data/datastream_test -g $(pwd)/palisade.gpkg -R $(pwd)/configs/ngen/realization_sloth_nom_cfe_pet.json - # - name: DAILY analysis assim extend today test - # if: always() - # run: | - # sudo rm -rf $(pwd)/data/datastream_test - # ./scripts/stream.sh -s DAILY -C NWM_ANALYSIS_ASSIM_EXTEND -e $(date -d '-2 day' '+%Y%m%d0000') -d $(pwd)/data/datastream_test -g $(pwd)/palisade.gpkg -R $(pwd)/configs/ngen/realization_sloth_nom_cfe_pet.json + - name: DAILY analysis assim extend today test + if: always() + run: | + sudo rm -rf $(pwd)/data/datastream_test + ./scripts/stream.sh -s DAILY -C NWM_ANALYSIS_ASSIM_EXTEND -e $(date -d '-2 day' '+%Y%m%d0000') -d $(pwd)/data/datastream_test -g $(pwd)/palisade.gpkg -R $(pwd)/configs/ngen/realization_sloth_nom_cfe_pet.json - \ No newline at end of file + diff --git a/forcingprocessor/src/forcingprocessor/processor.py b/forcingprocessor/src/forcingprocessor/processor.py index 411797359..5f1206d5a 100644 --- a/forcingprocessor/src/forcingprocessor/processor.py +++ b/forcingprocessor/src/forcingprocessor/processor.py @@ -13,7 +13,7 @@ from datetime import datetime import gzip import tarfile, tempfile -from forcingprocessor.weights_hf2ds import hf2ds +from forcingprocessor.weights_hf2ds import multiprocess_hf2ds from forcingprocessor.plot_forcings import plot_ngen_forcings from forcingprocessor.utils import get_window, log_time, convert_url2key, report_usage, nwm_variables, ngen_variables @@ -784,7 +784,7 @@ def prep_ngen_data(conf): log_time("READWEIGHTS_START", log_file) if ii_verbose: print(f'Obtaining weights from geopackage(s)\n',flush=True) global weights_json - weights_json, jcatchment_dict = hf2ds(gpkg_files) + weights_json, jcatchment_dict = multiprocess_hf2ds(gpkg_files) ncatchments = len(weights_json) global x_min, x_max, y_min, y_max x_min, x_max, y_min, y_max = get_window(weights_json) @@ -833,6 +833,12 @@ def prep_ngen_data(conf): # t_ax = t_ax # nwm_data=nwm_data[0][None,:] data_array, t_ax, nwm_data = multiprocess_data_extract(jnwm_files,nprocs,weights_json,fs) + + if datetime.strptime(t_ax[0],'%Y-%m-%d %H:%M:%S') > datetime.strptime(t_ax[-1],'%Y-%m-%d %H:%M:%S'): + # Hack to ensure data is always written out with time moving forward. + t_ax=list(reversed(t_ax)) + data_array = np.flip(data_array,axis=0) + t_extract = time.perf_counter() - t0 complexity = (nfiles_tot * ncatchments) / 10000 score = complexity / t_extract diff --git a/forcingprocessor/src/forcingprocessor/weights_hf2ds.py b/forcingprocessor/src/forcingprocessor/weights_hf2ds.py index b47d91295..235e328ea 100644 --- a/forcingprocessor/src/forcingprocessor/weights_hf2ds.py +++ b/forcingprocessor/src/forcingprocessor/weights_hf2ds.py @@ -1,12 +1,43 @@ import json import argparse, time import geopandas as gpd -import boto3 import re, os import concurrent.futures as cf import pandas as pd gpd.options.io_engine = "pyogrio" +def multiprocess_hf2ds(files : list): + nprocs = min(len(files),os.cpu_count()) + i = 0 + k = 0 + nfiles = len(files) + files_list = [] + nper = nfiles // nprocs + nleft = nfiles - (nper * nprocs) + for i in range(nprocs): + k = nper + i + nleft + files_list.append(files[i:k]) + i=k + + weight_jsons = [] + jcatchment_dicts = [] + with cf.ProcessPoolExecutor(max_workers=nprocs) as pool: + for results in pool.map( + hf2ds, + files_list, + ): + weight_jsons.append(results[0]) + jcatchment_dicts.append(results[1]) + + print(f'Processes have returned') + weight_json = {} + [weight_json.update(x) for x in weight_jsons] + jcatchment_dict = {} + [jcatchment_dict.update(x) for x in jcatchment_dicts] + + return weight_json, jcatchment_dict + + def hf2ds(files : list): """ Extracts the weights from a list of files diff --git a/forcingprocessor/tests/test_forcingprocessor.py b/forcingprocessor/tests/test_forcingprocessor.py index 2b3ceac42..43d7803bf 100644 --- a/forcingprocessor/tests/test_forcingprocessor.py +++ b/forcingprocessor/tests/test_forcingprocessor.py @@ -1,16 +1,16 @@ import os from pathlib import Path -from datetime import datetime -from datetime import datetime +from datetime import datetime, timedelta, timezone from forcingprocessor.processor import prep_ngen_data from forcingprocessor.nwm_filenames_generator import generate_nwmfiles -import pytz as tz import pytest HF_VERSION="v2.1.1" -date = datetime.now(tz.timezone('US/Eastern')) +date = datetime.now(timezone.utc) date = date.strftime('%Y%m%d') hourminute = '0000' +yesterday = datetime.now(timezone.utc) - timedelta(hours=24) +yesterday = yesterday.strftime('%Y%m%d') test_dir = Path(__file__).parent data_dir = (test_dir/'data').resolve() forcings_dir = (data_dir/'forcings').resolve() @@ -163,7 +163,6 @@ def test_noaa_nwm_pds_https_medium_range(): os.remove(assert_file) def test_noaa_nwm_pds_https_analysis_assim(): - assert False, f'test_nomads_post_processed() is BROKEN - https://github.com/CIROH-UA/nwmurl/issues/36' nwmurl_conf['start_date'] = date + hourminute nwmurl_conf['end_date'] = date + hourminute nwmurl_conf["urlbaseinput"] = 7 @@ -174,11 +173,11 @@ def test_noaa_nwm_pds_https_analysis_assim(): os.remove(assert_file) def test_noaa_nwm_pds_https_analysis_assim_extend(): - assert False, f'test_nomads_post_processed() is BROKEN - https://github.com/CIROH-UA/nwmurl/issues/36' - nwmurl_conf['start_date'] = date + hourminute - nwmurl_conf['end_date'] = date + hourminute + nwmurl_conf['start_date'] = yesterday + hourminute + nwmurl_conf['end_date'] = yesterday + hourminute nwmurl_conf["urlbaseinput"] = 7 - nwmurl_conf["runinput"] = 7 + nwmurl_conf["runinput"] = 6 + nwmurl_conf["fcst_cycle"] = [16] generate_nwmfiles(nwmurl_conf) prep_ngen_data(conf) assert assert_file.exists() @@ -189,6 +188,7 @@ def test_noaa_nwm_pds_s3(): nwmurl_conf['end_date'] = date + hourminute nwmurl_conf["runinput"] = 1 nwmurl_conf["urlbaseinput"] = 8 + nwmurl_conf["fcst_cycle"] = [0] generate_nwmfiles(nwmurl_conf) prep_ngen_data(conf) assert assert_file.exists() diff --git a/python_tools/src/python_tools/configure_datastream.py b/python_tools/src/python_tools/configure_datastream.py index 16a18e8ac..35405dde9 100644 --- a/python_tools/src/python_tools/configure_datastream.py +++ b/python_tools/src/python_tools/configure_datastream.py @@ -1,7 +1,6 @@ import argparse, json, os, copy, re -from datetime import datetime, timedelta +from datetime import datetime, timezone, timedelta from pathlib import Path -import pytz as tz import platform import psutil @@ -83,22 +82,25 @@ def create_conf_nwm(args): start = args.start_date end = args.end_date + fcst_cycle = 0 + if "DAILY" in start: if end == "": - start_dt = datetime.now(tz.timezone('US/Eastern')) + start_dt = datetime.now(timezone.utc) else: start_dt = datetime.strptime(end,'%Y%m%d%H%M') end_dt = start_dt + num_hrs= 24 else: start_dt = datetime.strptime(start,'%Y%m%d%H%M') end_dt = datetime.strptime(end,'%Y%m%d%H%M') + num_hrs = (end_dt - start_dt).seconds // 3600 start_dt = start_dt.replace(hour=1,minute=0,second=0,microsecond=0) - end_dt = end_dt.replace(hour=1,minute=0,second=0,microsecond=0) + end_dt = end_dt.replace(hour=1,minute=0,second=0,microsecond=0) start_str_real = start_dt.strftime('%Y-%m-%d %H:%M:%S') - end_str_real = end_dt.strftime('%Y-%m-%d %H:%M:%S') start_str_nwm = start_dt.strftime('%Y%m%d%H%M') - end_str_nwm = start_dt.strftime('%Y%m%d%H%M') + end_str_nwm = start_dt.strftime('%Y%m%d%H%M') if "RETRO" in args.forcing_source: if "V2" in args.forcing_source: @@ -115,8 +117,7 @@ def create_conf_nwm(args): "write_to_file" : True } else: - varinput = 5 - num_hrs = (end_dt - start_dt).seconds // 3600 + varinput = 5 if "HAWAII" in args.forcing_source: geoinput=2 @@ -146,12 +147,17 @@ def create_conf_nwm(args): runinput=6 num_hrs=28 dt=0 + fcst_cycle = 16 + start_dt = start_dt - timedelta(hours=12) + start_str_real = start_dt.strftime('%Y-%m-%d %H:%M:%S') else: + start_dt = start_dt - timedelta(hours=3) + start_str_real = start_dt.strftime('%Y-%m-%d %H:%M:%S') runinput=5 num_hrs=3 else: runinput=2 - num_hrs = 24 + num_hrs=24 nwm_conf = { "forcing_type" : "operational_archive", @@ -162,10 +168,13 @@ def create_conf_nwm(args): "geoinput" : geoinput, "meminput" : 0, "urlbaseinput" : urlbaseinput, - "fcst_cycle" : [0], + "fcst_cycle" : [fcst_cycle], "lead_time" : [x+dt for x in range(num_hrs)] } + end_str_real = start_dt + timedelta(hours=num_hrs-1) + end_str_real = end_str_real.strftime('%Y-%m-%d %H:%M:%S') + return nwm_conf, start_str_real, end_str_real def create_conf_fp(args): @@ -178,7 +187,7 @@ def create_conf_fp(args): output_file_type = ["netcdf"] if len(args.s3_bucket) > 0: if "DAILY" in args.start_date: - args.s3_prefix = re.sub(r"\DAILY",datetime.now(tz.timezone('US/Eastern')).strftime('%Y%m%d'),args.s3_prefix) + args.s3_prefix = re.sub(r"\DAILY",datetime.now(timezone.utc).strftime('%Y%m%d'),args.s3_prefix) output_path = f"s3://{args.s3_bucket}/{args.s3_prefix}" elif len(args.docker_mount) > 0: gpkg_file = [f"{args.docker_mount}/datastream-resources/config/{geo_base}"] @@ -225,6 +234,10 @@ def create_confs(args): nwm_conf = {} fp_conf = {} fp_conf['forcing'] = args.forcings + start_real = datetime.strptime(args.start_date,'%Y%m%d%H%M') + end_real = datetime.strptime(args.end_date,'%Y%m%d%H%M') + start_real = start_dt.strftime('%Y-%m-%d %H:%M:%S') + end_real = end_dt.strftime('%Y-%m-%d %H:%M:%S') elif os.path.exists(os.path.join(args.resource_path,"nwm-forcings")): nwm_conf = {} fp_conf = create_conf_fp(args) diff --git a/python_tools/tests/test_configurer.py b/python_tools/tests/test_configurer.py index c15df8de9..9e5f1f694 100644 --- a/python_tools/tests/test_configurer.py +++ b/python_tools/tests/test_configurer.py @@ -1,6 +1,5 @@ import os, pytest, json -from datetime import datetime -import pytz as tz +from datetime import datetime, timezone, timedelta from python_tools.configure_datastream import create_confs SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) @@ -110,7 +109,7 @@ def test_conf_daily(): with open(REALIZATION_RUN,'r') as fp: data = json.load(fp) start = datetime.strptime(data['time']['start_time'],"%Y-%m-%d %H:%M:%S") - assert start.day == datetime.now(tz.timezone('US/Eastern')).day + assert start.day == datetime.now(timezone.utc).day with open(CONF_NWM,'r') as fp: data = json.load(fp) @@ -142,7 +141,10 @@ def test_conf_daily_short_range(): with open(REALIZATION_RUN,'r') as fp: data = json.load(fp) start = datetime.strptime(data['time']['start_time'],"%Y-%m-%d %H:%M:%S") - assert start.day == datetime.now(tz.timezone('US/Eastern')).day + end = datetime.strptime(data['time']['end_time'],"%Y-%m-%d %H:%M:%S") + assert start.day == datetime.now(timezone.utc).day + assert end.day == datetime.now(timezone.utc).day + assert end.hour == 18 with open(CONF_NWM,'r') as fp: data = json.load(fp) @@ -159,7 +161,7 @@ def test_conf_daily_medium_range(): with open(REALIZATION_RUN,'r') as fp: data = json.load(fp) start = datetime.strptime(data['time']['start_time'],"%Y-%m-%d %H:%M:%S") - assert start.day == datetime.now(tz.timezone('US/Eastern')).day + assert start.day == datetime.now(timezone.utc).day with open(CONF_NWM,'r') as fp: data = json.load(fp) @@ -176,7 +178,10 @@ def test_conf_daily_noamds(): with open(REALIZATION_RUN,'r') as fp: data = json.load(fp) start = datetime.strptime(data['time']['start_time'],"%Y-%m-%d %H:%M:%S") - assert start.day == datetime.now(tz.timezone('US/Eastern')).day + end = datetime.strptime(data['time']['end_time'],"%Y-%m-%d %H:%M:%S") + assert start.day == datetime.now(timezone.utc).day + assert end.day == (datetime.now(timezone.utc)+timedelta(hours=240-1)).day + assert end.hour == 0 with open(CONF_NWM,'r') as fp: data = json.load(fp) @@ -193,7 +198,7 @@ def test_conf_daily_noamds_postprocessed(): with open(REALIZATION_RUN,'r') as fp: data = json.load(fp) start = datetime.strptime(data['time']['start_time'],"%Y-%m-%d %H:%M:%S") - assert start.day == datetime.now(tz.timezone('US/Eastern')).day + assert start.day == datetime.now(timezone.utc).day with open(CONF_NWM,'r') as fp: data = json.load(fp) @@ -213,7 +218,11 @@ def test_conf_daily_assim_split_vpu_s3out(): with open(REALIZATION_RUN,'r') as fp: data = json.load(fp) start = datetime.strptime(data['time']['start_time'],"%Y-%m-%d %H:%M:%S") - assert start.day == datetime.now(tz.timezone('US/Eastern')).day + end = datetime.strptime(data['time']['end_time'],"%Y-%m-%d %H:%M:%S") + assert start.hour == 22 + assert start.day == (datetime.now(timezone.utc)-timedelta(hours=24)).day + assert end.day == datetime.now(timezone.utc).day + assert end.hour == 00 with open(CONF_NWM,'r') as fp: data = json.load(fp) @@ -240,12 +249,17 @@ def test_conf_daily_assim_extend_split_vpu_s3out(): with open(REALIZATION_RUN,'r') as fp: data = json.load(fp) start = datetime.strptime(data['time']['start_time'],"%Y-%m-%d %H:%M:%S") - assert start.day == datetime.now(tz.timezone('US/Eastern')).day + end = datetime.strptime(data['time']['end_time'],"%Y-%m-%d %H:%M:%S") + assert start.hour == 13 + assert start.day == (datetime.now(timezone.utc)-timedelta(hours=24)).day + assert end.day == datetime.now(timezone.utc).day + assert end.hour == 16 with open(CONF_NWM,'r') as fp: data = json.load(fp) - assert data['urlbaseinput'] == 7 - assert data['runinput'] == 6 + assert data['urlbaseinput'] == 7 + assert data['runinput'] == 6 + assert data['fcst_cycle'][0] == 16 assert len(data['lead_time'] ) == 28 with open(CONF_FP,'r') as fp: @@ -253,4 +267,27 @@ def test_conf_daily_assim_extend_split_vpu_s3out(): assert len(data['forcing']['gpkg_file']) == 4 assert data['storage']['output_path'].startswith("s3://ciroh-community-ngen-datastream/pytest") +def test_conf_forcings_provided(): + inputs.start_date = "202410300100" + inputs.end_date = "202410300400" + inputs.forcing_source = "" + inputs.forcings = "test_file.nc" + inputs.forcing_split_vpu = "01,02,03W,16" + inputs.s3_bucket = "" + inputs.s3_prefix = "" + create_confs(inputs) + check_paths() + + with open(REALIZATION_RUN,'r') as fp: + data = json.load(fp) + start = datetime.strptime(data['time']['start_time'],"%Y-%m-%d %H:%M:%S") + end = datetime.strptime(data['time']['end_time'],"%Y-%m-%d %H:%M:%S") + assert start.hour == 1 + assert start.day == 30 + assert end.hour == 4 + assert end.day == 30 + + + + diff --git a/research_datastream/terraform/GETTING_STARTED.md b/research_datastream/terraform/GETTING_STARTED.md index 514cbf597..f72757e72 100644 --- a/research_datastream/terraform/GETTING_STARTED.md +++ b/research_datastream/terraform/GETTING_STARTED.md @@ -87,7 +87,7 @@ Starting from execution_template_datastream. These options correspond directly t "subset_id" : "Gages-09106150", "hydrofabric_version" : "2.1.1", "s3_bucket" : "ngen_datstream", - "object_prefix" : "datastream_cloud_test" + "s3_prefix" : "datastream_cloud_test" } ``` @@ -100,7 +100,7 @@ Starting from execution_template_general_purpose. Make sure to wrap commands in ``` ### Edit Run Options -The state machine is capable of confirming a complete execution by checking for the existence output data in the form of an s3 object. Set booleans here. If `s3_bucket` and `object_prefix` are provided in `datastream_command_options`, `ngen-datastream` will create a `ngen-run.tar.gz` file that can be found at `s3:////ngen-run.tar.gz` +The state machine is capable of confirming a complete execution by checking for the existence output data in the form of an s3 object. Set booleans here. If `s3_bucket` and `s3_prefix` are provided in `datastream_command_options`, `ngen-datastream` will create a `ngen-run.tar.gz` file that can be found at `s3:////ngen-run.tar.gz` ``` "run_options":{ "ii_delete_volume" : false, diff --git a/research_datastream/terraform/executions/execution_datastream_example.json b/research_datastream/terraform/executions/execution_datastream_example.json index 471be3a49..773f7103d 100644 --- a/research_datastream/terraform/executions/execution_datastream_example.json +++ b/research_datastream/terraform/executions/execution_datastream_example.json @@ -9,14 +9,14 @@ "subset_id" : "Gages-09106150", "hydrofabric_version" : "2.1.1", "s3_bucket" : "my-bucket", - "object_prefix" : "test_directory" + "s3_prefix" : "test_directory" }, "run_options":{ "ii_delete_volume" : true, "ii_check_s3" : true }, "instance_parameters": { - "ImageId": "ami-05c2c89efbcce18a2", + "ImageId": "ami-062bdcbb454b8d833", "InstanceType": "t4g.xlarge", "KeyName": "made_up_key", "SecurityGroupIds": [ diff --git a/research_datastream/terraform/executions/execution_gp_example.json b/research_datastream/terraform/executions/execution_gp_example.json index 802a5b927..205a0f2af 100644 --- a/research_datastream/terraform/executions/execution_gp_example.json +++ b/research_datastream/terraform/executions/execution_gp_example.json @@ -4,11 +4,11 @@ ], "run_options":{ "ii_delete_volume" : false, - "ii_check_s3" : true + "ii_check_s3" : false }, "instance_parameters" : { - "ImageId" : "ami-0fd0b3e0199927973", + "ImageId" : "ami-062bdcbb454b8d833", "InstanceType" : "t4g.large", "KeyName" : "jlaser_west2", "SecurityGroupIds" : ["sg-04365a4248fe126bc"], diff --git a/research_datastream/terraform/executions/execution_template_datastream.json b/research_datastream/terraform/executions/execution_template_datastream.json index eae3055d1..5b5cc5e51 100644 --- a/research_datastream/terraform/executions/execution_template_datastream.json +++ b/research_datastream/terraform/executions/execution_template_datastream.json @@ -9,7 +9,7 @@ "subset_id" : "", "hydrofabric_version" : "", "s3_bucket" : "", - "object_prefix" : "" + "s3_prefix" : "" }, "run_options":{ "ii_delete_volume" : true, diff --git a/research_datastream/terraform/lambda_functions/checker/lambda_function.py b/research_datastream/terraform/lambda_functions/checker/lambda_function.py index dbd0b9041..2957baa8a 100644 --- a/research_datastream/terraform/lambda_functions/checker/lambda_function.py +++ b/research_datastream/terraform/lambda_functions/checker/lambda_function.py @@ -1,30 +1,60 @@ import boto3 import time +import re +import datetime +from datetime import timezone client_s3 = boto3.client('s3') -def wait_for_object_existence(bucket_name,object_key): +def wait_for_object_existence(bucket_name,prefix): - ii_obj_found = False - while not ii_obj_found: - try: - client_s3.head_object(Bucket=bucket_name, Key=object_key) - print(f"Key: '{object_key}' found!") - ii_obj_found = True - except: + while True: + response = client_s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix) + if 'Contents' in response: + print(f"Objects exists in bucket {bucket_name} at prefix {prefix}") + return + else: time.sleep(1) - if not ii_obj_found: raise Exception(f'{object_key} does not exist in {bucket_name} ') def lambda_handler(event, context): + bucket = None + prefix = None if event["run_options"]['ii_check_s3']: - if not 'datastream_command_options' in event: raise Exception(f'The lambda only knows how to check s3 object for datastream_command_options with s3_bucket and object_prefix set') - bucket = event['datastream_command_options']['s3_bucket'] - obj_key = event['datastream_command_options']['object_prefix'] + '/ngen-run.tar.gz' - print(f'Checking if {obj_key} exists in {bucket}') - wait_for_object_existence(bucket, obj_key) + if not 'datastream_command_options' in event: + for jcmd in event["commands"]: + bucket_pattern = r"--s3_bucket[=\s']+([^\s']+)" + match = re.search(bucket_pattern, jcmd) + if match: + bucket = match.group(1) + prefix_pattern = r"--s3_prefix[=\s']+([^\s']+)" + match = re.search(prefix_pattern, jcmd) + if match: + prefix = match.group(1) + else: + bucket = event['datastream_command_options']['s3_bucket'] + prefix = event['datastream_command_options']['s3_prefix'] + if bucket is None or prefix is None: + raise Exception(f'User specified ii_check_s3, but no s3_bucket or s3_prefix were not found in commands') + if "DAILY" in prefix: + prefix = re.sub(r"\DAILY",datetime.datetime.now(timezone.utc).strftime('%Y%m%d'),prefix) + print(f'Checking if any objects with prefix {prefix} exists in {bucket}') + wait_for_object_existence(bucket, prefix) else: print(f'No s3 object check was performed.') return event + +if __name__ == "__main__": + + import argparse, json + parser = argparse.ArgumentParser() + parser.add_argument('--execution', dest="execution", type=str, help="",default = None) + args = parser.parse_args() + + with open(args.execution,'r') as fp: + execution = json.load(fp) + + lambda_handler(execution,"") + diff --git a/research_datastream/terraform/lambda_functions/poller/lambda_function.py b/research_datastream/terraform/lambda_functions/poller/lambda_function.py index 6a0058cf3..fdc7bcde6 100644 --- a/research_datastream/terraform/lambda_functions/poller/lambda_function.py +++ b/research_datastream/terraform/lambda_functions/poller/lambda_function.py @@ -19,6 +19,7 @@ def lambda_handler(event, context): """ Generic Poller funcion """ + t0 = time.perf_counter() global client_ssm, client_ec2 client_ssm = boto3.client('ssm',region_name=event['region']) @@ -29,7 +30,8 @@ def lambda_handler(event, context): output = get_command_result(command_id,instance_id) ii_pass = False - while not ii_pass: + ii_time = False + while not ii_pass and not ii_time: output = get_command_result(command_id,instance_id) if output['Status'] == 'Success': print(f'Command has succeeded!') @@ -37,8 +39,11 @@ def lambda_handler(event, context): break elif output['Status'] == 'InProgress': ii_pass = False - print(f'Commands are still in progress. Waiting a minute and checking again') - time.sleep(10) + print(f'Commands are still in progress. Waiting 5 seconds and checking again') + if (time.perf_counter() - t0) > 850: + print(f'Cycling...') + ii_time = True + time.sleep(5) else: raise Exception(f'Command failed {output}') diff --git a/research_datastream/terraform/lambda_functions/streamcommander/lambda_function.py b/research_datastream/terraform/lambda_functions/streamcommander/lambda_function.py index 28a7696a0..609870a1e 100644 --- a/research_datastream/terraform/lambda_functions/streamcommander/lambda_function.py +++ b/research_datastream/terraform/lambda_functions/streamcommander/lambda_function.py @@ -38,7 +38,7 @@ def lambda_handler(event, context): event['commands'] = [] if "s3_bucket" in ds_options: bucket = ds_options["s3_bucket"] - prefix = ds_options["object_prefix"] + prefix = ds_options["s3_prefix"] nprocs = ds_options["nprocs"] start = ds_options["start_time"] end = ds_options["end_time"] diff --git a/research_datastream/terraform/test/execution_gp_arm_docker_buildNtester.json b/research_datastream/terraform/test/execution_gp_arm_docker_buildNtester.json index 12cca692d..5a0372656 100644 --- a/research_datastream/terraform/test/execution_gp_arm_docker_buildNtester.json +++ b/research_datastream/terraform/test/execution_gp_arm_docker_buildNtester.json @@ -14,7 +14,7 @@ }, "instance_parameters" : { - "ImageId" : "ami-0e991a4647b49dd55", + "ImageId" : "ami-062bdcbb454b8d833", "InstanceType" : "t4g.large", "KeyName" : "actions_key_arm", "SecurityGroupIds" : ["sg-0fcbe0c6d6faa0117"], diff --git a/research_datastream/terraform/test/execution_gp_test.json b/research_datastream/terraform/test/execution_gp_test.json index 6a00aa07c..c8c62a22c 100644 --- a/research_datastream/terraform/test/execution_gp_test.json +++ b/research_datastream/terraform/test/execution_gp_test.json @@ -8,7 +8,7 @@ }, "instance_parameters" : { - "ImageId" : "ami-0e991a4647b49dd55", + "ImageId" : "ami-062bdcbb454b8d833", "InstanceType" : "t4g.nano", "KeyName" : "actions_key", "SecurityGroupIds" : ["sg-06f57f883e902d7bc"], @@ -30,7 +30,7 @@ { "DeviceName": "/dev/xvda", "Ebs": { - "VolumeSize": 16, + "VolumeSize": 32, "VolumeType": "gp3" } } diff --git a/scripts/stream.sh b/scripts/stream.sh index 1d53cc580..10ae84b3e 100755 --- a/scripts/stream.sh +++ b/scripts/stream.sh @@ -435,6 +435,10 @@ if [ ! -z $NGEN_FORCINGS ]; then echo "Using $NGEN_FORCINGS" FORCINGS_BASE=$(basename $NGEN_FORCINGS) get_file "$NGEN_FORCINGS" "$NGENRUN_FORCINGS" + if [ ! -e $$DATASTREAM_RESOURCES_NGENFORCINGS ]; then + mkdir -p $DATASTREAM_RESOURCES_NGENFORCINGS + fi + get_file "$NGEN_FORCINGS" "$DATASTREAM_RESOURCES_NGENFORCINGS" log_time "GET_FORCINGS_END" $DATASTREAM_PROFILING else log_time "FORCINGPROCESSOR_START" $DATASTREAM_PROFILING