diff --git a/airflow/assets/configuration/spec.yaml b/airflow/assets/configuration/spec.yaml index a78029f9ec994..cc4b4141d0ba8 100644 --- a/airflow/assets/configuration/spec.yaml +++ b/airflow/assets/configuration/spec.yaml @@ -13,6 +13,12 @@ files: description: The URL used to connect to the Airflow instance (use the Airflow web server REST API endpoint). value: type: string + - name: collect_ongoing_duration + required: false + description: Collect ongoing duration metric for DAG task instances. + value: + type: boolean + example: true - template: instances/http - template: instances/default - template: logs diff --git a/airflow/changelog.d/19278.added b/airflow/changelog.d/19278.added new file mode 100644 index 0000000000000..45bf7d91d95ab --- /dev/null +++ b/airflow/changelog.d/19278.added @@ -0,0 +1 @@ +Use `start_date` instead of `execution_date` for ongoing duration metrics \ No newline at end of file diff --git a/airflow/datadog_checks/airflow/airflow.py b/airflow/datadog_checks/airflow/airflow.py index f9d3e8157fe02..9e1f082520d87 100644 --- a/airflow/datadog_checks/airflow/airflow.py +++ b/airflow/datadog_checks/airflow/airflow.py @@ -19,7 +19,7 @@ def __init__(self, name, init_config, instances): self._url = self.instance.get('url', '') self._tags = self.instance.get('tags', []) - + self._collect_ongoing_duration = self.instance.get('collect_ongoing_duration', True) # The Agent only makes one attempt to instantiate each AgentCheck so any errors occurring # in `__init__` are logged just once, making it difficult to spot. Therefore, we emit # potential configuration errors as part of the check run phase. @@ -51,7 +51,7 @@ def check(self, _): else: submit_metrics(resp, tags) # Only calculate task duration for stable API - if target_url is url_stable: + if target_url is url_stable and self._collect_ongoing_duration: task_instances = self._get_all_task_instances(url_stable_task_instances, tags) if task_instances: self._calculate_task_ongoing_duration(task_instances, tags) @@ -118,14 +118,14 @@ def _calculate_task_ongoing_duration(self, tasks, tags): dag_task_tags = copy(tags) task_id = task.get('task_id') dag_id = task.get('dag_id') - execution_date = task.get('execution_date') + start_date = task.get('start_date') # Add tags for each task dag_task_tags.append('dag_id:{}'.format(dag_id)) dag_task_tags.append('task_id:{}'.format(task_id)) # Calculate ongoing duration - ongoing_duration = get_timestamp() - datetime.fromisoformat((execution_date)).timestamp() + ongoing_duration = get_timestamp() - datetime.fromisoformat((start_date)).timestamp() self.gauge('airflow.dag.task.ongoing_duration', ongoing_duration, tags=dag_task_tags) def _parse_config(self): diff --git a/airflow/datadog_checks/airflow/config_models/defaults.py b/airflow/datadog_checks/airflow/config_models/defaults.py index 81b466607723e..c88e64022a89a 100644 --- a/airflow/datadog_checks/airflow/config_models/defaults.py +++ b/airflow/datadog_checks/airflow/config_models/defaults.py @@ -24,6 +24,10 @@ def instance_auth_type(): return 'basic' +def instance_collect_ongoing_duration(): + return True + + def instance_disable_generic_tags(): return False diff --git a/airflow/datadog_checks/airflow/config_models/instance.py b/airflow/datadog_checks/airflow/config_models/instance.py index e31c99d217e86..265191f70aa1e 100644 --- a/airflow/datadog_checks/airflow/config_models/instance.py +++ b/airflow/datadog_checks/airflow/config_models/instance.py @@ -60,6 +60,7 @@ class InstanceConfig(BaseModel): aws_host: Optional[str] = None aws_region: Optional[str] = None aws_service: Optional[str] = None + collect_ongoing_duration: Optional[bool] = None connect_timeout: Optional[float] = None disable_generic_tags: Optional[bool] = None empty_default_hostname: Optional[bool] = None diff --git a/airflow/datadog_checks/airflow/data/conf.yaml.example b/airflow/datadog_checks/airflow/data/conf.yaml.example index 260c385662bcd..31105f3bf816f 100644 --- a/airflow/datadog_checks/airflow/data/conf.yaml.example +++ b/airflow/datadog_checks/airflow/data/conf.yaml.example @@ -50,6 +50,11 @@ instances: # - url: + ## @param collect_ongoing_duration - boolean - optional - default: true + ## Collect ongoing duration metric for DAG task instances. + # + # collect_ongoing_duration: true + ## @param proxy - mapping - optional ## This overrides the `proxy` setting in `init_config`. ## diff --git a/airflow/tests/test_unit.py b/airflow/tests/test_unit.py index 0cab38355d9ac..033334be280d7 100644 --- a/airflow/tests/test_unit.py +++ b/airflow/tests/test_unit.py @@ -118,3 +118,44 @@ def test_dag_task_ongoing_duration(aggregator, task_instance): tags=['key:my-tag', 'url:http://localhost:8080', 'dag_id:tutorial', 'task_id:sleep'], count=1, ) + + +@pytest.mark.parametrize( + "collect_ongoing_duration, should_call_method", + [ + pytest.param( + True, + [ + mock.call( + 'http://localhost:8080/api/v1/dags/~/dagRuns/~/taskInstances?state=running', + ['url:http://localhost:8080', 'key:my-tag'], + ) + ], + id="collect", + ), + pytest.param( + False, + [], + id="don't collect", + ), + ], +) +def test_config_collect_ongoing_duration(collect_ongoing_duration, should_call_method): + instance = {**common.FULL_CONFIG['instances'][0], 'collect_ongoing_duration': collect_ongoing_duration} + check = AirflowCheck('airflow', common.FULL_CONFIG, [instance]) + + with mock.patch('datadog_checks.airflow.airflow.AirflowCheck._get_version', return_value='2.6.2'): + with mock.patch('datadog_checks.base.utils.http.requests') as req: + mock_resp = mock.MagicMock(status_code=200) + mock_resp.json.side_effect = [ + {'metadatabase': {'status': 'healthy'}, 'scheduler': {'status': 'healthy'}}, + ] + req.get.return_value = mock_resp + + with mock.patch( + 'datadog_checks.airflow.airflow.AirflowCheck._get_all_task_instances' + ) as mock_get_all_task_instances: + check.check(None) + + # Assert method calls + mock_get_all_task_instances.assert_has_calls(should_call_method, any_order=False)