Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate yaml file validation from JSON schema to Pydantic schemas #118

Merged
merged 31 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
d210426
Reorder imports, standard library first
augusto-herrmann Aug 21, 2024
5991071
Remove unused import
augusto-herrmann Aug 21, 2024
05b77f5
Add module docstring
augusto-herrmann Aug 21, 2024
f651f0b
Create Pydantic schemas, migrate the tests that validate yaml files
augusto-herrmann Aug 21, 2024
dbda6e6
Reorder and remove unused imports
augusto-herrmann Aug 22, 2024
b515ce3
Apply black formatting
augusto-herrmann Aug 22, 2024
e73c656
Define method directly (remove unnecessary indirection)
augusto-herrmann Aug 22, 2024
26c2a3a
Rename Pydantic models to disambiguate
augusto-herrmann Aug 22, 2024
12bdc44
Remove unnecessary class abstraction
augusto-herrmann Aug 22, 2024
862eebd
Use json values instead of Python in Airflow variables
augusto-herrmann Aug 22, 2024
f19baf0
Factor out file read into its own method
augusto-herrmann Aug 22, 2024
0252124
Add missing parameter in example
augusto-herrmann Aug 22, 2024
7d91d1c
Refactor code to use Pydantic schema (partial)
augusto-herrmann Aug 22, 2024
8b42676
Rename class to fetch search terms and fix its docstring
augusto-herrmann Aug 23, 2024
0fb5243
Remove unused import
augusto-herrmann Aug 23, 2024
e0143f7
Apply black formatting
augusto-herrmann Aug 23, 2024
2f342f3
Add missing fields in Pydantic schemas
augusto-herrmann Aug 23, 2024
555cf6a
Use Pydantic schemas in DAG generation
augusto-herrmann Aug 23, 2024
a77d9ec
Fix field defaults
augusto-herrmann Aug 23, 2024
6d43f7c
Add field validator to search parameter, so that it's always a list
augusto-herrmann Aug 23, 2024
05deb93
Fix webhook in Discord mock send test
augusto-herrmann Aug 23, 2024
09b7a78
Fix expected data structure in test parameters (partial)
augusto-herrmann Aug 23, 2024
a319b5b
Merge PR #122 into PR #118
augusto-herrmann Aug 26, 2024
1c7b265
Fix indentation
augusto-herrmann Aug 26, 2024
2cf73f9
Fix default for doc_md
augusto-herrmann Aug 26, 2024
9171c1a
Change tags data type to set
augusto-herrmann Aug 26, 2024
9cd7c73
Fix expected data structure in test parameters (partial)
augusto-herrmann Aug 26, 2024
6b9851e
Fix default for dag tags
augusto-herrmann Aug 26, 2024
a178305
Fix expected data structure in test parameters (partial)
augusto-herrmann Aug 26, 2024
0e00319
fix fixture terms_from_db_example
edulauer Aug 27, 2024
71f0b9c
Remove old dataclasses
augusto-herrmann Aug 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dag_confs/examples_and_tests/all_parameters_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ dag:
- pessoa 2
schedule: 0 8 * * MON-FRI
search:
header: Pesquisa no DOU
terms:
- dados abertos
- governo aberto
Expand Down
103 changes: 58 additions & 45 deletions src/dou_dag_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import os
import sys
import textwrap
from dataclasses import asdict
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Union
from functools import reduce
Expand All @@ -35,6 +34,7 @@
from utils.date import get_trigger_date, template_ano_mes_dia_trigger_local_time
from notification.notifier import Notifier
from parsers import DAGConfig, YAMLParser
from schemas import FetchTermsConfig
from searchers import BaseSearcher, DOUSearcher, QDSearcher, INLABSSearcher


Expand Down Expand Up @@ -91,7 +91,7 @@ def merge_two(dict1, dict2):

def result_as_html(specs: DAGConfig) -> bool:
"""Só utiliza resultado HTML apenas para email"""
return specs.discord_webhook and specs.slack_webhook
return specs.report.discord and specs.report.slack


class DouDigestDagGenerator:
Expand Down Expand Up @@ -153,7 +153,7 @@ def prepare_doc_md(specs: DAGConfig, config_file: str) -> str:
Returns:
str: The DAG documentation in markdown format.
"""
config = asdict(specs)
config = specs.model_dump()
# options that won't show in the "DAG Docs"
del config["description"]
del config["doc_md"]
Expand Down Expand Up @@ -201,7 +201,7 @@ def _get_safe_schedule(self, specs: DAGConfig, default_schedule: str) -> str:
"""

schedule = default_schedule
id_based_minute = self._hash_dag_id(specs.dag_id, 60)
id_based_minute = self._hash_dag_id(specs.id, 60)
schedule_without_min = " ".join(schedule.split(" ")[1:])
schedule = f"{id_based_minute} {schedule_without_min}"

Expand Down Expand Up @@ -262,7 +262,7 @@ def generate_dags(self):

for filepath in files_list:
dag_specs = self.parser(filepath).parse()
dag_id = dag_specs.dag_id
dag_id = dag_specs.id
globals()[dag_id] = self.create_dag(dag_specs, filepath)

def perform_searches(
Expand Down Expand Up @@ -385,9 +385,7 @@ def create_dag(self, specs: DAGConfig, config_file: str) -> DAG:
the term_list from a database
"""
# Prepare the markdown documentation
doc_md = (
self.prepare_doc_md(specs, config_file) if specs.doc_md else specs.doc_md
)
doc_md = self.prepare_doc_md(specs, config_file) if specs.doc_md else None
# DAG parameters
default_args = {
"owner": specs.owner,
Expand All @@ -401,64 +399,78 @@ def create_dag(self, specs: DAGConfig, config_file: str) -> DAG:

schedule = self._update_schedule(specs)
dag = DAG(
specs.dag_id,
specs.id,
default_args=default_args,
schedule=schedule,
description=specs.description,
doc_md=doc_md,
catchup=False,
params={"trigger_date": "2022-01-02T12:00"},
tags=specs.dag_tags,
tags=specs.tags,
)

with dag:

with TaskGroup(group_id="exec_searchs") as tg_exec_searchs:
counter = 0
for subsearch in specs.search:
counter += 1
if subsearch["sql"]:

# is it a single search or a list of searchers?
if isinstance(specs.search, list):
searches = specs.search
else:
searches = [specs.search]

for counter, subsearch in enumerate(searches, 1):

# are terms to be fetched from a database?
terms_come_from_db: bool = isinstance(
subsearch.terms, FetchTermsConfig
) and getattr(subsearch.terms, "from_db_select", None)

# determine the terms list
term_list = []
# is it a directly defined list of terms or is it a
# configuration for fetching terms from a data source?
if isinstance(subsearch.terms, list):
term_list = subsearch.terms
elif terms_come_from_db:
select_terms_from_db_task = PythonOperator(
task_id=f"select_terms_from_db_{counter}",
python_callable=self.select_terms_from_db,
op_kwargs={
"sql": subsearch["sql"],
"conn_id": subsearch["conn_id"],
"sql": subsearch.terms.from_db_select.sql,
"conn_id": subsearch.terms.from_db_select.conn_id,
},
)
term_list = (
"{{ ti.xcom_pull(task_ids='exec_searchs.select_terms_from_db_"
+ str(counter)
+ "') }}"
)
term_list = (
"{{ ti.xcom_pull(task_ids='exec_searchs.select_terms_from_db_"
+ str(counter)
+ "') }}"
)

exec_search_task = PythonOperator(
task_id=f"exec_search_{counter}",
python_callable=self.perform_searches,
op_kwargs={
"header": subsearch["header"],
"sources": subsearch["sources"],
"territory_id": subsearch["territory_id"],
"term_list": subsearch["terms"] or term_list,
"dou_sections": subsearch["dou_sections"],
"search_date": subsearch["search_date"],
"field": subsearch["field"],
"is_exact_search": subsearch["is_exact_search"],
"ignore_signature_match": subsearch[
"ignore_signature_match"
],
"force_rematch": subsearch["force_rematch"],
"full_text": subsearch["full_text"],
"use_summary": subsearch["use_summary"],
"department": subsearch["department"],
"header": subsearch.header,
"sources": subsearch.sources,
"territory_id": subsearch.territory_id,
"term_list": term_list,
"dou_sections": subsearch.dou_sections,
"search_date": subsearch.date,
"field": subsearch.field,
"is_exact_search": subsearch.is_exact_search,
"ignore_signature_match": subsearch.ignore_signature_match,
"force_rematch": subsearch.force_rematch,
"full_text": subsearch.full_text,
"use_summary": subsearch.use_summary,
"department": subsearch.department,
"result_as_email": result_as_html(specs),
},
)

if subsearch["sql"]:
(
select_terms_from_db_task >> exec_search_task
) # pylint: disable=pointless-statement
if terms_come_from_db:
# pylint: disable=pointless-statement
select_terms_from_db_task >> exec_search_task

has_matches_task = BranchPythonOperator(
task_id="has_matches",
Expand All @@ -467,12 +479,12 @@ def create_dag(self, specs: DAGConfig, config_file: str) -> DAG:
"search_result": "{{ ti.xcom_pull(task_ids="
+ str(
[
f"exec_searchs.exec_search_{count + 1}"
for count in range(counter)
f"exec_searchs.exec_search_{count}"
for count in range(1, len(searches) + 1)
]
)
+ ") }}",
"skip_null": specs.skip_null,
"skip_null": specs.report.skip_null,
},
)

Expand All @@ -485,15 +497,16 @@ def create_dag(self, specs: DAGConfig, config_file: str) -> DAG:
"search_report": "{{ ti.xcom_pull(task_ids="
+ str(
[
f"exec_searchs.exec_search_{count + 1}"
for count in range(counter)
f"exec_searchs.exec_search_{count}"
for count in range(1, len(searches) + 1)
]
)
+ ") }}",
"report_date": template_ano_mes_dia_trigger_local_time,
},
)

# pylint: disable=pointless-statement
tg_exec_searchs >> has_matches_task

has_matches_task >> [send_notification_task, skip_notification_task]
Expand Down
19 changes: 11 additions & 8 deletions src/notification/discord_sender.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
import requests
import re

import requests

from notification.isender import ISender
from schemas import ReportConfig


class DiscordSender(ISender):
highlight_tags = ("__", "__")

def __init__(self, specs) -> None:
self.webhook_url = specs.discord_webhook
self.hide_filters = specs.hide_filters
self.header_text = specs.header_text
self.footer_text = specs.footer_text
self.no_results_found_text = specs.no_results_found_text
def __init__(self, report_config: ReportConfig) -> None:
self.webhook_url = report_config.discord["webhook"]
self.hide_filters = report_config.hide_filters
self.header_text = report_config.header_text
self.footer_text = report_config.footer_text
self.no_results_found_text = report_config.no_results_found_text

def send(self, search_report: list, report_date: str = None):
"""Parse the content, and send message to Discord"""
Expand Down Expand Up @@ -73,4 +76,4 @@ def _remove_html_tags(self, text):
# Define a regular expression pattern to match HTML tags
clean = re.compile('<.*?>')
# Substitute HTML tags with an empty string
return re.sub(clean, '', text)
return re.sub(clean, '', text)
Loading
Loading