Skip to content

Commit

Permalink
Converted harmony adapter to operate on STAC catalog (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
frankinspace authored Aug 20, 2024
1 parent d37bd4a commit 72bc1ea
Show file tree
Hide file tree
Showing 6 changed files with 320 additions and 243 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ jobs:
-Dsonar.tests=tests/
-Dsonar.projectName=${{ github.repository }}
-Dsonar.projectVersion=${{ env.software_version }}
-Dsonar.python.version=3.9,3.10
-Dsonar.python.version=3.10,3.11
- name: Run Snyk as a blocking step
uses: snyk/actions/python-3.10@master
env:
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Changed
- [issues/25](https://github.com/podaac/net2cog/issues/25): Converted harmony adapter to operate on STAC catalog

## [0.3.0]
### Changed
Expand Down
141 changes: 86 additions & 55 deletions net2cog/netcdf_convert_harmony.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,16 @@
Implementation of harmony-service-lib that invokes the netcdf converter.
"""
import argparse
import json
import os
import pathlib
import shutil
import tempfile

import harmony
import pystac
from harmony.exceptions import HarmonyException
from pystac import Asset

from net2cog import netcdf_convert

Expand All @@ -29,66 +35,91 @@ def __init__(self, message):
super().__init__(message)

self.data_dir = os.getenv(DATA_DIRECTORY_ENV, '/home/dockeruser/data')
self.job_data_dir = os.path.join(self.data_dir, message.requestId)
pathlib.Path(self.data_dir).mkdir(parents=True, exist_ok=True)

# Create temp directory
pathlib.Path(self.job_data_dir).mkdir(parents=True, exist_ok=True)
self.job_data_dir = tempfile.mkdtemp(prefix=message.requestId, dir=self.data_dir)

def invoke(self):
"""Run the service on the message contained in `self.message`.
Fetches data, runs the service, puts the result in a file,
calls back to Harmony, and cleans up after itself.
def process_item(self, item: pystac.Item, source: harmony.message.Source) -> pystac.Item:
"""
Performs net2cog on input STAC Item's data, returning
an output STAC item
Parameters
----------
item : pystac.Item
the item that should be coggified
source : harmony.message.Source
the input source defining the item
Returns
-------
pystac.Item
a STAC item describing the output
"""
result = item.clone()
result.assets = {}
output_dir = self.job_data_dir

logger = self.logger
message = self.message

logger.info("Received message %s", message)

self.logger.info('Input item %s', json.dumps(item.to_dict()))
try:
# Limit to the first granule. See note in method documentation
granules = message.granules
if message.isSynchronous:
granules = granules[:1]

for i, granule in enumerate(granules):
self.download_granules([granule])

self.logger.info('local_filename = %s', granule.local_filename)
directory_name = os.path.splitext(os.path.basename(granule.local_filename))[0]
output_file_directory = os.path.join(self.job_data_dir,
f'converted_{directory_name}')
output_filename = pathlib.Path(f'{output_file_directory}').joinpath(os.path.basename(granule.name))
self.logger.debug('output: %s', output_filename)

# Run the netcdf converter for the complete netcdf granule
cogs_generated = netcdf_convert.netcdf_converter(
granule.local_filename, output_filename
)
current_progress = int(100 * i / len(granules))
next_progress = int(100 * (i + 1) / len(granules))
for cog in cogs_generated:
if message.isSynchronous:
self.completed_with_local_file(
cog,
remote_filename=os.path.basename(cog),
mime="tiff"
)
else:
self.async_add_local_file_partial_result(
cog,
remote_filename=os.path.basename(cog),
title=granule.id,
progress=current_progress if cog != cogs_generated[-1] else next_progress,
mime="tiff"
)
if not message.isSynchronous:
self.async_completed_successfully()

except Exception as ex: # pylint: disable=W0703
logger.exception(ex)
self.completed_with_error('An unexpected error occurred')
# Get the data file
asset = next(v for k, v in item.assets.items() if 'data' in (v.roles or []))
self.logger.info('Downloading %s to %s', asset.href, output_dir)
input_filename = harmony.adapter.util.download(asset.href,
output_dir,
logger=self.logger,
access_token=self.message.accessToken,
cfg=self.config)

# Generate output filename
output_filename, output_file_ext = os.path.splitext(
harmony.adapter.util.generate_output_filename(input_filename, ext='tif'))
output_filename = f'{output_filename}_converted{output_file_ext}'

# Determine variables that need processing
self.logger.info('Generating COG(s) for %s output will be saved to %s', input_filename, output_filename)
var_list = source.process('variables')
if not isinstance(var_list, list):
var_list = [var_list]
if len(var_list) > 1:
raise HarmonyException(
'net2cog harmony adapter currently only supports processing one variable at a time. '
'Please specify a single variable in your Harmony request.')
var_list = list(map(lambda var: var.name, var_list))
self.logger.info('Processing variables %s', var_list)

# Run the netcdf converter for the complete netcdf granule
cog_generated = next(iter(netcdf_convert.netcdf_converter(pathlib.Path(input_filename),
pathlib.Path(output_dir).joinpath(
output_filename),
var_list=var_list)), [])

# Stage the output file with a conventional filename
self.logger.info('Generated COG %s', cog_generated)
staged_filename = os.path.basename(cog_generated)
url = harmony.adapter.util.stage(cog_generated,
staged_filename,
pystac.MediaType.COG,
location=self.message.stagingLocation,
logger=self.logger,
cfg=self.config)
self.logger.info('Staged %s to %s', cog_generated, url)

# Update the STAC record
result.assets['visual'] = Asset(url, title=staged_filename, media_type=pystac.MediaType.COG,
roles=['visual'])

# Return the STAC record
self.logger.info('Processed item %s', json.dumps(result.to_dict()))
return result
except Exception as uncaught_exception:
raise HarmonyException(str(f'Uncaught error in net2cog. '
f'Notify net2cog service provider. '
f'Message: {uncaught_exception}')) from uncaught_exception
finally:
self.cleanup()
# Clean up any intermediate resources
shutil.rmtree(self.job_data_dir)


def main():
Expand All @@ -100,7 +131,7 @@ def main():
None
"""
parser = argparse.ArgumentParser(prog='podaac-netcdf-converter',
parser = argparse.ArgumentParser(prog='net2cog_harmony',
description='Run the netcdf converter service')
harmony.setup_cli(parser)
args = parser.parse_args()
Expand Down
Loading

0 comments on commit 72bc1ea

Please sign in to comment.