Skip to content

Commit

Permalink
uv bump and use on airflow (#25766)
Browse files Browse the repository at this point in the history
* bump `uv` pin
* enable `uv` for `airflow` `tox` setups
* remove need to import `dagster_airflow_tests` by directly using
`pytest.mark`
* pin `pendulum` next to existing "temporary" `airflow` pin to < 2.8
  • Loading branch information
alangenfeld authored Nov 6, 2024
1 parent 0d1953b commit 30c52d7
Show file tree
Hide file tree
Showing 19 changed files with 48 additions and 75 deletions.
2 changes: 1 addition & 1 deletion .buildkite/dagster-buildkite/dagster_buildkite/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class GroupStep(TypedDict):
BuildkiteLeafStep = Union[CommandStep, TriggerStep, WaitStep]
BuildkiteTopLevelStep = Union[CommandStep, GroupStep]

UV_PIN = "uv==0.4.8"
UV_PIN = "uv==0.4.30"


def is_command_step(step: BuildkiteStep) -> TypeGuard[CommandStep]:
Expand Down
4 changes: 1 addition & 3 deletions examples/with_airflow/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ passenv =
CI_*
COVERALLS_REPO_TOKEN
BUILDKITE*
; uv has trouble with resolve
; https://buildkite.com/dagster/dagster-dagster/builds/76502#018dd221-e24c-40c2-a459-693bdb456f8f
; install_command = uv pip install {opts} {packages}
install_command = uv pip install {opts} {packages}
deps =
-e ../../python_modules/dagster[test]
-e ../../python_modules/dagster-pipes
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import os
import tempfile

import pytest
from airflow.models import DagBag, Variable
from dagster_airflow import make_dagster_job_from_airflow_dag, make_ephemeral_airflow_db_resource

from dagster_airflow_tests.marks import requires_local_db

DAG_RUN_CONF_DAG = """
from airflow import models
Expand All @@ -30,7 +29,7 @@ def test_function(**kwargs):
"""


@requires_local_db
@pytest.mark.requires_local_db
def test_dag_run_conf_local() -> None:
with tempfile.TemporaryDirectory() as dags_path:
with open(os.path.join(dags_path, "dag.py"), "wb") as f:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
from dagster._check import CheckError
from dagster_airflow import load_assets_from_airflow_dag, make_ephemeral_airflow_db_resource

from dagster_airflow_tests.marks import requires_local_db

ASSET_DAG = """
from airflow import models
Expand Down Expand Up @@ -39,7 +37,7 @@


@pytest.mark.skipif(airflow_version >= "2.0.0", reason="requires airflow 1")
@requires_local_db
@pytest.mark.requires_local_db
def test_load_assets_from_airflow_dag():
with tempfile.TemporaryDirectory(suffix="assets") as tmpdir_path:
with open(os.path.join(tmpdir_path, "dag.py"), "wb") as f:
Expand Down Expand Up @@ -82,7 +80,7 @@ def new_upstream_asset():


@pytest.mark.skipif(airflow_version >= "2.0.0", reason="requires airflow 1")
@requires_local_db
@pytest.mark.requires_local_db
def test_load_assets_from_airflow_dag_multiple_tasks_per_asset():
with tempfile.TemporaryDirectory(suffix="assets") as tmpdir_path:
with open(os.path.join(tmpdir_path, "dag.py"), "wb") as f:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
from dagster import AssetKey, asset, materialize
from dagster_airflow import load_assets_from_airflow_dag, make_ephemeral_airflow_db_resource

from dagster_airflow_tests.marks import requires_local_db

ASSET_DAG = """
from airflow import models
Expand Down Expand Up @@ -39,7 +37,7 @@


@pytest.mark.skipif(airflow_version < "2.0.0", reason="requires airflow 2")
@requires_local_db
@pytest.mark.requires_local_db
def test_load_assets_from_airflow_dag():
with tempfile.TemporaryDirectory(suffix="assets") as tmpdir_path:
with open(os.path.join(tmpdir_path, "dag.py"), "wb") as f:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
)

from dagster_airflow_tests.airflow_utils import test_make_from_dagbag_inputs
from dagster_airflow_tests.marks import requires_local_db


@pytest.mark.skipif(airflow_version >= "2.0.0", reason="requires airflow 1")
Expand Down Expand Up @@ -104,7 +103,7 @@ def test_make_definitions(
"expected_job_names, exclude_from_execution_tests",
test_airflow_example_dags_inputs,
)
@requires_local_db
@pytest.mark.requires_local_db
def test_airflow_example_dags(
expected_job_names,
exclude_from_execution_tests,
Expand Down Expand Up @@ -147,7 +146,7 @@ def test_airflow_example_dags(


@pytest.mark.skipif(airflow_version >= "2.0.0", reason="requires airflow 1")
@requires_local_db
@pytest.mark.requires_local_db
def test_retry_conversion():
with tempfile.TemporaryDirectory(suffix="retries") as tmpdir_path:
with open(os.path.join(tmpdir_path, "dag.py"), "wb") as f:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@
)

from dagster_airflow_tests.airflow_utils import test_make_from_dagbag_inputs_airflow_2
from dagster_airflow_tests.marks import requires_local_db, requires_no_db


@pytest.mark.skipif(airflow_version < "2.0.0", reason="requires airflow 2")
@pytest.mark.parametrize(
"path_and_content_tuples, fn_arg_path, expected_job_names",
test_make_from_dagbag_inputs_airflow_2,
)
@requires_no_db
@pytest.mark.requires_no_db
def test_make_definition(
path_and_content_tuples,
fn_arg_path,
Expand Down Expand Up @@ -94,7 +93,7 @@ def get_examples_airflow_repo_params():
"job_name, exclude_from_execution_tests",
get_examples_airflow_repo_params(),
)
@requires_local_db
@pytest.mark.requires_local_db
def test_airflow_example_dags(
airflow_examples_repo,
job_name,
Expand Down Expand Up @@ -132,7 +131,7 @@ def test_airflow_example_dags(


@pytest.mark.skipif(airflow_version < "2.0.0", reason="requires airflow 2")
@requires_local_db
@pytest.mark.requires_local_db
def test_retry_conversion():
with tempfile.TemporaryDirectory(suffix="retries") as tmpdir_path:
with open(os.path.join(tmpdir_path, "dag.py"), "wb") as f:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
from airflow.models import Connection, TaskInstance
from dagster_airflow import DagsterCloudOperator

from dagster_airflow_tests.marks import requires_local_db

if airflow_version >= "2.0.0":
from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType
Expand All @@ -35,7 +33,7 @@
)


@requires_local_db
@pytest.mark.requires_local_db
class TestDagsterOperator(unittest.TestCase):
@mock.patch("dagster_airflow.hooks.dagster_hook.DagsterHook.launch_run", return_value="run_id")
@mock.patch("dagster_airflow.hooks.dagster_hook.DagsterHook.wait_for_run")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pytest
from airflow import __version__ as airflow_version
from airflow.models.dag import DAG
from airflow.operators.dummy_operator import DummyOperator # type: ignore
Expand All @@ -13,15 +14,13 @@
from dagster._serdes import serialize_pp
from dagster_airflow.dagster_job_factory import make_dagster_job_from_airflow_dag

from dagster_airflow_tests.marks import requires_no_db

default_args = {
"owner": "dagster",
"start_date": days_ago(1),
}


@requires_no_db
@pytest.mark.requires_no_db
def test_one_task_dag(snapshot):
if airflow_version >= "2.0.0":
dag = DAG(
Expand All @@ -47,7 +46,7 @@ def test_one_task_dag(snapshot):
)


@requires_no_db
@pytest.mark.requires_no_db
def test_two_task_dag_no_dep(snapshot):
if airflow_version >= "2.0.0":
dag = DAG(
Expand Down Expand Up @@ -77,7 +76,7 @@ def test_two_task_dag_no_dep(snapshot):
)


@requires_no_db
@pytest.mark.requires_no_db
def test_two_task_dag_with_dep(snapshot):
if airflow_version >= "2.0.0":
dag = DAG(
Expand Down Expand Up @@ -109,7 +108,7 @@ def test_two_task_dag_with_dep(snapshot):
)


@requires_no_db
@pytest.mark.requires_no_db
def test_diamond_task_dag(snapshot):
if airflow_version >= "2.0.0":
dag = DAG(
Expand Down Expand Up @@ -151,7 +150,7 @@ def test_diamond_task_dag(snapshot):
)


@requires_no_db
@pytest.mark.requires_no_db
def test_multi_root_dag(snapshot):
if airflow_version >= "2.0.0":
dag = DAG(
Expand Down Expand Up @@ -193,7 +192,7 @@ def test_multi_root_dag(snapshot):
)


@requires_no_db
@pytest.mark.requires_no_db
def test_multi_leaf_dag(snapshot):
if airflow_version >= "2.0.0":
dag = DAG(
Expand Down Expand Up @@ -234,7 +233,7 @@ def test_multi_leaf_dag(snapshot):
)


@requires_no_db
@pytest.mark.requires_no_db
def test_complex_dag(snapshot):
if airflow_version >= "2.0.0":
dag = DAG(
Expand Down Expand Up @@ -489,7 +488,7 @@ def test_complex_dag(snapshot):
)


@requires_no_db
@pytest.mark.requires_no_db
def test_one_task_dag_to_job():
if airflow_version >= "2.0.0":
dag = DAG(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
from airflow.models import Connection
from dagster_airflow import make_dagster_definitions_from_airflow_dags_path

from dagster_airflow_tests.marks import requires_local_db

LOAD_CONNECTION_DAG_FILE_AIRFLOW_2_CONTENTS = """
import pendulum
from airflow import DAG
Expand Down Expand Up @@ -48,7 +46,7 @@


@pytest.mark.skipif(airflow_version < "2.0.0", reason="requires airflow 2")
@requires_local_db
@pytest.mark.requires_local_db
class TestConnectionsAirflow2(unittest.TestCase):
@mock.patch("dagster_airflow.hooks.dagster_hook.DagsterHook.launch_run", return_value="run_id")
@mock.patch("dagster_airflow.hooks.dagster_hook.DagsterHook.wait_for_run")
Expand Down Expand Up @@ -111,7 +109,7 @@ def test_ingest_airflow_dags_with_connections(self, launch_run, wait_for_run):


@pytest.mark.skipif(airflow_version >= "2.0.0", reason="requires airflow 1")
@requires_local_db
@pytest.mark.requires_local_db
class TestConnectionsAirflow1(unittest.TestCase):
@mock.patch("dagster_airflow.hooks.dagster_hook.DagsterHook.launch_run", return_value="run_id")
@mock.patch("dagster_airflow.hooks.dagster_hook.DagsterHook.wait_for_run")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
)

from dagster_airflow_tests.airflow_utils import test_make_from_dagbag_inputs
from dagster_airflow_tests.marks import requires_local_db


@pytest.mark.skipif(airflow_version >= "2.0.0", reason="requires airflow 1")
Expand Down Expand Up @@ -100,7 +99,7 @@ def test_make_repo(
"expected_job_names, exclude_from_execution_tests",
test_airflow_example_dags_inputs,
)
@requires_local_db
@pytest.mark.requires_local_db
def test_airflow_example_dags(
expected_job_names,
exclude_from_execution_tests,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,14 @@
)

from dagster_airflow_tests.airflow_utils import test_make_from_dagbag_inputs_airflow_2
from dagster_airflow_tests.marks import requires_local_db, requires_no_db


@pytest.mark.skipif(airflow_version < "2.0.0", reason="requires airflow 2")
@pytest.mark.parametrize(
"path_and_content_tuples, fn_arg_path, expected_job_names",
test_make_from_dagbag_inputs_airflow_2,
)
@requires_no_db
@pytest.mark.requires_no_db
def test_make_repo(
path_and_content_tuples,
fn_arg_path,
Expand Down Expand Up @@ -93,7 +92,7 @@ def get_examples_airflow_repo_params():
"job_name, exclude_from_execution_tests",
get_examples_airflow_repo_params(),
)
@requires_local_db
@pytest.mark.requires_local_db
def test_airflow_example_dags(
airflow_examples_repo,
job_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import os
from unittest import mock

import pytest

# We ignore type errors in several places because we are importing in such a way as to be
# compatible with both versions 1.x and 2.x of airflow. This means importing from places that are
# not the blessed API of the latest version, which raises pyright "not exported" errors.
Expand All @@ -24,8 +26,6 @@
from dagster._time import get_current_datetime
from dagster_airflow import make_dagster_job_from_airflow_dag

from dagster_airflow_tests.marks import requires_no_db

default_args = {
"owner": "dagster",
"start_date": days_ago(1),
Expand All @@ -35,7 +35,7 @@
# Airflow DAG ids and Task ids allow a larger valid character set (alphanumeric characters,
# dashes, dots and underscores) than Dagster's naming conventions (alphanumeric characters,
# underscores), so Dagster will strip invalid characters and replace with '_'
@requires_no_db
@pytest.mark.requires_no_db
def test_normalize_name():
if airflow_version >= "2.0.0":
dag = DAG(
Expand Down Expand Up @@ -67,7 +67,7 @@ def test_normalize_name():


# Test names with 250 characters, Airflow's max allowed length
@requires_no_db
@pytest.mark.requires_no_db
def test_long_name():
dag_name = "dag-with.dot-dash-lo00ong" * 10
if airflow_version >= "2.0.0":
Expand Down Expand Up @@ -107,7 +107,7 @@ def test_long_name():
)


@requires_no_db
@pytest.mark.requires_no_db
def test_one_task_dag():
if airflow_version >= "2.0.0":
dag = DAG(
Expand Down Expand Up @@ -138,7 +138,7 @@ def normalize_file_content(s):
return "\n".join([line for line in s.replace(os.linesep, "\n").split("\n") if line])


@requires_no_db
@pytest.mark.requires_no_db
def test_template_task_dag(tmpdir):
if airflow_version >= "2.0.0":
dag = DAG(
Expand Down Expand Up @@ -234,7 +234,7 @@ def intercept_spark_submit(*_args, **_kwargs):
return m


@requires_no_db
@pytest.mark.requires_no_db
@mock.patch("subprocess.Popen", side_effect=intercept_spark_submit)
def test_spark_dag(mock_subproc_popen):
# Hack to get around having a Connection
Expand Down
Loading

0 comments on commit 30c52d7

Please sign in to comment.