Skip to content

Commit

Permalink
Switch test_views_home.py from parsing files directly to using fixtur…
Browse files Browse the repository at this point in the history
…es (#44788)

Although this test and UI will go away before release, the direct use of
`DagFileProcessor` to update/create things in the database is going to stop
shortly once DAG parsing is switched over to use Task SDK in the subprocess,
so to make that PR easier to review this change is being made now.
  • Loading branch information
ashb authored Dec 9, 2024
1 parent ee07d63 commit cf4f2ca
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 54 deletions.
88 changes: 34 additions & 54 deletions tests/www/views/test_views_home.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import markupsafe
import pytest

from airflow.dag_processing.processor import DagFileProcessor
from airflow.models.errors import ParseImportError
from airflow.security import permissions
from airflow.utils.state import State
from airflow.www.utils import UIAlert
Expand All @@ -37,7 +37,7 @@
client_with_login,
)

pytestmark = pytest.mark.db_test
pytestmark = [pytest.mark.db_test, pytest.mark.need_serialized_dag]


def clean_db():
Expand Down Expand Up @@ -203,81 +203,58 @@ def client_single_dag_edit(app, user_single_dag_edit):
TEST_TAGS = ["example", "test", "team", "group"]


def _process_file(file_path):
dag_file_processor = DagFileProcessor(dag_ids=[], dag_directory="/tmp", log=mock.MagicMock())
dag_file_processor.process_file(file_path, [])


@pytest.fixture
def _working_dags(tmp_path):
dag_contents_template = "from airflow import DAG\ndag = DAG('{}', schedule=None, tags=['{}'])"
def _working_dags(dag_maker):
for dag_id, tag in zip(TEST_FILTER_DAG_IDS, TEST_TAGS):
path = tmp_path / f"{dag_id}.py"
path.write_text(dag_contents_template.format(dag_id, tag))
_process_file(path)
with dag_maker(dag_id=dag_id, fileloc=f"/{dag_id}.py", tags=[tag]):
# We need to enter+exit the dag maker context for it to create the dag
pass


@pytest.fixture
def _working_dags_with_read_perm(tmp_path):
dag_contents_template = "from airflow import DAG\ndag = DAG('{}', schedule=None, tags=['{}'])"
dag_contents_template_with_read_perm = (
"from airflow import DAG\ndag = DAG('{}', schedule=None, tags=['{}'], "
"access_control={{'role_single_dag':{{'can_read'}}}}) "
)
def _working_dags_with_read_perm(dag_maker):
for dag_id, tag in zip(TEST_FILTER_DAG_IDS, TEST_TAGS):
path = tmp_path / f"{dag_id}.py"
if dag_id == "filter_test_1":
path.write_text(dag_contents_template_with_read_perm.format(dag_id, tag))
access_control = {"role_single_dag": {"can_read"}}
else:
path.write_text(dag_contents_template.format(dag_id, tag))
_process_file(path)
access_control = None

with dag_maker(dag_id=dag_id, fileloc=f"/{dag_id}.py", tags=[tag], access_control=access_control):
pass


@pytest.fixture
def _working_dags_with_edit_perm(tmp_path):
dag_contents_template = "from airflow import DAG\ndag = DAG('{}', schedule=None, tags=['{}'])"
dag_contents_template_with_read_perm = (
"from airflow import DAG\ndag = DAG('{}', schedule=None, tags=['{}'], "
"access_control={{'role_single_dag':{{'can_edit'}}}}) "
)
def _working_dags_with_edit_perm(dag_maker):
for dag_id, tag in zip(TEST_FILTER_DAG_IDS, TEST_TAGS):
path = tmp_path / f"{dag_id}.py"
if dag_id == "filter_test_1":
path.write_text(dag_contents_template_with_read_perm.format(dag_id, tag))
access_control = {"role_single_dag": {"can_edit"}}
else:
path.write_text(dag_contents_template.format(dag_id, tag))
_process_file(path)
access_control = None


@pytest.fixture
def _broken_dags(tmp_path, _working_dags):
for dag_id in TEST_FILTER_DAG_IDS:
path = tmp_path / f"{dag_id}.py"
path.write_text("airflow DAG")
_process_file(path)
with dag_maker(dag_id=dag_id, fileloc=f"/{dag_id}.py", tags=[tag], access_control=access_control):
pass


@pytest.fixture
def _broken_dags_with_read_perm(tmp_path, _working_dags_with_read_perm):
def _broken_dags(session):
from airflow.models.errors import ParseImportError

for dag_id in TEST_FILTER_DAG_IDS:
path = tmp_path / f"{dag_id}.py"
path.write_text("airflow DAG")
_process_file(path)
session.add(ParseImportError(filename=f"/{dag_id}.py", stacktrace="Some Error\nTraceback:\n"))
session.commit()


@pytest.fixture
def _broken_dags_after_working(tmp_path):
def _broken_dags_after_working(dag_maker, session):
# First create and process a DAG file that works
path = tmp_path / "all_in_one.py"
contents = "from airflow import DAG\n"
for i, dag_id in enumerate(TEST_FILTER_DAG_IDS):
contents += f"dag{i} = DAG('{dag_id}', schedule=None)\n"
path.write_text(contents)
_process_file(path)
path = "/all_in_one.py"
for dag_id in TEST_FILTER_DAG_IDS:
with dag_maker(dag_id=dag_id, fileloc=path, session=session):
pass

contents += "foobar()"
path.write_text(contents)
_process_file(path)
# Then create an import error against that file
session.add(ParseImportError(filename=path, stacktrace="Some Error\nTraceback:\n"))
session.commit()


def test_home_filter_tags(_working_dags, admin_client):
Expand All @@ -289,6 +266,7 @@ def test_home_filter_tags(_working_dags, admin_client):
assert flask.session[FILTER_TAGS_COOKIE] is None


@pytest.mark.usefixtures("_broken_dags", "_working_dags")
def test_home_importerrors(_broken_dags, user_client):
# Users with "can read on DAGs" gets all DAG import errors
resp = user_client.get("home", follow_redirects=True)
Expand All @@ -297,6 +275,7 @@ def test_home_importerrors(_broken_dags, user_client):
check_content_in_response(f"/{dag_id}.py", resp)


@pytest.mark.usefixtures("_broken_dags", "_working_dags")
def test_home_no_importerrors_perm(_broken_dags, client_no_importerror):
# Users without "can read on import errors" don't see any import errors
resp = client_no_importerror.get("home", follow_redirects=True)
Expand All @@ -315,7 +294,8 @@ def test_home_no_importerrors_perm(_broken_dags, client_no_importerror):
"home?lastrun=all_states",
],
)
def test_home_importerrors_filtered_singledag_user(_broken_dags_with_read_perm, client_single_dag, page):
@pytest.mark.usefixtures("_working_dags_with_read_perm", "_broken_dags")
def test_home_importerrors_filtered_singledag_user(client_single_dag, page):
# Users that can only see certain DAGs get a filtered list of import errors
resp = client_single_dag.get(page, follow_redirects=True)
check_content_in_response("Import Errors", resp)
Expand Down
6 changes: 6 additions & 0 deletions tests_common/pytest_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,12 @@ def __exit__(self, type, value, traceback):

dag.clear(session=self.session)
dag.sync_to_db(processor_subdir=self.processor_subdir, session=self.session)

if dag.access_control:
from airflow.www.security_appless import ApplessAirflowSecurityManager

security_manager = ApplessAirflowSecurityManager(session=self.session)
security_manager.sync_perm_for_dag(dag.dag_id, dag.access_control)
self.dag_model = self.session.get(DagModel, dag.dag_id)

if self.want_serialized:
Expand Down

0 comments on commit cf4f2ca

Please sign in to comment.