-
Notifications
You must be signed in to change notification settings - Fork 18
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #2847 from cisagov/rjm/2564-fixtures
#2564: Revise fixtures to include portfolios - [RJM]
- Loading branch information
Showing
10 changed files
with
858 additions
and
320 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
from datetime import timedelta | ||
from django.utils import timezone | ||
import logging | ||
import random | ||
from faker import Faker | ||
from django.db import transaction | ||
|
||
from registrar.fixtures.fixtures_requests import DomainRequestFixture | ||
from registrar.fixtures.fixtures_users import UserFixture | ||
from registrar.models import User, DomainRequest | ||
from registrar.models.domain import Domain | ||
|
||
fake = Faker() | ||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class DomainFixture(DomainRequestFixture): | ||
"""Create two domains and permissions on them for each user. | ||
One domain will have a past expiration date. | ||
Depends on fixtures_requests. | ||
Make sure this class' `load` method is called from `handle` | ||
in management/commands/load.py, then use `./manage.py load` | ||
to run this code. | ||
""" | ||
|
||
@classmethod | ||
def load(cls): | ||
# Lumped under .atomic to ensure we don't make redundant DB calls. | ||
# This bundles them all together, and then saves it in a single call. | ||
with transaction.atomic(): | ||
try: | ||
# Get the usernames of users created in the UserFixture | ||
created_usernames = [user_data["username"] for user_data in UserFixture.ADMINS + UserFixture.STAFF] | ||
|
||
# Filter users to only include those created by the fixture | ||
users = list(User.objects.filter(username__in=created_usernames)) | ||
except Exception as e: | ||
logger.warning(e) | ||
return | ||
|
||
# Approve each user associated with `in review` status domains | ||
cls._approve_domain_requests(users) | ||
|
||
@staticmethod | ||
def _generate_fake_expiration_date(days_in_future=365): | ||
"""Generates a fake expiration date between 1 and 365 days in the future.""" | ||
current_date = timezone.now().date() # nosec | ||
return current_date + timedelta(days=random.randint(1, days_in_future)) # nosec | ||
|
||
@staticmethod | ||
def _generate_fake_expiration_date_in_past(): | ||
"""Generates a fake expiration date up to 365 days in the past.""" | ||
current_date = timezone.now().date() # nosec | ||
return current_date + timedelta(days=random.randint(-365, -1)) # nosec | ||
|
||
@classmethod | ||
def _approve_request(cls, domain_request, users): | ||
"""Helper function to approve a domain request.""" | ||
if not domain_request: | ||
return None | ||
|
||
if domain_request.investigator is None: | ||
# Assign random investigator if not already assigned | ||
domain_request.investigator = random.choice(users) # nosec | ||
|
||
# Approve the domain request | ||
domain_request.approve(send_email=False) | ||
|
||
return domain_request | ||
|
||
@classmethod | ||
def _approve_domain_requests(cls, users): | ||
"""Approves one current and one expired request per user.""" | ||
domain_requests_to_update = [] | ||
expired_requests = [] | ||
|
||
for user in users: | ||
# Get the latest and second-to-last domain requests | ||
domain_requests = DomainRequest.objects.filter( | ||
creator=user, status=DomainRequest.DomainRequestStatus.IN_REVIEW | ||
).order_by("-id")[:2] | ||
|
||
# Latest domain request | ||
domain_request = domain_requests[0] if domain_requests else None | ||
# Second-to-last domain request (expired) | ||
domain_request_expired = domain_requests[1] if len(domain_requests) > 1 else None | ||
|
||
# Approve the current domain request | ||
if domain_request: | ||
cls._approve_request(domain_request, users) | ||
domain_requests_to_update.append(domain_request) | ||
|
||
# Approve the expired domain request | ||
if domain_request_expired: | ||
cls._approve_request(domain_request_expired, users) | ||
domain_requests_to_update.append(domain_request_expired) | ||
expired_requests.append(domain_request_expired) | ||
|
||
# Perform bulk update for the domain requests | ||
cls._bulk_update_requests(domain_requests_to_update) | ||
|
||
# Retrieve all domains associated with the domain requests | ||
domains_to_update = Domain.objects.filter(domain_info__domain_request__in=domain_requests_to_update) | ||
|
||
# Loop through and update expiration dates for domains | ||
for domain in domains_to_update: | ||
domain_request = domain.domain_info.domain_request | ||
|
||
# Set the expiration date based on whether the request is expired | ||
if domain_request in expired_requests: | ||
domain.expiration_date = cls._generate_fake_expiration_date_in_past() | ||
else: | ||
domain.expiration_date = cls._generate_fake_expiration_date() | ||
|
||
# Perform bulk update for the domains | ||
cls._bulk_update_domains(domains_to_update) | ||
|
||
@classmethod | ||
def _bulk_update_requests(cls, domain_requests_to_update): | ||
"""Bulk update domain requests.""" | ||
if domain_requests_to_update: | ||
try: | ||
DomainRequest.objects.bulk_update(domain_requests_to_update, ["status", "investigator"]) | ||
logger.info(f"Successfully updated {len(domain_requests_to_update)} requests.") | ||
except Exception as e: | ||
logger.error(f"Unexpected error during requests bulk update: {e}") | ||
|
||
@classmethod | ||
def _bulk_update_domains(cls, domains_to_update): | ||
"""Bulk update domains with expiration dates.""" | ||
if domains_to_update: | ||
try: | ||
Domain.objects.bulk_update(domains_to_update, ["expiration_date"]) | ||
logger.info(f"Successfully updated {len(domains_to_update)} domains.") | ||
except Exception as e: | ||
logger.error(f"Unexpected error during domains bulk update: {e}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
import logging | ||
import random | ||
from faker import Faker | ||
from django.db import transaction | ||
|
||
from registrar.models import User, DomainRequest, FederalAgency | ||
from registrar.models.portfolio import Portfolio | ||
from registrar.models.senior_official import SeniorOfficial | ||
|
||
|
||
fake = Faker() | ||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class PortfolioFixture: | ||
""" | ||
Creates 2 pre-defined portfolios with the infrastructure to add more. | ||
Make sure this class' `load` method is called from `handle` | ||
in management/commands/load.py, then use `./manage.py load` | ||
to run this code. | ||
""" | ||
|
||
PORTFOLIOS = [ | ||
{ | ||
"organization_name": "Hotel California", | ||
}, | ||
{ | ||
"organization_name": "Wish You Were Here", | ||
}, | ||
] | ||
|
||
@classmethod | ||
def fake_so(cls): | ||
return { | ||
"first_name": fake.first_name(), | ||
"last_name": fake.last_name(), | ||
"title": fake.job(), | ||
"email": fake.ascii_safe_email(), | ||
"phone": "201-555-5555", | ||
} | ||
|
||
@classmethod | ||
def _set_non_foreign_key_fields(cls, portfolio: Portfolio, portfolio_dict: dict): | ||
"""Helper method used by `load`.""" | ||
portfolio.organization_type = ( | ||
portfolio_dict["organization_type"] | ||
if "organization_type" in portfolio_dict | ||
else DomainRequest.OrganizationChoices.FEDERAL | ||
) | ||
portfolio.notes = portfolio_dict["notes"] if "notes" in portfolio_dict else None | ||
portfolio.address_line1 = ( | ||
portfolio_dict["address_line1"] if "address_line1" in portfolio_dict else fake.street_address() | ||
) | ||
portfolio.address_line2 = portfolio_dict["address_line2"] if "address_line2" in portfolio_dict else None | ||
portfolio.city = portfolio_dict["city"] if "city" in portfolio_dict else fake.city() | ||
portfolio.state_territory = ( | ||
portfolio_dict["state_territory"] if "state_territory" in portfolio_dict else fake.state_abbr() | ||
) | ||
portfolio.zipcode = portfolio_dict["zipcode"] if "zipcode" in portfolio_dict else fake.postalcode() | ||
portfolio.urbanization = portfolio_dict["urbanization"] if "urbanization" in portfolio_dict else None | ||
portfolio.security_contact_email = ( | ||
portfolio_dict["security_contact_email"] if "security_contact_email" in portfolio_dict else fake.email() | ||
) | ||
|
||
@classmethod | ||
def _set_foreign_key_fields(cls, portfolio: Portfolio, portfolio_dict: dict, user: User): | ||
"""Helper method used by `load`.""" | ||
if not portfolio.senior_official: | ||
if portfolio_dict.get("senior_official") is not None: | ||
portfolio.senior_official, _ = SeniorOfficial.objects.get_or_create(**portfolio_dict["senior_official"]) | ||
else: | ||
portfolio.senior_official = SeniorOfficial.objects.create(**cls.fake_so()) | ||
|
||
if not portfolio.federal_agency: | ||
if portfolio_dict.get("federal_agency") is not None: | ||
portfolio.federal_agency, _ = FederalAgency.objects.get_or_create(name=portfolio_dict["federal_agency"]) | ||
else: | ||
federal_agencies = FederalAgency.objects.all() | ||
# Random choice of agency for selects, used as placeholders for testing. | ||
portfolio.federal_agency = random.choice(federal_agencies) # nosec | ||
|
||
@classmethod | ||
def load(cls): | ||
"""Creates portfolios.""" | ||
logger.info("Going to load %s portfolios" % len(cls.PORTFOLIOS)) | ||
|
||
# Lumped under .atomic to ensure we don't make redundant DB calls. | ||
# This bundles them all together, and then saves it in a single call. | ||
with transaction.atomic(): | ||
try: | ||
user = User.objects.all().last() | ||
except Exception as e: | ||
logger.warning(e) | ||
return | ||
|
||
portfolios_to_create = [] | ||
for portfolio_data in cls.PORTFOLIOS: | ||
organization_name = portfolio_data["organization_name"] | ||
|
||
# Check if portfolio with the organization name already exists | ||
if Portfolio.objects.filter(organization_name=organization_name).exists(): | ||
logger.info( | ||
f"Portfolio with organization name '{organization_name}' already exists, skipping creation." | ||
) | ||
continue | ||
|
||
try: | ||
portfolio = Portfolio( | ||
creator=user, | ||
organization_name=portfolio_data["organization_name"], | ||
) | ||
cls._set_non_foreign_key_fields(portfolio, portfolio_data) | ||
cls._set_foreign_key_fields(portfolio, portfolio_data, user) | ||
portfolios_to_create.append(portfolio) | ||
except Exception as e: | ||
logger.warning(e) | ||
|
||
# Bulk create domain requests | ||
if len(portfolios_to_create) > 0: | ||
try: | ||
Portfolio.objects.bulk_create(portfolios_to_create) | ||
logger.info(f"Successfully created {len(portfolios_to_create)} portfolios") | ||
except Exception as e: | ||
logger.warning(f"Error bulk creating portfolios: {e}") |
Oops, something went wrong.