Skip to content

Commit

Permalink
Explicitly include Filter Data step in ingestion server removal IP (#…
Browse files Browse the repository at this point in the history
…4524)

* Explicitly include Filter Data step in ingestion server removal IP

* Move footnote content into the primary document
  • Loading branch information
AetherUnbound authored Jun 21, 2024
1 parent 417ba9c commit 2befb89
Showing 1 changed file with 69 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,15 @@ steps, which will be discussed separately in this IP:
entire contents of the upstream media table are copied into a new temp table
in the API database. This temp table will later replace the main media table
in the API.
- **Clean Data**: Data clean up which includes removing denylisted tags and
cleaning URLs. This step will become unnecessary when the
- **Clean Data**: Data clean up which includes cleaning URLs. This step will
become unnecessary when the
[Catalog Data Cleaning project](https://github.com/WordPress/openverse/issues/430)
is completed, which moves all data cleaning steps upstream into the initial
provider ingestion process.
- **Filter Data**: Separate from the clean up step (but currently included as
part of it). Filter out denylisted tags and machine-generated tags which are
below our current confidence threshold. This will _not_ be a part of the data
cleaning project and must be carried forward.
- **Create Index**: Create a new Elasticsearch index, matching the configuration
of the existing media index.
- **Distributed Reindex**: Convert each record from the new temp table to the
Expand Down Expand Up @@ -267,8 +271,8 @@ developed as separate DAGs alongside the current ones.
1. Create a new data refresh factory to generate new data refresh DAGs for each
media type for staging and production, with the
`<environment>_<media>_data_refresh` DAG id. For simplicity of code review,
these initial DAGs should only perform the `Copy Data`, and `Create Index`
steps, which we will perform entirely in Airflow.
these initial DAGs should only perform the `Copy Data`, `Filter Data`, and
`Create Index` steps, which we will perform entirely in Airflow.
1. Create a new `catalog-indexer-worker` in the Catalog, and build the new
indexer worker image locally.
1. Add the distributed reindexing step to the DAG (excludes infrastructure
Expand Down Expand Up @@ -333,14 +337,35 @@ already implemented in the current data refresh and can simply be copied:
We will include new tasks to perform the initial few steps of the ingestion
server's work:

- Copy Data: this should be a TaskGroup that will have multiple tasks for
- `Copy Data`: this should be a TaskGroup that will have multiple tasks for
creating the FDW from the upstream DB to the downstream DB, running the
copy_data query, and so on. It should fully replace the implementation of
[`refresh_api_table`](https://github.com/WordPress/openverse/blob/05ff48d05f2163104151c5589cf352a156bc6a97/ingestion_server/ingestion_server/ingest.py#L248)
in the ingestion server. All steps in this section are SQL queries that can be
implemented using the existing
[PostgresHook and PGExecuteQueryOperator](https://github.com/WordPress/openverse/blob/05ff48d05f2163104151c5589cf352a156bc6a97/catalog/dags/common/sql.py).
- Create Index: we can use our existing
- `Filter Data`: initially, this should be a single python task which exactly
mirrors the behavior of the
[`clean_image_data` function of `cleanup.py`](https://github.com/WordPress/openverse/blob/47fe5df0e9b8ad3dba06021f4cd4af9139977644/ingestion_server/ingestion_server/cleanup.py#L295)
(only applying the tag-specific steps) on the ingestion server[^3]. The
easiest way to do this would be to directly map the functionality of the
ingestion server on this step within a single Airflow task. The steps for this
task are as follows (see [Alternatives](#filtering-approach) for possible
future directions):
1. Get a batch of records from the database using `CLEANUP_BUFFER_SIZE`
2. Divide batch up into `multiprocessing.cpu_count()` subbatches
3. Split the filtering up into separate workers using multiprocessing
4. On each process
1. Create a new DB connection & cursor per worker
2. Iterate through each record
1. Remove tags below confidence level
2. Remove tags that need to be filtered (denylisted, machine-generated
filter list, provider, etc)
3. Only surface the record if it needs to be changed
4. Update each records one by one with a single `UPDATE`
3. Commit cursor and close connection
5. Repeat steps 1-4 until all batches are consumed
- `Create Index`: we can use our existing
[Elasticsearch tasks](https://github.com/WordPress/openverse/blob/05ff48d05f2163104151c5589cf352a156bc6a97/catalog/dags/common/elasticsearch.py#L82)
to create the new elasticsearch index with the index suffix generated in the
previous task.
Expand Down Expand Up @@ -557,6 +582,8 @@ No new tools or packages are required.

<!-- Describe any alternatives considered and why they were not chosen or recommended. -->

### ECS approach

The alternative options of using an ECS approach or performing the reindex
entirely in Airflow are discussed at length in the
[Approach to the Distributed Reindex](#approach-to-the-distributed-reindex)
Expand All @@ -567,6 +594,37 @@ using EC2 operators to start and stop the instances as needed. However, more
infrastructure work is required in this approach, and we would require
deployments whenever there are code changes in the indexer workers.

### Filtering approach

There are a number of ways to accomplish the data filtering, including several
ways to improve the approach mentioned.

The Airflow scheduler container has access to 4 cores, which is the same as the
ingestion server where this step was originally running. At present, it takes
about 8 hours for all cleanup steps, but that includes the URL cleaning which is
certainly more time intensive than the tag filtering since it makes outbound
requests. Running the tag filtering on Airflow should not impact any of the
other running tasks or saturate the instance.

There are a few ways this process could be improved, but none of them are
required _at this moment_. We can follow up after this project is complete to
assess what further optimizations might be necessary at this step. Some
potential suggestions for that time:

- Instead of single `UPDATE` queries for each affected records, we could insert
records from each subbatch to a temporary table. Then the base table could be
updated with an `UPDATE ... FROM` in bulk. Since the indices haven't been
applied to the base table yet, this should be fairly performant.
- Instead of using multiprocessing, we could pre-define the batches and run the
filtering chunks on a set of mapped tasks. The multiprocessing has the benefit
of iterating over a cursor on the database rather than having to manage the
record ranges explicitly, but this would allow further parallelization and
task management.
- The indexer workers themselves could be expanded to run on larger chunks of
the database for this filtering. This would likely require the most work as it
would involve expanding the indexer workers' API to handle this additional
task.

## Blockers

<!-- What hard blockers exist that prevent further work on this project? -->
Expand Down Expand Up @@ -609,3 +667,8 @@ monitor the first production data refreshes closely.

[^2]:
https://towardsdatascience.com/using-apache-airflow-dockeroperator-with-docker-compose-57d0217c8219

[^3]:
See #4456 for further context on this. The filtering is a _necessary_ step
of the data refresh we need to carry forward even after removing the other
cleanup steps.

0 comments on commit 2befb89

Please sign in to comment.