diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..85d1cac --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,9 @@ +include README.md +include MANIFEST.in +include setup.cfg +include setup.py +recursive-include airflow_prometheus_exporter/ * + +recursive-exclude * __pycache__ +recursive-exclude * *.py[co] +recursive-exclude * .*.sw[a-z] diff --git a/README.md b/README.md index eb7122b..e364f3c 100644 --- a/README.md +++ b/README.md @@ -51,8 +51,9 @@ value of configurable parameter in xcom table xcom fields is deserialized as a dictionary and if key is found for a paticular task-id, the value is reported as a guage -add task / key combinations in config.yaml: -``` +Add task / key combinations in config.yaml: + +```bash xcom_params: - task_id: abc diff --git a/config.yaml b/airflow_prometheus_exporter/config.yaml similarity index 100% rename from config.yaml rename to airflow_prometheus_exporter/config.yaml diff --git a/airflow_prometheus_exporter/prometheus_exporter.py b/airflow_prometheus_exporter/prometheus_exporter.py index aab58fc..5f0b479 100644 --- a/airflow_prometheus_exporter/prometheus_exporter.py +++ b/airflow_prometheus_exporter/prometheus_exporter.py @@ -1,5 +1,6 @@ """Prometheus exporter for Airflow.""" - +import json +import pickle from contextlib import contextmanager from airflow.configuration import conf @@ -8,15 +9,13 @@ from airflow.settings import Session from airflow.utils.state import State from airflow.utils.log.logging_mixin import LoggingMixin -from airflow_prometheus_exporter.xcom_config import xcom_config from flask import Response from flask_admin import BaseView, expose -import json -import pickle from prometheus_client import generate_latest, REGISTRY from prometheus_client.core import GaugeMetricFamily from sqlalchemy import and_, func +from airflow_prometheus_exporter.xcom_config import load_xcom_config CANARY_DAG = "canary_dag" @@ -439,7 +438,8 @@ def collect(self): labels=["dag_id", "task_id"], ) - for tasks in xcom_config["xcom_params"]: + xcom_config = load_xcom_config() + for tasks in xcom_config.get("xcom_params", []): for param in get_xcom_params(tasks["task_id"]): xcom_value = extract_xcom_parameter(param.value) diff --git a/airflow_prometheus_exporter/xcom_config.py b/airflow_prometheus_exporter/xcom_config.py index 8fe626f..9ec01fa 100644 --- a/airflow_prometheus_exporter/xcom_config.py +++ b/airflow_prometheus_exporter/xcom_config.py @@ -1,12 +1,15 @@ import yaml -import os +from pathlib import Path -dir = os.path.dirname(__file__) -filename = os.path.join(dir, "../config.yaml") +CONFIG_FILE = Path.cwd() / "config.yaml" -xcom_config = {} -with open(filename) as file: - # The FullLoader parameter handles the conversion from YAML - # scalar values to Python the dictionary format - xcom_config = yaml.load(file, Loader=yaml.FullLoader) +def load_xcom_config(): + """Loads the XCom config if present.""" + try: + with open(CONFIG_FILE) as file: + # The FullLoader parameter handles the conversion from YAML + # scalar values to Python the dictionary format + return yaml.load(file, Loader=yaml.FullLoader) + except FileNotFoundError: + return {} diff --git a/setup.py b/setup.py index 76cd136..4635edc 100644 --- a/setup.py +++ b/setup.py @@ -41,6 +41,7 @@ keywords='airflow_prometheus_exporter', name='airflow_prometheus_exporter', packages=find_packages(include=['airflow_prometheus_exporter']), + include_package_data=True, url='https://github.com/robinhood/airflow_prometheus_exporter', version='1.0.5', entry_points={