Skip to content

Commit

Permalink
🔨 Update Dags
Browse files Browse the repository at this point in the history
  • Loading branch information
AntoinePELAMOURGUES committed Nov 21, 2024
1 parent ca95a88 commit 96bab83
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 91 deletions.
8 changes: 0 additions & 8 deletions airflow/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,3 @@ AIRFLOW_UID=1000
AIRFLOW_GID=1000
AIRFLOW_PROJ_DIR=/home/antoine/jul24_cmlops_reco_film/airflow

# POSTGRES RECO_MOVIES DB
POSTGRES_USER=user
POSTGRES_PASSWORD=datascientest
POSTGRES_DB=reco_movies
PGADMIN_DEFAULT_EMAIL=[email protected]
PGADMIN_DEFAULT_PASSWORD=datascientest
POSTGRES_HOST=reco_movies_db
POSTGRES_PORT=5432
1 change: 1 addition & 0 deletions airflow/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ RUN apt-get update && apt-get install -y \
USER airflow

COPY requirements.txt .
COPY .env .

# Mettre à jour pip et installer les dépendances Python
RUN pip install --upgrade pip && \
Expand Down
109 changes: 87 additions & 22 deletions airflow/dags/predict_surprise_SVD.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,43 +10,102 @@
import mlflow
import pickle
from datetime import datetime

# Initialisation de NumPy
np.import_array()
import psycopg2
from dotenv import load_dotenv
from contextlib import contextmanager

# Charger les variables d'environnement à partir du fichier .env
load_dotenv()


POSTGRES_USER= os.getenv('POSTGRES_USER')
POSTGRES_PASSWORD= os.getenv('POSTGRES_PASSWORD')
POSTGRES_DB= os.getenv('POSTGRES_DB')
POSTGRES_HOST= os.getenv('POSTGRES_HOST')
POSTGRES_PORT= os.getenv('POSTGRES_PORT')

@contextmanager # Ajout du décorateur contextmanager
def get_db_connection():
"""
Gestionnaire de contexte pour la connexion à la base de données.
Ouvre une connexion et la ferme automatiquement après utilisation.
Utilisation:
with get_db_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM table")
"""
conn = None
try:
conn = psycopg2.connect(
database=POSTGRES_DB,
host=POSTGRES_HOST,
user=POSTGRES_USER,
password=POSTGRES_PASSWORD,
port=POSTGRES_PORT
)
print("Connection à la base de données OK")
yield conn
except psycopg2.Error as e:
print(f"Erreur lors de la connexion à la base de données: {e}")
raise
finally:
if conn is not None:
conn.close()
print("Connexion à la base de données fermée")

# Configuration de MLflow
mlflow.set_tracking_uri("http://mlflow_webserver:5000")
EXPERIMENT_NAME = "Movie_Recommendation_Experiment"
time = datetime.now()
run_name = f"{time}_Modèle SVD"

def read_ratings(ratings_csv: str, data_dir: str = "/opt/airflow/data/raw") -> pd.DataFrame:
"""Lit le fichier CSV contenant les évaluations des films."""
def load_model(pkl_files, directory = "/opt/airflow/models") :
"""Charge le modèle à partir d'un répertoire."""
# Vérifier si le répertoire existe
if not os.path.exists(directory):
raise FileNotFoundError(f"Le répertoire {directory} n'existe pas.")
# Charger le modèle
filepath = os.path.join(directory, pkl_files)
with open(filepath, 'rb') as file:
model = pickle.load(file)
print(f'Modèle chargé depuis {filepath}')
return model

def fetch_latest_ratings() -> pd.DataFrame:
"""Récupère 25 % des derniers enregistrements de la table ratings et les transforme en DataFrame."""
query = """
SELECT userId, movieId, rating
FROM ratings
ORDER BY id DESC
LIMIT (SELECT COUNT(*) FROM ratings) * 0.25
"""
try:
# Lire le fichier CSV et retourner un DataFrame Pandas
data = pd.read_csv(os.path.join(data_dir, ratings_csv))
print("Dataset ratings loaded")
return data
with get_db_connection() as conn:
df = pd.read_sql_query(query, conn)
print("Derniers enregistrements récupérés")
return df
except Exception as e:
print(f"Error loading data: {e}")
print(f"Erreur lors de la récupération des enregistrements: {e}")
raise

def train_model() -> tuple:
"""Entraîne le modèle de recommandation sur les données fournies et retourne le modèle et son RMSE."""
# Démarrer un nouveau run dans MLflow
with mlflow.start_run(run_name=run_name) as run:
# Charger les données d'évaluation des films
ratings = read_ratings('processed_ratings.csv')
ratings = fetch_latest_ratings()

# Préparer les données pour Surprise
reader = Reader(rating_scale=(0.5, 5))
data = Dataset.load_from_df(ratings[['userId', 'movieId', 'rating']], reader=reader)

# Diviser les données en ensembles d'entraînement et de test
trainset, testset = train_test_split(data, test_size=0.25)
trainset, testset = train_test_split(data, test_size=0.15)

# Créer et entraîner le modèle SVD
model = SVD(n_factors=150, n_epochs=30, lr_all=0.01, reg_all=0.05)
model = load_model('model_SVD.pkl')

model.fit(trainset)

# Tester le modèle sur l'ensemble de test et calculer RMSE
Expand All @@ -57,6 +116,16 @@ def train_model() -> tuple:

print("Valeur de l'écart quadratique moyen (RMSE) :", acc_rounded)

# Enregistrer les métriques dans MLflow pour suivi ultérieur
mlflow.log_param("n_factors", 150)
mlflow.log_param("n_epochs", 30)
mlflow.log_param("lr_all", 0.01)
mlflow.log_param("reg_all", 0.05)
# Arrondir à 2 chiffres après la virgule
acc_rounded = round(acc, 2)

print("Valeur de l'écart quadratique moyen (RMSE) :", acc_rounded)

# Enregistrer les métriques dans MLflow pour suivi ultérieur
mlflow.log_param("n_factors", 150)
mlflow.log_param("n_epochs", 30)
Expand All @@ -77,20 +146,16 @@ def train_model() -> tuple:

print(f"Meilleur RMSE actuel: {last_rmse}, Nouveau RMSE: {acc_rounded}")

if acc_rounded < last_rmse:
print("Nouveau modèle meilleur, sauvegarde...")
directory = '/opt/airflow/models/model_SVD.pkl'

directory = '/opt/airflow/model/model_svd.pkl'
with open(directory, 'wb') as file:
pickle.dump(model, file)
print(f'Modèle sauvegardé sous {directory}')

with open(directory, 'wb') as file:
pickle.dump(model, file)
print(f'Modèle sauvegardé sous {directory}')
else:
print("Ancien modèle conservé ...")
# Définition du DAG Airflow

svd_dag = DAG(
dag_id='SVD_train_and_compare_model',
dag_id='SVD_train_model',
description='SVD Model for Movie Recommendation',
tags=['antoine'],
schedule_interval='@daily',
Expand Down
127 changes: 67 additions & 60 deletions airflow/dags/scrapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@
import os
from dotenv import load_dotenv
import psycopg2
from contextlib import contextmanager # Ajout du module contextlib
import logging
from airflow.utils.dates import days_ago
from airflow.hooks.base_hook import BaseHook
import time

# Configurer le logger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Charger les variables d'environnement depuis le fichier .env
load_dotenv()
Expand All @@ -18,6 +27,7 @@
POSTGRES_HOST= os.getenv('POSTGRES_HOST')
POSTGRES_PORT= os.getenv('POSTGRES_PORT')

@contextmanager # Ajout du décorateur contextmanager
def get_db_connection():
"""
Gestionnaire de contexte pour la connexion à la base de données.
Expand All @@ -41,7 +51,6 @@ def get_db_connection():
yield conn
except psycopg2.Error as e:
print(f"Erreur lors de la connexion à la base de données: {e}")
conn.rollback()
raise
finally:
if conn is not None:
Expand All @@ -50,6 +59,7 @@ def get_db_connection():

def scrape_imdb():
"""Scrape les données des films depuis IMDb et les insère dans Supabase."""
start_time = time.time()
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
Expand All @@ -66,6 +76,7 @@ def scrape_imdb():
cleaned_titles = [re.sub(r'^\d+\.\s*', '', title) for title in titles]
cleaned_links = [link.split('/')[2].split('?')[0].replace('tt', '') for link in links]


genres_list, year_list = [], []

# Boucle pour récupérer les détails de chaque film
Expand All @@ -87,48 +98,60 @@ def scrape_imdb():
})
year_list.append(year_elem.text if year_elem else None)

# Connexion à postgres pour insérer les données
with get_db_connection() as conn:
with conn.cursor() as cur:
# Créer la table movies si elle n'existe pas
cur.execute("""
CREATE TABLE IF NOT EXISTS movies (
movieId SERIAL PRIMARY KEY,
title VARCHAR(255) NOT NULL,
genres TEXT,
year INT
)
""")

# Créer la table links si elle n'existe pas
cur.execute("""
CREATE TABLE IF NOT EXISTS links (
id SERIAL PRIMARY KEY,
movieId INT REFERENCES movies(movieId),
imdbId INT,
tmdbId INT)
)
""")

# Insérer les données des films dans la table movies
for title, genres, year in zip(cleaned_titles, genres_list, year_list):
cur.execute("INSERT INTO movies (title, genres, year) VALUES (%s, %s, %s)", (title, genres, year))
# Insérer les données des films dans la table links
for imdb_id in cleaned_links:
cur.execute("INSERT INTO links (imdbId) VALUES (%s)", (imdb_id,))
conn.commit()

print("Données insérées avec succès dans la base de données.")

def export_table_to_csv(table_name, csv_file_path):
"""Exporte une table de postgres vers un fichier CSV."""
with get_db_connection() as conn:
with conn.cursor() as cur:
cur.execute(f"SELECT * FROM {table_name}")
rows = cur.fetchall()
df = pd.DataFrame(rows)
df.to_csv(csv_file_path, index=False)
print(f"Table {table_name} exportée vers {csv_file_path}")
try:
with get_db_connection() as conn:
with conn.cursor() as cur:
try:
# Récupération du dernier movieId
cur.execute("SELECT COALESCE(MAX(movieId), 0) FROM movies")
last_movie_id = cur.fetchone()[0]

# Liste pour stocker les insertions à effectuer
insert_movies = []
insert_links = []

for title, genres, year, link in zip(cleaned_titles, genres_list, year_list, cleaned_links):
cur.execute("SELECT 1 FROM movies WHERE title = %s AND year = %s", (title, year))
if cur.fetchone() is None:
last_movie_id += 1
genres_str = ','.join(genres) # Convertir la liste de genres en chaîne de caractères
insert_movies.append((title, genres_str, year))
insert_links.append((last_movie_id, link))
logger.info(f"Titres insérés: {insert_movies}")
logger.info(f"Liens insérés: {insert_links}")
else:
print(f"Le film {title} existe déjà dans la base de données.")
# Insertion des films
if insert_movies:
cur.executemany("""
INSERT INTO movies (title, genres, year)
VALUES (%s, %s, %s)
""", insert_movies)

# Insertion des liens
if insert_links:
cur.executemany("""
INSERT INTO links (movieId, imdbId)
VALUES (%s, %s)
""", insert_links)

conn.commit()
logger.info("Données insérées avec succès dans les tables movies & links.")
# Envoyer la métrique du nombre de films insérés
statsd = BaseHook.get_connection('statsd_default')
statsd.gauge('imdb_scraper.movies_inserted', len(insert_movies))

except Exception as e:
conn.rollback()
logger.error(f"Une erreur est survenue : {e}")
raise
finally:
end_time = time.time()
duration = end_time - start_time
# Envoyer la métrique de durée d'exécution
statsd.gauge('imdb_scraper.duration', duration)
# Envoyer la métrique de succès/échec
statsd.increment('imdb_scraper.success' if not e else 'imdb_scraper.failure')

# Arguments par défaut pour le DAG
default_args = {
Expand All @@ -152,21 +175,5 @@ def export_table_to_csv(table_name, csv_file_path):
dag=dag,
)

# Tâche pour exporter la table movies vers CSV
update_movies_task = PythonOperator(
task_id='update_movies_task',
python_callable=export_table_to_csv,
dag=dag,
op_kwargs={"table_name": "movies", "csv_file_path": "/opt/airflow/data/raw/processed_movies.csv"}
)

# Tâche pour exporter la table links vers CSV
update_links_task = PythonOperator(
task_id='update_links_task',
python_callable=export_table_to_csv,
dag=dag,
op_kwargs={"table_name": "links", "csv_file_path": "/opt/airflow/data/raw/processed_links.csv"}
)

# Définir l'ordre d'exécution des tâches dans le DAG
scrape_task >> update_movies_task >> update_links_task
scrape_task
2 changes: 1 addition & 1 deletion airflow/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ x-airflow-common: &airflow-common
- ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
- ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
- ../ml/data/processed/:/opt/airflow/data/raw
- ../ml/models/:/opt/airflow/model
- ../ml/models/:/opt/airflow/models
user: "0:0"
depends_on: &airflow-common-depends-on
redis:
Expand Down

0 comments on commit 96bab83

Please sign in to comment.