Skip to content

Commit

Permalink
[Issue #3500] Handle opportunity attachments when opp deleted or is p…
Browse files Browse the repository at this point in the history
…ublished (#3503)

## Summary
Fixes #3500

### Time to review: __10 mins__

## Changes proposed
Handle the following two cases during the transformation process
* Opportunity deleted - clean up attachments from s3
* Opportunity stops being a draft - move all the attachments to the
other s3 bucket

## Context for reviewers
Mostly just some additional file utils for moving files to handle the
above scenarios. Only noteworthy callout is that there isn't really a
concept of "moving" a file on s3, it's just a copy+delete.
  • Loading branch information
chouinar authored Jan 16, 2025
1 parent d86aa3f commit 4ebfef7
Show file tree
Hide file tree
Showing 8 changed files with 255 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -1,18 +1,31 @@
import logging
from typing import Tuple
from typing import Tuple, cast

import src.data_migration.transformation.transform_constants as transform_constants
import src.data_migration.transformation.transform_util as transform_util
from src.adapters.aws import S3Config
from src.data_migration.transformation.subtask.abstract_transform_subtask import (
AbstractTransformSubTask,
)
from src.db.models.opportunity_models import Opportunity
from src.db.models.staging.opportunity import Topportunity
from src.services.opportunity_attachments import attachment_util
from src.task.task import Task
from src.util import file_util

logger = logging.getLogger(__name__)


class TransformOpportunity(AbstractTransformSubTask):

def __init__(self, task: Task, s3_config: S3Config | None = None):
super().__init__(task)

if s3_config is None:
s3_config = S3Config()

self.s3_config = s3_config

def transform_records(self) -> None:
# Fetch all opportunities that were modified
# Alongside that, grab the existing opportunity record
Expand Down Expand Up @@ -53,11 +66,18 @@ def process_opportunity(
extra,
)

# Cleanup the attachments from s3
if target_opportunity is not None:
for attachment in target_opportunity.opportunity_attachments:
file_util.delete_file(attachment.file_location)

else:
# To avoid incrementing metrics for records we fail to transform, record
# here whether it's an insert/update and we'll increment after transforming
is_insert = target_opportunity is None

was_draft = target_opportunity.is_draft if target_opportunity else None

logger.info("Transforming and upserting opportunity", extra=extra)
transformed_opportunity = transform_util.transform_opportunity(
source_opportunity, target_opportunity
Expand All @@ -76,5 +96,23 @@ def process_opportunity(
)
self.db_session.merge(transformed_opportunity)

# If an opportunity went from being a draft to not a draft (published)
# then we need to move all of its attachments to the public bucket
# from the draft s3 bucket.
if was_draft and transformed_opportunity.is_draft is False:
for attachment in cast(Opportunity, target_opportunity).opportunity_attachments:
# Determine the new path
file_name = attachment_util.adjust_legacy_file_name(attachment.file_name)
s3_path = attachment_util.get_s3_attachment_path(
file_name,
attachment.attachment_id,
transformed_opportunity,
self.s3_config,
)

# Move the file
file_util.move_file(attachment.file_location, s3_path)
attachment.file_location = s3_path

logger.info("Processed opportunity", extra=extra)
source_opportunity.transformed_at = self.transform_time
47 changes: 46 additions & 1 deletion api/src/util/file_util.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import shutil
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
Expand Down Expand Up @@ -95,7 +96,28 @@ def get_file_length_bytes(path: str) -> int:
return file_stats.st_size


def delete_file(path: str) -> None:
def copy_file(source_path: str | Path, destination_path: str | Path) -> None:
is_source_s3 = is_s3_path(source_path)
is_dest_s3 = is_s3_path(destination_path)

# This isn't a download or upload method
# Don't allow "copying" between mismatched locations
if is_source_s3 != is_dest_s3:
raise Exception("Cannot download/upload between disk and S3 using this method")

if is_source_s3:
s3_client = get_s3_client()

source_bucket, source_path = split_s3_url(source_path)
dest_bucket, dest_path = split_s3_url(destination_path)

s3_client.copy({"Bucket": source_bucket, "Key": source_path}, dest_bucket, dest_path)
else:
os.makedirs(os.path.dirname(destination_path), exist_ok=True)
shutil.copy2(source_path, destination_path)


def delete_file(path: str | Path) -> None:
"""Delete a file from s3 or local disk"""
if is_s3_path(path):
bucket, s3_path = split_s3_url(path)
Expand All @@ -106,6 +128,23 @@ def delete_file(path: str) -> None:
os.remove(path)


def move_file(source_path: str | Path, destination_path: str | Path) -> None:
is_source_s3 = is_s3_path(source_path)
is_dest_s3 = is_s3_path(destination_path)

# This isn't a download or upload method
# Don't allow "copying" between mismatched locations
if is_source_s3 != is_dest_s3:
raise Exception("Cannot download/upload between disk and S3 using this method")

if is_source_s3:
copy_file(source_path, destination_path)
delete_file(source_path)

else:
os.renames(source_path, destination_path)


def file_exists(path: str | Path) -> bool:
"""Get whether a file exists or not"""
if is_s3_path(path):
Expand All @@ -121,3 +160,9 @@ def file_exists(path: str | Path) -> bool:

# Local file system
return Path(path).exists()


def read_file(path: str | Path, mode: str = "r", encoding: str | None = None) -> str:
"""Simple function for just getting all of the contents of a file"""
with open_stream(path, mode, encoding) as input_file:
return input_file.read()
9 changes: 9 additions & 0 deletions api/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import src.auth.login_gov_jwt_auth as login_gov_jwt_auth
import tests.src.db.models.factories as factories
from src.adapters import search
from src.adapters.aws import S3Config
from src.adapters.oauth.login_gov.mock_login_gov_oauth_client import MockLoginGovOauthClient
from src.auth.api_jwt_auth import create_jwt_for_user
from src.constants.schema import Schemas
Expand Down Expand Up @@ -394,6 +395,14 @@ def other_mock_s3_bucket(other_mock_s3_bucket_resource):
yield other_mock_s3_bucket_resource.name


@pytest.fixture
def s3_config(mock_s3_bucket, other_mock_s3_bucket):
return S3Config(
PUBLIC_FILES_BUCKET=f"s3://{mock_s3_bucket}",
DRAFT_FILES_BUCKET=f"s3://{other_mock_s3_bucket}",
)


####################
# Class-based testing
####################
Expand Down
1 change: 1 addition & 0 deletions api/tests/src/data_migration/transformation/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def setup_opportunity(
if create_existing:
f.OpportunityFactory.create(
opportunity_id=source_opportunity.opportunity_id,
opportunity_attachments=[],
# set created_at/updated_at to an earlier time so its clear
# when they were last updated
timestamps_in_past=True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@

import src.data_migration.transformation.transform_constants as transform_constants
from src.data_migration.transformation.subtask.transform_opportunity import TransformOpportunity
from src.services.opportunity_attachments import attachment_util
from src.util import file_util
from tests.src.data_migration.transformation.conftest import (
BaseTransformTestClass,
setup_opportunity,
validate_opportunity,
)
from tests.src.db.models.factories import OpportunityAttachmentFactory, OpportunityFactory


class TestTransformOpportunity(BaseTransformTestClass):
@pytest.fixture()
def transform_opportunity(self, transform_oracle_data_task, truncate_staging_tables):
return TransformOpportunity(transform_oracle_data_task)
def transform_opportunity(self, transform_oracle_data_task, truncate_staging_tables, s3_config):
return TransformOpportunity(transform_oracle_data_task, s3_config)

def test_process_opportunities(self, db_session, transform_opportunity):
ordinary_delete = setup_opportunity(
Expand Down Expand Up @@ -97,3 +100,73 @@ def test_process_opportunity_invalid_category(self, db_session, transform_opport
transform_opportunity.process_opportunity(insert_that_will_fail, None)

validate_opportunity(db_session, insert_that_will_fail, expect_in_db=False)

def test_process_opportunity_delete_with_attachments(
self, db_session, transform_opportunity, s3_config
):

source_opportunity = setup_opportunity(create_existing=False, is_delete=True)

target_opportunity = OpportunityFactory.create(
opportunity_id=source_opportunity.opportunity_id, opportunity_attachments=[]
)

attachments = []
for i in range(10):
s3_path = attachment_util.get_s3_attachment_path(
f"my_file{i}.txt", i, target_opportunity, s3_config
)

with file_util.open_stream(s3_path, "w") as outfile:
outfile.write(f"This is the {i}th file")

attachment = OpportunityAttachmentFactory.create(
opportunity=target_opportunity, file_location=s3_path
)
attachments.append(attachment)

transform_opportunity.process_opportunity(source_opportunity, target_opportunity)

validate_opportunity(db_session, source_opportunity, expect_in_db=False)

# Verify all of the files were deleted
for attachment in attachments:
assert file_util.file_exists(attachment.file_location) is False

def test_process_opportunity_update_to_non_draft_with_attachments(
self, db_session, transform_opportunity, s3_config
):

source_opportunity = setup_opportunity(
create_existing=False, source_values={"is_draft": "N"}
)

target_opportunity = OpportunityFactory.create(
opportunity_id=source_opportunity.opportunity_id,
is_draft=True,
opportunity_attachments=[],
)

attachments = []
for i in range(10):
s3_path = attachment_util.get_s3_attachment_path(
f"my_file{i}.txt", i, target_opportunity, s3_config
)
assert s3_path.startswith(s3_config.draft_files_bucket_path) is True

with file_util.open_stream(s3_path, "w") as outfile:
outfile.write(f"This is the {i}th file")

attachment = OpportunityAttachmentFactory.create(
opportunity=target_opportunity, file_location=s3_path
)
attachments.append(attachment)

transform_opportunity.process_opportunity(source_opportunity, target_opportunity)

validate_opportunity(db_session, source_opportunity)

# Verify all of the files were moved to the public bucket
for attachment in attachments:
assert attachment.file_location.startswith(s3_config.public_files_bucket_path) is True
assert file_util.file_exists(attachment.file_location) is True
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import pytest

import tests.src.db.models.factories as f
from src.adapters.aws import S3Config
from src.data_migration.transformation import transform_constants
from src.data_migration.transformation.subtask.transform_opportunity_attachment import (
TransformOpportunityAttachment,
Expand All @@ -17,13 +16,6 @@

class TestTransformOpportunitySummary(BaseTransformTestClass):

@pytest.fixture
def s3_config(self, mock_s3_bucket, other_mock_s3_bucket):
return S3Config(
PUBLIC_FILES_BUCKET=f"s3://{mock_s3_bucket}",
DRAFT_FILES_BUCKET=f"s3://{other_mock_s3_bucket}",
)

@pytest.fixture()
def transform_opportunity_attachment(self, transform_oracle_data_task, s3_config):
return TransformOpportunityAttachment(transform_oracle_data_task, s3_config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ def truncate_all_staging_tables(self, db_session):

@pytest.fixture()
def transform_oracle_data_task(
self, db_session, enable_factory_create, truncate_opportunities, truncate_all_staging_tables
self,
db_session,
enable_factory_create,
truncate_opportunities,
truncate_all_staging_tables,
s3_config,
) -> TransformOracleDataTask:
return TransformOracleDataTask(db_session)

Expand Down Expand Up @@ -466,7 +471,10 @@ def test_delete_opportunity_with_deleted_children(self, db_session, transform_or
# but we'll still have delete events for the others - this verfies how we handle that.

existing_opportunity = f.OpportunityFactory(
no_current_summary=True, opportunity_assistance_listings=[], agency_code="AGENCYXYZ"
no_current_summary=True,
opportunity_assistance_listings=[],
agency_code="AGENCYXYZ",
opportunity_attachments=[],
)
opportunity = f.StagingTopportunityFactory(
opportunity_id=existing_opportunity.opportunity_id, cfdas=[], is_deleted=True
Expand Down
Loading

0 comments on commit 4ebfef7

Please sign in to comment.