diff --git a/CHANGELOG.md b/CHANGELOG.md index 9c9c8855..5c880a25 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Updated jupyter notebook - Update notebook test to use python code directly instead of using jupyter notebook - Updated python libraries - + - Update history json to have url in history ### Deprecated ### Removed ### Fixed diff --git a/podaac/merger/harmony/service.py b/podaac/merger/harmony/service.py index 0a053a59..9081f807 100644 --- a/podaac/merger/harmony/service.py +++ b/podaac/merger/harmony/service.py @@ -93,7 +93,7 @@ def process_catalog(self, catalog: Catalog): self.logger.info('Finished granule downloads') output_path = Path(temp_dir).joinpath(filename).resolve() - merge_netcdf_files(input_files, output_path, logger=self.logger) + merge_netcdf_files(input_files, output_path, granule_urls, logger=self.logger) staged_url = self._stage(str(output_path), filename, NETCDF4_MIME) # -- Output to STAC catalog -- diff --git a/podaac/merger/merge.py b/podaac/merger/merge.py index f4e1a47d..c2cd9edc 100644 --- a/podaac/merger/merge.py +++ b/podaac/merger/merge.py @@ -24,7 +24,7 @@ def is_file_empty(parent_group): return True -def merge_netcdf_files(original_input_files, output_file, logger=getLogger(__name__), perf_stats=None, process_count=None): # pylint: disable=too-many-locals +def merge_netcdf_files(original_input_files, output_file, granule_urls, logger=getLogger(__name__), perf_stats=None, process_count=None): # pylint: disable=too-many-locals """ Main entrypoint to merge implementation. Merges n >= 2 granules together as a single granule. Named in reference to original Java implementation. @@ -64,7 +64,7 @@ def merge_netcdf_files(original_input_files, output_file, logger=getLogger(__nam if is_empty is False: input_files.append(file) - preprocess = run_preprocess(input_files, process_count) + preprocess = run_preprocess(input_files, process_count, granule_urls) group_list = preprocess['group_list'] max_dims = preprocess['max_dims'] var_info = preprocess['var_info'] diff --git a/podaac/merger/merge_cli.py b/podaac/merger/merge_cli.py index 2f55d26f..6efd2589 100644 --- a/podaac/merger/merge_cli.py +++ b/podaac/merger/merge_cli.py @@ -37,7 +37,8 @@ def main(): logging.basicConfig(level=logging.DEBUG) input_files = list(Path(args.data_dir).resolve().iterdir()) - merge_netcdf_files(input_files, args.output_path, process_count=args.cores) + granule_urls = [] + merge_netcdf_files(input_files, args.output_path, granule_urls, process_count=args.cores) if __name__ == '__main__': diff --git a/podaac/merger/preprocess_worker.py b/podaac/merger/preprocess_worker.py index 85c8421e..8ff1c4fa 100644 --- a/podaac/merger/preprocess_worker.py +++ b/podaac/merger/preprocess_worker.py @@ -1,7 +1,6 @@ """Preprocessing methods and the utilities to automagically run them in single-thread/multiprocess modes""" import json -import os import queue from copy import deepcopy from datetime import datetime, timezone @@ -15,7 +14,7 @@ from podaac.merger.variable_info import VariableInfo -def run_preprocess(file_list, process_count): +def run_preprocess(file_list, process_count, granule_urls): """ Automagically run preprocessing in an optimized mode determined by the environment @@ -28,9 +27,9 @@ def run_preprocess(file_list, process_count): """ if process_count == 1: - return _run_single_core(file_list) + return _run_single_core(file_list, granule_urls) - return _run_multi_core(file_list, process_count) + return _run_multi_core(file_list, process_count, granule_urls) def merge_max_dims(merged_max_dims, subset_max_dims): @@ -76,7 +75,7 @@ def merge_metadata(merged_metadata, subset_metadata): merged_attrs[attr_name] = False # mark as inconsistent -def construct_history(input_files): +def construct_history(input_files, granule_urls): """ Construct history JSON entry for this concatenation operation https://wiki.earthdata.nasa.gov/display/TRT/In-File+Provenance+Metadata+-+TRT-42 @@ -91,10 +90,10 @@ def construct_history(input_files): dict History JSON constructed for this concat operation """ - base_names = list(map(os.path.basename, input_files)) + history_json = { "date_time": datetime.now(tz=timezone.utc).isoformat(), - "derived_from": base_names, + "derived_from": granule_urls, "program": 'concise', "version": importlib_metadata.distribution('podaac-concise').version, "parameters": f'input_files={input_files}', @@ -124,7 +123,7 @@ def retrieve_history(dataset): return json.loads(history_json) -def _run_single_core(file_list): +def _run_single_core(file_list, granule_urls): """ Run the granule preprocessing in the current thread/single-core mode @@ -153,7 +152,7 @@ def _run_single_core(file_list): group_list.sort() # Ensure insertion order doesn't matter between granules - history_json.append(construct_history(file_list)) + history_json.append(construct_history(file_list, granule_urls)) group_metadata[group_list[0]]['history_json'] = json.dumps( history_json, default=str @@ -168,7 +167,7 @@ def _run_single_core(file_list): } -def _run_multi_core(file_list, process_count): +def _run_multi_core(file_list, process_count, granule_urls): """ Run the granule preprocessing in multi-core mode. This method spins up the number of processes defined by process_count which process granules @@ -248,7 +247,7 @@ def _run_multi_core(file_list, process_count): # Merge history_json entries from input files history_json.extend(result['history_json']) - history_json.append(construct_history(file_list)) + history_json.append(construct_history(file_list, granule_urls)) group_metadata[group_list[0]]['history_json'] = json.dumps( history_json, default=str diff --git a/tests/test_merge.py b/tests/test_merge.py index 02412350..fd0f1d73 100644 --- a/tests/test_merge.py +++ b/tests/test_merge.py @@ -130,8 +130,9 @@ def run_verification(self, data_dir, output_name, process_count=None): output_path = self.__output_path.joinpath(output_name) data_path = self.__test_data_path.joinpath(data_dir) input_files = list(data_path.iterdir()) + granule_urls = [] - merge.merge_netcdf_files(input_files, output_path, process_count=process_count) + merge.merge_netcdf_files(input_files, output_path, granule_urls, process_count=process_count) merged_dataset = nc.Dataset(output_path) file_map = self.verify_files(merged_dataset, input_files, data_dir) @@ -146,8 +147,9 @@ def run_java_verification(self, output_name, process_count=None): java_path = self.__test_data_path.joinpath('java_results', 'merged-ASCATA-L2-25km-Lat-90.0_90.0-Lon-180.0_180.0.subset.nc') python_path = self.__output_path.joinpath(output_name) + granule_urls = [] - merge.merge_netcdf_files(input_files, python_path, process_count=process_count) + merge.merge_netcdf_files(input_files, python_path, granule_urls, process_count=process_count) java_dataset = nc.Dataset(java_path) python_dataset = nc.Dataset(python_path) @@ -207,7 +209,7 @@ def assert_valid_history(merged_dataset, input_files): history_json = json.loads(merged_dataset.getncattr('history_json'))[-1] assert 'date_time' in history_json assert history_json.get('program') == 'concise' - assert history_json.get('derived_from') == input_files + assert history_json.get('derived_from') == [] # list of granule urls assert history_json.get('version') == importlib_metadata.distribution('podaac-concise').version assert 'input_files=' in history_json.get('parameters') assert history_json.get('program_ref') == 'https://cmr.earthdata.nasa.gov:443/search/concepts/S2153799015-POCLOUD' @@ -217,6 +219,7 @@ def assert_valid_history(merged_dataset, input_files): merge.merge_netcdf_files( original_input_files=input_files, output_file=self.__output_path.joinpath(output_name_single), + granule_urls=[], process_count=1 ) merged_dataset = nc.Dataset(self.__output_path.joinpath(output_name_single)) @@ -228,6 +231,7 @@ def assert_valid_history(merged_dataset, input_files): merge.merge_netcdf_files( original_input_files=input_files, output_file=self.__output_path.joinpath(output_name_multi), + granule_urls=[], process_count=2 ) merged_dataset = nc.Dataset(self.__output_path.joinpath(output_name_multi)) @@ -243,6 +247,7 @@ def assert_valid_history(merged_dataset, input_files): merge.merge_netcdf_files( original_input_files=input_files, output_file=self.__output_path.joinpath(output_name_single), + granule_urls=[], process_count=1 ) merged_dataset = nc.Dataset(self.__output_path.joinpath(output_name_single)) @@ -274,14 +279,15 @@ def test_mismatched_vars(self): } # Test single process merge - merge.merge_netcdf_files(input_files, output_path_single, process_count=1) + granule_urls = [] + merge.merge_netcdf_files(input_files, output_path_single, granule_urls, process_count=1) dataset = nc.Dataset(output_path_single) actual_vars = set(dataset.variables.keys()) actual_vars.remove('subset_files') assert actual_vars == expected_vars # Test multi-process merge - merge.merge_netcdf_files(input_files, output_path_multi, process_count=2) + merge.merge_netcdf_files(input_files, output_path_multi, granule_urls, process_count=2) dataset = nc.Dataset(output_path_multi) actual_vars = set(dataset.variables.keys()) actual_vars.remove('subset_files')