Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Staging database restore DAG: base restore #2099

Merged
merged 24 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
c6f5a07
Add AWS_CONN_ID to constants.py
AetherUnbound May 11, 2023
05191e5
Add first few tasks, DAG skeleton
AetherUnbound May 12, 2023
5f9c762
Extract AWS_RDS_CONN_ID, pull available check into sensor
AetherUnbound May 12, 2023
f3dbd38
Make a submodule constants file
AetherUnbound May 12, 2023
b8502fd
Add check staging utility function
AetherUnbound May 12, 2023
4bd03c8
Add step to get staging DB details
AetherUnbound May 12, 2023
62078ba
Use a format string for the identifier names
AetherUnbound May 12, 2023
1cdb3c9
Add staging restore step
AetherUnbound May 12, 2023
8ddf498
Add the await creation step
AetherUnbound May 12, 2023
56af8ca
Add rename instance function, move constants, notify outage
AetherUnbound May 12, 2023
5efd289
Move rename into task group, add rename staging to old
AetherUnbound May 12, 2023
5731465
Add more retries to sensor after rename
AetherUnbound May 12, 2023
939dcc1
Employ a shortcircuit operator instead of custom skip
AetherUnbound May 12, 2023
4eba5bc
Add failure handling on rename, add old db deletion after run
AetherUnbound May 12, 2023
6fe70ad
Improve slack messaging
AetherUnbound May 12, 2023
ca0459e
Add test_utils, rename ensure_staging function
AetherUnbound May 12, 2023
6d6ed95
Allow RDS hook to be overridden
AetherUnbound May 12, 2023
82d0fa7
Add tests for major functionality
AetherUnbound May 13, 2023
4dcbf06
Update types
AetherUnbound May 13, 2023
e373836
Fix initial slack message
AetherUnbound May 13, 2023
1a99bf7
Add instructions about local testing
AetherUnbound May 13, 2023
71069cf
Generate DAG docs
AetherUnbound May 13, 2023
a08ea01
Fix spelling
AetherUnbound May 23, 2023
a8e4289
Update exception message
AetherUnbound May 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions catalog/DAGs.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ The following are DAGs grouped by their primary tag:
| [`recreate_audio_popularity_calculation`](#recreate_audio_popularity_calculation) | `None` |
| [`recreate_image_popularity_calculation`](#recreate_image_popularity_calculation) | `None` |
| [`report_pending_reported_media`](#report_pending_reported_media) | `@weekly` |
| [`staging_database_restore`](#staging_database_restore) | `@monthly` |

## Maintenance

Expand Down Expand Up @@ -133,6 +134,7 @@ The following is documentation associated with each DAG (where available):
1. [`science_museum_workflow`](#science_museum_workflow)
1. [`smithsonian_workflow`](#smithsonian_workflow)
1. [`smk_workflow`](#smk_workflow)
1. [`staging_database_restore`](#staging_database_restore)
1. [`stocksnap_workflow`](#stocksnap_workflow)
1. [`wikimedia_commons_workflow`](#wikimedia_commons_workflow)
1. [`wikimedia_reingestion_workflow`](#wikimedia_reingestion_workflow)
Expand Down Expand Up @@ -704,6 +706,25 @@ Output: TSV file containing the media metadata.

Notes: https://www.smk.dk/en/article/smk-api/

## `staging_database_restore`

### Update the staging database

This DAG is responsible for updating the staging database using the most recent
snapshot of the production database.

For a full explanation of the DAG, see the implementation plan description:
https://docs.openverse.org/projects/proposals/search_relevancy_sandbox/20230406-implementation_plan_update_staging_database.html#dag

This DAG will default to using the standard AWS connection ID for the RDS
operations. For local testing, you can set up two environment variables to have
the RDS operations run using a different hook:

- `AWS_RDS_CONN_ID`: The Airflow connection ID to use for RDS operations (e.g.
`aws_rds`)
- `AIRFLOW_CONN_<ID>`: The connection string to use for RDS operations (per the
above example, it might be `AIRFLOW_CONN_AWS_RDS`)

## `stocksnap_workflow`

Content Provider: StockSnap
Expand Down
1 change: 1 addition & 0 deletions catalog/dags/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@

POSTGRES_CONN_ID = os.getenv("OPENLEDGER_CONN_ID", "postgres_openledger_testing")
OPENLEDGER_API_CONN_ID = os.getenv("OPENLEDGER_API_CONN_ID", "postgres_openledger_api")
AWS_CONN_ID = os.getenv("AWS_CONN_ID", "aws_conn_id")
19 changes: 19 additions & 0 deletions catalog/dags/database/staging_database_restore/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import os

from common.constants import AWS_CONN_ID


_ID_FORMAT = "{}-openverse-db"

DAG_ID = "staging_database_restore"
PROD_IDENTIFIER = _ID_FORMAT.format("prod")
STAGING_IDENTIFIER = _ID_FORMAT.format("dev")
TEMP_IDENTIFIER = _ID_FORMAT.format("dev-next")
OLD_IDENTIFIER = _ID_FORMAT.format("dev-old")

SAFE_TO_MUTATE = {STAGING_IDENTIFIER, TEMP_IDENTIFIER, OLD_IDENTIFIER}

SKIP_VARIABLE = "SKIP_STAGING_DATABASE_RESTORE"
AWS_RDS_CONN_ID = os.environ.get("AWS_RDS_CONN_ID", AWS_CONN_ID)
SLACK_USERNAME = "Staging Database Restore"
SLACK_ICON = ":database-pink:"
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
import logging
from datetime import timedelta
from pprint import pformat

from airflow.decorators import task
from airflow.models import Variable
from airflow.providers.amazon.aws.hooks.rds import RdsHook
from airflow.providers.amazon.aws.sensors.rds import RdsDbSensor
from airflow.utils.task_group import TaskGroup
from airflow.utils.trigger_rule import TriggerRule

from common import slack
from database.staging_database_restore import constants
from database.staging_database_restore.utils import (
ensure_mutate_allowed,
setup_rds_hook,
)


REQUIRED_DB_INFO = {
"MultiAZ",
"AvailabilityZone",
"VpcSecurityGroups",
"DBSubnetGroup",
"PubliclyAccessible",
"DBInstanceClass",
"AllocatedStorage",
}


log = logging.getLogger(__name__)


@task.short_circuit
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very cool, TIL!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stumbled across this one by accident while looking for something else, couldn't help but use it!

def skip_restore(should_skip: bool = False) -> bool:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this take should_skip as an argument? Will a value ever be passed this way?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"""
Determine whether to skip the restore process.
Can be overridden by setting the `SKIP_STAGING_DATABASE_RESTORE` Airflow Variable
to `true`.
Should return `True` to have the DAG continue, and `False` to have it skipped.
https://docs.astronomer.io/learn/airflow-branch-operator#taskshort_circuit-shortcircuitoperator
"""
should_continue = not (
should_skip
or Variable.get(
constants.SKIP_VARIABLE, default_var=False, deserialize_json=True
)
)
if not should_continue:
notify_slack.function(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neat!

"The staging database restore has been skipped. "
f"(Set the `{constants.SKIP_VARIABLE}` Airflow Variable to `false`"
"to disable this behavior.)"
)
return should_continue


@task
@setup_rds_hook
def get_latest_prod_snapshot(rds_hook: RdsHook = None) -> str:
"""
Get the latest automated snapshot for the production database.
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/rds/client/describe_db_snapshots.html
Status is checked using a sensor in a later step, in case a snapshot creation is
currently in progress.
"""
# Get snapshots
snapshots = rds_hook.conn.describe_db_snapshots(
DBInstanceIdentifier=constants.PROD_IDENTIFIER,
SnapshotType="automated",
).get("DBSnapshots", [])
# Sort by descending creation time
snapshots = sorted(
snapshots,
key=lambda x: x["SnapshotCreateTime"],
reverse=True,
)
if not snapshots:
raise ValueError(f"No snapshots found for {constants.PROD_IDENTIFIER}")
latest_snapshot = snapshots[0]
log.info(f"Latest snapshot: {latest_snapshot}")
return latest_snapshot["DBSnapshotIdentifier"]


@task
@setup_rds_hook
def get_staging_db_details(rds_hook: RdsHook = None) -> dict:
"""
Retrieve the details of the staging database. Only some details are required (and
others are actually sensitive) so filter down to only what we need.
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/rds/client/describe_db_instances.html
"""
# Get staging DB details
instances = rds_hook.conn.describe_db_instances(
DBInstanceIdentifier=constants.STAGING_IDENTIFIER,
).get("DBInstances", [])
if not instances:
raise ValueError(f"No staging DB found for {constants.STAGING_IDENTIFIER}")
staging_db = instances[0]
# While it might be tempting to log this information, it contains sensitive
# values. Instead, we'll select only the information we need, then log that.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is filtering out sensitive values only important for logging? Ie, could we only restrict logging to REQUIRED_DB_INFO, but use everything to create the new db? Would that provide any extra resilience to configuration drift, or can we expect that REQUIRED_DB_INFO will never need to change?

Copy link
Collaborator Author

@AetherUnbound AetherUnbound May 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are a few reasons why the current approach might be best:

  • Easy prevention of logging secrets accidentally
  • REQUIRED_DB_INFO isn't likely to change on the features that we care about at least
  • There are some values that are returned from the details API that have to have their shape changed before they can be fed into the create from snapshot API (see the lines below on subnet group name and VPC security group IDs). There are dozens of values returned from the former API, so trying to manage the appropriate transformations on all of them so they can be fed into the latter API seems unnecessary at this point IMO.

staging_db = {
key: value for key, value in staging_db.items() if key in REQUIRED_DB_INFO
}
# Pull the DBSubnetGroup name out of the DBSubnetGroup object
staging_db["DBSubnetGroupName"] = staging_db.pop("DBSubnetGroup")[
"DBSubnetGroupName"
]
# Pull the VPC IDs out of the VpcSecurityGroups objects
staging_db["VpcSecurityGroupIds"] = [
vpc["VpcSecurityGroupId"] for vpc in staging_db.pop("VpcSecurityGroups")
]
log.info(f"Staging DB config: \n{pformat(staging_db)}")
return staging_db


@task
@setup_rds_hook
def restore_staging_from_snapshot(
latest_snapshot: str, staging_config: dict, rds_hook: RdsHook = None
) -> None:
"""
Restore the staging database from the latest snapshot.
Augment the restore operation with the existing details determined from
a previous step.
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/rds/client/restore_db_instance_from_db_snapshot.html
"""
log.info(
f"Creating a new {constants.TEMP_IDENTIFIER} instance from {latest_snapshot} "
f"with: \n{pformat(staging_config)}"
)
rds_hook.conn.restore_db_instance_from_db_snapshot(
DBInstanceIdentifier=constants.TEMP_IDENTIFIER,
DBSnapshotIdentifier=latest_snapshot,
**staging_config,
)


@task
@setup_rds_hook
def rename_db_instance(source: str, target: str, rds_hook: RdsHook = None) -> None:
"""
Rename a database instance.
This can only be run on instances where mutation is allowed.
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/rds/client/modify_db_instance.html
"""
log.info("Checking input values")
ensure_mutate_allowed(source)
ensure_mutate_allowed(target)
log.info(f"Renaming {source} to {target}")
rds_hook.conn.modify_db_instance(
DBInstanceIdentifier=source,
NewDBInstanceIdentifier=target,
ApplyImmediately=True,
)


@task
def notify_slack(text: str) -> None:
slack.send_message(
text,
username=constants.SLACK_USERNAME,
icon_emoji=constants.SLACK_ICON,
dag_id=constants.DAG_ID,
)


def make_rds_sensor(task_id: str, db_identifier: str, retries: int = 0) -> RdsDbSensor:
return RdsDbSensor(
task_id=task_id,
db_identifier=db_identifier,
target_statuses=["available"],
aws_conn_id=constants.AWS_RDS_CONN_ID,
mode="reschedule",
timeout=60 * 60, # 1 hour
retries=retries,
retry_delay=timedelta(minutes=1),
)


def make_rename_task_group(
source: str,
target: str,
trigger_rule: TriggerRule = TriggerRule.ALL_SUCCESS,
) -> TaskGroup:
"""
Create a task group which includes both a rename operation, and a sensor to wait
for the new database to be ready. This requires retries because the database
may not be ready immediately after the rename when the first await is tried.
"""
source_name = source.removesuffix("-openverse-db")
target_name = target.removesuffix("-openverse-db")
with TaskGroup(group_id=f"rename_{source_name}_to_{target_name}") as rename_group:
rename = rename_db_instance.override(
task_id=f"rename_{source_name}_to_{target_name}",
trigger_rule=trigger_rule,
)(
source=source,
target=target,
)
await_rename = make_rds_sensor(
task_id=f"await_{target_name}",
db_identifier=target,
retries=2,
)
rename >> await_rename

return rename_group
Loading