Skip to content

Latest commit

 

History

History
95 lines (77 loc) · 2.3 KB

readme.md

File metadata and controls

95 lines (77 loc) · 2.3 KB

SNPedia-airflow

  • this repo is for learning how data pipeline works using apache airflow

  • the disease lists: ['Heart_disease', 'Stroke', 'High_blood_pressure', 'Chronic_kidney_disease']

Setup airflow download docker-compose.yaml file from this command

curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.9.1/docker-compose.yaml'

create Dockerfile (for install external python libraryies )

FROM apache/airflow:latest
RUN pip install --no-cache-dir pymongo python-dotenv bs4

Import the librarys

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.utils import timezone
from bs4 import BeautifulSoup
from dotenv import load_dotenv

define DAG(direct acyclic graph)

start = EmptyOperator(task_id="start")
step = EmptyOperator(task_id="step")
end = EmptyOperator(task_id="end")

get_rs_links_from_diseases = [
    PythonOperator(
        task_id=f"get_rs_links_from_{disease}",
        python_callable=_get_rs_links_from_diseases,
        op_kwargs={"disease": disease},
    ) for disease in utils.DISEASE_LIST]

get_all_data_from_diseases = [
    PythonOperator(
        task_id=f"get_all_data_from_{disease}",
        python_callable=_get_all_data_from_diseases,
        op_kwargs={'affecting_disease': disease},
    ) for disease in utils.DISEASE_LIST]

combine_data = PythonOperator(
    task_id="combine_data",
    python_callable=_combine_data,
)

save_to_mongo_db = PythonOperator(
    task_id="save_to_mongo_db",
    python_callable=_svae_to_mongodb,
    op_kwargs={"jsonData": "/opt/airflow/dags/SNPedia.json"},
)

complete the DAG

start >> get_rs_links_from_diseases >> step >>  get_all_data_from_diseases >> combine_data >> save_to_mongo_db >> end

the shape of data

{
  affecting_disease: str,
  data: {
    GenoMagSummary:
      { Geno: str,
        Mag: str,
        Summary: str
      }[],
    Reference: str,
    Chromosome: str,
    Position: str,
    Gene: str,
    "isA": str,
    "RelatedPublications": {
      PMID: str,
      description: str,
    }[],
  }[],
  updatedAt: DateTime,
}[]