From 8b98f0024f2532be0b82371970f07225255741c0 Mon Sep 17 00:00:00 2001 From: Staci Mullins <63313398+stacimc@users.noreply.github.com> Date: Fri, 10 Nov 2023 10:58:11 -0800 Subject: [PATCH] Create filtered index before promoting primary index during data refresh (#3303) * Create the filtered index before promoting the primary index during the data refresh * Use XCOMs instead of TaskFlow to get index name Unfortunately if we don't do this, the task depdendency graph looks very messy, as a dependency line is drawn from generate_index_suffix to every descendant task that uses it. * Delete the correct index * Update docs * Prevent retries on the concurrency check * Pull methods out, move comments for clarity --- catalog/dags/common/ingestion_server.py | 7 + .../data_refresh/create_filtered_index.py | 141 ++++++++++++++ .../data_refresh/create_filtered_index_dag.py | 173 +++++------------- catalog/dags/data_refresh/dag_factory.py | 21 --- .../data_refresh/data_refresh_task_factory.py | 39 +++- documentation/catalog/reference/DAGs.md | 76 ++++---- 6 files changed, 265 insertions(+), 192 deletions(-) create mode 100644 catalog/dags/data_refresh/create_filtered_index.py diff --git a/catalog/dags/common/ingestion_server.py b/catalog/dags/common/ingestion_server.py index 3ae2d9709cc..3848724f452 100644 --- a/catalog/dags/common/ingestion_server.py +++ b/catalog/dags/common/ingestion_server.py @@ -1,8 +1,10 @@ import logging import os +import uuid from datetime import timedelta from urllib.parse import urlparse +from airflow.decorators import task from airflow.exceptions import AirflowSkipException from airflow.providers.http.operators.http import SimpleHttpOperator from airflow.providers.http.sensors.http import HttpSensor @@ -86,6 +88,11 @@ def response_check_index_readiness_check(response: Response) -> bool: return hits >= THRESHOLD_RESULT_COUNT +@task +def generate_index_suffix(default_suffix: str | None = None) -> str: + return default_suffix or uuid.uuid4().hex + + def get_current_index(target_alias: str) -> SimpleHttpOperator: return SimpleHttpOperator( task_id="get_current_index", diff --git a/catalog/dags/data_refresh/create_filtered_index.py b/catalog/dags/data_refresh/create_filtered_index.py new file mode 100644 index 00000000000..f4fd3d9270f --- /dev/null +++ b/catalog/dags/data_refresh/create_filtered_index.py @@ -0,0 +1,141 @@ +""" +# Create filtered index TaskGroup factory + +This module contains factory functions used to generate the Airflow tasks for +creating filtered indices, used by the data refreshes of each media type and the +Create Filtered Index DAGs. + +Filtered index creation is handled by the ingestion server. The TaskGroups generated +by the ``create_filtered_index_creation_task_groups`` function in this module are +responsible for creating and promoting filtered indices. The ``create_filtered_index`` +TaskGroup trigges the ingestion server action to create and populate the filtered +index for a given media type, and awaits the completion of the index creation. The +``promote_filtered_index`` TaskGroup awaits healthy results from the newly created +index, and then points the filtered index alias for the media type to the new index, +finally deleting the old, now unused filtered index. These TaskGroups are used in +the data refresh DAGs to execute the filtered index steps. +""" +from datetime import timedelta + +from airflow.operators.empty import EmptyOperator +from airflow.utils.task_group import TaskGroup +from airflow.utils.trigger_rule import TriggerRule + +from common import ingestion_server +from common.constants import XCOM_PULL_TEMPLATE +from data_refresh.data_refresh_types import DataRefresh + + +def create_and_populate_filtered_index( + media_type: str, + data_refresh: DataRefresh, + origin_index_suffix: str | None, + destination_index_suffix: str | None, +): + create_payload = {} + if origin_index_suffix: + create_payload["origin_index_suffix"] = origin_index_suffix + if destination_index_suffix: + create_payload["destination_index_suffix"] = destination_index_suffix + + return ingestion_server.trigger_and_wait_for_task( + action="CREATE_AND_POPULATE_FILTERED_INDEX", + model=media_type, + data=create_payload or None, + timeout=data_refresh.create_filtered_index_timeout, + ) + + +def point_alias( + media_type: str, target_alias: str, destination_index_suffix: str +) -> TaskGroup: + point_alias_payload = { + "alias": target_alias, + "index_suffix": f"{destination_index_suffix}-filtered", + } + + with TaskGroup(group_id="point_alias") as point_alias_group: + ingestion_server.trigger_and_wait_for_task( + action="POINT_ALIAS", + model=media_type, + data=point_alias_payload, + timeout=timedelta(hours=12), # matches the ingestion server's wait time + ) + return point_alias_group + + +def create_filtered_index_creation_task_groups( + data_refresh: DataRefresh, + origin_index_suffix: str | None, + destination_index_suffix: str | None, +) -> tuple[TaskGroup, TaskGroup]: + """ + Create the TaskGroups that performs filtered index creation and promotion for + the given DataRefresh. + """ + media_type = data_refresh.media_type + target_alias = f"{media_type}-filtered" + + with TaskGroup(group_id="create_filtered_index") as create_filtered_index_group: + # If a destination index suffix isn't provided, we need to generate + # one so that we know where to point the alias + final_destination_index_suffix = ( + ingestion_server.generate_index_suffix.override( + task_id="generate_filtered_index_suffix" + )(destination_index_suffix) + ) + + # Determine the current index. The current index retrieval has to happen prior + # to any of the index creation steps to ensure the appropriate index information + # is retrieved. + get_current_index_if_exists = ingestion_server.get_current_index(target_alias) + + # The current index retrieval step can be skipped if the index does not + # currently exist. The empty operator below works as a control flow management + # step to ensure the create step runs even if the current index retrieval step + # is skipped (the trigger rule would be tedious to percolate through all the + # helper functions to the index creation step itself). + continue_if_no_current_index = EmptyOperator( + task_id="continue_if_no_current_index", + trigger_rule=TriggerRule.NONE_FAILED, + ) + + do_create, await_create = create_and_populate_filtered_index( + media_type=media_type, + data_refresh=data_refresh, + origin_index_suffix=origin_index_suffix, + destination_index_suffix=final_destination_index_suffix, + ) + + get_current_index_if_exists >> continue_if_no_current_index >> do_create + do_create >> await_create + + with TaskGroup(group_id="promote_filtered_index") as promote_filtered_index_group: + # Await healthy results from the newly created elasticsearch index, then trigger + # and await the promotion of the index. + index_readiness_check = ingestion_server.index_readiness_check( + media_type=media_type, + index_suffix=f"{final_destination_index_suffix}-filtered", + ) + + do_point_alias = point_alias( + media_type=media_type, + target_alias=target_alias, + destination_index_suffix=final_destination_index_suffix, + ) + + delete_old_index = ingestion_server.trigger_task( + action="DELETE_INDEX", + model=data_refresh.media_type, + data={ + "index_suffix": XCOM_PULL_TEMPLATE.format( + get_current_index_if_exists.task_id, "return_value" + ), + }, + ) + + index_readiness_check >> do_point_alias + + [get_current_index_if_exists, do_point_alias] >> delete_old_index + + return create_filtered_index_group, promote_filtered_index_group diff --git a/catalog/dags/data_refresh/create_filtered_index_dag.py b/catalog/dags/data_refresh/create_filtered_index_dag.py index 19c6905c8e0..1cadcbd9158 100644 --- a/catalog/dags/data_refresh/create_filtered_index_dag.py +++ b/catalog/dags/data_refresh/create_filtered_index_dag.py @@ -5,21 +5,27 @@ using a factory function. Filtered index creation is handled by the ingestion server. The DAGs generated -by the ``build_create_filtered_index_dag`` function in this module are +by the ``create_filtered_index_creation_dag`` function in this module are responsible for triggering the ingestion server action to create and populate the filtered index for a given media type. The DAG awaits the completion of the filtered index creation and then points the filtered index alias for the -media type to the newly created index. +media type to the newly created index. They make use of the +``create_filtered_index_creation_task_groups`` factory, which is also used by the +data refreshes to perform the same functions. The purpose of these DAGs is to allow +the filtered index creation steps to be run in isolation from the data refresh. ## When this DAG runs -The DAGs generated in this module are triggered by the data refresh DAGs. -Maintaining this process separate from the data refresh DAGs, while still -triggering it there, allows us to run filtered index creation independently -of the full data refresh. This is primarily useful in two cases: for testing -changes to the filtered index creation; and for re-running filtered index -creation if an urgent change to the sensitive terms calls for an immediate -recreation of the filtered indexes. +The DAGs generated by the ``create_filtered_index_creation_dag`` can be used +to manually run the filtered index creation and promotion steps described above in +isolation from the rest of the data refresh. These DAGs also include checks to ensure +that race conditions with the data refresh DAGs are not encountered (see ``Race +conditions`` section below). + +The DAGs generated in this module are on a `None` schedule and are only triggered +manually. This is primarily useful in two cases: for testing changes to the filtered +index creation; and for re-running filtered index creation if an urgent change to the +sensitive terms calls for an immediate recreation of the filtered indexes. ## Race conditions @@ -43,28 +49,20 @@ This ensures that neither are depending on or modifying the origin indexes critical for the creation of the filtered indexes. - -Because the data refresh DAG triggers the filtered index creation DAG, we do -allow a ``force`` param to be passed to the DAGs generated by this module. -This parameter is only for use by the data refresh DAG and should not be -used when manually triggering the DAG unless you are absolutely certain -of what you are doing. """ -import uuid -from datetime import datetime, timedelta +from datetime import datetime from airflow import DAG from airflow.decorators import task from airflow.exceptions import AirflowSensorTimeout from airflow.models.param import Param -from airflow.operators.empty import EmptyOperator from airflow.sensors.external_task import ExternalTaskSensor -from airflow.utils.task_group import TaskGroup -from airflow.utils.trigger_rule import TriggerRule -from common import ingestion_server -from common.constants import DAG_DEFAULT_ARGS, XCOM_PULL_TEMPLATE +from common.constants import DAG_DEFAULT_ARGS from common.sensors.utils import get_most_recent_dag_run +from data_refresh.create_filtered_index import ( + create_filtered_index_creation_task_groups, +) from data_refresh.data_refresh_types import DATA_REFRESH_CONFIGS, DataRefresh @@ -77,17 +75,17 @@ # this factory function, but it would require a redundant # call to the decorated function and doesn't look like it would # provide any additional value whatsoever. -def filtered_index_creation_dag_factory(data_refresh: DataRefresh): +def create_filtered_index_creation_dag(data_refresh: DataRefresh): + """ + Create a DAG for the given DataRefresh that performs filtered index + creation and promotion, preventing concurrency with the data refreshes. + """ media_type = data_refresh.media_type - target_alias = f"{media_type}-filtered" @task( task_id=f"prevent_concurrency_with_{media_type}_data_refresh", ) - def prevent_concurrency_with_data_refresh(force: bool, **context): - if force: - return - + def prevent_concurrency_with_data_refresh(**context): data_refresh_dag_id = f"{media_type}_data_refresh" wait_for_filtered_index_creation = ExternalTaskSensor( task_id="check_for_running_data_refresh", @@ -109,42 +107,6 @@ def prevent_concurrency_with_data_refresh(force: bool, **context): "Filtered index creation cannot start during a data refresh." ) - @task() - def generate_index_suffix(default_suffix: str): - return default_suffix or uuid.uuid4().hex - - def create_and_populate_filtered_index( - origin_index_suffix: str | None, - destination_index_suffix: str | None, - ): - create_payload = {} - if origin_index_suffix: - create_payload["origin_index_suffix"] = origin_index_suffix - if destination_index_suffix: - create_payload["destination_index_suffix"] = destination_index_suffix - - return ingestion_server.trigger_and_wait_for_task( - action="CREATE_AND_POPULATE_FILTERED_INDEX", - model=media_type, - data=create_payload or None, - timeout=data_refresh.create_filtered_index_timeout, - ) - - def point_alias(destination_index_suffix: str) -> TaskGroup: - point_alias_payload = { - "alias": target_alias, - "index_suffix": f"{destination_index_suffix}-filtered", - } - - with TaskGroup(group_id="point_alias") as point_alias_group: - ingestion_server.trigger_and_wait_for_task( - action="POINT_ALIAS", - model=media_type, - data=point_alias_payload, - timeout=timedelta(hours=12), # matches the ingestion server's wait time - ) - return point_alias_group - with DAG( dag_id=f"create_filtered_{media_type}_index", default_args=DAG_DEFAULT_ARGS, @@ -155,20 +117,6 @@ def point_alias(destination_index_suffix: str) -> TaskGroup: catchup=False, doc_md=__doc__, params={ - "force": Param( - default=False, - type="boolean", - description=( - "Bypass data refresh concurrency check. " - "Should only ever be used by the data_refresh " - "DAGs when triggering filtered index creation " - "at the end of a data refresh. This should not " - "be used when manually running this DAG " - "unless you're absolutely sure of what you're " - "doing. This check exists to prevent race " - "conditions and should not be ignored lightly." - ), - ), "origin_index_suffix": Param( default=None, type=["string", "null"], @@ -193,64 +141,25 @@ def point_alias(destination_index_suffix: str) -> TaskGroup: }, render_template_as_native_obj=True, ) as dag: - prevent_concurrency = prevent_concurrency_with_data_refresh( - force="{{ params.force }}", - ) - - # If a destination index suffix isn't provided, we need to generate - # one so that we know where to point the alias - destination_index_suffix = generate_index_suffix( - "{{ params.destination_index_suffix }}" - ) - - get_current_index_if_exists = ingestion_server.get_current_index(target_alias) - - do_create, await_create = create_and_populate_filtered_index( - origin_index_suffix="{{ params.origin_index_suffix }}", - destination_index_suffix=destination_index_suffix, - ) - - # Await healthy results from the newly created elasticsearch index. - index_readiness_check = ingestion_server.index_readiness_check( - media_type=media_type, - index_suffix=f"{destination_index_suffix}-filtered", - ) - - do_point_alias = point_alias(destination_index_suffix=destination_index_suffix) - - delete_old_index = ingestion_server.trigger_task( - action="DELETE_INDEX", - model=data_refresh.media_type, - data={ - "index_suffix": XCOM_PULL_TEMPLATE.format( - get_current_index_if_exists.task_id, "return_value" - ), - }, + prevent_concurrency = prevent_concurrency_with_data_refresh.override( + retries=0 + )() + + # Once the concurrency check has passed, actually create the filtered + # index. + ( + create_filtered_index, + promote_filtered_index, + ) = create_filtered_index_creation_task_groups( + data_refresh, + "{{ params.origin_index_suffix }}", + "{{ params.destination_index_suffix }}", ) - # Once concurrency has been checked against, determine the destination index - # suffix and get the current index. The current index retrieval has to happen - # prior to any of the index creation steps to ensure the appropriate index - # information is retrieved. - prevent_concurrency >> [get_current_index_if_exists, destination_index_suffix] - - # The current index retrieval step can be skipped if the index does not - # currently exist. The empty operator below works as a control flow management - # step to ensure the create step runs even if the current index retrieval step - # is skipped (the trigger rule would be tedious to percolate through all the - # helper functions to the index creation step itself). - continue_if_no_current_index = EmptyOperator( - task_id="continue_if_no_current_index", - trigger_rule=TriggerRule.NONE_FAILED, - ) - - get_current_index_if_exists >> continue_if_no_current_index >> do_create - await_create >> index_readiness_check >> do_point_alias - - [get_current_index_if_exists, do_point_alias] >> delete_old_index + prevent_concurrency >> create_filtered_index >> promote_filtered_index return dag for data_refresh in DATA_REFRESH_CONFIGS: - create_filtered_index_dag = filtered_index_creation_dag_factory(data_refresh) + create_filtered_index_dag = create_filtered_index_creation_dag(data_refresh) diff --git a/catalog/dags/data_refresh/dag_factory.py b/catalog/dags/data_refresh/dag_factory.py index f2fd382574e..88ed8075c2e 100644 --- a/catalog/dags/data_refresh/dag_factory.py +++ b/catalog/dags/data_refresh/dag_factory.py @@ -27,7 +27,6 @@ from airflow import DAG from airflow.operators.python import PythonOperator -from airflow.operators.trigger_dagrun import TriggerDagRunOperator from common.constants import ( DAG_DEFAULT_ARGS, @@ -128,29 +127,9 @@ def create_data_refresh_dag(data_refresh: DataRefresh, external_dag_ids: Sequenc }, ) - index_suffix = XCOM_PULL_TEMPLATE.format( - data_refresh_group.get_child_by_label("generate_index_suffix").task_id, - "return_value", - ) - trigger_filtered_index_creation = TriggerDagRunOperator( - task_id=f"trigger_create_filtered_{data_refresh.media_type}_index", - trigger_dag_id=f"create_filtered_{data_refresh.media_type}_index", - conf={ - # Force to skip data refresh DAG concurrency check - # as the data refresh DAG will clearly already be running - # as it is triggering the filtered index creation DAG. - "force": True, - "origin_index_suffix": index_suffix, - # Match origin and destination suffixes so we can tell which - # filtered indexes were created as part of a data refresh. - "destination_index_suffix": index_suffix, - }, - ) - # Set up task dependencies before_record_count >> data_refresh_group data_refresh_group >> after_record_count >> report_counts - data_refresh_group >> trigger_filtered_index_creation return dag diff --git a/catalog/dags/data_refresh/data_refresh_task_factory.py b/catalog/dags/data_refresh/data_refresh_task_factory.py index e0ec759634d..32ce0497765 100644 --- a/catalog/dags/data_refresh/data_refresh_task_factory.py +++ b/catalog/dags/data_refresh/data_refresh_task_factory.py @@ -60,6 +60,9 @@ from common.constants import XCOM_PULL_TEMPLATE from common.sensors.single_run_external_dags_sensor import SingleRunExternalDAGsSensor from common.sensors.utils import get_most_recent_dag_run +from data_refresh.create_filtered_index import ( + create_filtered_index_creation_task_groups, +) from data_refresh.data_refresh_types import DataRefresh @@ -181,8 +184,33 @@ def create_data_refresh_task_group( ) tasks.append(index_readiness_check) + # Create the TaskGroups for creating and promoting the filtered index. The + # promotion task group will not be run until later. + ( + create_filtered_index, + promote_filtered_index, + ) = create_filtered_index_creation_task_groups( + data_refresh=data_refresh, + origin_index_suffix=XCOM_PULL_TEMPLATE.format( + generate_index_suffix.task_id, "return_value" + ), + # Match origin and destination suffixes so we can tell which + # filtered indexes were created as part of a data refresh. + destination_index_suffix=XCOM_PULL_TEMPLATE.format( + generate_index_suffix.task_id, "return_value" + ), + ) + + # Add the task group for triggering the filtered index creation and awaiting its + # completion, once `ingest_upstream` has finished creating the new media index + # but before it is promoted. This prevents the filtered index creation from + # running against an index that is already promoted in production. + tasks.append(create_filtered_index) + # Trigger the `promote` task on the ingestion server and await its completion. - # This task promotes the newly created API DB table and elasticsearch index. + # This task promotes the newly created API DB table and elasticsearch index. It + # does not include promotion of the filtered index, which must be promoted + # separately. with TaskGroup(group_id="promote") as promote_tasks: ingestion_server.trigger_and_wait_for_task( action="promote", @@ -209,12 +237,17 @@ def create_data_refresh_task_group( ) tasks.append(delete_old_index) + # Finally, promote the filtered index. + tasks.append(promote_filtered_index) + # ``tasks`` contains the following tasks and task groups: # wait_for_data_refresh # └─ get_current_index # └─ ingest_upstream (trigger_ingest_upstream + wait_for_ingest_upstream) - # └─ promote (trigger_promote + wait_for_promote) - # └─ delete_old_index + # └─ create_filtered_index + # └─ promote (trigger_promote + wait_for_promote) + # └─ delete_old_index + # └─ promote_filtered_index (including delete filtered index) chain(*tasks) return data_refresh_group diff --git a/documentation/catalog/reference/DAGs.md b/documentation/catalog/reference/DAGs.md index 72a3b8f14b6..baefb4e4e1b 100644 --- a/documentation/catalog/reference/DAGs.md +++ b/documentation/catalog/reference/DAGs.md @@ -344,21 +344,29 @@ This module creates the filtered index creation DAGs for each media type using a factory function. Filtered index creation is handled by the ingestion server. The DAGs generated -by the `build_create_filtered_index_dag` function in this module are responsible -for triggering the ingestion server action to create and populate the filtered -index for a given media type. The DAG awaits the completion of the filtered -index creation and then points the filtered index alias for the media type to -the newly created index. +by the `create_filtered_index_creation_dag` function in this module are +responsible for triggering the ingestion server action to create and populate +the filtered index for a given media type. The DAG awaits the completion of the +filtered index creation and then points the filtered index alias for the media +type to the newly created index. They make use of the +`create_filtered_index_creation_task_groups` factory, which is also used by the +data refreshes to perform the same functions. The purpose of these DAGs is to +allow the filtered index creation steps to be run in isolation from the data +refresh. ##### When this DAG runs -The DAGs generated in this module are triggered by the data refresh DAGs. -Maintaining this process separate from the data refresh DAGs, while still -triggering it there, allows us to run filtered index creation independently of -the full data refresh. This is primarily useful in two cases: for testing -changes to the filtered index creation; and for re-running filtered index -creation if an urgent change to the sensitive terms calls for an immediate -recreation of the filtered indexes. +The DAGs generated by the `create_filtered_index_creation_dag` can be used to +manually run the filtered index creation and promotion steps described above in +isolation from the rest of the data refresh. These DAGs also include checks to +ensure that race conditions with the data refresh DAGs are not encountered (see +`Race conditions` section below). + +The DAGs generated in this module are on a `None` schedule and are only +triggered manually. This is primarily useful in two cases: for testing changes +to the filtered index creation; and for re-running filtered index creation if an +urgent change to the sensitive terms calls for an immediate recreation of the +filtered indexes. ##### Race conditions @@ -383,12 +391,6 @@ There are two mechanisms that prevent this from happening: This ensures that neither are depending on or modifying the origin indexes critical for the creation of the filtered indexes. -Because the data refresh DAG triggers the filtered index creation DAG, we do -allow a `force` param to be passed to the DAGs generated by this module. This -parameter is only for use by the data refresh DAG and should not be used when -manually triggering the DAG unless you are absolutely certain of what you are -doing. - ### `create_filtered_image_index` #### Create filtered index DAG factory @@ -397,21 +399,29 @@ This module creates the filtered index creation DAGs for each media type using a factory function. Filtered index creation is handled by the ingestion server. The DAGs generated -by the `build_create_filtered_index_dag` function in this module are responsible -for triggering the ingestion server action to create and populate the filtered -index for a given media type. The DAG awaits the completion of the filtered -index creation and then points the filtered index alias for the media type to -the newly created index. +by the `create_filtered_index_creation_dag` function in this module are +responsible for triggering the ingestion server action to create and populate +the filtered index for a given media type. The DAG awaits the completion of the +filtered index creation and then points the filtered index alias for the media +type to the newly created index. They make use of the +`create_filtered_index_creation_task_groups` factory, which is also used by the +data refreshes to perform the same functions. The purpose of these DAGs is to +allow the filtered index creation steps to be run in isolation from the data +refresh. ##### When this DAG runs -The DAGs generated in this module are triggered by the data refresh DAGs. -Maintaining this process separate from the data refresh DAGs, while still -triggering it there, allows us to run filtered index creation independently of -the full data refresh. This is primarily useful in two cases: for testing -changes to the filtered index creation; and for re-running filtered index -creation if an urgent change to the sensitive terms calls for an immediate -recreation of the filtered indexes. +The DAGs generated by the `create_filtered_index_creation_dag` can be used to +manually run the filtered index creation and promotion steps described above in +isolation from the rest of the data refresh. These DAGs also include checks to +ensure that race conditions with the data refresh DAGs are not encountered (see +`Race conditions` section below). + +The DAGs generated in this module are on a `None` schedule and are only +triggered manually. This is primarily useful in two cases: for testing changes +to the filtered index creation; and for re-running filtered index creation if an +urgent change to the sensitive terms calls for an immediate recreation of the +filtered indexes. ##### Race conditions @@ -436,12 +446,6 @@ There are two mechanisms that prevent this from happening: This ensures that neither are depending on or modifying the origin indexes critical for the creation of the filtered indexes. -Because the data refresh DAG triggers the filtered index creation DAG, we do -allow a `force` param to be passed to the DAGs generated by this module. This -parameter is only for use by the data refresh DAG and should not be used when -manually triggering the DAG unless you are absolutely certain of what you are -doing. - ### `europeana_workflow` Content Provider: Europeana