Skip to content

Commit

Permalink
bump pydantic to v2
Browse files Browse the repository at this point in the history
  • Loading branch information
bukosabino committed Sep 29, 2023
1 parent 02f9097 commit 7763403
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 141 deletions.
6 changes: 3 additions & 3 deletions src/email/send_email.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
def send_email(config_loader, subject: str, content: str) -> None:
logger = lg.getLogger(send_email.__name__)
logger.info("Sending email")
sg = SendGridAPIClient(api_key=os.environ.get('SENDGRID_API_KEY'))
from_email = Email(config_loader['admin_email'])
to_email = To(config_loader['admin_email'])
sg = SendGridAPIClient(api_key=os.environ.get("SENDGRID_API_KEY"))
from_email = Email(config_loader["admin_email"])
to_email = To(config_loader["admin_email"])
content = Content("text/plain", content)
mail = Mail(from_email, to_email, subject, content)
response = sg.client.mail.send.post(request_body=mail.get())
Expand Down
10 changes: 5 additions & 5 deletions src/etls/etl_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ def _split_documents(self, docs: tp.List[BOEMetadataDocument]) -> tp.List[Docume
loader = BOETextLoader(file_path=doc.filepath, metadata=doc.dict())
documents = loader.load()
text_splitter = CharacterTextSplitter(
separator=self._config_loader['separator'],
chunk_size=self._config_loader['chunk_size'],
chunk_overlap=self._config_loader['chunk_overlap']
separator=self._config_loader["separator"],
chunk_size=self._config_loader["chunk_size"],
chunk_overlap=self._config_loader["chunk_overlap"],
)
docs_chunks += text_splitter.split_documents(documents)
if doc:
logger.info('Removing file %s', doc.filepath)
logger.info("Removing file %s", doc.filepath)
os.remove(doc.filepath)
logger.info("Splitted %s documents in %s chunks", len(docs), len(docs_chunks))
return docs_chunks
Expand All @@ -56,7 +56,7 @@ def _load_database(self, docs_chunks: tp.List[Document]) -> None:

def _log_database_stats(self) -> None:
logger = lg.getLogger(self._log_database_stats.__name__)
index_name = self._config_loader['vector_store_index_name']
index_name = self._config_loader["vector_store_index_name"]
logger.info(pinecone.describe_index(index_name))
index = pinecone.Index(index_name)
logger.info(index.describe_index_stats())
5 changes: 2 additions & 3 deletions src/etls/etl_daily.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
from src.etls.scrapper.boe import BOEScrapper
from src.initialize import initialize_app

if __name__ == '__main__':
if __name__ == "__main__":
INIT_OBJECTS = initialize_app()
etl_job = ETL(
config_loader=INIT_OBJECTS.config_loader,
vector_store=INIT_OBJECTS.vector_store
config_loader=INIT_OBJECTS.config_loader, vector_store=INIT_OBJECTS.vector_store
)
boe_scrapper = BOEScrapper()
day = date.today()
Expand Down
13 changes: 8 additions & 5 deletions src/etls/etl_initial.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@
from src.etls.scrapper.boe import BOEScrapper
from src.initialize import initialize_app

if __name__ == '__main__':
if __name__ == "__main__":
INIT_OBJECTS = initialize_app()
etl_job = ETL(
config_loader=INIT_OBJECTS.config_loader,
vector_store=INIT_OBJECTS.vector_store
config_loader=INIT_OBJECTS.config_loader, vector_store=INIT_OBJECTS.vector_store
)
boe_scrapper = BOEScrapper()
docs = boe_scrapper.download_days(
date_start=datetime.strptime(INIT_OBJECTS.config_loader['date_start'], '%Y/%m/%d').date(),
date_end=datetime.strptime(INIT_OBJECTS.config_loader['date_end'], '%Y/%m/%d').date(),
date_start=datetime.strptime(
INIT_OBJECTS.config_loader["date_start"], "%Y/%m/%d"
).date(),
date_end=datetime.strptime(
INIT_OBJECTS.config_loader["date_end"], "%Y/%m/%d"
).date(),
)
if docs:
etl_job.run(docs)
Expand Down
11 changes: 5 additions & 6 deletions src/etls/scrapper/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,16 @@


class BaseScrapper(ABC):

@abstractmethod
def download_days(self, date_start: date, date_end: date) -> tp.List[BOEMetadataDocument]:
"""Download all the documents between two dates (from date_start to date_end)
"""
def download_days(
self, date_start: date, date_end: date
) -> tp.List[BOEMetadataDocument]:
"""Download all the documents between two dates (from date_start to date_end)"""
pass

@abstractmethod
def download_day(self, day: date) -> tp.List[BOEMetadataDocument]:
"""Download all the documents for a specific date.
"""
"""Download all the documents for a specific date."""
pass

@abstractmethod
Expand Down
128 changes: 71 additions & 57 deletions src/etls/scrapper/boe.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@
from requests.exceptions import HTTPError

from src.etls.scrapper.base import BaseScrapper
from src.etls.utils import (BOEMetadataDocument, BOEMetadataDocument2,
BOEMetadataReferencia)
from src.etls.utils import (
BOEMetadataDocument,
BOEMetadataDocument2,
BOEMetadataReferencia,
)
from src.initialize import initialize_logging

initialize_logging()
Expand All @@ -23,63 +26,79 @@ def _extract_metadata(soup) -> tp.Dict:
# Metadatos
identificador = soup.documento.metadatos.identificador
if identificador:
metadata_dict['identificador'] = identificador.get_text()
metadata_dict["identificador"] = identificador.get_text()

if numero_oficial := soup.documento.metadatos.numero_oficial:
metadata_dict['numero_oficial'] = numero_oficial.get_text()
metadata_dict["numero_oficial"] = numero_oficial.get_text()

if departamento := soup.documento.metadatos.departamento:
metadata_dict['departamento'] = departamento.get_text()
metadata_dict["departamento"] = departamento.get_text()

if rango := soup.documento.metadatos.rango:
metadata_dict['rango'] = rango.get_text()
metadata_dict["rango"] = rango.get_text()

if titulo := soup.documento.metadatos.titulo:
metadata_dict['titulo'] = titulo.get_text()
metadata_dict["titulo"] = titulo.get_text()

if url_pdf := soup.documento.metadatos.url_pdf:
metadata_dict['url_pdf'] = url_pdf.get_text()
metadata_dict["url_pdf"] = url_pdf.get_text()

if origen_legislativo := soup.documento.metadatos.origen_legislativo:
metadata_dict['origen_legislativo'] = origen_legislativo.get_text()
metadata_dict["origen_legislativo"] = origen_legislativo.get_text()

if fecha_publicacion := soup.documento.metadatos.fecha_publicacion:
metadata_dict['fecha_publicacion'] = fecha_publicacion.get_text()
metadata_dict["fecha_publicacion"] = fecha_publicacion.get_text()

if fecha_disposicion := soup.documento.metadatos.fecha_disposicion:
metadata_dict['fecha_disposicion'] = fecha_disposicion.get_text()
metadata_dict["fecha_disposicion"] = fecha_disposicion.get_text()

metadata_dict['anio'] = datetime.strptime(fecha_publicacion.get_text(), '%Y%m%d').strftime('%Y')
metadata_dict["anio"] = datetime.strptime(
fecha_publicacion.get_text(), "%Y%m%d"
).strftime("%Y")

# Analisis
if observaciones := soup.documento.analisis.observaciones:
metadata_dict['observaciones'] = observaciones.get_text()
metadata_dict["observaciones"] = observaciones.get_text()

if ambito_geografico := soup.documento.analisis.ambito_geografico:
metadata_dict['ambito_geografico'] = ambito_geografico.get_text()
metadata_dict["ambito_geografico"] = ambito_geografico.get_text()

if modalidad := soup.documento.analisis.modalidad:
metadata_dict['modalidad'] = modalidad.get_text()
metadata_dict["modalidad"] = modalidad.get_text()

if tipo := soup.documento.analisis.tipo:
metadata_dict['tipo'] = tipo.get_text()
metadata_dict["tipo"] = tipo.get_text()

metadata_dict['materias'] = [
materia.get_text() for materia in soup.select('documento > analisis > materias > materia')
metadata_dict["materias"] = [
materia.get_text()
for materia in soup.select("documento > analisis > materias > materia")
]
metadata_dict['alertas'] = [
alerta.get_text() for alerta in soup.select('documento > analisis > alertas > alerta')
metadata_dict["alertas"] = [
alerta.get_text()
for alerta in soup.select("documento > analisis > alertas > alerta")
]
metadata_dict['notas'] = [
nota.get_text() for nota in soup.select('documento > analisis > notas > nota')
metadata_dict["notas"] = [
nota.get_text() for nota in soup.select("documento > analisis > notas > nota")
]
metadata_dict['ref_posteriores'] = [
BOEMetadataReferencia(id=ref['referencia'], palabra=ref.palabra.get_text(), texto=ref.texto.get_text())
for ref in soup.select('documento > analisis > referencias > posteriores > posterior')
metadata_dict["ref_posteriores"] = [
BOEMetadataReferencia(
id=ref["referencia"],
palabra=ref.palabra.get_text(),
texto=ref.texto.get_text(),
)
for ref in soup.select(
"documento > analisis > referencias > posteriores > posterior"
)
]
metadata_dict['ref_anteriores'] = [
BOEMetadataReferencia(id=ref['referencia'], palabra=ref.palabra.get_text(), texto=ref.texto.get_text())
for ref in soup.select('documento > analisis > referencias > anteriores > anterior')
metadata_dict["ref_anteriores"] = [
BOEMetadataReferencia(
id=ref["referencia"],
palabra=ref.palabra.get_text(),
texto=ref.texto.get_text(),
)
for ref in soup.select(
"documento > analisis > referencias > anteriores > anterior"
)
]
return metadata_dict

Expand All @@ -94,25 +113,25 @@ def _list_links_day(url: str) -> tp.List[str]:
logger.info("Scrapping day: %s", url)
response = requests.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'lxml')
soup = BeautifulSoup(response.text, "lxml")
id_links = [
url.text.split('?id=')[-1]
url.text.split("?id=")[-1]
for section in soup.find_all(
lambda tag: tag.name == "seccion" and 'num' in tag.attrs and (
tag.attrs['num'] == '1' or tag.attrs['num'] == 'T'
)
lambda tag: tag.name == "seccion"
and "num" in tag.attrs
and (tag.attrs["num"] == "1" or tag.attrs["num"] == "T")
)
for url in section.find_all('urlxml')
for url in section.find_all("urlxml")
]
logger.info("Scrapped day successfully %s (%s BOE documents)", url, len(id_links))
return id_links


class BOEScrapper(BaseScrapper):

def download_days(self, date_start: date, date_end: date) -> tp.List[BOEMetadataDocument]:
"""Download all the documents between two dates (from date_start to date_end)
"""
def download_days(
self, date_start: date, date_end: date
) -> tp.List[BOEMetadataDocument]:
"""Download all the documents between two dates (from date_start to date_end)"""
logger = lg.getLogger(self.download_days.__name__)
logger.info("Downloading BOE content from day %s to %s", date_start, date_end)
delta = timedelta(days=1)
Expand All @@ -126,8 +145,7 @@ def download_days(self, date_start: date, date_end: date) -> tp.List[BOEMetadata
return metadata_documents

def download_day(self, day: date) -> tp.List[BOEMetadataDocument]:
"""Download all the documents for a specific date.
"""
"""Download all the documents for a specific date."""
logger = lg.getLogger(self.download_day.__name__)
logger.info("Downloading BOE content for day %s", day)
day_str = day.strftime("%Y%m%d")
Expand All @@ -141,7 +159,9 @@ def download_day(self, day: date) -> tp.List[BOEMetadataDocument]:
metadata_doc = self.download_document(url_document)
metadata_documents.append(metadata_doc)
except HTTPError:
logger.error("Not scrapped document %s on day %s", url_document, day_url)
logger.error(
"Not scrapped document %s on day %s", url_document, day_url
)
except HTTPError:
logger.error("Not scrapped document on day %s", day_url)
logger.info("Downloaded BOE content for day %s", day)
Expand All @@ -159,14 +179,11 @@ def download_document(self, url: str) -> BOEMetadataDocument:
logger.info("Scrapping document: %s", url)
response = requests.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'lxml')
with tempfile.NamedTemporaryFile('w', delete=False) as fn:
soup = BeautifulSoup(response.text, "lxml")
with tempfile.NamedTemporaryFile("w", delete=False) as fn:
text = soup.select_one("documento > texto").get_text()
fn.write(text)
metadata_doc = BOEMetadataDocument(
filepath=fn.name,
**_extract_metadata(soup)
)
metadata_doc = BOEMetadataDocument(filepath=fn.name, **_extract_metadata(soup))
logger.info("Scrapped document successfully %s", url)
return metadata_doc

Expand All @@ -182,21 +199,18 @@ def download_document_txt(self, url: str) -> BOEMetadataDocument2:
logger.info("Scrapping document: %s", url)
response = requests.get(url)
response.raise_for_status()
with tempfile.NamedTemporaryFile('w', delete=False) as fn:
soup = BeautifulSoup(response.text, 'html.parser') # 'html5lib'
text = soup.find('div', id='textoxslt').get_text()
text = unicodedata.normalize('NFKC', text)
with tempfile.NamedTemporaryFile("w", delete=False) as fn:
soup = BeautifulSoup(response.text, "html.parser") # 'html5lib'
text = soup.find("div", id="textoxslt").get_text()
text = unicodedata.normalize("NFKC", text)
fn.write(text)
span_tag = soup.find('span', class_='puntoConso')
span_tag = soup.find("span", class_="puntoConso")
if span_tag:
span_tag = span_tag.extract()
# TODO: link to span_tag.a['href'] to improve the split by articles -> https://www.boe.es/buscar/act.php?id=BOE-A-2022-14630
title = soup.find('h3', class_='documento-tit').get_text()
title = soup.find("h3", class_="documento-tit").get_text()
metadata_doc = BOEMetadataDocument2(
filepath=fn.name,
title=title,
url=url,
document_id=url.split('?id=')[-1]
filepath=fn.name, title=title, url=url, document_id=url.split("?id=")[-1]
)
logger.info("Scrapped document successfully %s", url)
return metadata_doc
Loading

0 comments on commit 7763403

Please sign in to comment.