Skip to content

Commit

Permalink
Create table and integration into poll_status (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
joshgarde authored Oct 19, 2023
1 parent 0d61692 commit d79ce59
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 11 deletions.
33 changes: 31 additions & 2 deletions podaac/swodlr_ingest_to_sds/poll_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@
from datetime import datetime
from copy import deepcopy
import logging
from podaac.swodlr_ingest_to_sds.utils import mozart_client, ingest_table
import re
from podaac.swodlr_ingest_to_sds.utils import (
mozart_client, ingest_table, available_tiles_table
)


SUCCESS_STATUSES = {'job-completed'}
FAIL_STATUSES = {'job-failed', 'job-offline', 'job-deduped'}
PRODUCT_REGEX = re.compile(
r'_(?P<product>PIXC(Vec)?)_(?P<cycle>\d{3})_(?P<pass>\d{3})_(?P<tile>\d{3}(R|L))_' # noqa: E501
)


def lambda_handler(event, _context):
Expand Down Expand Up @@ -62,9 +69,31 @@ def lambda_handler(event, _context):
new_event['jobs'].remove(item)
elif status in SUCCESS_STATUSES:
logging.info('Job id: %s; status: %s', job_id, status)
new_event['jobs'].remove(item)

# Insert into available tiles table
cpt = _extract_cpt(granule_id)
if cpt is not None:
tile_id = f'{cpt["product"]},{cpt["cycle"]},{cpt["pass"]},{cpt["tile"]}' # pylint: disable=line-too-long # noqa: E501
available_tiles_table.put_item(
Item={'tile_id': tile_id}
)

new_event['jobs'].remove(item) # Remove from queue
# Otello raises very generic exceptions
except Exception: # pylint: disable=broad-except
logging.exception('Failed to get status: %s', job_id)

return new_event


def _extract_cpt(granule_id):
parsed_id = PRODUCT_REGEX.match(granule_id)
if parsed_id is None:
return None

return {
'product': parsed_id.group('product'),
'cycle': int(parsed_id.group('cycle')),
'pass': int(parsed_id.group('pass')),
'tile': int(parsed_id.group('tile'))
}
15 changes: 15 additions & 0 deletions podaac/swodlr_ingest_to_sds/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,24 @@ def ingest_table(self):

return self._ingest_table

@property
def available_tiles_table(self):
'''
Lazily creates a DynamoDB table resource
'''
if not hasattr(self, '_available_tiles_table'):
dynamodb = boto3.resource('dynamodb')
# pylint: disable=attribute-defined-outside-init
self._available_tiles_table = dynamodb.Table(
self.get_param('available_tiles_table_name')
)

return self._available_tiles_table


# Silence the linters
ingest_table: Table
available_tiles_table: Table
mozart_client: Mozart
get_param: Callable[[str], str]

Expand Down
4 changes: 4 additions & 0 deletions terraform/database_dynamodb.tf
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@
data "aws_dynamodb_table" "ingest" {
name = "${local.app_prefix}-ingest"
}

data "aws_dynamodb_table" "avalible_tiles" {
name = "${local.app_prefix}-avalible-tiles"
}
26 changes: 17 additions & 9 deletions terraform/lambdas.tf
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,16 @@ resource "aws_iam_role" "lambda" {
]
Effect = "Allow"
Resource = data.aws_dynamodb_table.ingest.arn
},
{
Sid = ""
Action = [
"dynamodb:BatchWriteItem",
"dynamodb:PutItem",
"dynamodb:UpdateItem"
]
Effect = "Allow"
Resource = data.aws_dynamodb_table.avalible_tiles.arn
}
]
})
Expand All @@ -178,55 +188,53 @@ resource "aws_iam_role" "lambda" {
resource "aws_ssm_parameter" "sds_pcm_release_tag" {
name = "${local.service_path}/sds_pcm_release_tag"
type = "String"
overwrite = true
value = var.sds_pcm_release_tag
}

resource "aws_ssm_parameter" "sds_host" {
name = "${local.service_path}/sds_host"
type = "String"
overwrite = true
value = var.sds_host
}

resource "aws_ssm_parameter" "sds_username" {
name = "${local.service_path}/sds_username"
type = "String"
overwrite = true
value = var.sds_username
}

resource "aws_ssm_parameter" "sds_password" {
name = "${local.service_path}/sds_password"
type = "SecureString"
overwrite = true
value = var.sds_password
}

resource "aws_ssm_parameter" "sds_ca_cert" {
name = "${local.service_path}/sds_ca_cert"
type = "SecureString"
overwrite = true
value = local.sds_ca_cert
}

resource "aws_ssm_parameter" "stepfunction_arn" {
name = "${local.service_path}/stepfunction_arn"
type = "String"
overwrite = true
value = aws_sfn_state_machine.ingest_to_sds.arn
}

resource "aws_ssm_parameter" "ingest_queue_url" {
name = "${local.service_path}/ingest_queue_url"
type = "String"
overwrite = true
value = data.aws_sqs_queue.ingest.id
}

resource "aws_ssm_parameter" "ingest_table_name" {
name = "${local.service_path}/ingest_table_name"
type = "String"
overwrite = true
value = data.aws_dynamodb_table.ingest.name
}
}

resource "aws_ssm_parameter" "available_tiles_table_name" {
name = "${local.service_path}/available_tiles_table_name"
type = "String"
value = data.aws_dynamodb_table.avalible_tiles.name
}

0 comments on commit d79ce59

Please sign in to comment.