Skip to content

Commit

Permalink
[Issue #1746] Add transformations for assistance listing table (#1875)
Browse files Browse the repository at this point in the history
This is a follow-up to
#1794 - which it builds
upon.

## Summary
Fixes #1746

### Time to review: __10 mins__

## Changes proposed
Adds transformation logic for the assistance listing (formerly CFDA)
tables.

## Context for reviewers
The transformations are pretty uneventful, the only complexity is that
the legacy Oracle database doesn't have a foreign key between the
`TopportunityCfda` table and the `Topportunity` table and there are
~2300 orphaned cfda records that we wouldn't be able to import, so we
additionally need to validate that the opportunity exists when we try to
transform the data, and if not, we just mark it as "transformed" and do
nothing with it.

There is some basic work on relationships between the staging tables +
more factories for setting up the data which will be ongoing /
@jamesbursa is also looking into.

---------

Co-authored-by: nava-platform-bot <[email protected]>
  • Loading branch information
chouinar and nava-platform-bot authored May 3, 2024
1 parent 480f395 commit 4b73f7e
Show file tree
Hide file tree
Showing 6 changed files with 415 additions and 8 deletions.
135 changes: 131 additions & 4 deletions api/src/data_migration/transformation/transform_oracle_data_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from src.adapters import db
from src.constants.lookup_constants import OpportunityCategory
from src.db.models.base import ApiSchemaTable, TimestampMixin
from src.db.models.opportunity_models import Opportunity
from src.db.models.staging.opportunity import Topportunity
from src.db.models.opportunity_models import Opportunity, OpportunityAssistanceListing
from src.db.models.staging.opportunity import Topportunity, TopportunityCfda
from src.db.models.staging.staging_base import StagingBase, StagingParamMixin
from src.task.task import Task
from src.util import datetime_util
Expand All @@ -27,6 +27,7 @@ class Metrics(StrEnum):
TOTAL_RECORDS_DELETED = "total_records_deleted"
TOTAL_RECORDS_INSERTED = "total_records_inserted"
TOTAL_RECORDS_UPDATED = "total_records_updated"
TOTAL_RECORDS_ORPHANED = "total_records_orphaned"

TOTAL_ERROR_COUNT = "total_error_count"

Expand Down Expand Up @@ -68,6 +69,27 @@ def fetch(
),
)

def fetch_with_opportunity(
self, source_model: Type[S], destination_model: Type[D], join_clause: Sequence
) -> list[Tuple[S, D | None, Opportunity | None]]:
# Similar to the above fetch function, but also grabs an opportunity record
# Note that this requires your source_model to have an opportunity_id field defined.

return cast(
list[Tuple[S, D | None, Opportunity | None]],
self.db_session.execute(
select(source_model, destination_model, Opportunity)
.join(destination_model, *join_clause, isouter=True)
.join(
Opportunity,
source_model.opportunity_id == Opportunity.opportunity_id, # type: ignore[attr-defined]
isouter=True,
)
.where(source_model.transformed_at.is_(None))
.execution_options(yield_per=5000)
),
)

def process_opportunities(self) -> None:
# Fetch all opportunities that were modified
# Alongside that, grab the existing opportunity record
Expand Down Expand Up @@ -121,8 +143,87 @@ def process_opportunity(
source_opportunity.transformed_at = self.transform_time

def process_assistance_listings(self) -> None:
# TODO - https://github.com/HHS/simpler-grants-gov/issues/1746
pass
assistance_listings: list[
Tuple[TopportunityCfda, OpportunityAssistanceListing | None, Opportunity | None]
] = self.fetch_with_opportunity(
TopportunityCfda,
OpportunityAssistanceListing,
[
TopportunityCfda.opp_cfda_id
== OpportunityAssistanceListing.opportunity_assistance_listing_id
],
)

for (
source_assistance_listing,
target_assistance_listing,
opportunity,
) in assistance_listings:
try:
self.process_assistance_listing(
source_assistance_listing, target_assistance_listing, opportunity
)
except ValueError:
self.increment(self.Metrics.TOTAL_ERROR_COUNT)
logger.exception(
"Failed to process assistance listing",
extra={
"opportunity_assistance_listing_id": source_assistance_listing.opp_cfda_id
},
)

def process_assistance_listing(
self,
source_assistance_listing: TopportunityCfda,
target_assistance_listing: OpportunityAssistanceListing | None,
opportunity: Opportunity | None,
) -> None:
self.increment(self.Metrics.TOTAL_RECORDS_PROCESSED)
extra = {
"opportunity_assistance_listing_id": source_assistance_listing.opp_cfda_id,
"opportunity_id": source_assistance_listing.opportunity_id,
}
logger.info("Processing assistance listing", extra=extra)

if opportunity is None:
# The Oracle system we're importing these from does not have a foreign key between
# the opportunity ID in the TOPPORTUNITY_CFDA table and the TOPPORTUNITY table.
# There are many (2306 as of writing) orphaned CFDA records, created between 2007 and 2011
# We don't want to continuously process these, so won't error for these, and will just
# mark them as transformed below.
self.increment(self.Metrics.TOTAL_RECORDS_ORPHANED)
logger.info(
"Assistance listing is orphaned and does not connect to any opportunity",
extra=extra,
)

elif source_assistance_listing.is_deleted:
logger.info("Deleting assistance listing", extra=extra)

if target_assistance_listing is None:
raise ValueError("Cannot delete assistance listing as it does not exist")

self.increment(self.Metrics.TOTAL_RECORDS_DELETED)
self.db_session.delete(target_assistance_listing)

else:
# To avoid incrementing metrics for records we fail to transform, record
# here whether it's an insert/update and we'll increment after transforming
is_insert = target_assistance_listing is None

logger.info("Transforming and upserting assistance listing", extra=extra)
transformed_assistance_listing = transform_assistance_listing(
source_assistance_listing, target_assistance_listing
)
self.db_session.merge(transformed_assistance_listing)

if is_insert:
self.increment(self.Metrics.TOTAL_RECORDS_INSERTED)
else:
self.increment(self.Metrics.TOTAL_RECORDS_UPDATED)

logger.info("Processed assistance listing", extra=extra)
source_assistance_listing.transformed_at = self.transform_time

def process_opportunity_summaries(self) -> None:
# TODO - https://github.com/HHS/simpler-grants-gov/issues/1747
Expand Down Expand Up @@ -187,6 +288,32 @@ def transform_opportunity_category(value: str | None) -> OpportunityCategory | N
return OPPORTUNITY_CATEGORY_MAP[value]


def transform_assistance_listing(
source_assistance_listing: TopportunityCfda,
existing_assistance_listing: OpportunityAssistanceListing | None,
) -> OpportunityAssistanceListing:
log_extra = {"opportunity_assistance_listing_id": source_assistance_listing.opp_cfda_id}

if existing_assistance_listing is None:
logger.info("Creating new assistance listing record", extra=log_extra)

# We always create a new assistance listing record here and merge it in the calling function
# this way if there is any error doing the transformation, we don't modify the existing one.
target_assistance_listing = OpportunityAssistanceListing(
opportunity_assistance_listing_id=source_assistance_listing.opp_cfda_id,
opportunity_id=source_assistance_listing.opportunity_id,
)

target_assistance_listing.assistance_listing_number = source_assistance_listing.cfdanumber
target_assistance_listing.program_title = source_assistance_listing.programtitle

transform_update_create_timestamp(
source_assistance_listing, target_assistance_listing, log_extra=log_extra
)

return target_assistance_listing


def convert_est_timestamp_to_utc(timestamp: datetime | None) -> datetime | None:
if timestamp is None:
return None
Expand Down
12 changes: 12 additions & 0 deletions api/src/db/models/staging/opportunity.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,22 @@
from sqlalchemy.orm import Mapped, relationship

from src.db.legacy_mixin import opportunity_mixin
from src.db.models.staging.staging_base import StagingBase, StagingParamMixin


class Topportunity(StagingBase, opportunity_mixin.TopportunityMixin, StagingParamMixin):
__tablename__ = "topportunity"

cfdas: Mapped[list["TopportunityCfda"]] = relationship(
primaryjoin="Topportunity.opportunity_id == foreign(TopportunityCfda.opportunity_id)",
uselist=True,
)


class TopportunityCfda(StagingBase, opportunity_mixin.TopportunityCfdaMixin, StagingParamMixin):
__tablename__ = "topportunity_cfda"

opportunity: Mapped[Topportunity | None] = relationship(
primaryjoin="TopportunityCfda.opportunity_id == foreign(Topportunity.opportunity_id)",
uselist=False,
)
Loading

0 comments on commit 4b73f7e

Please sign in to comment.