Skip to content

Commit

Permalink
Clarify language to 'staging', simplify math
Browse files Browse the repository at this point in the history
  • Loading branch information
stacimc committed Feb 20, 2024
1 parent c80eb1c commit 5a053be
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ def get_destination_index_config(source_config: dict, destination_index_name: st


@task
def get_production_source_counts(source_index: str, es_host: str):
def get_staging_source_counts(source_index: str, es_host: str):
"""
Get the count of records per source for the given media type in the
production source index.
staging source index.
"""
es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn

Expand All @@ -75,7 +75,7 @@ def get_production_source_counts(source_index: str, es_host: str):

@task
def get_proportional_source_count_kwargs(
production_source_counts: dict[str, int], percentage_of_prod: int
staging_source_counts: dict[str, int], percentage_of_prod: int
):
"""
Return a list of kwargs for each mapped task to reindex the
Expand All @@ -85,33 +85,14 @@ def get_proportional_source_count_kwargs(
* `max_docs`: The count of records for this source needed in the new
index in order for the source to make up the same
proportion of the new index as it does in the
production unfiltered media index.
source index.
* `query`: An elasticsearch query that will be used to restrict
the reindexing task to records from this source.
"""
# Given the record counts for each source in production, determine
# what proportion of the production total each source represents
production_total = sum(production_source_counts.values())
production_source_proportions = {
source: production_count / production_total
for source, production_count in production_source_counts.items()
}

# The total number of records in the proportional subset index. Note that
# the final count of the subset may be different if the proportions do not
# divide evenly into the desired total, because we must round to the nearest
# integer.
# For example, if the desired index size is 1,000 records but each of the
# sources must represent 1/3 of the index, we will round the count for each
# source to 333 and have a final index with only 999 records.
subset_total = round(production_total * percentage_of_prod)

# Return a list of kwargs that will be passed to the mapped reindex
# tasks, for reindexing each source into the new index.
return [
{
"max_docs": round(subset_total * source_proportion),
"max_docs": round(staging_total * percentage_of_prod),
"query": {"bool": {"filter": [{"term": {"source": source}}]}},
}
for source, source_proportion in production_source_proportions.items()
for source, staging_total in staging_source_counts.items()
]
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
default=None,
type=["string", "null"],
description=(
"Optionally, the existing production Elasticsearch index"
"Optionally, the existing staging Elasticsearch index"
" to use as the basis for the new index. If not provided,"
" the index aliased to `<media_type>-filtered` will be used."
),
Expand Down Expand Up @@ -142,14 +142,14 @@ def create_proportional_by_source_staging_index():

new_index = es.create_index(index_config=destination_index_config, es_host=es_host)

production_source_counts = create_index.get_production_source_counts(
staging_source_counts = create_index.get_staging_source_counts(
source_index=source_index_name, es_host=es_host
)

desired_source_counts = create_index.get_proportional_source_count_kwargs.override(
task_id="get_desired_source_counts"
)(
production_source_counts=production_source_counts,
staging_source_counts=staging_source_counts,
percentage_of_prod="{{ params.percentage_of_prod }}",
)

Expand Down Expand Up @@ -180,8 +180,8 @@ def create_proportional_by_source_staging_index():
# Setup additional dependencies
prevent_concurrency >> es_host
es_host >> [source_index_name, destination_index_name, destination_alias]
production_source_counts >> desired_source_counts
new_index >> production_source_counts
staging_source_counts >> desired_source_counts
new_index >> staging_source_counts
reindex >> point_alias >> notify_completion


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@


@pytest.mark.parametrize(
"production_source_counts, percentage_of_prod, expected_results",
"staging_source_counts, percentage_of_prod, expected_results",
[
(
{"jamendo": 10_000, "freesound": 20_000, "wikimedia_audio": 10_000},
Expand Down Expand Up @@ -69,27 +69,27 @@
0.5,
[
{
# Note that each source gets 1_667 records (because it is
# rounded up), for a total of 5_001 records in the new index.
"max_docs": 1_667,
# Note that each source gets 1_666 records (because it is
# rounded), for a total of 4_998 records in the new index.
"max_docs": 1_666,
"query": {"bool": {"filter": [{"term": {"source": "flickr"}}]}},
},
{
"max_docs": 1_667,
"max_docs": 1_666,
"query": {"bool": {"filter": [{"term": {"source": "stocksnap"}}]}},
},
{
"max_docs": 1_667,
"max_docs": 1_666,
"query": {"bool": {"filter": [{"term": {"source": "smk"}}]}},
},
],
),
],
)
def test_get_proportional_source_count_kwargs(
production_source_counts, percentage_of_prod, expected_results
staging_source_counts, percentage_of_prod, expected_results
):
actual_results = get_proportional_source_count_kwargs.function(
production_source_counts, percentage_of_prod
staging_source_counts, percentage_of_prod
)
assert actual_results == expected_results

0 comments on commit 5a053be

Please sign in to comment.