Skip to content

Commit

Permalink
Merge pull request #2389 from ASFHyP3/multi-burst-sandbox
Browse files Browse the repository at this point in the history
Multi-burst functionality
  • Loading branch information
jtherrmann authored Oct 1, 2024
2 parents 1d4e856 + 3e2789a commit cbf6106
Show file tree
Hide file tree
Showing 13 changed files with 940 additions and 191 deletions.
85 changes: 85 additions & 0 deletions .github/workflows/deploy-multi-burst-sandbox.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
name: Deploy Multi-Burst Sandbox Stack to AWS

on:
push:
branches:
- multi-burst-sandbox

concurrency: ${{ github.workflow }}-${{ github.ref }}

jobs:
deploy:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
include:
- environment: hyp3-multi-burst-sandbox
domain: hyp3-multi-burst-sandbox.asf.alaska.edu
template_bucket: cf-templates-1hz9ldhhl4ahu-us-west-2
image_tag: test
product_lifetime_in_days: 14
default_credits_per_user: 0
default_application_status: APPROVED
cost_profile: DEFAULT
deploy_ref: refs/heads/multi-burst-sandbox
job_files: >-
job_spec/INSAR_ISCE_BURST.yml
job_spec/INSAR_ISCE_MULTI_BURST.yml
job_spec/AUTORIFT.yml
job_spec/RTC_GAMMA.yml
job_spec/WATER_MAP.yml
job_spec/WATER_MAP_EQ.yml
instance_types: r6id.xlarge,r6id.2xlarge,r6id.4xlarge,r6id.8xlarge,r6idn.xlarge,r6idn.2xlarge,r6idn.4xlarge,r6idn.8xlarge
default_max_vcpus: 640
expanded_max_vcpus: 640
required_surplus: 0
security_environment: ASF
ami_id: /aws/service/ecs/optimized-ami/amazon-linux-2023/recommended/image_id
distribution_url: ''

environment:
name: ${{ matrix.environment }}
url: https://${{ matrix.domain }}

steps:
- uses: actions/[email protected]

- uses: aws-actions/configure-aws-credentials@v4
with:
aws-access-key-id: ${{ secrets.V2_AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.V2_AWS_SECRET_ACCESS_KEY }}
aws-session-token: ${{ secrets.V2_AWS_SESSION_TOKEN }}
aws-region: ${{ secrets.AWS_REGION }}

- uses: actions/setup-python@v5
with:
python-version: 3.9

- uses: ./.github/actions/deploy-hyp3
with:
TEMPLATE_BUCKET: ${{ matrix.template_bucket }}
STACK_NAME: ${{ matrix.environment }}
DOMAIN_NAME: ${{ matrix.domain }}
API_NAME: ${{ matrix.environment }}
CERTIFICATE_ARN: ${{ secrets.CERTIFICATE_ARN }}
IMAGE_TAG: ${{ matrix.image_tag }}
PRODUCT_LIFETIME: ${{ matrix.product_lifetime_in_days }}
VPC_ID: ${{ secrets.VPC_ID }}
SUBNET_IDS: ${{ secrets.SUBNET_IDS }}
SECRET_ARN: ${{ secrets.SECRET_ARN }}
CLOUDFORMATION_ROLE_ARN: ${{ secrets.CLOUDFORMATION_ROLE_ARN }}
DEFAULT_CREDITS_PER_USER: ${{ matrix.default_credits_per_user }}
DEFAULT_APPLICATION_STATUS: ${{ matrix.default_application_status }}
COST_PROFILE: ${{ matrix.cost_profile }}
JOB_FILES: ${{ matrix.job_files }}
DEFAULT_MAX_VCPUS: ${{ matrix.default_max_vcpus }}
EXPANDED_MAX_VCPUS: ${{ matrix.expanded_max_vcpus }}
MONTHLY_BUDGET: ${{ secrets.MONTHLY_BUDGET }}
REQUIRED_SURPLUS: ${{ matrix.required_surplus }}
ORIGIN_ACCESS_IDENTITY_ID: ${{ secrets.ORIGIN_ACCESS_IDENTITY_ID }}
SECURITY_ENVIRONMENT: ${{ matrix.security_environment }}
AMI_ID: ${{ matrix.ami_id }}
INSTANCE_TYPES: ${{ matrix.instance_types }}
DISTRIBUTION_URL: ${{ matrix.distribution_url }}
AUTH_PUBLIC_KEY: ${{ secrets.AUTH_PUBLIC_KEY }}
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,17 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [7.10.0]

### Added
- Added a new `INSAR_ISCE_MULTI_BURST` job type for running multi-burst InSAR. Currently, this job type is restricted to a special `hyp3-multi-burst-sandbox` deployment for HyP3 operators. However, this is an important step toward eventually making multi-burst InSAR available for general users.

### Changed
- Job validator functions now accept two parameters: the job dictionary and the granule metadata.
- Granule metadata validation now supports `reference` and `secondary` job parameters in addition to the existing `granules` parameter.
- Burst InSAR validators now support multi-burst jobs.
- Replaced the step function's `INSPECT_MEMORY_REQUIREMENTS` step with a new `SET_BATCH_OVERRIDES` step, which calls a Lambda function to dynamically calculate [Batch container overrides](https://docs.aws.amazon.com/batch/latest/APIReference/API_ContainerOverrides.html) based on job type and parameters.

## [7.9.3]
### Fixed
- Added missing cloudformation:DeleteStack permission to cloudformation deployment role in ASF-deployment-ci-cf.yml .
Expand Down
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ API = ${PWD}/apps/api/src
CHECK_PROCESSING_TIME = ${PWD}/apps/check-processing-time/src
GET_FILES = ${PWD}/apps/get-files/src
HANDLE_BATCH_EVENT = ${PWD}/apps/handle-batch-event/src
SET_BATCH_OVERRIDES = ${PWD}/apps/set-batch-overrides/src
SCALE_CLUSTER = ${PWD}/apps/scale-cluster/src
START_EXECUTION_MANAGER = ${PWD}/apps/start-execution-manager/src
START_EXECUTION_WORKER = ${PWD}/apps/start-execution-worker/src
DISABLE_PRIVATE_DNS = ${PWD}/apps/disable-private-dns/src
UPDATE_DB = ${PWD}/apps/update-db/src
UPLOAD_LOG = ${PWD}/apps/upload-log/src
DYNAMO = ${PWD}/lib/dynamo
export PYTHONPATH = ${API}:${CHECK_PROCESSING_TIME}:${GET_FILES}:${HANDLE_BATCH_EVENT}:${SCALE_CLUSTER}:${START_EXECUTION_MANAGER}:${START_EXECUTION_WORKER}:${DISABLE_PRIVATE_DNS}:${UPDATE_DB}:${UPLOAD_LOG}:${DYNAMO}
export PYTHONPATH = ${API}:${CHECK_PROCESSING_TIME}:${GET_FILES}:${HANDLE_BATCH_EVENT}:${SET_BATCH_OVERRIDES}:${SCALE_CLUSTER}:${START_EXECUTION_MANAGER}:${START_EXECUTION_WORKER}:${DISABLE_PRIVATE_DNS}:${UPDATE_DB}:${UPLOAD_LOG}:${DYNAMO}


build: render
Expand Down Expand Up @@ -44,7 +45,7 @@ render:
static: flake8 openapi-validate cfn-lint

flake8:
flake8 --ignore=E731 --max-line-length=120 --import-order-style=pycharm --statistics --application-import-names hyp3_api,get_files,handle_batch_event,check_processing_time,start_execution_manager,start_execution_worker,disable_private_dns,update_db,upload_log,dynamo,lambda_logging,scale_cluster apps tests lib
flake8 --ignore=E731 --max-line-length=120 --import-order-style=pycharm --statistics --application-import-names hyp3_api,get_files,handle_batch_event,set_batch_overrides,check_processing_time,start_execution_manager,start_execution_worker,disable_private_dns,update_db,upload_log,dynamo,lambda_logging,scale_cluster apps tests lib

openapi-validate: render
openapi-spec-validator apps/api/src/hyp3_api/api-spec/openapi-spec.yml
Expand Down
7 changes: 6 additions & 1 deletion apps/api/src/hyp3_api/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ class TokenDeserializeError(Exception):


def get_granules(jobs: list[dict]) -> set[str]:
return {granule for job in jobs for granule in job['job_parameters'].get('granules', [])}
return {
granule
for key in ['granules', 'reference', 'secondary']
for job in jobs
for granule in job['job_parameters'].get(key, [])
}


def serialize(payload: dict):
Expand Down
58 changes: 44 additions & 14 deletions apps/api/src/hyp3_api/validation.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import os
import sys
from copy import deepcopy
from pathlib import Path

import requests
Expand Down Expand Up @@ -70,31 +71,47 @@ def check_granules_exist(granules, granule_metadata):
raise GranuleValidationError(f'Some requested scenes could not be found: {", ".join(not_found_granules)}')


def check_dem_coverage(granule_metadata):
def check_dem_coverage(_, granule_metadata):
bad_granules = [g['name'] for g in granule_metadata if not has_sufficient_coverage(g['polygon'])]
if bad_granules:
raise GranuleValidationError(f'Some requested scenes do not have DEM coverage: {", ".join(bad_granules)}')


def check_same_burst_ids(granule_metadata):
ref_burst_id, sec_burst_id = [granule['name'].split('_')[1] for granule in granule_metadata]
if ref_burst_id != sec_burst_id:
def check_same_burst_ids(job, _):
refs = job['job_parameters']['reference']
secs = job['job_parameters']['secondary']
ref_ids = ['_'.join(ref.split('_')[1:3]) for ref in refs]
sec_ids = ['_'.join(sec.split('_')[1:3]) for sec in secs]
if len(ref_ids) != len(sec_ids):
raise GranuleValidationError(
f'The requested scenes do not have the same burst ID: {ref_burst_id} and {sec_burst_id}'
f'Number of reference and secondary scenes must match, got: '
f'{len(ref_ids)} references and {len(sec_ids)} secondaries'
)
for i in range(len(ref_ids)):
if ref_ids[i] != sec_ids[i]:
raise GranuleValidationError(
f'Burst IDs do not match for {refs[i]} and {secs[i]}.'
)
if len(set(ref_ids)) != len(ref_ids):
duplicate_pair_id = next(ref_id for ref_id in ref_ids if ref_ids.count(ref_id) > 1)
raise GranuleValidationError(
f'The requested scenes have more than 1 pair with the following burst ID: {duplicate_pair_id}.'
)


def check_valid_polarizations(granule_metadata):
ref_polarization, sec_polarization = [granule['name'].split('_')[4] for granule in granule_metadata]
if ref_polarization != sec_polarization:
def check_valid_polarizations(job, _):
polarizations = set(granule.split('_')[4] for granule in get_granules([job]))
if len(polarizations) > 1:
raise GranuleValidationError(
f'The requested scenes need to have the same polarization, got: {", ".join(polarizations)}'
)
if not polarizations.issubset({'VV', 'HH'}):
raise GranuleValidationError(
f'The requested scenes do not have the same polarization: {ref_polarization} and {sec_polarization}'
f'Only VV and HH polarizations are currently supported, got: {polarizations.pop()}'
)
if ref_polarization != 'VV' and ref_polarization != 'HH':
raise GranuleValidationError(f'Only VV and HH polarizations are currently supported, got: {ref_polarization}')


def check_not_antimeridian(granule_metadata):
def check_not_antimeridian(_, granule_metadata):
for granule in granule_metadata:
bbox = granule['polygon'].bounds
if abs(bbox[0] - bbox[2]) > 180.0 and bbox[0] * bbox[2] < 0.0:
Expand All @@ -119,7 +136,20 @@ def get_multipolygon_from_geojson(input_file):
return MultiPolygon(polygons)


def validate_jobs(jobs):
# TODO https://github.com/ASFHyP3/hyp3/issues/2442 remove this function after two burst types are merged
def convert_single_burst_jobs(jobs: list[dict]) -> list[dict]:
jobs = deepcopy(jobs)
for job in jobs:
if job['job_type'] == 'INSAR_ISCE_BURST':
job_parameters = job['job_parameters']
ref, sec = job_parameters.pop('granules')
job_parameters['reference'], job_parameters['secondary'] = [ref], [sec]
return jobs


def validate_jobs(jobs: list[dict]) -> None:
jobs = convert_single_burst_jobs(jobs)

granules = get_granules(jobs)
granule_metadata = get_cmr_metadata(granules)

Expand All @@ -129,4 +159,4 @@ def validate_jobs(jobs):
job_granule_metadata = [granule for granule in granule_metadata if granule['name'] in get_granules([job])]
module = sys.modules[__name__]
validator = getattr(module, validator_name)
validator(job_granule_metadata)
validator(job, job_granule_metadata)
76 changes: 76 additions & 0 deletions apps/set-batch-overrides/set-batch-overrides-cf.yml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
AWSTemplateFormatVersion: 2010-09-09

{% if security_environment == 'EDC' %}
Parameters:

SecurityGroupId:
Type: String

SubnetIds:
Type: CommaDelimitedList
{% endif %}

Outputs:

LambdaArn:
Value: !GetAtt Lambda.Arn

Resources:

LogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub "/aws/lambda/${Lambda}"
RetentionInDays: 90

Role:
Type: {{ 'Custom::JplRole' if security_environment in ('JPL', 'JPL-public') else 'AWS::IAM::Role' }}
Properties:
{% if security_environment in ('JPL', 'JPL-public') %}
ServiceToken: !ImportValue Custom::JplRole::ServiceToken
Path: /account-managed/hyp3/
{% endif %}
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
Action: sts:AssumeRole
Principal:
Service: lambda.amazonaws.com
Effect: Allow
ManagedPolicyArns:
- !Ref Policy
{% if security_environment == 'EDC' %}
- arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole
{% endif %}

Policy:
Type: {{ 'Custom::JplPolicy' if security_environment in ('JPL', 'JPL-public') else 'AWS::IAM::ManagedPolicy' }}
Properties:
{% if security_environment in ('JPL', 'JPL-public') %}
ServiceToken: !ImportValue Custom::JplPolicy::ServiceToken
Path: /account-managed/hyp3/
{% endif %}
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- logs:CreateLogStream
- logs:PutLogEvents
Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/*"

Lambda:
Type: AWS::Lambda::Function
Properties:
Code: src/
Handler: set_batch_overrides.lambda_handler
MemorySize: 128
Role: !GetAtt Role.Arn
Runtime: python3.9
Timeout: 30
{% if security_environment == 'EDC' %}
VpcConfig:
SecurityGroupIds:
- !Ref SecurityGroupId
SubnetIds: !Ref SubnetIds
{% endif %}
Loading

0 comments on commit cbf6106

Please sign in to comment.