Skip to content

Commit

Permalink
🔨 Update DAGS
Browse files Browse the repository at this point in the history
  • Loading branch information
AntoinePELAMOURGUES committed Nov 19, 2024
1 parent 0f79175 commit cf8c0f1
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 69 deletions.
17 changes: 16 additions & 1 deletion airflow/dags/predict_knn_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,19 @@ def run_training(**kwargs):
)

if __name__ == "__main__":
my_dag.cli()
my_dag.cli()
from airflow.utils.state import State
from airflow.models import DagBag

dag_bag = DagBag()
dag = dag_bag.get_dag(dag_id='KNN_train_model')
dag.clear()

# Exécuter les tâches du DAG
for task in dag.tasks:
task.run(ignore_ti_state=True)

# Vérifier l'état des tâches
for task in dag.tasks:
ti = dag.get_task_instance(task.task_id)
assert ti.state == State.SUCCESS, f"Tâche {task.task_id} échouée"
17 changes: 16 additions & 1 deletion airflow/dags/predict_surprise_SVD.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,19 @@ def train_model() -> tuple:
)

if __name__ == "__main__":
svd_dag.cli()
svd_dag.cli()
from airflow.utils.state import State
from airflow.models import DagBag

dag_bag = DagBag()
dag = dag_bag.get_dag(dag_id='SVD_train_and_compare_model')
dag.clear()

# Exécuter les tâches du DAG
for task in dag.tasks:
task.run(ignore_ti_state=True)

# Vérifier l'état des tâches
for task in dag.tasks:
ti = dag.get_task_instance(task.task_id)
assert ti.state == State.SUCCESS, f"Tâche {task.task_id} échouée"
160 changes: 94 additions & 66 deletions airflow/dags/scrapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,47 @@
import pandas as pd
import re
import os
from supabase import create_client
from dotenv import load_dotenv
import psycopg2

# Charger les variables d'environnement depuis le fichier .env
load_dotenv()

def connect_to_supabase():
"""Établit une connexion à Supabase en utilisant les variables d'environnement."""
supabase_url = os.environ.get("SUPABASE_URL")
supabase_key = os.environ.get("SUPABASE_KEY")

if not all([supabase_url, supabase_key]):
raise ValueError(
"Les variables d'environnement SUPABASE_URL et SUPABASE_KEY doivent être définies."
POSTGRES_USER= os.getenv('POSTGRES_USER')
POSTGRES_PASSWORD= os.getenv('POSTGRES_PASSWORD')
POSTGRES_DB= os.getenv('POSTGRES_DB')
POSTGRES_HOST= os.getenv('POSTGRES_HOST')
POSTGRES_PORT= os.getenv('POSTGRES_PORT')

def get_db_connection():
"""
Gestionnaire de contexte pour la connexion à la base de données.
Ouvre une connexion et la ferme automatiquement après utilisation.
Utilisation:
with get_db_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM table")
"""
conn = None
try:
conn = psycopg2.connect(
database=POSTGRES_DB,
host=POSTGRES_HOST,
user=POSTGRES_USER,
password=POSTGRES_PASSWORD,
port=POSTGRES_PORT
)
return create_client(supabase_url, supabase_key)
print("Connection à la base de données OK")
yield conn
except psycopg2.Error as e:
print(f"Erreur lors de la connexion à la base de données: {e}")
conn.rollback()
raise
finally:
if conn is not None:
conn.close()
print("Connexion à la base de données fermée")

def scrape_imdb():
"""Scrape les données des films depuis IMDb et les insère dans Supabase."""
Expand All @@ -41,18 +66,13 @@ def scrape_imdb():
cleaned_titles = [re.sub(r'^\d+\.\s*', '', title) for title in titles]
cleaned_links = [link.split('/')[2].split('?')[0].replace('tt', '') for link in links]

cover_list, genres_list, year_list = [], [], []
genres_list, year_list = [], []

# Boucle pour récupérer les détails de chaque film
for imdb_ref in cleaned_links:
movie_page = requests.get(f"http://www.imdb.com/title/tt{imdb_ref}/", headers=headers)
soup_movie = bs(movie_page.content, 'lxml')

# Récupérer l'image de couverture
image = soup_movie.find('img', class_="ipc-image")
link_img = image['src'] if image and 'src' in image.attrs else None
cover_list.append(link_img)

# Récupérer les genres du film
genres = soup_movie.find_all('span', class_='ipc-chip__text')
movie_genres = [i.text for i in genres[:-1]]
Expand All @@ -67,62 +87,53 @@ def scrape_imdb():
})
year_list.append(year_elem.text if year_elem else None)

# Connexion à Supabase pour insérer les données
supabase = connect_to_supabase()

# Récupérer le dernier movieId dans la base de données pour éviter les doublons
response = supabase.from_("movies").select("movieId").order("movieId", desc=True).limit(1).execute()

max_movie_id = response.data[0]['movieId'] + 1 if response.data else 1

# Insertion des films dans Supabase
for title, year, genres, cover_link, imdb in zip(cleaned_titles, year_list, genres_list, cover_list, cleaned_links):
existing_movie_response = supabase.from_("movies").select("*").eq("title", title).eq("year", year).execute()

if existing_movie_response.data:
print(f"Le film {title} - {year} est déjà présent dans la collection movies.")
else:
# Insérer le nouveau film dans la table movies
supabase.from_("movies").insert({
'movieId': max_movie_id,
'title': title,
'genres': genres,
'year': year
}).execute()

# Insérer le nouveau lien dans la table links
supabase.from_("links").insert({
'movieId': max_movie_id,
'imdbId': imdb,
'tmdbId': 0,
'cover_link': cover_link
}).execute()

print(f"Insertion du film {title}.")
max_movie_id += 1
# Connexion à postgres pour insérer les données
with get_db_connection() as conn:
with conn.cursor() as cur:
# Créer la table movies si elle n'existe pas
cur.execute("""
CREATE TABLE IF NOT EXISTS movies (
movieId SERIAL PRIMARY KEY,
title VARCHAR(255) NOT NULL,
genres TEXT,
year INT
)
""")

# Créer la table links si elle n'existe pas
cur.execute("""
CREATE TABLE IF NOT EXISTS links (
id SERIAL PRIMARY KEY,
movieId INT REFERENCES movies(movieId),
imdbId INT,
tmdbId INT)
)
""")

# Insérer les données des films dans la table movies
for title, genres, year in zip(cleaned_titles, genres_list, year_list):
cur.execute("INSERT INTO movies (title, genres, year) VALUES (%s, %s, %s)", (title, genres, year))
# Insérer les données des films dans la table links
for imdb_id in cleaned_links:
cur.execute("INSERT INTO links (imdbId) VALUES (%s)", (imdb_id,))
conn.commit()

print("Données insérées avec succès dans la base de données.")

def export_table_to_csv(table_name, csv_file_path):
"""Exporte une table de Supabase vers un fichier CSV."""
supabase = connect_to_supabase()

# Récupérer les données de la table spécifiée
response = supabase.from_(table_name).select("*").execute()

if response.error:
print("Erreur lors de la récupération des données:", response.error)
return

# Convertir les données en DataFrame Pandas et exporter en CSV
data = response.data
df = pd.DataFrame(data)

df.to_csv(csv_file_path, index=False)
print(f"Données exportées vers {csv_file_path} avec succès.")
"""Exporte une table de postgres vers un fichier CSV."""
with get_db_connection() as conn:
with conn.cursor() as cur:
cur.execute(f"SELECT * FROM {table_name}")
rows = cur.fetchall()
df = pd.DataFrame(rows)
df.to_csv(csv_file_path, index=False)
print(f"Table {table_name} exportée vers {csv_file_path}")

# Arguments par défaut pour le DAG
default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 10, 31),
'start_date': datetime(2024, 11, 19),
}

# Création du DAG
Expand Down Expand Up @@ -159,3 +170,20 @@ def export_table_to_csv(table_name, csv_file_path):

# Définir l'ordre d'exécution des tâches dans le DAG
scrape_task >> update_movies_task >> update_links_task

if __name__ == "__main__":
from airflow.utils.state import State
from airflow.models import DagBag

dag_bag = DagBag()
dag = dag_bag.get_dag(dag_id='imdb_scraper_dag')
dag.clear()

# Exécuter les tâches du DAG
for task in dag.tasks:
task.run(ignore_ti_state=True)

# Vérifier l'état des tâches
for task in dag.tasks:
ti = dag.get_task_instance(task.task_id)
assert ti.state == State.SUCCESS, f"Tâche {task.task_id} échouée"
2 changes: 1 addition & 1 deletion airflow/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ scikit-surprise
mlflow
lxml
requests
sqlalchemy
psycopg2

0 comments on commit cf8c0f1

Please sign in to comment.