Skip to content

Commit

Permalink
fix pytests
Browse files Browse the repository at this point in the history
  • Loading branch information
sliu008 committed Oct 25, 2023
1 parent f292533 commit f8d79b6
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 21 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Updated jupyter notebook
- Update notebook test to use python code directly instead of using jupyter notebook
- Updated python libraries

- Update history json to have url in history
### Deprecated
### Removed
### Fixed
Expand Down
2 changes: 1 addition & 1 deletion podaac/merger/harmony/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def process_catalog(self, catalog: Catalog):
self.logger.info('Finished granule downloads')

output_path = Path(temp_dir).joinpath(filename).resolve()
merge_netcdf_files(input_files, output_path, logger=self.logger)
merge_netcdf_files(input_files, output_path, granule_urls, logger=self.logger)
staged_url = self._stage(str(output_path), filename, NETCDF4_MIME)

# -- Output to STAC catalog --
Expand Down
4 changes: 2 additions & 2 deletions podaac/merger/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def is_file_empty(parent_group):
return True


def merge_netcdf_files(original_input_files, output_file, logger=getLogger(__name__), perf_stats=None, process_count=None): # pylint: disable=too-many-locals
def merge_netcdf_files(original_input_files, output_file, granule_urls, logger=getLogger(__name__), perf_stats=None, process_count=None): # pylint: disable=too-many-locals
"""
Main entrypoint to merge implementation. Merges n >= 2 granules together as a single
granule. Named in reference to original Java implementation.
Expand Down Expand Up @@ -64,7 +64,7 @@ def merge_netcdf_files(original_input_files, output_file, logger=getLogger(__nam
if is_empty is False:
input_files.append(file)

preprocess = run_preprocess(input_files, process_count)
preprocess = run_preprocess(input_files, process_count, granule_urls)
group_list = preprocess['group_list']
max_dims = preprocess['max_dims']
var_info = preprocess['var_info']
Expand Down
3 changes: 2 additions & 1 deletion podaac/merger/merge_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ def main():
logging.basicConfig(level=logging.DEBUG)

input_files = list(Path(args.data_dir).resolve().iterdir())
merge_netcdf_files(input_files, args.output_path, process_count=args.cores)
granule_urls = []
merge_netcdf_files(input_files, args.output_path, granule_urls, process_count=args.cores)


if __name__ == '__main__':
Expand Down
21 changes: 10 additions & 11 deletions podaac/merger/preprocess_worker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""Preprocessing methods and the utilities to automagically run them in single-thread/multiprocess modes"""

import json
import os
import queue
from copy import deepcopy
from datetime import datetime, timezone
Expand All @@ -15,7 +14,7 @@
from podaac.merger.variable_info import VariableInfo


def run_preprocess(file_list, process_count):
def run_preprocess(file_list, process_count, granule_urls):
"""
Automagically run preprocessing in an optimized mode determined by the environment
Expand All @@ -28,9 +27,9 @@ def run_preprocess(file_list, process_count):
"""

if process_count == 1:
return _run_single_core(file_list)
return _run_single_core(file_list, granule_urls)

return _run_multi_core(file_list, process_count)
return _run_multi_core(file_list, process_count, granule_urls)


def merge_max_dims(merged_max_dims, subset_max_dims):
Expand Down Expand Up @@ -76,7 +75,7 @@ def merge_metadata(merged_metadata, subset_metadata):
merged_attrs[attr_name] = False # mark as inconsistent


def construct_history(input_files):
def construct_history(input_files, granule_urls):
"""
Construct history JSON entry for this concatenation operation
https://wiki.earthdata.nasa.gov/display/TRT/In-File+Provenance+Metadata+-+TRT-42
Expand All @@ -91,10 +90,10 @@ def construct_history(input_files):
dict
History JSON constructed for this concat operation
"""
base_names = list(map(os.path.basename, input_files))

history_json = {
"date_time": datetime.now(tz=timezone.utc).isoformat(),
"derived_from": base_names,
"derived_from": granule_urls,
"program": 'concise',
"version": importlib_metadata.distribution('podaac-concise').version,
"parameters": f'input_files={input_files}',
Expand Down Expand Up @@ -124,7 +123,7 @@ def retrieve_history(dataset):
return json.loads(history_json)


def _run_single_core(file_list):
def _run_single_core(file_list, granule_urls):
"""
Run the granule preprocessing in the current thread/single-core mode
Expand Down Expand Up @@ -153,7 +152,7 @@ def _run_single_core(file_list):

group_list.sort() # Ensure insertion order doesn't matter between granules

history_json.append(construct_history(file_list))
history_json.append(construct_history(file_list, granule_urls))
group_metadata[group_list[0]]['history_json'] = json.dumps(
history_json,
default=str
Expand All @@ -168,7 +167,7 @@ def _run_single_core(file_list):
}


def _run_multi_core(file_list, process_count):
def _run_multi_core(file_list, process_count, granule_urls):
"""
Run the granule preprocessing in multi-core mode. This method spins up
the number of processes defined by process_count which process granules
Expand Down Expand Up @@ -248,7 +247,7 @@ def _run_multi_core(file_list, process_count):
# Merge history_json entries from input files
history_json.extend(result['history_json'])

history_json.append(construct_history(file_list))
history_json.append(construct_history(file_list, granule_urls))
group_metadata[group_list[0]]['history_json'] = json.dumps(
history_json,
default=str
Expand Down
16 changes: 11 additions & 5 deletions tests/test_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,9 @@ def run_verification(self, data_dir, output_name, process_count=None):
output_path = self.__output_path.joinpath(output_name)
data_path = self.__test_data_path.joinpath(data_dir)
input_files = list(data_path.iterdir())
granule_urls = []

merge.merge_netcdf_files(input_files, output_path, process_count=process_count)
merge.merge_netcdf_files(input_files, output_path, granule_urls, process_count=process_count)

merged_dataset = nc.Dataset(output_path)
file_map = self.verify_files(merged_dataset, input_files, data_dir)
Expand All @@ -146,8 +147,9 @@ def run_java_verification(self, output_name, process_count=None):

java_path = self.__test_data_path.joinpath('java_results', 'merged-ASCATA-L2-25km-Lat-90.0_90.0-Lon-180.0_180.0.subset.nc')
python_path = self.__output_path.joinpath(output_name)
granule_urls = []

merge.merge_netcdf_files(input_files, python_path, process_count=process_count)
merge.merge_netcdf_files(input_files, python_path, granule_urls, process_count=process_count)

java_dataset = nc.Dataset(java_path)
python_dataset = nc.Dataset(python_path)
Expand Down Expand Up @@ -207,7 +209,7 @@ def assert_valid_history(merged_dataset, input_files):
history_json = json.loads(merged_dataset.getncattr('history_json'))[-1]
assert 'date_time' in history_json
assert history_json.get('program') == 'concise'
assert history_json.get('derived_from') == input_files
assert history_json.get('derived_from') == [] # list of granule urls
assert history_json.get('version') == importlib_metadata.distribution('podaac-concise').version
assert 'input_files=' in history_json.get('parameters')
assert history_json.get('program_ref') == 'https://cmr.earthdata.nasa.gov:443/search/concepts/S2153799015-POCLOUD'
Expand All @@ -217,6 +219,7 @@ def assert_valid_history(merged_dataset, input_files):
merge.merge_netcdf_files(
original_input_files=input_files,
output_file=self.__output_path.joinpath(output_name_single),
granule_urls=[],
process_count=1
)
merged_dataset = nc.Dataset(self.__output_path.joinpath(output_name_single))
Expand All @@ -228,6 +231,7 @@ def assert_valid_history(merged_dataset, input_files):
merge.merge_netcdf_files(
original_input_files=input_files,
output_file=self.__output_path.joinpath(output_name_multi),
granule_urls=[],
process_count=2
)
merged_dataset = nc.Dataset(self.__output_path.joinpath(output_name_multi))
Expand All @@ -243,6 +247,7 @@ def assert_valid_history(merged_dataset, input_files):
merge.merge_netcdf_files(
original_input_files=input_files,
output_file=self.__output_path.joinpath(output_name_single),
granule_urls=[],
process_count=1
)
merged_dataset = nc.Dataset(self.__output_path.joinpath(output_name_single))
Expand Down Expand Up @@ -274,14 +279,15 @@ def test_mismatched_vars(self):
}

# Test single process merge
merge.merge_netcdf_files(input_files, output_path_single, process_count=1)
granule_urls = []
merge.merge_netcdf_files(input_files, output_path_single, granule_urls, process_count=1)
dataset = nc.Dataset(output_path_single)
actual_vars = set(dataset.variables.keys())
actual_vars.remove('subset_files')
assert actual_vars == expected_vars

# Test multi-process merge
merge.merge_netcdf_files(input_files, output_path_multi, process_count=2)
merge.merge_netcdf_files(input_files, output_path_multi, granule_urls, process_count=2)
dataset = nc.Dataset(output_path_multi)
actual_vars = set(dataset.variables.keys())
actual_vars.remove('subset_files')
Expand Down

0 comments on commit f8d79b6

Please sign in to comment.