Skip to content

Commit

Permalink
Remove RETRY_STRATEGY_TAG from the correct list of tags (#26566)
Browse files Browse the repository at this point in the history
Summary:
#26358 intended to remove this
tag from the list of tags not to copy over on a retried run, but instead
removed it from the list of tags that you can't set yourself in code.
Revert that change, apply it instead to the correct list of tags, and
add tests for both lists.

Test Plan: BK tests that were failing before

## Summary & Motivation

## How I Tested These Changes

## Changelog

> Insert changelog entry or delete this section.
  • Loading branch information
gibsondan authored Dec 18, 2024
1 parent f84bb2c commit 831c8fc
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
AUTO_RETRY_RUN_ID_TAG,
MAX_RETRIES_TAG,
PARENT_RUN_ID_TAG,
RESUME_RETRY_TAG,
RETRY_ON_ASSET_OR_OP_FAILURE_TAG,
RETRY_STRATEGY_TAG,
ROOT_RUN_ID_TAG,
Expand Down Expand Up @@ -395,7 +396,15 @@ def test_consume_new_runs_for_automatic_reexecution(instance, workspace_context)
assert len(instance.run_coordinator.queue()) == 0

# retries failure
run = create_run(instance, status=DagsterRunStatus.STARTED, tags={MAX_RETRIES_TAG: "2"})
run = create_run(
instance,
status=DagsterRunStatus.STARTED,
tags={
MAX_RETRIES_TAG: "2",
RESUME_RETRY_TAG: "true",
RETRY_STRATEGY_TAG: "ALL_STEPS",
},
)
dagster_event = DagsterEvent(
event_type_value=DagsterEventType.PIPELINE_FAILURE.value,
job_name="foo",
Expand Down Expand Up @@ -427,6 +436,10 @@ def test_consume_new_runs_for_automatic_reexecution(instance, workspace_context)
run = instance.get_run_by_id(run.run_id)
assert run.tags.get(AUTO_RETRY_RUN_ID_TAG) == first_retry.run_id

# retry strategy is copied, "is_resume_retry" is not since the retry strategy is ALL_STEPS
assert RESUME_RETRY_TAG not in first_retry.tags
assert first_retry.tags.get(RETRY_STRATEGY_TAG) == "ALL_STEPS"

# doesn't retry again
list(
consume_new_runs_for_automatic_reexecution(
Expand Down
4 changes: 2 additions & 2 deletions python_modules/dagster/dagster/_core/storage/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
USER_EDITABLE_SYSTEM_TAGS = [
PRIORITY_TAG,
MAX_RETRIES_TAG,
RESUME_RETRY_TAG,
RETRY_STRATEGY_TAG,
MAX_RUNTIME_SECONDS_TAG,
RUN_ISOLATION_TAG,
RETRY_ON_ASSET_OR_OP_FAILURE_TAG,
Expand All @@ -115,7 +115,7 @@
*RUN_METRIC_TAGS,
RUN_FAILURE_REASON_TAG,
RETRY_NUMBER_TAG,
RETRY_STRATEGY_TAG,
RESUME_RETRY_TAG,
WILL_RETRY_TAG,
AUTO_RETRY_RUN_ID_TAG,
*BACKFILL_TAGS,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from dagster import job, op
import pytest
from dagster import ScheduleDefinition, job, op
from dagster._core.errors import DagsterInvalidDefinitionError
from dagster._core.storage.tags import RESUME_RETRY_TAG, RETRY_STRATEGY_TAG


def test_op_tags():
Expand Down Expand Up @@ -45,3 +48,22 @@ def no_tags_job():
noop_op()

assert no_tags_job.get_subset(op_selection=["noop_op"]).tags == {}


def test_user_editable_system_tags():
@op
def noop_op(_):
pass

@job
def noop_job():
noop_op()

ScheduleDefinition(
job=noop_job, cron_schedule="* * * * *", tags={RETRY_STRATEGY_TAG: "ALL_STEPS"}
)

with pytest.raises(
DagsterInvalidDefinitionError, match="Attempted to set tag with reserved system prefix"
):
ScheduleDefinition(job=noop_job, cron_schedule="* * * * *", tags={RESUME_RETRY_TAG: "true"})

0 comments on commit 831c8fc

Please sign in to comment.