Skip to content

Commit

Permalink
Fix broken build due to XCOM config
Browse files Browse the repository at this point in the history
  • Loading branch information
Abhishek Ray committed Jan 2, 2020
1 parent 9335fa1 commit 38559b6
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 15 deletions.
9 changes: 9 additions & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
include README.md
include MANIFEST.in
include setup.cfg
include setup.py
recursive-include airflow_prometheus_exporter/ *

recursive-exclude * __pycache__
recursive-exclude * *.py[co]
recursive-exclude * .*.sw[a-z]
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ value of configurable parameter in xcom table

xcom fields is deserialized as a dictionary and if key is found for a paticular task-id, the value is reported as a guage

add task / key combinations in config.yaml:
```
Add task / key combinations in config.yaml:

```bash
xcom_params:
-
task_id: abc
Expand Down
File renamed without changes.
10 changes: 5 additions & 5 deletions airflow_prometheus_exporter/prometheus_exporter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Prometheus exporter for Airflow."""

import json
import pickle
from contextlib import contextmanager

from airflow.configuration import conf
Expand All @@ -8,15 +9,13 @@
from airflow.settings import Session
from airflow.utils.state import State
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow_prometheus_exporter.xcom_config import xcom_config
from flask import Response
from flask_admin import BaseView, expose
import json
import pickle
from prometheus_client import generate_latest, REGISTRY
from prometheus_client.core import GaugeMetricFamily
from sqlalchemy import and_, func

from airflow_prometheus_exporter.xcom_config import load_xcom_config

CANARY_DAG = "canary_dag"

Expand Down Expand Up @@ -439,7 +438,8 @@ def collect(self):
labels=["dag_id", "task_id"],
)

for tasks in xcom_config["xcom_params"]:
xcom_config = load_xcom_config()
for tasks in xcom_config.get("xcom_params", []):
for param in get_xcom_params(tasks["task_id"]):
xcom_value = extract_xcom_parameter(param.value)

Expand Down
19 changes: 11 additions & 8 deletions airflow_prometheus_exporter/xcom_config.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import yaml
import os
from pathlib import Path

dir = os.path.dirname(__file__)
filename = os.path.join(dir, "../config.yaml")
CONFIG_FILE = Path.cwd() / "config.yaml"


xcom_config = {}
with open(filename) as file:
# The FullLoader parameter handles the conversion from YAML
# scalar values to Python the dictionary format
xcom_config = yaml.load(file, Loader=yaml.FullLoader)
def load_xcom_config():
"""Loads the XCom config if present."""
try:
with open(CONFIG_FILE) as file:
# The FullLoader parameter handles the conversion from YAML
# scalar values to Python the dictionary format
return yaml.load(file, Loader=yaml.FullLoader)
except FileNotFoundError:
return {}
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
keywords='airflow_prometheus_exporter',
name='airflow_prometheus_exporter',
packages=find_packages(include=['airflow_prometheus_exporter']),
include_package_data=True,
url='https://github.com/robinhood/airflow_prometheus_exporter',
version='1.0.5',
entry_points={
Expand Down

0 comments on commit 38559b6

Please sign in to comment.