diff --git a/airflow/dags/predict_knn_model.py b/airflow/dags/predict_knn_model.py index 0648f33..9b15984 100644 --- a/airflow/dags/predict_knn_model.py +++ b/airflow/dags/predict_knn_model.py @@ -90,4 +90,19 @@ def run_training(**kwargs): ) if __name__ == "__main__": - my_dag.cli() \ No newline at end of file + my_dag.cli() + from airflow.utils.state import State + from airflow.models import DagBag + + dag_bag = DagBag() + dag = dag_bag.get_dag(dag_id='KNN_train_model') + dag.clear() + + # Exécuter les tâches du DAG + for task in dag.tasks: + task.run(ignore_ti_state=True) + + # Vérifier l'état des tâches + for task in dag.tasks: + ti = dag.get_task_instance(task.task_id) + assert ti.state == State.SUCCESS, f"Tâche {task.task_id} échouée" \ No newline at end of file diff --git a/airflow/dags/predict_surprise_SVD.py b/airflow/dags/predict_surprise_SVD.py index 400bb4f..0c57d76 100644 --- a/airflow/dags/predict_surprise_SVD.py +++ b/airflow/dags/predict_surprise_SVD.py @@ -105,4 +105,19 @@ def train_model() -> tuple: ) if __name__ == "__main__": - svd_dag.cli() \ No newline at end of file + svd_dag.cli() + from airflow.utils.state import State + from airflow.models import DagBag + + dag_bag = DagBag() + dag = dag_bag.get_dag(dag_id='SVD_train_and_compare_model') + dag.clear() + + # Exécuter les tâches du DAG + for task in dag.tasks: + task.run(ignore_ti_state=True) + + # Vérifier l'état des tâches + for task in dag.tasks: + ti = dag.get_task_instance(task.task_id) + assert ti.state == State.SUCCESS, f"Tâche {task.task_id} échouée" \ No newline at end of file diff --git a/airflow/dags/scrapping.py b/airflow/dags/scrapping.py index 5f1f95c..979f6b5 100644 --- a/airflow/dags/scrapping.py +++ b/airflow/dags/scrapping.py @@ -6,22 +6,47 @@ import pandas as pd import re import os -from supabase import create_client from dotenv import load_dotenv +import psycopg2 # Charger les variables d'environnement depuis le fichier .env load_dotenv() -def connect_to_supabase(): - """Établit une connexion à Supabase en utilisant les variables d'environnement.""" - supabase_url = os.environ.get("SUPABASE_URL") - supabase_key = os.environ.get("SUPABASE_KEY") - - if not all([supabase_url, supabase_key]): - raise ValueError( - "Les variables d'environnement SUPABASE_URL et SUPABASE_KEY doivent être définies." +POSTGRES_USER= os.getenv('POSTGRES_USER') +POSTGRES_PASSWORD= os.getenv('POSTGRES_PASSWORD') +POSTGRES_DB= os.getenv('POSTGRES_DB') +POSTGRES_HOST= os.getenv('POSTGRES_HOST') +POSTGRES_PORT= os.getenv('POSTGRES_PORT') + +def get_db_connection(): + """ + Gestionnaire de contexte pour la connexion à la base de données. + Ouvre une connexion et la ferme automatiquement après utilisation. + + Utilisation: + with get_db_connection() as conn: + with conn.cursor() as cur: + cur.execute("SELECT * FROM table") + """ + conn = None + try: + conn = psycopg2.connect( + database=POSTGRES_DB, + host=POSTGRES_HOST, + user=POSTGRES_USER, + password=POSTGRES_PASSWORD, + port=POSTGRES_PORT ) - return create_client(supabase_url, supabase_key) + print("Connection à la base de données OK") + yield conn + except psycopg2.Error as e: + print(f"Erreur lors de la connexion à la base de données: {e}") + conn.rollback() + raise + finally: + if conn is not None: + conn.close() + print("Connexion à la base de données fermée") def scrape_imdb(): """Scrape les données des films depuis IMDb et les insère dans Supabase.""" @@ -41,18 +66,13 @@ def scrape_imdb(): cleaned_titles = [re.sub(r'^\d+\.\s*', '', title) for title in titles] cleaned_links = [link.split('/')[2].split('?')[0].replace('tt', '') for link in links] - cover_list, genres_list, year_list = [], [], [] + genres_list, year_list = [], [] # Boucle pour récupérer les détails de chaque film for imdb_ref in cleaned_links: movie_page = requests.get(f"http://www.imdb.com/title/tt{imdb_ref}/", headers=headers) soup_movie = bs(movie_page.content, 'lxml') - # Récupérer l'image de couverture - image = soup_movie.find('img', class_="ipc-image") - link_img = image['src'] if image and 'src' in image.attrs else None - cover_list.append(link_img) - # Récupérer les genres du film genres = soup_movie.find_all('span', class_='ipc-chip__text') movie_genres = [i.text for i in genres[:-1]] @@ -67,62 +87,53 @@ def scrape_imdb(): }) year_list.append(year_elem.text if year_elem else None) - # Connexion à Supabase pour insérer les données - supabase = connect_to_supabase() - - # Récupérer le dernier movieId dans la base de données pour éviter les doublons - response = supabase.from_("movies").select("movieId").order("movieId", desc=True).limit(1).execute() - - max_movie_id = response.data[0]['movieId'] + 1 if response.data else 1 - - # Insertion des films dans Supabase - for title, year, genres, cover_link, imdb in zip(cleaned_titles, year_list, genres_list, cover_list, cleaned_links): - existing_movie_response = supabase.from_("movies").select("*").eq("title", title).eq("year", year).execute() - - if existing_movie_response.data: - print(f"Le film {title} - {year} est déjà présent dans la collection movies.") - else: - # Insérer le nouveau film dans la table movies - supabase.from_("movies").insert({ - 'movieId': max_movie_id, - 'title': title, - 'genres': genres, - 'year': year - }).execute() - - # Insérer le nouveau lien dans la table links - supabase.from_("links").insert({ - 'movieId': max_movie_id, - 'imdbId': imdb, - 'tmdbId': 0, - 'cover_link': cover_link - }).execute() - - print(f"Insertion du film {title}.") - max_movie_id += 1 + # Connexion à postgres pour insérer les données + with get_db_connection() as conn: + with conn.cursor() as cur: + # Créer la table movies si elle n'existe pas + cur.execute(""" + CREATE TABLE IF NOT EXISTS movies ( + movieId SERIAL PRIMARY KEY, + title VARCHAR(255) NOT NULL, + genres TEXT, + year INT + ) + """) + + # Créer la table links si elle n'existe pas + cur.execute(""" + CREATE TABLE IF NOT EXISTS links ( + id SERIAL PRIMARY KEY, + movieId INT REFERENCES movies(movieId), + imdbId INT, + tmdbId INT) + ) + """) + + # Insérer les données des films dans la table movies + for title, genres, year in zip(cleaned_titles, genres_list, year_list): + cur.execute("INSERT INTO movies (title, genres, year) VALUES (%s, %s, %s)", (title, genres, year)) + # Insérer les données des films dans la table links + for imdb_id in cleaned_links: + cur.execute("INSERT INTO links (imdbId) VALUES (%s)", (imdb_id,)) + conn.commit() + + print("Données insérées avec succès dans la base de données.") def export_table_to_csv(table_name, csv_file_path): - """Exporte une table de Supabase vers un fichier CSV.""" - supabase = connect_to_supabase() - - # Récupérer les données de la table spécifiée - response = supabase.from_(table_name).select("*").execute() - - if response.error: - print("Erreur lors de la récupération des données:", response.error) - return - - # Convertir les données en DataFrame Pandas et exporter en CSV - data = response.data - df = pd.DataFrame(data) - - df.to_csv(csv_file_path, index=False) - print(f"Données exportées vers {csv_file_path} avec succès.") + """Exporte une table de postgres vers un fichier CSV.""" + with get_db_connection() as conn: + with conn.cursor() as cur: + cur.execute(f"SELECT * FROM {table_name}") + rows = cur.fetchall() + df = pd.DataFrame(rows) + df.to_csv(csv_file_path, index=False) + print(f"Table {table_name} exportée vers {csv_file_path}") # Arguments par défaut pour le DAG default_args = { 'owner': 'airflow', - 'start_date': datetime(2024, 10, 31), + 'start_date': datetime(2024, 11, 19), } # Création du DAG @@ -159,3 +170,20 @@ def export_table_to_csv(table_name, csv_file_path): # Définir l'ordre d'exécution des tâches dans le DAG scrape_task >> update_movies_task >> update_links_task + +if __name__ == "__main__": + from airflow.utils.state import State + from airflow.models import DagBag + + dag_bag = DagBag() + dag = dag_bag.get_dag(dag_id='imdb_scraper_dag') + dag.clear() + + # Exécuter les tâches du DAG + for task in dag.tasks: + task.run(ignore_ti_state=True) + + # Vérifier l'état des tâches + for task in dag.tasks: + ti = dag.get_task_instance(task.task_id) + assert ti.state == State.SUCCESS, f"Tâche {task.task_id} échouée" diff --git a/airflow/requirements.txt b/airflow/requirements.txt index df8c63d..8eeef7d 100644 --- a/airflow/requirements.txt +++ b/airflow/requirements.txt @@ -8,4 +8,4 @@ scikit-surprise mlflow lxml requests -sqlalchemy \ No newline at end of file +psycopg2 \ No newline at end of file