Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename execution_date to logical_date across codebase #43902

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

sunank200
Copy link
Collaborator

@sunank200 sunank200 commented Nov 11, 2024

Motivation

This PR renames execution_date to logical_date across the codebase. The shift towards logical_date helps move away from the limitations of execution_date, particularly with dynamic DAG runs and cases where multiple runs occur at the same time.

Key Changes

  • Replaced all instances of execution_date with logical_date including models.
  • Updated references in database models, templates, and functions.
  • Removed unique constraints on execution_date in the database to allow multiple DAG runs with the same logical time.

How execution_date and logical_date Work

  1. Logical date is equivalent to execution date: The two are just different names for the same value.
  2. Timetable controls logical date: The logical date can be set to any value, not necessarily tied to the data interval's start or end.
  3. Schedules dictate behavior: For value-based schedules (like cron), the logical date is set by the timetable class used.

Transitioning from execution_date

  • Airflow 3.0 will fully remove execution_date.
  • For uniquely identifying runs:
    • Use run_id for human-readable identifiers.
    • Use data_interval_start/data_interval_end for data partitioning in future.
    • id (auto-incremented) is recommended for ordering DAG runs.
  • If your existing logic relies on execution_date, switch to using data_interval_start or data_interval_end for identifying time ranges.

Testing

  • Updated unit tests to reflect the changes.

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added area:API Airflow's REST/HTTP API area:CLI area:Scheduler including HA (high availability) scheduler provider:cncf-kubernetes Kubernetes provider related issues labels Nov 11, 2024
@sunank200 sunank200 added legacy api Whether legacy API changes should be allowed in PR legacy ui Whether legacy UI change should be allowed in PR labels Nov 11, 2024
@sunank200 sunank200 force-pushed the rename-execution-date-to-logical-date branch 9 times, most recently from ea36b6c to 65f9b8e Compare November 12, 2024 02:54
@uranusjr
Copy link
Member

This needs a news fragment. Rewording the above summary would work fine. See existing files in newsfragments (the significant ones) for some examples.

@uranusjr
Copy link
Member

uranusjr commented Nov 12, 2024

For templates, replace {{ ds }} with {{ data_interval_start | ds }}.

This should probably use logical_date for max compatibility. Also should mention ts (same rewrite).

Oh wait, we’re not removing those in this PR yet, so this should not be mentioned at all.

@@ -410,7 +410,7 @@ def string_lower_type(val):
# trigger_dag
ARG_RUN_ID = Arg(("-r", "--run-id"), help="Helps to identify this run")
ARG_CONF = Arg(("-c", "--conf"), help="JSON string that gets pickled into the DagRun's conf attribute")
ARG_EXEC_DATE = Arg(("-e", "--exec-date"), help="The execution date of the DAG", type=parsedate)
ARG_EXEC_DATE = Arg(("-e", "--exec-date"), help="The logical date of the DAG", type=parsedate)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also rename this flag in a future PR.

airflow/models/taskreschedule.py Show resolved Hide resolved
airflow/models/xcom.py Show resolved Hide resolved
airflow/utils/context.py Outdated Show resolved Hide resolved
airflow/utils/context.pyi Outdated Show resolved Hide resolved
@sunank200 sunank200 force-pushed the rename-execution-date-to-logical-date branch 10 times, most recently from 5184a34 to 12bc242 Compare November 13, 2024 17:58
@sunank200 sunank200 force-pushed the rename-execution-date-to-logical-date branch from 12bc242 to 9897584 Compare November 13, 2024 20:41
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also needs to mention that some keys in the context have been removed.

@@ -451,7 +426,6 @@ def context_copy_partial(source: Context, keys: Container[str]) -> Context:
:meta private:
"""
new = Context({k: v for k, v in source._context.items() if k in keys})
new._deprecation_replacements = source._deprecation_replacements.copy()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line should not be removed.

Comment on lines +274 to +276
logical_date = (
dag_run.logical_date.isoformat() if AIRFLOW_V_3_0_PLUS else dag_run.execution_date.isoformat()
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logical_date = (
dag_run.logical_date.isoformat() if AIRFLOW_V_3_0_PLUS else dag_run.execution_date.isoformat()
)
logical_date = dag_run.logical_date.isoformat()

dag_run.logical_date is available on Airflow 2.

Comment on lines +48 to +49
AIRFLOW_VERSION = Version(airflow_version)
AIRFLOW_V_3_0_PLUS = Version(AIRFLOW_VERSION.base_version) >= Version("3.0.0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed anymore?

Comment on lines +47 to +48
AIRFLOW_VERSION = Version(airflow_version)
AIRFLOW_V_3_0_PLUS = Version(AIRFLOW_VERSION.base_version) >= Version("3.0.0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed?

Comment on lines +316 to +318
logical_date = (
dag_run.logical_date.isoformat() if AIRFLOW_V_3_0_PLUS else dag_run.execution_date.isoformat()
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logical_date = (
dag_run.logical_date.isoformat() if AIRFLOW_V_3_0_PLUS else dag_run.execution_date.isoformat()
)
logical_date = dag_run.logical_date.isoformat()

Comment on lines +320 to 342
return (
log_id_template.format(
dag_id=ti.dag_id,
task_id=ti.task_id,
run_id=getattr(ti, "run_id", ""),
data_interval_start=data_interval_start,
data_interval_end=data_interval_end,
logical_date=logical_date,
try_number=try_number,
map_index=getattr(ti, "map_index", ""),
)
if AIRFLOW_V_3_0_PLUS
else log_id_template.format(
dag_id=ti.dag_id,
task_id=ti.task_id,
run_id=getattr(ti, "run_id", ""),
data_interval_start=data_interval_start,
data_interval_end=data_interval_end,
execution_date=logical_date,
try_number=try_number,
map_index=getattr(ti, "map_index", ""),
)
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return (
log_id_template.format(
dag_id=ti.dag_id,
task_id=ti.task_id,
run_id=getattr(ti, "run_id", ""),
data_interval_start=data_interval_start,
data_interval_end=data_interval_end,
logical_date=logical_date,
try_number=try_number,
map_index=getattr(ti, "map_index", ""),
)
if AIRFLOW_V_3_0_PLUS
else log_id_template.format(
dag_id=ti.dag_id,
task_id=ti.task_id,
run_id=getattr(ti, "run_id", ""),
data_interval_start=data_interval_start,
data_interval_end=data_interval_end,
execution_date=logical_date,
try_number=try_number,
map_index=getattr(ti, "map_index", ""),
)
)
return log_id_template.format(
dag_id=ti.dag_id,
task_id=ti.task_id,
run_id=getattr(ti, "run_id", ""),
data_interval_start=data_interval_start,
data_interval_end=data_interval_end,
execution_date=logical_date, # Airlow 2 Compatibility.
logical_date=logical_date,
try_number=try_number,
map_index=ti.map_index,
)

task_info = {
"dag_id": context_var["dag_id"],
"task_id": context_var["task_id"],
"execution_date": context_var["execution_date"],
date_key: context_var["logical_date"] if AIRFLOW_V_3_0_PLUS else context_var["execution_date"],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
date_key: context_var["logical_date"] if AIRFLOW_V_3_0_PLUS else context_var["execution_date"],
date_key: context_var["logical_date"],

@@ -310,7 +309,7 @@ def get_tasks_to_skip():
self.skip(
dag_run=dag_run,
tasks=to_skip,
execution_date=cast("DateTime", dag_run.execution_date), # type: ignore[call-arg]
execution_date=cast("DateTime", dag_run.execution_date), # type: ignore[call-arg, union-attr]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
execution_date=cast("DateTime", dag_run.execution_date), # type: ignore[call-arg, union-attr]
execution_date=cast("DateTime", dag_run.logical_date), # type: ignore[call-arg, union-attr]

"prev_data_interval_end_success",
"prev_data_interval_start_success",
"prev_execution_date",
"prev_execution_date_success",
"prev_start_date_success",
"prev_end_date_success",
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These can not be deleted; the operator will not work correctly on Airflow 2. We can add comments these should be deleted when we drop Airflow 2 support on the provider.

task_info = {
"dag_id": context_var["dag_id"],
"task_id": context_var["task_id"],
"execution_date": context_var["execution_date"],
date_key: context_var["logical_date"] if AIRFLOW_V_3_0_PLUS else context_var["execution_date"],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
date_key: context_var["logical_date"] if AIRFLOW_V_3_0_PLUS else context_var["execution_date"],
date_key: context_var["logical_date"],

Comment on lines +33 to +34
AIRFLOW_VERSION = Version(airflow_version)
AIRFLOW_V_3_0_PLUS = Version(AIRFLOW_VERSION.base_version) >= Version("3.0.0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:API Airflow's REST/HTTP API area:CLI area:Scheduler including HA (high availability) scheduler legacy api Whether legacy API changes should be allowed in PR legacy ui Whether legacy UI change should be allowed in PR provider:cncf-kubernetes Kubernetes provider related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants