diff --git a/.env.example b/.env.example old mode 100644 new mode 100755 diff --git a/.github/workflows/ci_airflow.yml b/.github/workflows/ci_airflow.yml old mode 100644 new mode 100755 diff --git a/.github/workflows/docker-deploy.yml b/.github/workflows/docker-deploy.yml old mode 100644 new mode 100755 diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml old mode 100644 new mode 100755 diff --git a/.github/workflows/test-databases.yml b/.github/workflows/test-databases.yml old mode 100644 new mode 100755 diff --git a/.gitignore b/.gitignore old mode 100644 new mode 100755 diff --git a/LICENSE b/LICENSE old mode 100644 new mode 100755 diff --git a/Makefile b/Makefile old mode 100644 new mode 100755 index 4602202..4f767f4 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ NAMESPACE1 = reco-movies NAMESPACE2 = airflow -.PHONY: help setup1 setup2 start stop down restart logs-supabase logs-airflow logs-api logs-fastapi clean network all namespace pv secrets configmaps deployments services ingress clean-kube-reco clean-kube-airflow apply-configmap load-data-minikube install-airflow pv-airflow airflow reco +.PHONY: help setup1 setup2 start stop down restart logs-supabase logs-airflow logs-api logs-fastapi clean network all namespace pv secrets configmaps deployments services ingress clean-kube-reco clean-kube-airflow apply-configmap start-minikube start-airflow pv-airflow reco # Help command to list all available targets help: @@ -114,52 +114,29 @@ network: ###### MAKEFILE KUBERNETES all: namespace install-airflow pv-airflow pv secrets configmaps deployments services ingress +start-minikube: + minikube start --driver=docker --memory=8192 --cpus=4 + +install-helm: + curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 + chmod 700 get_helm.sh + ./get_helm.sh + # Installation de helm Airflow -install-airflow: +start-airflow: sudo apt-get update helm repo add apache-airflow https://airflow.apache.org helm upgrade --install airflow apache-airflow/airflow --namespace airflow --create-namespace -f kubernetes/airflow/my_values.yml - kubectl apply -f kubernetes/airflow/airflow-local-dags-folder-pv.yml -n Airflow - kubectl apply -f kubernetes/airflow/airflow-local-dags-folder-pvc.yml -n airflow - kubectl apply -f kubernetes/airflow/airflow-local-logs-folder-pv.yml -n airflow - kubectl apply -f kubernetes/airflow/airflow-local-logs-folder-pvc.yml -n airflow - kubectl apply -f kubernetes/airflow/order/order-data-folder-pv.yaml - kubectl apply -f kubernetes/airflow/order/order-data-folder-pvc.yaml - kubectl apply -f kubernetes/secrets/airflow-secrets.yaml - kubectl apply -f kubernetes/airflow/order/python-transform-job.yaml -n airflow - kubectl apply -f kubernetes/airflow/order/python-load-job.yaml -n airflow - - -delete-airflow-statefulsets: - kubectl delete statefulset -n airflow airflow-triggerer || true - kubectl delete statefulset -n airflow airflow-worker || true - -pv-airflow: - kubectl apply -f kubernetes/airflow/airflow-local-dags-folder-pv.yml -n airflow --validate=false - kubectl apply -f kubernetes/airflow/airflow-local-dags-folder-pvc.yml -n airflow --validate=false - kubectl apply -f kubernetes/airflow/airflow-local-logs-folder-pv.yml -n airflow --validate=false - kubectl apply -f kubernetes/airflow/airflow-local-logs-folder-pvc.yml -n airflow --validate=false - kubectl apply -f kubernetes/airflow/order/order-data-folder-pv.yaml - kubectl apply -f kubernetes/airflow/order/order-data-folder-pvc.yaml + kubectl apply -f kubernetes/storageclass/storageclass.yaml -n airflow + kubectl apply -f kubernetes/persistent-volumes/airflow-local-dags-folder.yml -n airflow + kubectl apply -f kubernetes/persistent-volumes/airflow-local-logs-folder.yml -n airflow + kubectl apply -f kubernetes/secrets/airflow-secrets.yaml -n airflow + delete-pv-airflow: kubectl delete pv airflow-local-dags-folder || true kubectl delete pv airflow-local-logs-folder || true - kubectl delete pv order-data-folder || true - -airflow: namespace pv-airflow - helm -n airflow upgrade --install airflow apache-airflow/airflow -f kubernetes/airflow/my_values.yml - -# Chargement des données dans minikube : https://minikube.sigs.k8s.io/docs/handbook/filesync/ -load-data-minikube: - mkdir -p ~/.minikube/files/processed_raw - mkdir -p ~/.minikube/files/dags - mkdir -p ~/.minikube/files/logs - cp -r ml/data/processed/* ~/.minikube/files/processed_raw - cp -r postgres/init.sql ~/.minikube/files/init.sql - cp -r prometheus/prometheus.yml ~/.minikube/files/prometheus.yml - cp -r airflow/dags/* ~/.minikube/files/dags - minikube start + # Vérifie si kubectl est connecté à un cluster check-kube: diff --git a/README.md b/README.md old mode 100644 new mode 100755 diff --git a/airflow/.env.example b/airflow/.env.example old mode 100644 new mode 100755 diff --git a/airflow/Dockerfile b/airflow/Dockerfile old mode 100644 new mode 100755 diff --git a/airflow/dags/predict_knn_model.py b/airflow/dags/predict_knn_model.py old mode 100644 new mode 100755 diff --git a/airflow/dags/predict_surprise_SVD.py b/airflow/dags/predict_surprise_SVD.py old mode 100644 new mode 100755 diff --git a/airflow/dags/scrapping.py b/airflow/dags/scrapping.py old mode 100644 new mode 100755 diff --git a/airflow/docker-compose.yaml b/airflow/docker-compose.yaml old mode 100644 new mode 100755 diff --git a/airflow/logs/.gitkeep b/airflow/logs/.gitkeep old mode 100644 new mode 100755 diff --git a/airflow/requirements.txt b/airflow/requirements.txt old mode 100644 new mode 100755 diff --git a/clean_docker.sh b/clean_docker.sh old mode 100644 new mode 100755 diff --git a/clean_kubernetes.sh b/clean_kubernetes.sh old mode 100644 new mode 100755 diff --git a/data/dags_airflow/init_order.py b/data/dags_airflow/init_order.py new file mode 100755 index 0000000..ff31918 --- /dev/null +++ b/data/dags_airflow/init_order.py @@ -0,0 +1,95 @@ +from airflow import DAG +from airflow.utils.dates import days_ago +from airflow.operators.python_operator import PythonOperator +from airflow import settings +from airflow.models.connection import Connection +from airflow.operators.postgres_operator import PostgresOperator +import os + +conn_keys = ['conn_id', 'conn_type', 'host', 'login', 'password', 'schema'] + +def get_postgres_conn_conf(): + postgres_conn_conf = {} + postgres_conn_conf['host'] = os.getenv("AIRFLOW_POSTGRESQL_SERVICE_HOST") + postgres_conn_conf['port'] = os.getenv("AIRFLOW_POSTGRESQL_SERVICE_PORT") + if (postgres_conn_conf['host'] == None): + raise TypeError("The AIRFLOW_POSTGRESQL_SERVICE_HOST isn't defined") + elif (postgres_conn_conf['port'] == None): + raise TypeError("The AIRFLOW_POSTGRESQL_SERVICE_PORT isn't defined") + postgres_conn_conf['conn_id'] = 'postgres' + postgres_conn_conf['conn_type'] = 'postgres' + postgres_conn_conf['login'] = 'postgres' + postgres_conn_conf['password'] = 'postgres' + postgres_conn_conf['schema'] = 'postgres' + return postgres_conn_conf + +def create_conn(**kwargs): + session = settings.Session() + print("Session created") + connections = session.query(Connection) + print("Connections listed") + if not kwargs['conn_id'] in [connection.conn_id for connection in connections]: + conn_params = { key: kwargs[key] for key in conn_keys } + conn = Connection(**conn_params) + session.add(conn) + session.commit() + print("Connection Created") + else: + print("Connection already exists") + session.close() + +postgres_conn_conf = get_postgres_conn_conf() + +with DAG( + dag_id='init_order', + tags=['order', 'antoine'], + default_args={ + 'owner': 'airflow', + 'start_date': days_ago(0, minute=1), + }, + catchup=False +) as dag: + + create_postgres_conn = PythonOperator( + task_id='create_postgres_conn', + python_callable=create_conn, + op_kwargs=postgres_conn_conf + ) + + create_tables = PostgresOperator( + task_id='create_tables', + postgres_conn_id='postgres', + sql=""" + CREATE TABLE IF NOT EXISTS movies ( + movieId SERIAL PRIMARY KEY, + title VARCHAR(200) NOT NULL, + genres TEXT, + year INT + ); + + CREATE TABLE IF NOT EXISTS ratings ( + id SERIAL PRIMARY KEY, + userId INT, + movieId INT REFERENCES movies(movieId), + rating FLOAT NOT NULL, + timestamp INT, + bayesian_mean FLOAT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS links ( + id SERIAL PRIMARY KEY, + movieId INT REFERENCES movies(movieId), + imdbId INT, + tmdbId INT + ); + + CREATE TABLE IF NOT EXISTS users ( + userId SERIAL PRIMARY KEY, + username VARCHAR(50) NOT NULL, + email VARCHAR(100) NOT NULL UNIQUE, + hached_password VARCHAR(300) NOT NULL + ); + """ +) + +create_postgres_conn >> create_tables diff --git a/data/dags_airflow/load_transform_save_db.py b/data/dags_airflow/load_transform_save_db.py new file mode 100755 index 0000000..336a514 --- /dev/null +++ b/data/dags_airflow/load_transform_save_db.py @@ -0,0 +1,46 @@ +from airflow import DAG +from airflow.utils.dates import days_ago +from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator +from airflow.kubernetes.secret import Secret +from kubernetes.client import models as k8s + +secret_database = Secret( + deploy_type="env", + deploy_target="DATABASE", + secret="sql-conn", + namespace="airflow", +) + +secret_user = Secret( + deploy_type="env", + deploy_target="USER", + secret="sql-conn", + namespace="airflow", +) + +secret_password = Secret( + deploy_type="env", + deploy_target="PASSWORD", + secret="sql-conn", + namespace="airflow", +) + +with DAG( + dag_id='load_transform_save_db', + tags=['antoine'], + default_args={ + 'owner': 'airflow', + 'start_date': days_ago(0, minute=1), + }, + schedule_interval=None, # Pas de planification automatique, + catchup=False +) as dag: + + python_transform = KubernetesPodOperator( + task_id="python_transform", + image="antoinepela/projet_reco_movies:order-python-transform-latest", + cmds=["/bin/bash", "-c", "/app/start.sh"], + namespace= "airflow", + ) + +python_transform \ No newline at end of file diff --git a/data/dags_airflow/predict_knn_model.py b/data/dags_airflow/predict_knn_model.py new file mode 100755 index 0000000..d3fc508 --- /dev/null +++ b/data/dags_airflow/predict_knn_model.py @@ -0,0 +1,178 @@ +import pandas as pd +import numpy as np +from scipy.sparse import csr_matrix +from sklearn.neighbors import NearestNeighbors +from airflow import DAG +from airflow.operators.python_operator import PythonOperator +from datetime import datetime +import os +import pickle +import mlflow +import psycopg2 +from dotenv import load_dotenv +from contextlib import contextmanager +from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator + + +# Charger les variables d'environnement à partir du fichier .env +load_dotenv() + +POSTGRES_USER= os.getenv('POSTGRES_USER') +POSTGRES_PASSWORD= os.getenv('POSTGRES_PASSWORD') +POSTGRES_DB= os.getenv('POSTGRES_DB') +POSTGRES_HOST= os.getenv('POSTGRES_HOST') +POSTGRES_PORT= os.getenv('POSTGRES_PORT') + + +@contextmanager # Ajout du décorateur contextmanager +def get_db_connection(): + """ + Gestionnaire de contexte pour la connexion à la base de données. + Ouvre une connexion et la ferme automatiquement après utilisation. + + Utilisation: + with get_db_connection() as conn: + with conn.cursor() as cur: + cur.execute("SELECT * FROM table") + """ + conn = None + try: + conn = psycopg2.connect( + database=POSTGRES_DB, + host=POSTGRES_HOST, + user=POSTGRES_USER, + password=POSTGRES_PASSWORD, + port=POSTGRES_PORT + ) + print("Connection à la base de données OK") + yield conn + except psycopg2.Error as e: + print(f"Erreur lors de la connexion à la base de données: {e}") + raise + finally: + if conn is not None: + conn.close() + print("Connexion à la base de données fermée") + +def load_model(pkl_files, directory = "/opt/airflow/models") : + """Charge le modèle à partir d'un répertoire.""" + # Vérifier si le répertoire existe + if not os.path.exists(directory): + raise FileNotFoundError(f"Le répertoire {directory} n'existe pas.") + # Charger le modèle + filepath = os.path.join(directory, pkl_files) + with open(filepath, 'rb') as file: + model = pickle.load(file) + print(f'Modèle chargé depuis {filepath}') + return model + +def fetch_latest_ratings() -> pd.DataFrame: + """Récupère 25 % des derniers enregistrements de la table ratings et les transforme en DataFrame.""" + query = """ + SELECT userId, movieId, rating + FROM ratings + ORDER BY id DESC + LIMIT (SELECT COUNT(*) FROM ratings) * 0.25 + """ + try: + with get_db_connection() as conn: + df = pd.read_sql_query(query, conn) + print("Derniers enregistrements récupérés") + return df + except Exception as e: + print(f"Erreur lors de la récupération des enregistrements: {e}") + raise + +# Configuration de MLflow +mlflow.set_tracking_uri("http://mlflow_webserver:5000") +EXPERIMENT_NAME = "Movie_Recommendation_Experiment" +time = datetime.now() +run_name = f"{time}_Modèle KNN" + +def create_X(df): + """Generates a sparse user-item rating matrix.""" + M = df['userid'].nunique() + N = df['movieid'].nunique() + + user_mapper = dict(zip(np.unique(df["userid"]), list(range(M)))) + movie_mapper = dict(zip(np.unique(df["movieid"]), list(range(N)))) + + user_index = [user_mapper[i] for i in df['userid']] + item_index = [movie_mapper[i] for i in df['movieid']] + + X = csr_matrix((df["rating"], (user_index, item_index)), shape=(M, N)) + + return X + +def train_model(df, k=10): + """Trains the KNN model on the training data.""" + X = create_X(df) + + X = X.T # Transpose to have users in rows + + kNN = load_model('model_KNN.pkl') + + model = kNN.fit(X) + + return model + +def save_model(model, filepath: str) -> None: + """Sauvegarde le modèle entraîné dans un fichier.""" + directory = os.path.join(filepath, 'model_KNN.pkl') + with open(directory, 'wb') as file: + pickle.dump(model, file) + print(f'Modèle sauvegardé sous {filepath}/model_KNN.pkl') + +def run_training(**kwargs): + """Main function to train the model.""" + # Démarrer un nouveau run dans MLflow + with mlflow.start_run(run_name=run_name) as run: + # Load data + ratings = fetch_latest_ratings() + # Train KNN model + model_knn = train_model(ratings) + save_model(model_knn, '/opt/airflow/models/') + + # Enregistrer les métriques dans MLflow pour suivi ultérieur + mlflow.log_param("n_neighbors", 11) + mlflow.log_param("algorithm", "brute") + mlflow.log_param("metric", "cosine") + +# Define Airflow DAG +my_dag = DAG( + dag_id='KNN_train_model', + description='KNN Model for Movie Recommendation', + tags=['antoine'], + schedule_interval='@daily', + default_args={ + 'owner': 'airflow', + 'start_date': datetime(2024, 11, 22), + } +) + +# Create a task to train the model and evaluate its performance +train_task = PythonOperator( + task_id='train_model', + python_callable=run_training, + dag=my_dag, +) + +train_task_kube = KubernetesPodOperator( + task_id="train_model_kube", + name="train_model_kube", + namespace="airflow", + image="airflow-mlflow:latest", + cmds=["python", "predict_knn_model.py"], + get_logs=True, + env_vars={ + 'POSTGRES_USER': POSTGRES_USER, + 'POSTGRES_PASSWORD': POSTGRES_PASSWORD, + 'POSTGRES_DB': POSTGRES_DB, + 'POSTGRES_HOST': POSTGRES_HOST, + 'POSTGRES_PORT': POSTGRES_PORT, + }, + dag=my_dag +) + +# Define dependencies between tasks +train_task >> train_task_kube \ No newline at end of file diff --git a/data/dags_airflow/predict_surprise_SVD.py b/data/dags_airflow/predict_surprise_SVD.py new file mode 100755 index 0000000..b9b08f2 --- /dev/null +++ b/data/dags_airflow/predict_surprise_SVD.py @@ -0,0 +1,175 @@ +import numpy as np +import pandas as pd +from airflow import DAG +from airflow.operators.python_operator import PythonOperator +import os +from surprise import Dataset, Reader +from surprise.prediction_algorithms.matrix_factorization import SVD +from surprise.model_selection import train_test_split +from surprise import accuracy +import mlflow +import pickle +from datetime import datetime +import psycopg2 +from dotenv import load_dotenv +from contextlib import contextmanager + +# Charger les variables d'environnement à partir du fichier .env +load_dotenv() + + +POSTGRES_USER= os.getenv('POSTGRES_USER') +POSTGRES_PASSWORD= os.getenv('POSTGRES_PASSWORD') +POSTGRES_DB= os.getenv('POSTGRES_DB') +POSTGRES_HOST= os.getenv('POSTGRES_HOST') +POSTGRES_PORT= os.getenv('POSTGRES_PORT') + +@contextmanager # Ajout du décorateur contextmanager +def get_db_connection(): + """ + Gestionnaire de contexte pour la connexion à la base de données. + Ouvre une connexion et la ferme automatiquement après utilisation. + + Utilisation: + with get_db_connection() as conn: + with conn.cursor() as cur: + cur.execute("SELECT * FROM table") + """ + conn = None + try: + conn = psycopg2.connect( + database=POSTGRES_DB, + host=POSTGRES_HOST, + user=POSTGRES_USER, + password=POSTGRES_PASSWORD, + port=POSTGRES_PORT + ) + print("Connection à la base de données OK") + yield conn + except psycopg2.Error as e: + print(f"Erreur lors de la connexion à la base de données: {e}") + raise + finally: + if conn is not None: + conn.close() + print("Connexion à la base de données fermée") + +# Configuration de MLflow +mlflow.set_tracking_uri("http://mlflow_webserver:5000") +EXPERIMENT_NAME = "Movie_Recommendation_Experiment" +time = datetime.now() +run_name = f"{time}_Modèle SVD" + +def load_model(pkl_files, directory = "/opt/airflow/models") : + """Charge le modèle à partir d'un répertoire.""" + # Vérifier si le répertoire existe + if not os.path.exists(directory): + raise FileNotFoundError(f"Le répertoire {directory} n'existe pas.") + # Charger le modèle + filepath = os.path.join(directory, pkl_files) + with open(filepath, 'rb') as file: + model = pickle.load(file) + print(f'Modèle chargé depuis {filepath}') + return model + +def fetch_latest_ratings() -> pd.DataFrame: + """Récupère 25 % des derniers enregistrements de la table ratings et les transforme en DataFrame.""" + query = """ + SELECT userId, movieId, rating + FROM ratings + ORDER BY id DESC + LIMIT (SELECT COUNT(*) FROM ratings) * 0.25 + """ + try: + with get_db_connection() as conn: + df = pd.read_sql_query(query, conn) + print("Derniers enregistrements récupérés") + return df + except Exception as e: + print(f"Erreur lors de la récupération des enregistrements: {e}") + raise + +def train_model() -> tuple: + """Entraîne le modèle de recommandation sur les données fournies et retourne le modèle et son RMSE.""" + # Démarrer un nouveau run dans MLflow + with mlflow.start_run(run_name=run_name) as run: + # Charger les données d'évaluation des films + ratings = fetch_latest_ratings() + + # Préparer les données pour Surprise + reader = Reader(rating_scale=(0.5, 5)) + data = Dataset.load_from_df(ratings[['userid', 'movieid', 'rating']], reader=reader) + + # Diviser les données en ensembles d'entraînement et de test + trainset, testset = train_test_split(data, test_size=0.15) + + # Créer et entraîner le modèle SVD + model = load_model('model_SVD.pkl') + + model.fit(trainset) + + # Tester le modèle sur l'ensemble de test et calculer RMSE + predictions = model.test(testset) + acc = accuracy.rmse(predictions) + # Arrondir à 2 chiffres après la virgule + acc_rounded = round(acc, 2) + + print("Valeur de l'écart quadratique moyen (RMSE) :", acc_rounded) + + # Enregistrer les métriques dans MLflow pour suivi ultérieur + mlflow.log_param("n_factors", 150) + mlflow.log_param("n_epochs", 30) + mlflow.log_param("lr_all", 0.01) + mlflow.log_param("reg_all", 0.05) + # Arrondir à 2 chiffres après la virgule + acc_rounded = round(acc, 2) + + print("Valeur de l'écart quadratique moyen (RMSE) :", acc_rounded) + + # Enregistrer les métriques dans MLflow pour suivi ultérieur + mlflow.log_param("n_factors", 150) + mlflow.log_param("n_epochs", 30) + mlflow.log_param("lr_all", 0.01) + mlflow.log_param("reg_all", 0.05) + + """Récupère le dernier RMSE enregistré dans MLflow.""" + mlflow.set_experiment(EXPERIMENT_NAME) + + # Récupérer les dernières exécutions triées par RMSE décroissant, en prenant la première (meilleure) + runs = mlflow.search_runs(order_by=["metrics.rmse desc"], max_results=1) + print("Chargement des anciens RMSE pour comparaison") + + if not runs.empty: + last_rmse = runs.iloc[0]["metrics.rmse"] + else: + last_rmse = float('inf') # Si aucun run n'est trouvé, retourner une valeur infinie + + print(f"Meilleur RMSE actuel: {last_rmse}, Nouveau RMSE: {acc_rounded}") + + directory = '/opt/airflow/models/model_SVD.pkl' + + with open(directory, 'wb') as file: + pickle.dump(model, file) + print(f'Modèle sauvegardé sous {directory}') + +# Définition du DAG Airflow + +svd_dag = DAG( + dag_id='SVD_train_model', + description='SVD Model for Movie Recommendation', + tags=['antoine'], + schedule_interval='@daily', + default_args={ + 'owner': 'airflow', + 'start_date': datetime(2024, 11, 22), + } +) + +# Tâches du DAG + +train_task = PythonOperator( + task_id='train_model', + python_callable=train_model, + dag=svd_dag, + queue="kubernetes", +) diff --git a/data/dags_airflow/scrapping.py b/data/dags_airflow/scrapping.py new file mode 100755 index 0000000..631ab0c --- /dev/null +++ b/data/dags_airflow/scrapping.py @@ -0,0 +1,260 @@ +from airflow import DAG +from airflow.operators.python import PythonOperator +from datetime import datetime +import requests +from bs4 import BeautifulSoup as bs +import pandas as pd +import re +import os +from dotenv import load_dotenv +import psycopg2 as psycopg2_binary +from contextlib import contextmanager +import logging +from airflow.utils.dates import days_ago +from airflow.hooks.base_hook import BaseHook +import time +from airflow.exceptions import AirflowNotFoundException + +# Configurer le logger pour suivre les événements et les erreurs +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Charger les variables d'environnement depuis le fichier .env +load_dotenv() +POSTGRES_USER = os.getenv('POSTGRES_USER') +POSTGRES_PASSWORD = os.getenv('POSTGRES_PASSWORD') +POSTGRES_DB = os.getenv('POSTGRES_DB') +POSTGRES_HOST = os.getenv('POSTGRES_HOST') +POSTGRES_PORT = os.getenv('POSTGRES_PORT') +tmdb_token = os.getenv("TMDB_API_TOKEN") + +@contextmanager +def get_db_connection(): + """Gestionnaire de contexte pour la connexion à la base de données. + Ouvre une connexion et la ferme automatiquement après utilisation. + """ + conn = None + try: + # Établir la connexion à la base de données PostgreSQL + conn = psycopg2_binary.connect( + database=POSTGRES_DB, + host=POSTGRES_HOST, + user=POSTGRES_USER, + password=POSTGRES_PASSWORD, + port=POSTGRES_PORT + ) + logger.info("Connection à la base de données OK") + yield conn # Retourner la connexion pour utilisation dans le bloc 'with' + except psycopg2_binary.Error as e: + logger.error(f"Erreur lors de la connexion à la base de données: {e}") + raise # Lever l'exception si une erreur se produit + finally: + if conn is not None: + conn.close() # Fermer la connexion à la fin du bloc 'with' + logger.info("Connexion à la base de données fermée") + +def scrape_imdb_first_page(task_instance): + """Scrape les données des films depuis IMDb et les renvoie sous forme de listes.""" + start_time = time.time() + headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' + } + + try: + # Récupérer la page des box-offices d'IMDb + page = requests.get("https://www.imdb.com/chart/boxoffice", headers=headers) + page.raise_for_status() # Vérifier que la requête a réussi + + soup = bs(page.content, 'lxml') + + # Extraire les liens et titres des films + links = [a['href'] for a in soup.find_all('a', class_='ipc-title-link-wrapper')] + cleaned_links = [link.split('/')[2].split('?')[0].replace('tt', '') for link in links] + + logger.info("Liens IMDB nettoyés: %s", cleaned_links) + + task_instance.xcom_push(key="cleaned_links", value=cleaned_links) + + except requests.RequestException as e: + logger.error(f"Erreur lors de la récupération de la page IMDb: {e}") + + finally: + end_time = time.time() + duration = end_time - start_time + + logger.info(f"Durée du scraping IMDb: {duration} secondes") + +def genres_request(task_instance): + """Effectue des requêtes à l'API TMDB pour récupérer les informations des films.""" + url = "https://api.themoviedb.org/3/genre/movie/list?language=en" + + headers = { + "accept": "application/json", + "Authorization": f"Bearer {tmdb_token}" + } + + response = requests.get(url, headers=headers) + + if response.status_code == 200: + data = response.json() + genres = {str(genre["id"]): genre["name"] for genre in data["genres"]} + task_instance.xcom_push(key="genres", value=genres) + logger.info("Genres récupérés avec succès: %s", genres) + + +def api_tmdb_request(task_instance): + """Effectue des requêtes à l'API TMDB pour récupérer les informations des films.""" + results = {} + cleaned_links = task_instance.xcom_pull(task_ids="scrape_imdb_task", key="cleaned_links") + genres = task_instance.xcom_pull(task_ids="get_genres_task", key="genres") + logger.info("Liens nettoyés recus via XCom: %s", cleaned_links) + logger.info("Genres recus via XCom: %s", genres) + + for index, movie_id in enumerate(cleaned_links): + url = f"https://api.themoviedb.org/3/find/tt{movie_id}?external_source=imdb_id" + logger.info("Url pour le film index %s: %s", index, url) + headers = { + "accept": "application/json", + "Authorization": f"Bearer {tmdb_token}" + } + + response = requests.get(url, headers=headers) + + if response.status_code == 200: + data = response.json() + logger.info("Données reçues pour le film index %s: %s", index, data) + if data["movie_results"]: + movie_info = data["movie_results"][0] + movie_info = data["movie_results"][0] + release_date = movie_info["release_date"] + release_year = release_date.split("-")[0] # Extraire l'année + + results[str(index)] = { + "tmdb_id": movie_info["id"], + "title": movie_info["title"], + "genre_ids": movie_info['genre_ids'], + "imbd_id": movie_id, + "date": release_date, + "year": release_year, # Ajouter l'année extraite + "genres": [genres[str(genre_id)] for genre_id in movie_info['genre_ids']] + } + + else: + results[str(index)] = {"error": f"Request failed with status code {response.status_code}"} + + task_instance.xcom_push(key="api_results", value=results) + +def insert_data_movies(task_instance): + """Insère les données des films dans la base de données.""" + start_time = time.time() + api_results = task_instance.xcom_pull(task_ids="scrape_movies_infos_task", key="api_results") + try: + with get_db_connection() as conn: + with conn.cursor() as cur: + for index, movie_data in api_results.items(): + count = 0 + # Exécuter la requête pour récupérer la valeur maximale de movieId + cur.execute("SELECT MAX(movieId) FROM movies") + max_movie_id = cur.fetchone()[0] + movieId = max_movie_id + 1 + # Vérifier si une erreur a été retournée pour ce film + if "error" in movie_data: + logger.error(f"Erreur pour le film index {index}: {movie_data['error']}") + continue + + title = movie_data["title"] + genres = movie_data["genres"] + imdb_id = movie_data["imbd_id"] + tmdb_id = movie_data["tmdb_id"] + year = movie_data["year"] + + # Éviter les doublons dans la base de données + cur.execute("SELECT 1 FROM movies WHERE title = %s AND year = %s", (title, year)) + + if cur.fetchone() is None: # Si le film n'existe pas déjà + count += 1 + genres_str = ','.join(genres) # Convertir la liste de genres en chaîne de caractères + # Insertion du film et récupération de l'ID inséré + cur.execute(""" + INSERT INTO movies (movieId, title, genres, year) + VALUES (%s, %s, %s, %s) + """, (movieId, title, genres_str, year)) + + # Insertion du lien avec l'ID du film + cur.execute(""" + INSERT INTO links (movieId, imdbId, tmdbId) + VALUES (%s, %s, %s) + """, (movieId, imdb_id, tmdb_id)) + + logger.info(f"Film inséré: {title} avec ID {movieId}") + else: + logger.info(f"Le film {title} existe déjà dans la base de données.") + + conn.commit() # Valider les changements dans la base de données + + logger.info("Données insérées avec succès dans les tables movies & links.") + + logger.info(f"Nombre de films insérés: {count}") + + except Exception as e: + logger.error(f"Erreur lors de la connexion à la base de données: {e}") + + finally: + end_time = time.time() + duration = end_time - start_time + + logger.info(f"Durée de l'insertion des données: {duration} secondes") + +# Arguments par défaut pour le DAG +default_args = { + 'owner': 'airflow', + 'start_date': datetime(2024, 11, 19), +} + +# Création du DAG principal +dag_scraping_and_inserting_movies = DAG( + dag_id='imdb_scraper_updating_db_dag', + description='Scraping IMDb and updating database', + tags=['antoine'], + default_args=default_args, + schedule_interval='@daily', +) + +# Tâche pour récupérer les genres depuis TMDB +get_genres_task = PythonOperator( + task_id='get_genres_task', + python_callable=genres_request, + dag=dag_scraping_and_inserting_movies, + queue="kubernetes", +) + +# Tâche pour scraper IMDb et récupérer les liens +scrape_imdb_task = PythonOperator( + task_id='scrape_imdb_task', + python_callable=scrape_imdb_first_page, + dag=dag_scraping_and_inserting_movies, + queue="kubernetes", +) + +# Tâche pour récupérer les infos des films depuis TMDB +scrape_infos_task = PythonOperator( + task_id='scrape_movies_infos_task', + python_callable=api_tmdb_request, + op_kwargs={'cleaned_links': '{{ task_instance.xcom_pull(task_ids="scrape_imdb_task") }}', 'genres': '{{ task_instance.xcom_pull(task_ids="get_genres_task") }}'}, + dag=dag_scraping_and_inserting_movies, + queue="kubernetes", +) + +# Tâche pour insérer les données dans la base +insert_data_task = PythonOperator( + task_id='insert_data_movies_task', + python_callable=insert_data_movies, + op_kwargs={ + 'api_results': '{{ task_instance.xcom_pull(task_ids="scrape_movies_infos_task") }}', + }, + dag=dag_scraping_and_inserting_movies, + queue="kubernetes" +) + +# Définir l'ordre d'exécution des tâches dans le DAG +get_genres_task >> scrape_imdb_task >> scrape_infos_task >> insert_data_task \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml old mode 100644 new mode 100755 diff --git a/api/Dockerfile b/docker/fastapi/Dockerfile old mode 100644 new mode 100755 similarity index 100% rename from api/Dockerfile rename to docker/fastapi/Dockerfile diff --git a/api/__init__.py b/docker/fastapi/__init__.py old mode 100644 new mode 100755 similarity index 100% rename from api/__init__.py rename to docker/fastapi/__init__.py diff --git a/api/auth.py b/docker/fastapi/auth.py old mode 100644 new mode 100755 similarity index 100% rename from api/auth.py rename to docker/fastapi/auth.py diff --git a/api/database.py b/docker/fastapi/database.py old mode 100644 new mode 100755 similarity index 100% rename from api/database.py rename to docker/fastapi/database.py diff --git a/api/main.py b/docker/fastapi/main.py old mode 100644 new mode 100755 similarity index 100% rename from api/main.py rename to docker/fastapi/main.py diff --git a/api/predict.py b/docker/fastapi/predict.py old mode 100644 new mode 100755 similarity index 100% rename from api/predict.py rename to docker/fastapi/predict.py diff --git a/api/requirements.txt b/docker/fastapi/requirements.txt old mode 100644 new mode 100755 similarity index 100% rename from api/requirements.txt rename to docker/fastapi/requirements.txt diff --git a/mlflow/Dockerfile b/docker/mlflow/Dockerfile old mode 100644 new mode 100755 similarity index 100% rename from mlflow/Dockerfile rename to docker/mlflow/Dockerfile diff --git a/kubernetes/airflow/order/docker/prod/python_scrapping/Dockerfile b/docker/python_scrapping/Dockerfile similarity index 100% rename from kubernetes/airflow/order/docker/prod/python_scrapping/Dockerfile rename to docker/python_scrapping/Dockerfile diff --git a/kubernetes/airflow/order/docker/prod/python_scrapping/main.py b/docker/python_scrapping/main.py similarity index 100% rename from kubernetes/airflow/order/docker/prod/python_scrapping/main.py rename to docker/python_scrapping/main.py diff --git a/kubernetes/airflow/order/docker/prod/python_load/requirements.txt b/docker/python_scrapping/requirements.txt similarity index 100% rename from kubernetes/airflow/order/docker/prod/python_load/requirements.txt rename to docker/python_scrapping/requirements.txt diff --git a/docker/python_transform/Dockerfile b/docker/python_transform/Dockerfile new file mode 100644 index 0000000..e567246 --- /dev/null +++ b/docker/python_transform/Dockerfile @@ -0,0 +1,21 @@ +FROM python:3.8-slim + +WORKDIR /app + +COPY ./build_features.py ./build_features.py + +COPY ./data_to_db.py ./data_to_db.py + +COPY ./start.sh ./start.sh + +COPY ./requirements.txt ./requirements.txt + +RUN mkdir -p data/to_ingest + +RUN pip install -r requirements.txt + +# Rendre le script start.sh exécutable +RUN chmod +x start.sh + +# Spécifier la commande à exécuter lors du démarrage du conteneur +CMD ["./start.sh"] \ No newline at end of file diff --git a/docker/python_transform/build_features.py b/docker/python_transform/build_features.py new file mode 100644 index 0000000..0077837 --- /dev/null +++ b/docker/python_transform/build_features.py @@ -0,0 +1,229 @@ +import pandas as pd +import os +from passlib.context import CryptContext +import requests + +# Configuration du contexte pour le hachage des mots de passe +bcrypt_context = CryptContext(schemes=['bcrypt'], deprecated='auto') + +def download_and_save_file(url, raw_data_relative_path): + """ + Télécharge les fichiers CSV depuis l'URL donnée et les enregistre dans le chemin spécifié. + + Args: + url (str): L'URL de base pour télécharger les fichiers. + raw_data_relative_path (str): Chemin relatif où les fichiers seront enregistrés. + """ + filenames = ['links.csv', 'movies.csv', 'ratings.csv'] + os.makedirs(raw_data_relative_path, exist_ok=True) # Crée le répertoire si nécessaire + + for filename in filenames: + data_url = os.path.join(url, filename) + try: + response = requests.get(data_url) + response.raise_for_status() # Assure que la requête a réussi + + file_path = os.path.join(raw_data_relative_path, filename) + with open(file_path, 'wb') as file: + file.write(response.content) # Écrit le contenu dans le fichier + print(f"File saved to {file_path}") + + except requests.exceptions.RequestException as e: + print(f"Error downloading {filename}: {e}") + except IOError as e: + print(f"Error saving {filename}: {e}") + +def load_data(raw_data_relative_path): + """ + Charge les données des fichiers CSV dans des DataFrames pandas. + + Args: + raw_data_relative_path (str): Chemin vers le répertoire contenant les fichiers CSV. + + Returns: + tuple: DataFrames pour les évaluations, les films et les liens. + """ + try: + df_ratings = pd.read_csv(f'{raw_data_relative_path}/ratings.csv') + df_movies = pd.read_csv(f'{raw_data_relative_path}/movies.csv') + df_links = pd.read_csv(f'{raw_data_relative_path}/links.csv') + print(f'Ratings, movies and links loaded from {raw_data_relative_path} directory') + return df_ratings, df_movies, df_links + except FileNotFoundError as e: + print(f"File not found: {e}") + except pd.errors.EmptyDataError as e: + print(f"No data: {e}") + except Exception as e: + print(f"An error occurred while loading data: {e}") + +def bayesienne_mean(df, M, C): + """ + Calcule la moyenne bayésienne des notes d'un film. + + Args: + df (pd.Series): La série de notes du film. + M (float): La moyenne brute des notes des films. + C (float): La moyenne de la quantité de notes. + + Returns: + float: La moyenne bayésienne calculée. + """ + moy_ba = (C * M + df.sum()) / (C + df.count()) + return moy_ba + +def preprocessing_ratings(df_ratings) -> pd.DataFrame: + """ + Applique la moyenne bayésienne sur les évaluations des films. + + Args: + df_ratings (pd.DataFrame): DataFrame contenant les évaluations. + + Returns: + pd.DataFrame: DataFrame contenant les évaluations traitées avec la moyenne bayésienne. + """ + + # Statistiques par film : quantité et moyenne des notes + movies_stats = df_ratings.groupby('movieId').agg({'rating': ['count', 'mean']}) + + # Renommer les colonnes + movies_stats.columns = ['count', 'mean'] + + # Calculer les moyennes nécessaires pour la moyenne bayésienne + C = movies_stats['count'].mean() # Moyenne de la quantité de notes + M = movies_stats['mean'].mean() # Moyenne brute des notes + + # Calculer la moyenne bayésienne par film + movies_stats['bayesian_mean'] = movies_stats.apply( + lambda x: bayesienne_mean(df_ratings[df_ratings['movieId'] == x.name]['rating'], M, C), axis=1) + + # Ajouter la colonne bayesian_mean au DataFrame original + df_ratings = df_ratings.merge(movies_stats[['bayesian_mean']], on='movieId', how='left') + + print("Application de la moyenne bayésienne sur la colonne rating effectuée") + + return df_ratings + +def preprocessing_movies(df_movies) -> pd.DataFrame: + """ + Traite le fichier CSV des films et extrait les informations nécessaires. + + Args: + df_movies (pd.DataFrame): DataFrame contenant les films. + + Returns: + pd.DataFrame: DataFrame contenant les films traités. + """ + + print("Création d'une colonne year et passage des genres en liste de genres") + + # Séparer les genres sur les pipes et les joindre par des virgules + df_movies['genres'] = df_movies['genres'].apply(lambda x: ', '.join(x.split("|"))) + + # Extraction de l'année et mise à jour du titre + df_movies['year'] = df_movies['title'].str.extract(r'\((\d{4})\)')[0] + + # Nettoyer le titre en retirant l'année + df_movies['title'] = df_movies['title'].str.replace(r' \(\d{4}\)', '', regex=True) + + # Remplir les valeurs manquantes avec la méthode forward fill + df_movies.ffill(inplace=True) + + return df_movies + +def preprocessing_links(df_links) -> pd.DataFrame: + """ + Modifie le type de tmdbId dans le dataset des liens. + + Args: + df_links (pd.DataFrame): DataFrame contenant les liens. + + Returns: + pd.DataFrame: DataFrame contenant les liens traités. + + """ + + print('Modification du type de la colonne tmdbId en int') + # Remplacer les valeurs manquantes par 0 et convertir en entier + df_links['tmdbId'] = df_links.tmdbId.fillna(0).astype(int) + + return df_links + +def create_users() -> pd.DataFrame: + """ + Crée un DataFrame d'utilisateurs fictifs avec mots de passe hachés. + + Returns: + pd.DataFrame: DataFrame contenant les utilisateurs créés. + """ + + print("Création des utilisateurs _ fichier users.csv") + username = [] + email = [] + password = [] + + for i in range(1, 501): + username.append('user'+str(i)) + email.append('user'+str(i)+'@example.com') + password.append('password'+str(i)) + + hached_password = [bcrypt_context.hash(i) for i in password] + + # Créer un DataFrame + df_users = pd.DataFrame({'username': username, 'email': email, 'hached_password': hached_password}) + + return df_users + +def save_data(df_ratings, df_movies, df_links, df_users, data_directory): + """ + Enregistre les DataFrames traités dans des fichiers CSV. + + Args: + df_ratings (pd.DataFrame): DataFrame contenant les évaluations traitées. + df_movies (pd.DataFrame): DataFrame contenant les films traités. + df_links (pd.DataFrame): DataFrame contenant les liens traités. + df_users (pd.DataFrame): DataFrame contenant les utilisateurs créés. + data_directory (str): Répertoire où enregistrer les fichiers CSV. + """ + + os.makedirs(data_directory, exist_ok=True) # Crée le répertoire si nécessaire + + try: + # Enregistrement des fichiers CSV + df_ratings.to_csv(f'{data_directory}/processed_ratings.csv', index=False) + df_movies.to_csv(f'{data_directory}/processed_movies.csv', index=False) + df_links.to_csv(f'{data_directory}/processed_links.csv', index=False) + df_users.to_csv(f'{data_directory}/users.csv', index=False) + + print(f'Processed ratings, movies, links and users loaded in {data_directory}') + + except IOError as e: + print(f"Error saving files: {e}") + +if __name__ == "__main__": + raw_data_relative_path="app/data/to_ingest/bronze" + bucket_folder_url="https://mlops-project-db.s3.eu-west-1.amazonaws.com/movie_recommandation/" + + data_directory = "app/data/to_ingest/silver" + + download_and_save_file(url=bucket_folder_url, raw_data_relative_path=raw_data_relative_path) + + # Chargement des données à partir du chemin spécifié + try: + df_ratings, df_movies, df_links = load_data(raw_data_relative_path) + + # Prétraitement des données + if not any(df is None for df in [df_ratings, df_movies, df_links]): + df_ratings = preprocessing_ratings(df_ratings) + df_movies = preprocessing_movies(df_movies) + df_links = preprocessing_links(df_links) + + # Création d'utilisateurs fictifs + df_users = create_users() + + # Sauvegarde des données traitées dans le répertoire spécifié + save_data(df_ratings, df_movies, df_links, df_users, data_directory) + else: + print("Une ou plusieurs DataFrames n'ont pas pu être chargées.") + + except Exception as e: + print(f"An error occurred in the main execution flow: {e}") \ No newline at end of file diff --git a/docker/python_transform/data_to_db.py b/docker/python_transform/data_to_db.py new file mode 100644 index 0000000..f92c815 --- /dev/null +++ b/docker/python_transform/data_to_db.py @@ -0,0 +1,118 @@ +import os +import psycopg2 +import pandas as pd +from sqlalchemy import create_engine, table, column +from sqlalchemy.dialects.postgresql import insert + +# Définition des tables SQLAlchemy pour les opérations d'upsert +table_movies = table('movies', + column('movieId'), + column('title'), + column('genres'), + column('year') +) + +table_ratings = table('ratings', + column('id'), + column('userId'), + column('movieId'), + column('rating'), + column('timestamp'), + column('bayesian_mean') +) + +table_links = table('links', + column('id'), + column('movieId'), + column('imdbId'), + column('tmdbId') +) + +table_users = table('users', + column('userId'), + column('username'), + column('email'), + column('hached_password') +) + +def load_config(): + """Charge la configuration de la base de données à partir des variables d'environnement.""" + return { + 'host': os.getenv('AIRFLOW_POSTGRESQL_SERVICE_HOST'), + 'database': os.getenv('DATABASE'), + 'user': os.getenv('USER'), + 'password': os.getenv('PASSWORD') + } + +def connect(config): + """Connecte au serveur PostgreSQL et retourne la connexion.""" + try: + conn = psycopg2.connect(**config) + print('Connected to the PostgreSQL server.') + return conn + except (psycopg2.DatabaseError, Exception) as error: + print(f"Connection error: {error}") + return None + +def execute_query_psql(query, config): + """Exécute une requête SQL pour insérer des données dans une table.""" + conn_string = f'postgresql://{config["user"]}:{config["password"]}@{config["host"]}/{config["database"]}' + + try: + with create_engine(conn_string).begin() as conn: + res = conn.execute(query) + return res.rowcount # Retourne le nombre de lignes affectées par la requête + except Exception as e: + print(f"Error executing query: {e}") + return 0 + +def upsert_to_psql(table, df): + """Insère ou met à jour des enregistrements dans une table. + + Args: + table: La table SQLAlchemy cible. + df (pd.DataFrame): DataFrame contenant les données à insérer ou mettre à jour. + + """ + + # Préparation des données pour l'upsert + dict_data = df.where(pd.notnull(df), None).to_dict(orient='records') + + # Création de l'instruction d'insertion avec gestion des conflits + insert_stmt = insert(table).values(dict_data) + + do_update_stmt = insert_stmt.on_conflict_do_update( + index_elements=[table.c.id], # Remplacez par la clé primaire appropriée + set_={col.name: insert_stmt.excluded[col.name] for col in table.columns} + ) + + rowcount = execute_query_psql(do_update_stmt, config) + + if rowcount > 0: + print(f'{rowcount} rows have been inserted or updated in {table.name}') + else: + print(f'No rows were inserted or updated in {table.name}') + +if __name__ == '__main__': + data_directory = 'app/data' + + config = load_config() + + # Chargement et traitement des fichiers CSV par morceaux + for filename, table in [ + (f'{data_directory}/to_ingest/silver/processed_ratings.csv', table_ratings), + (f'{data_directory}/to_ingest/silver/processed_movies.csv', table_movies), + (f'{data_directory}/to_ingest/silver/processed_links.csv', table_links), + (f'{data_directory}/to_ingest/silver/users.csv', table_users) + ]: + try: + for chunk in pd.read_csv(filename, chunksize=1000): # Lire par morceaux de 1000 lignes + upsert_to_psql(table, chunk) + print(f"Finished processing {filename}") + + except FileNotFoundError as e: + print(f"File not found: {e}") + except pd.errors.EmptyDataError as e: + print(f"No data in file: {e}") + except Exception as e: + print(f"An error occurred while processing {filename}: {e}") \ No newline at end of file diff --git a/kubernetes/airflow/order/docker/prod/python_scrapping/requirements.txt b/docker/python_transform/requirements.txt similarity index 53% rename from kubernetes/airflow/order/docker/prod/python_scrapping/requirements.txt rename to docker/python_transform/requirements.txt index c8044ea..606b86b 100644 --- a/kubernetes/airflow/order/docker/prod/python_scrapping/requirements.txt +++ b/docker/python_transform/requirements.txt @@ -1,10 +1,8 @@ -greenlet==3.0.3 numpy==1.24.4 pandas==1.5.3 +bcrypt==3.2.0 +passlib psycopg2==2.9.9 python-dateutil==2.9.0.post0 -pytz==2024.1 -six==1.16.0 SQLAlchemy==2.0.30 -typing_extensions==4.11.0 -bcrypt==3.2.0 \ No newline at end of file +typing_extensions==4.11.0 \ No newline at end of file diff --git a/docker/python_transform/start.sh b/docker/python_transform/start.sh new file mode 100644 index 0000000..4827390 --- /dev/null +++ b/docker/python_transform/start.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +# Exécute le premier script +python3 build_features.py + +# Vérifie si le premier script a réussi +if [ $? -eq 0 ]; then + # Exécute le deuxième script + python3 data_to_db.py +else + echo "Erreur lors de l'exécution de build_features.py" + exit 1 +fi \ No newline at end of file diff --git a/kubernetes/airflow/airflow-local-dags-folder-pv.yml b/kubernetes/airflow/airflow-local-dags-folder-pv.yml deleted file mode 100644 index bf8b6a0..0000000 --- a/kubernetes/airflow/airflow-local-dags-folder-pv.yml +++ /dev/null @@ -1,15 +0,0 @@ -apiVersion: v1 -kind: PersistentVolume -metadata: - name: airflow-local-dags-folder -spec: - storageClassName: local-path - capacity: - storage: 2Gi - accessModes: - - ReadWriteOnce - claimRef: - namespace: airflow - name: airflow-local-dags-folder - hostPath: - path: "/home/antoine/jul24_cmlops_reco_film/kubernetes/airflow/dags" diff --git a/kubernetes/airflow/airflow-local-dags-folder-pvc.yml b/kubernetes/airflow/airflow-local-dags-folder-pvc.yml deleted file mode 100644 index f147b3b..0000000 --- a/kubernetes/airflow/airflow-local-dags-folder-pvc.yml +++ /dev/null @@ -1,12 +0,0 @@ -apiVersion: v1 -kind: PersistentVolumeClaim -metadata: - name: airflow-local-dags-folder - namespace: airflow -spec: - storageClassName: local-path - accessModes: - - ReadWriteOnce - resources: - requests: - storage: 2Gi diff --git a/kubernetes/airflow/airflow-local-logs-folder-pv.yml b/kubernetes/airflow/airflow-local-logs-folder-pv.yml deleted file mode 100644 index ad652d7..0000000 --- a/kubernetes/airflow/airflow-local-logs-folder-pv.yml +++ /dev/null @@ -1,15 +0,0 @@ -apiVersion: v1 -kind: PersistentVolume -metadata: - name: airflow-local-logs-folder -spec: - storageClassName: local-path - capacity: - storage: 2Gi - accessModes: - - ReadWriteOnce - claimRef: - namespace: airflow - name: airflow-local-logs-folder - hostPath: - path: "/home/antoine/jul24_cmlops_reco_film/kubernetes/airflow/logs" diff --git a/kubernetes/airflow/airflow-local-logs-folder-pvc.yml b/kubernetes/airflow/airflow-local-logs-folder-pvc.yml deleted file mode 100644 index 0b04fec..0000000 --- a/kubernetes/airflow/airflow-local-logs-folder-pvc.yml +++ /dev/null @@ -1,12 +0,0 @@ -# Persistent Volume Claims -apiVersion: v1 -kind: PersistentVolumeClaim -metadata: - name: airflow-local-logs-folder - namespace: airflow -spec: - accessModes: - - ReadWriteOnce - resources: - requests: - storage: 5Gi diff --git a/kubernetes/airflow/dags/init_order.py b/kubernetes/airflow/dags/init_order.py old mode 100644 new mode 100755 index ba26aeb..ff31918 --- a/kubernetes/airflow/dags/init_order.py +++ b/kubernetes/airflow/dags/init_order.py @@ -57,9 +57,39 @@ def create_conn(**kwargs): ) create_tables = PostgresOperator( - task_id='create_tables', - postgres_conn_id='postgres', - sql='sql/init.sql' - ) + task_id='create_tables', + postgres_conn_id='postgres', + sql=""" + CREATE TABLE IF NOT EXISTS movies ( + movieId SERIAL PRIMARY KEY, + title VARCHAR(200) NOT NULL, + genres TEXT, + year INT + ); + + CREATE TABLE IF NOT EXISTS ratings ( + id SERIAL PRIMARY KEY, + userId INT, + movieId INT REFERENCES movies(movieId), + rating FLOAT NOT NULL, + timestamp INT, + bayesian_mean FLOAT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS links ( + id SERIAL PRIMARY KEY, + movieId INT REFERENCES movies(movieId), + imdbId INT, + tmdbId INT + ); + + CREATE TABLE IF NOT EXISTS users ( + userId SERIAL PRIMARY KEY, + username VARCHAR(50) NOT NULL, + email VARCHAR(100) NOT NULL UNIQUE, + hached_password VARCHAR(300) NOT NULL + ); + """ +) create_postgres_conn >> create_tables diff --git a/kubernetes/airflow/dags/load_order.py b/kubernetes/airflow/dags/load_order.py deleted file mode 100644 index 249b9be..0000000 --- a/kubernetes/airflow/dags/load_order.py +++ /dev/null @@ -1,65 +0,0 @@ -from airflow import DAG -from airflow.utils.dates import days_ago -from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator -from airflow.kubernetes.secret import Secret -from kubernetes.client import models as k8s - -secret_database = Secret( - deploy_type="env", - deploy_target="DATABASE", - secret="sql-conn" -) - -secret_user = Secret( - deploy_type="env", - deploy_target="USER", - secret="sql-conn" -) - -secret_password = Secret( - deploy_type="env", - deploy_target="PASSWORD", - secret="sql-conn" -) - -volume = k8s.V1Volume( - name="order-data-folder", - persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource( - claim_name="order-data-folder" - ) -) - -volume_mount = k8s.V1VolumeMount( - name="order-data-folder", - mount_path="/app/data/to_ingest" -) - -with DAG( - dag_id='load_order', - tags=['order', 'antoine'], - default_args={ - 'owner': 'airflow', - 'start_date': days_ago(0, minute=1), - }, - schedule_interval='0 17 * * *', - catchup=False -) as dag: - - python_transform = KubernetesPodOperator( - task_id="python_transform", - image="chokosk8/order-python-transform", - cmds=["python3", "main.py"], - volumes=[volume], - volume_mounts=[volume_mount] - ) - - python_load = KubernetesPodOperator( - task_id="python_load", - image="chokosk8/order-python-load", - cmds=["python3", "main.py"], - secrets=[secret_database, secret_user, secret_password], - volumes=[volume], - volume_mounts=[volume_mount] - ) - -python_transform >> python_load \ No newline at end of file diff --git a/kubernetes/airflow/dags/load_transform_save_db.py b/kubernetes/airflow/dags/load_transform_save_db.py new file mode 100755 index 0000000..336a514 --- /dev/null +++ b/kubernetes/airflow/dags/load_transform_save_db.py @@ -0,0 +1,46 @@ +from airflow import DAG +from airflow.utils.dates import days_ago +from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator +from airflow.kubernetes.secret import Secret +from kubernetes.client import models as k8s + +secret_database = Secret( + deploy_type="env", + deploy_target="DATABASE", + secret="sql-conn", + namespace="airflow", +) + +secret_user = Secret( + deploy_type="env", + deploy_target="USER", + secret="sql-conn", + namespace="airflow", +) + +secret_password = Secret( + deploy_type="env", + deploy_target="PASSWORD", + secret="sql-conn", + namespace="airflow", +) + +with DAG( + dag_id='load_transform_save_db', + tags=['antoine'], + default_args={ + 'owner': 'airflow', + 'start_date': days_ago(0, minute=1), + }, + schedule_interval=None, # Pas de planification automatique, + catchup=False +) as dag: + + python_transform = KubernetesPodOperator( + task_id="python_transform", + image="antoinepela/projet_reco_movies:order-python-transform-latest", + cmds=["/bin/bash", "-c", "/app/start.sh"], + namespace= "airflow", + ) + +python_transform \ No newline at end of file diff --git a/kubernetes/airflow/dags/predict_knn_model.py b/kubernetes/airflow/dags/predict_knn_model.py old mode 100644 new mode 100755 diff --git a/kubernetes/airflow/dags/predict_surprise_SVD.py b/kubernetes/airflow/dags/predict_surprise_SVD.py old mode 100644 new mode 100755 diff --git a/kubernetes/airflow/dags/scrapping.py b/kubernetes/airflow/dags/scrapping.py old mode 100644 new mode 100755 diff --git a/kubernetes/airflow/dags/sql/init.sql b/kubernetes/airflow/dags/sql/init.sql deleted file mode 100644 index 4445edd..0000000 --- a/kubernetes/airflow/dags/sql/init.sql +++ /dev/null @@ -1,43 +0,0 @@ - - -CREATE TABLE IF NOT EXISTS movies ( - movieId SERIAL PRIMARY KEY, - title VARCHAR(200) NOT NULL, - genres TEXT, - year INT -); - --- -- Charger les données à partir du fichier CSV --- COPY movies(movieId, title, genres, year) FROM '/docker-entrypoint-initdb.d/processed_movies.csv' DELIMITER ',' CSV HEADER; - -CREATE TABLE IF NOT EXISTS ratings ( - id SERIAL PRIMARY KEY, - userId INT, - movieId INT REFERENCES movies(movieId), - rating FLOAT NOT NULL, - timestamp INT, - bayesian_mean FLOAT NOT NULL -); - --- -- Charger les données à partir du fichier CSV --- COPY ratings(userId, movieId, rating, timestamp, bayesian_mean) FROM '/docker-entrypoint-initdb.d/processed_ratings.csv' DELIMITER ',' CSV HEADER; - -CREATE TABLE IF NOT EXISTS links ( - id SERIAL PRIMARY KEY, - movieId INT REFERENCES movies(movieId), - imdbId INT, - tmdbId INT -); - --- -- Charger les données à partir du fichier CSV --- COPY links(movieId, imdbId, tmdbId) FROM '/docker-entrypoint-initdb.d/processed_links.csv' DELIMITER ',' CSV HEADER; - -CREATE TABLE IF NOT EXISTS users ( - userId SERIAL PRIMARY KEY, - username VARCHAR(50) NOT NULL, - email VARCHAR(100) NOT NULL UNIQUE, - hached_password VARCHAR(300) NOT NULL -); - --- -- Charger les données à partir du fichier CSV --- COPY users(username, email, hached_password) FROM '/docker-entrypoint-initdb.d/users.csv' DELIMITER ',' CSV HEADER; diff --git a/kubernetes/airflow/logs/scheduler/latest b/kubernetes/airflow/logs/scheduler/latest deleted file mode 120000 index 2c4787c..0000000 --- a/kubernetes/airflow/logs/scheduler/latest +++ /dev/null @@ -1 +0,0 @@ -2024-11-25 \ No newline at end of file diff --git a/kubernetes/airflow/my_values.yml b/kubernetes/airflow/my_values.yml old mode 100644 new mode 100755 index d5cae3b..3969d5e --- a/kubernetes/airflow/my_values.yml +++ b/kubernetes/airflow/my_values.yml @@ -14,6 +14,8 @@ gid: 1000 scheduler: env: - name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL - value: "30" + value: "10" executor: CeleryKubernetesExecutor + +webserverSecretKey: 370562eacc440d7d9a9a1ad86eef576c diff --git a/kubernetes/airflow/order/docker/prod/python_load/Dockerfile b/kubernetes/airflow/order/docker/prod/python_load/Dockerfile deleted file mode 100644 index 08b78ca..0000000 --- a/kubernetes/airflow/order/docker/prod/python_load/Dockerfile +++ /dev/null @@ -1,13 +0,0 @@ -FROM python:3.8-slim - -RUN apt-get update -y && apt-get install -y libpq-dev gcc - -WORKDIR /app - -RUN mkdir -p data/to_ingest/silver - -COPY ./main.py ./main.py - -COPY ./requirements.txt ./requirements.txt - -RUN pip install -r requirements.txt diff --git a/kubernetes/airflow/order/docker/prod/python_load/main.py b/kubernetes/airflow/order/docker/prod/python_load/main.py deleted file mode 100644 index a373265..0000000 --- a/kubernetes/airflow/order/docker/prod/python_load/main.py +++ /dev/null @@ -1,160 +0,0 @@ -import os -import psycopg2 -import pandas as pd -from sqlalchemy import create_engine, table, column -from sqlalchemy.dialects.postgresql import insert -from datetime import datetime -import json - -# Définition des tables SQLAlchemy pour les opérations d'upsert -table_movies = table('movies', - column('movieId'), - column('title'), - column('genres'), - column('year') -) - -table_ratings = table('ratings', - column('id'), - column('userId'), - column('movieId'), - column('rating'), - column('timestamp'), - column('bayesian_mean') -) - -table_links = table('links', - column('id'), - column('movieId'), - column('imdbId'), - column('tmdbId') -) - -table_users = table('users', - column('userId'), - column('username'), - column('email'), - column('hached_password') -) - -def load_config(): - """Charge la configuration de la base de données à partir des variables d'environnement.""" - config = {} - config['host'] = os.getenv('AIRFLOW_POSTGRESQL_SERVICE_HOST') - config['database'] = os.getenv('DATABASE') - config['user'] = os.getenv('USER') - config['password'] = os.getenv('PASSWORD') - return config - -def connect(config): - """Connecte au serveur PostgreSQL.""" - try: - with psycopg2.connect(**config) as conn: - print('Connected to the PostgreSQL server.') - except (psycopg2.DatabaseError, Exception) as error: - print(error) - -def execute_query_psql(query, config): - """Exécute une requête SQL pour insérer des données dans une table.""" - conn_string = 'postgresql://' + config['user'] + ':' + config['password'] + '@' + config['host'] + '/' + config['database'] - try: - db = create_engine(conn_string) - with db.begin() as conn: - res = conn.execute(query) - return res.rowcount # Retourne le nombre de lignes affectées par la requête - except (Exception, psycopg2.DatabaseError) as error: - print(error) - -def upsert_movies_to_psql(table_movies, df_movies): - """Insère ou met à jour des enregistrements dans la table movies.""" - dict_customer = [{k: v if pd.notnull(v) else None for k, v in m.items()} for m in df_movies.to_dict(orient='records')] - - # Création de l'instruction d'insertion avec gestion des conflits - insert_stmt = insert(table_movies).values(dict_customer) - - do_update_stmt = insert_stmt.on_conflict_do_update( - index_elements=[table_movies.c.movieId], - set_={ - 'movieId': insert_stmt.excluded.movieId, - 'title': insert_stmt.excluded.title, - 'genres': insert_stmt.excluded.genres, - 'year': insert_stmt.excluded.year, - } - ) - - rowcount = execute_query_psql(do_update_stmt, config) - print(f'{rowcount} movies rows has been inserted or updated') - -def upsert_ratings_to_psql(table_ratings, df_ratings): - """Insère ou met à jour des enregistrements dans la table ratings.""" - dict_product = [{k: v if pd.notnull(v) else None for k, v in m.items()} for m in df_ratings.to_dict(orient='records')] - - insert_stmt = insert(table_ratings).values(dict_product) - - do_update_stmt = insert_stmt.on_conflict_do_update( - index_elements=[table_ratings.c.id], - set_={ - 'id': insert_stmt.excluded.id, - 'userId': insert_stmt.excluded.userId, - 'movieId': insert_stmt.excluded.movieId, - 'rating': insert_stmt.excluded.rating, - 'timestamp': insert_stmt.excluded.timestamp, - 'bayesian_mean': insert_stmt.excluded.bayesian_mean - } - ) - - rowcount = execute_query_psql(do_update_stmt, config) - print(f'{rowcount} ratings rows has been inserted or updated') - -def upsert_links_to_psql(table_links, df_links): - """Insère ou met à jour des enregistrements dans la table links.""" - dict_product = [{k: v if pd.notnull(v) else None for k, v in m.items()} for m in df_links.to_dict(orient='records')] - - insert_stmt = insert(table_links).values(dict_product) - - do_update_stmt = insert_stmt.on_conflict_do_update( - index_elements=[table_links.c.id], # Correction ici : utiliser table_links au lieu de table_ratings - set_={ - 'id': insert_stmt.excluded.id, - 'movieId': insert_stmt.excluded.movieId, - 'imdbId': insert_stmt.excluded.imdbId, - 'tmdbId': insert_stmt.excluded.tmdbId - } - ) - - rowcount = execute_query_psql(do_update_stmt, config) - print(f'{rowcount} links rows has been inserted or updated') - -def upsert_users_to_psql(table_users, df_users): - """Insère ou met à jour des enregistrements dans la table users.""" - dict_product = [{k: v if pd.notnull(v) else None for k, v in m.items()} for m in df_users.to_dict(orient='records')] - - insert_stmt = insert(table_users).values(dict_product) - - do_update_stmt = insert_stmt.on_conflict_do_update( - index_elements=[table_users.c.userId], - set_={ - 'userId': insert_stmt.excluded.userId, - 'username': insert_stmt.excluded.username, # Correction ici : utiliser username au lieu de movieId - 'email': insert_stmt.excluded.email, - 'hached_password': insert_stmt.excluded.hached_password # Correction ici : utiliser hached_password au lieu de tmdbId - } - ) - - rowcount = execute_query_psql(do_update_stmt, config) - print(f'{rowcount} users rows has been inserted or updated') - - -if __name__ == '__main__': - data_directory = 'data' - config = load_config() - print(os.environ) - print(config) - df_ratings = pd.read_csv(f'{data_directory}/to_ingest/silver/processed_ratings.csv') - upsert_ratings_to_psql(table_ratings, df_ratings) - df_movies = pd.read_csv(f'{data_directory}/to_ingest/silver/processed_movies.csv') - upsert_movies_to_psql(table_movies, df_movies) - df_links = pd.read_csv(f'{data_directory}/to_ingest/silver/processed_links.csv') - upsert_links_to_psql(table_links, df_links) - df_users = pd.read_csv(f'{data_directory}/to_ingest/silver/users.csv') - upsert_users_to_psql(table_users, df_users) \ No newline at end of file diff --git a/kubernetes/airflow/order/docker/prod/python_transform/Dockerfile b/kubernetes/airflow/order/docker/prod/python_transform/Dockerfile deleted file mode 100644 index 70fe6b3..0000000 --- a/kubernetes/airflow/order/docker/prod/python_transform/Dockerfile +++ /dev/null @@ -1,11 +0,0 @@ -FROM python:3.8-slim - -WORKDIR /app - -COPY ./build_features.py ./build_features.py - -COPY ./requirements.txt ./requirements.txt - -RUN mkdir -p data/to_ingest - -RUN pip install -r requirements.txt diff --git a/kubernetes/airflow/order/docker/prod/python_transform/build_features.py b/kubernetes/airflow/order/docker/prod/python_transform/build_features.py deleted file mode 100644 index c48630c..0000000 --- a/kubernetes/airflow/order/docker/prod/python_transform/build_features.py +++ /dev/null @@ -1,137 +0,0 @@ -import pandas as pd -import os -from passlib.context import CryptContext - -bcrypt_context = CryptContext(schemes=['bcrypt'], deprecated='auto') - -def load_data(data_directory): - df_ratings = pd.read_csv(f'{data_directory}/to_ingest/bronze/ratings.csv') - df_movies = pd.read_csv(f'{data_directory}/to_ingest/bronze/movies.csv') - df_links = pd.read_csv(f'{data_directory}/to_ingest/bronze/links.csv') - print(f'ratings, movies and links loaded from {data_directory}/to_ingest/bronze directory') - return df_ratings, df_movies, df_links - -def bayesienne_mean(df, M, C): - """ - Calcule la moyenne bayésienne des notes d'un film. - - Args: - df (pd.Series): La série de notes du film. - M (float): La moyenne brute des notes des films. - C (float): La moyenne de la quantité de notes. - - Returns: - float: La moyenne bayésienne calculée. - """ - moy_ba = (C * M + df.sum()) / (C + df.count()) - return moy_ba - -def preprocessing_ratings(df_ratings) -> pd.DataFrame: - """ - Lecture du fichier CSV des évaluations et application de la moyenne bayésienne. - - Args: - ratings_file (str): Chemin vers le fichier CSV contenant les évaluations. - - Returns: - pd.DataFrame: DataFrame contenant les évaluations traitées. - """ - - # Statistiques par film : quantité et moyenne des notes - movies_stats = df_ratings.groupby('movieId').agg({'rating': ['count', 'mean']}) - movies_stats.columns = ['count', 'mean'] - - # Calculer les moyennes nécessaires pour la moyenne bayésienne - C = movies_stats['count'].mean() # Moyenne de la quantité de notes - M = movies_stats['mean'].mean() # Moyenne brute des notes - - # Calculer la moyenne bayésienne par film - movies_stats['bayesian_mean'] = movies_stats.apply( - lambda x: bayesienne_mean(df_ratings[df_ratings['movieId'] == x.name]['rating'], M, C), axis=1) - - # Ajouter la colonne bayesian_mean au DataFrame original - df_ratings = df_ratings.merge(movies_stats[['bayesian_mean']], on='movieId', how='left') - - print("Application de la moyenne bayésienne sur la colonne rating effectuée") - - return df_ratings - -def preprocessing_movies(df_movies) -> pd.DataFrame: - """ - Lecture du fichier CSV des films et traitement des données. - - Args: - movies_file (str): Chemin vers le fichier CSV contenant les films. - - Returns: - pd.DataFrame: DataFrame contenant les films traités. - """ - - # Traitement des genres et extraction de l'année - print("Création d'une colonne year et passage des genres en liste de genres") - - # Séparer les genres sur les pipes - # Séparer les genres sur les pipes et les joindre par des virgules - df_movies['genres'] = df_movies['genres'].apply(lambda x: ', '.join(x.split("|"))) - - # Extraction de l'année et mise à jour du titre - df_movies['year'] = df_movies['title'].str.extract(r'\((\d{4})\)')[0] - - # Nettoyer le titre en retirant l'année - df_movies['title'] = df_movies['title'].str.replace(r' \(\d{4}\)', '', regex=True) - - # Remplir les valeurs manquantes avec la méthode forward fill - df_movies.ffill(inplace=True) - - return df_movies - -def preprocessing_links(df_links) -> pd.DataFrame: - """ - Chargement du dataset des liens et modification du type de tmdbId. - - Args: - links_file (str): Chemin vers le fichier CSV contenant les liens. - - Returns: - pd.DataFrame: DataFrame contenant les liens traités. - """ - - print('Modification du type de la colonne tmdbId en int') - # Remplacer les valeurs manquantes par 0 et convertir en entier - df_links['tmdbId'] = df_links.tmdbId.fillna(0).astype(int) - - return df_links - -def create_users(): - print("Création des utilisateurs _ fichier users.csv") - username = [] - email = [] - password = [] - - for i in range(1, 501): - username.append('user'+str(i)) - email.append('user'+str(i)+'@example.com') - password.append('password'+str(i)) - - hached_password = [bcrypt_context.hash(i) for i in password] - - # Créer un DataFrame - df_users = pd.DataFrame({'username': username, 'email': email, 'hached_password': hached_password}) - - return df_users - -def save_data(df_ratings, df_movies, df_links, df_users, data_directory): - df_ratings.to_csv(f'{data_directory}/to_ingest/silver/processed_ratings.csv', index=False) - df_movies.to_csv(f'{data_directory}/to_ingest/silver/processed_movies.csv', index=False) - df_links.to_csv(f'{data_directory}/to_ingest/silver/processed_links.csv', index=False) - df_users.to_csv(f'{data_directory}/to_ingest/silver/users.csv', index=False) - print(f'processed_ratings, processed_movies, processed_links and users loaded in {data_directory}/to_ingest/silver directory') - -if __name__ == "__main__": - data_directory = 'data' - df_ratings, df_movies, df_links = load_data(data_directory) - preprocessing_ratings(df_ratings) - preprocessing_movies(df_movies) - preprocessing_links(df_links) - df_users = create_users() - save_data(df_ratings, df_movies, df_links, df_users, data_directory) \ No newline at end of file diff --git a/kubernetes/airflow/order/docker/prod/python_transform/requirements.txt b/kubernetes/airflow/order/docker/prod/python_transform/requirements.txt deleted file mode 100644 index 029ea4a..0000000 --- a/kubernetes/airflow/order/docker/prod/python_transform/requirements.txt +++ /dev/null @@ -1,4 +0,0 @@ -numpy==1.24.4 -pandas==1.5.3 -bcrypt==3.2.0 -passlib \ No newline at end of file diff --git a/kubernetes/airflow/order/order-data-folder-pv.yaml b/kubernetes/airflow/order/order-data-folder-pv.yaml deleted file mode 100644 index 7350026..0000000 --- a/kubernetes/airflow/order/order-data-folder-pv.yaml +++ /dev/null @@ -1,14 +0,0 @@ -apiVersion: v1 -kind: PersistentVolume -metadata: - name: order-data-folder - labels: - type: local -spec: - storageClassName: local-path - capacity: - storage: 2Gi - accessModes: - - ReadWriteOnce - hostPath: - path: "/home/antoine/jul24_cmlops_reco_film/kubernetes/airflow/order/data/to_ingest" diff --git a/kubernetes/airflow/order/order-data-folder-pvc.yaml b/kubernetes/airflow/order/order-data-folder-pvc.yaml deleted file mode 100644 index 02d4f67..0000000 --- a/kubernetes/airflow/order/order-data-folder-pvc.yaml +++ /dev/null @@ -1,12 +0,0 @@ -apiVersion: v1 -kind: PersistentVolumeClaim -metadata: - name: order-data-folder - namespace: airflow -spec: - storageClassName: local-path - accessModes: - - ReadWriteOnce - resources: - requests: - storage: 2Gi diff --git a/kubernetes/airflow/order/python-load-job.yaml b/kubernetes/airflow/order/python-load-job.yaml deleted file mode 100644 index 9b914ee..0000000 --- a/kubernetes/airflow/order/python-load-job.yaml +++ /dev/null @@ -1,22 +0,0 @@ -apiVersion: batch/v1 -kind: Job -metadata: - name: python-load -spec: - template: - spec: - containers: - - name: python-load - image: antoinepela/projet_reco_movies:order-python-build-latest - command: ["python", "main.py"] - envFrom: - - secretRef: - name: sql-conn - volumeMounts: - - name: order-data-folder - mountPath: /app/data/to_ingest - restartPolicy: Never - volumes: - - name: order-data-folder - hostPath: - path: /home/antoine/jul24_cmlops_reco_film/kubernetes/airflow/order/data/to_ingest diff --git a/kubernetes/airflow/order/python-transform-job.yaml b/kubernetes/airflow/order/python-transform-job.yaml deleted file mode 100644 index b3d45b0..0000000 --- a/kubernetes/airflow/order/python-transform-job.yaml +++ /dev/null @@ -1,19 +0,0 @@ -apiVersion: batch/v1 -kind: Job -metadata: - name: python-transform -spec: - template: - spec: - containers: - - name: python-transform - image: antoinepela/projet_reco_movies:order-python-transform-latest - command: ["python", "build_features.py"] - volumeMounts: - - name: order-data-folder - mountPath: /app/data/to_ingest - restartPolicy: Never - volumes: - - name: order-data-folder - persistentVolumeClaim: - claimName: order-data-folder diff --git a/kubernetes/configmaps/configmaps.yml b/kubernetes/configmaps/configmaps.yml old mode 100644 new mode 100755 diff --git a/kubernetes/deployments/fastapi-deployment.yml b/kubernetes/deployments/fastapi-deployment.yml old mode 100644 new mode 100755 diff --git a/kubernetes/deployments/grafana-deployment.yml b/kubernetes/deployments/grafana-deployment.yml old mode 100644 new mode 100755 diff --git a/kubernetes/deployments/minio-deployment.yml b/kubernetes/deployments/minio-deployment.yml old mode 100644 new mode 100755 diff --git a/kubernetes/deployments/mlflow-deployment.yml b/kubernetes/deployments/mlflow-deployment.yml old mode 100644 new mode 100755 diff --git a/kubernetes/deployments/node-exporter-deployment.yml b/kubernetes/deployments/node-exporter-deployment.yml old mode 100644 new mode 100755 diff --git a/kubernetes/deployments/pgadmin-deployment.yml b/kubernetes/deployments/pgadmin-deployment.yml old mode 100644 new mode 100755 diff --git a/kubernetes/deployments/postgres-api-deployment.yml b/kubernetes/deployments/postgres-api-deployment.yml old mode 100644 new mode 100755 diff --git a/kubernetes/deployments/postgres-exporter-deployment.yml b/kubernetes/deployments/postgres-exporter-deployment.yml old mode 100644 new mode 100755 diff --git a/kubernetes/deployments/postgres-mlflow-deployment.yml b/kubernetes/deployments/postgres-mlflow-deployment.yml old mode 100644 new mode 100755 diff --git a/kubernetes/deployments/prometheus-deployment.yml b/kubernetes/deployments/prometheus-deployment.yml old mode 100644 new mode 100755 diff --git a/kubernetes/deployments/streamlit-deployment.yml b/kubernetes/deployments/streamlit-deployment.yml old mode 100644 new mode 100755 diff --git a/kubernetes/ingress/ingress.yml b/kubernetes/ingress/ingress.yml old mode 100644 new mode 100755 diff --git a/kubernetes/namespace/namespace.yml b/kubernetes/namespace/namespace.yml old mode 100644 new mode 100755 diff --git a/kubernetes/persistent-volumes/airflow-local-dags-folder.yml b/kubernetes/persistent-volumes/airflow-local-dags-folder.yml new file mode 100755 index 0000000..f1d49f0 --- /dev/null +++ b/kubernetes/persistent-volumes/airflow-local-dags-folder.yml @@ -0,0 +1,29 @@ +apiVersion: v1 +kind: PersistentVolume +metadata: + name: airflow-local-dags-folder +spec: + capacity: + storage: 5Gi + accessModes: + - ReadWriteOnce + storageClassName: standard + claimRef: + namespace: airflow + name: airflow-local-dags-folder + hostPath: + path: /data/dags_airflow + +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: airflow-local-dags-folder + namespace: airflow +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 5Gi + storageClassName: standard diff --git a/kubernetes/persistent-volumes/airflow-local-logs-folder.yml b/kubernetes/persistent-volumes/airflow-local-logs-folder.yml new file mode 100755 index 0000000..6de16a4 --- /dev/null +++ b/kubernetes/persistent-volumes/airflow-local-logs-folder.yml @@ -0,0 +1,30 @@ +apiVersion: v1 +kind: PersistentVolume +metadata: + name: airflow-local-logs-folder +spec: + capacity: + storage: 2Gi # Capacité du PV + accessModes: + - ReadWriteOnce # Mode d'accès + storageClassName: standard + claimRef: + namespace: airflow # Namespace pour le PVC qui utilise ce PV + name: airflow-local-logs-folder + hostPath: + path: /data/logs_airflow + +--- +# Persistent Volume Claims +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: airflow-local-logs-folder + namespace: airflow # Namespace où le PVC sera créé +spec: + accessModes: + - ReadWriteOnce # Mode d'accès requis pour le PVC + storageClassName: standard + resources: + requests: + storage: 2Gi # Capacité demandée par le PVC diff --git a/kubernetes/persistent-volumes/fastapi-persistent-volume.yml b/kubernetes/persistent-volumes/fastapi-persistent-volume.yml old mode 100644 new mode 100755 diff --git a/kubernetes/persistent-volumes/grafana-persistent-volume.yml b/kubernetes/persistent-volumes/grafana-persistent-volume.yml old mode 100644 new mode 100755 diff --git a/kubernetes/persistent-volumes/minio-persistent-volumes.yml b/kubernetes/persistent-volumes/minio-persistent-volumes.yml old mode 100644 new mode 100755 diff --git a/kubernetes/persistent-volumes/pgadmin-persistent-volume.yml b/kubernetes/persistent-volumes/pgadmin-persistent-volume.yml old mode 100644 new mode 100755 diff --git a/kubernetes/persistent-volumes/postgres-api-persistent-volumes.yml b/kubernetes/persistent-volumes/postgres-api-persistent-volumes.yml old mode 100644 new mode 100755 diff --git a/kubernetes/persistent-volumes/prometheus-persistent-volume.yml b/kubernetes/persistent-volumes/prometheus-persistent-volume.yml old mode 100644 new mode 100755 diff --git a/kubernetes/secrets/airflow-secrets.yaml b/kubernetes/secrets/airflow-secrets.yaml old mode 100644 new mode 100755 index be1d66a..e510055 --- a/kubernetes/secrets/airflow-secrets.yaml +++ b/kubernetes/secrets/airflow-secrets.yaml @@ -2,6 +2,7 @@ apiVersion: v1 kind: Secret metadata: name: sql-conn + namespace: airflow type: Opaque data: DATABASE: cG9zdGdyZXM= # base64 encoded diff --git a/kubernetes/secrets/secrets.yml b/kubernetes/secrets/secrets.yml old mode 100644 new mode 100755 diff --git a/kubernetes/services/services.yml b/kubernetes/services/services.yml old mode 100644 new mode 100755 diff --git a/kubernetes/storageclass/storageclass.yaml b/kubernetes/storageclass/storageclass.yaml new file mode 100644 index 0000000..be90ae1 --- /dev/null +++ b/kubernetes/storageclass/storageclass.yaml @@ -0,0 +1,7 @@ +apiVersion: storage.k8s.io/v1 +kind: StorageClass +metadata: + name: local-path +provisioner: rancher.io/local-path +volumeBindingMode: WaitForFirstConsumer +reclaimPolicy: Delete diff --git a/ml/notebooks/.gitkeep b/ml/notebooks/.gitkeep old mode 100644 new mode 100755 diff --git a/ml/notebooks/test.ipynb b/ml/notebooks/test.ipynb old mode 100644 new mode 100755 diff --git a/ml/notebooks/test_Antoine.ipynb b/ml/notebooks/test_Antoine.ipynb old mode 100644 new mode 100755 diff --git a/ml/notebooks/test_mikhael.ipynb b/ml/notebooks/test_mikhael.ipynb old mode 100644 new mode 100755 diff --git a/ml/src/__init__.py b/ml/src/__init__.py old mode 100644 new mode 100755 diff --git a/ml/src/config b/ml/src/config old mode 100644 new mode 100755 diff --git a/ml/src/data/.gitkeep b/ml/src/data/.gitkeep old mode 100644 new mode 100755 diff --git a/ml/src/data/__init__.py b/ml/src/data/__init__.py old mode 100644 new mode 100755 diff --git a/ml/src/data/check_structure.py b/ml/src/data/check_structure.py old mode 100644 new mode 100755 diff --git a/ml/src/data/import_raw_data.py b/ml/src/data/import_raw_data.py old mode 100644 new mode 100755 diff --git a/ml/src/features/.gitkeep b/ml/src/features/.gitkeep old mode 100644 new mode 100755 diff --git a/ml/src/features/__init__.py b/ml/src/features/__init__.py old mode 100644 new mode 100755 diff --git a/ml/src/features/build_features.py b/ml/src/features/build_features.py old mode 100644 new mode 100755 diff --git a/ml/src/models/.gitkeep b/ml/src/models/.gitkeep old mode 100644 new mode 100755 diff --git a/ml/src/models/__init__.py b/ml/src/models/__init__.py old mode 100644 new mode 100755 diff --git a/ml/src/models/train_model.py b/ml/src/models/train_model.py old mode 100644 new mode 100755 diff --git a/ml/src/requirements.txt b/ml/src/requirements.txt old mode 100644 new mode 100755 diff --git a/ml/src/visualization/.gitkeep b/ml/src/visualization/.gitkeep old mode 100644 new mode 100755 diff --git a/ml/src/visualization/__init__.py b/ml/src/visualization/__init__.py old mode 100644 new mode 100755 diff --git a/ml/src/visualization/visualize.py b/ml/src/visualization/visualize.py old mode 100644 new mode 100755 diff --git a/notebooks/1_exploration_data.ipynb b/notebooks/1_exploration_data.ipynb old mode 100644 new mode 100755 diff --git a/notebooks/2_recommandation_KNN.ipynb b/notebooks/2_recommandation_KNN.ipynb old mode 100644 new mode 100755 diff --git a/notebooks/3_test_surprise_models_cross_validation.ipynb b/notebooks/3_test_surprise_models_cross_validation.ipynb old mode 100644 new mode 100755 diff --git a/notebooks/4_Gridsearch_SVD.ipynb b/notebooks/4_Gridsearch_SVD.ipynb old mode 100644 new mode 100755 diff --git a/notebooks/5_train_svd_model.ipynb b/notebooks/5_train_svd_model.ipynb old mode 100644 new mode 100755 diff --git a/notebooks/6_import_data_mongodb.ipynb b/notebooks/6_import_data_mongodb.ipynb old mode 100644 new mode 100755 diff --git a/notebooks/7_scrapping_cover.ipynb b/notebooks/7_scrapping_cover.ipynb old mode 100644 new mode 100755 diff --git a/notebooks/test_Antoine.ipynb b/notebooks/test_Antoine.ipynb old mode 100644 new mode 100755 diff --git a/notebooks/test_predict.ipynb b/notebooks/test_predict.ipynb old mode 100644 new mode 100755 diff --git a/postgres/.env.example b/postgres/.env.example old mode 100644 new mode 100755 diff --git a/postgres/docker-compose.yml b/postgres/docker-compose.yml old mode 100644 new mode 100755 diff --git a/postgres/init.sql b/postgres/init.sql old mode 100644 new mode 100755 diff --git a/prometheus/prometheus.yml b/prometheus/prometheus.yml old mode 100644 new mode 100755 diff --git a/reco_movies_project.jpeg b/reco_movies_project.jpeg old mode 100644 new mode 100755 diff --git a/requirements-dev.txt b/requirements-dev.txt old mode 100644 new mode 100755 diff --git a/requirements-ref.txt b/requirements-ref.txt old mode 100644 new mode 100755 diff --git a/streamlit/Dockerfile b/streamlit/Dockerfile old mode 100644 new mode 100755 diff --git a/streamlit/app/.streamlit/config.toml b/streamlit/app/.streamlit/config.toml old mode 100644 new mode 100755 diff --git a/streamlit/app/__init__.py b/streamlit/app/__init__.py old mode 100644 new mode 100755 diff --git a/streamlit/app/app.py b/streamlit/app/app.py old mode 100644 new mode 100755 diff --git a/streamlit/app/images/datascientest.png b/streamlit/app/images/datascientest.png old mode 100644 new mode 100755 diff --git a/streamlit/app/images/netflix-catalogue.jpg b/streamlit/app/images/netflix-catalogue.jpg old mode 100644 new mode 100755 diff --git a/streamlit/app/images/video.png b/streamlit/app/images/video.png old mode 100644 new mode 100755 diff --git a/streamlit/app/pages/1_Contexte & Objectifs.py b/streamlit/app/pages/1_Contexte & Objectifs.py old mode 100644 new mode 100755 diff --git "a/streamlit/app/pages/2_Choix_Mod\303\250le.py" "b/streamlit/app/pages/2_Choix_Mod\303\250le.py" old mode 100644 new mode 100755 diff --git a/streamlit/app/pages/3_Gestion BDD.py b/streamlit/app/pages/3_Gestion BDD.py old mode 100644 new mode 100755 diff --git a/streamlit/app/pages/4_Authentification.py b/streamlit/app/pages/4_Authentification.py old mode 100644 new mode 100755 diff --git a/streamlit/app/pages/5_Application.py b/streamlit/app/pages/5_Application.py old mode 100644 new mode 100755 diff --git a/streamlit/app/pages/6_Monitoring.py b/streamlit/app/pages/6_Monitoring.py old mode 100644 new mode 100755 diff --git a/streamlit/app/style.css b/streamlit/app/style.css old mode 100644 new mode 100755 diff --git a/streamlit/app/utils.py b/streamlit/app/utils.py old mode 100644 new mode 100755 diff --git a/streamlit/requirements.txt b/streamlit/requirements.txt old mode 100644 new mode 100755 diff --git a/tests/Dags_airflow/test_knn_model.py b/tests/Dags_airflow/test_knn_model.py old mode 100644 new mode 100755 diff --git a/tests/Dags_airflow/test_scrapping.py b/tests/Dags_airflow/test_scrapping.py old mode 100644 new mode 100755 diff --git a/tests/Dags_airflow/test_surprise_svd.py b/tests/Dags_airflow/test_surprise_svd.py old mode 100644 new mode 100755 diff --git a/tests/__init__.py b/tests/__init__.py old mode 100644 new mode 100755 diff --git a/tests/requirements-test.txt b/tests/requirements-test.txt old mode 100644 new mode 100755 diff --git a/tests/test_api_auth.py b/tests/test_api_auth.py old mode 100644 new mode 100755 diff --git a/tests/test_api_predict.py b/tests/test_api_predict.py old mode 100644 new mode 100755 diff --git a/tests/test_build_features.py b/tests/test_build_features.py old mode 100644 new mode 100755 index 73647a0..bd65016 --- a/tests/test_build_features.py +++ b/tests/test_build_features.py @@ -1,81 +1,42 @@ - -import pytest +import unittest import pandas as pd -import numpy as np +import importlib.util +import sys import os -from ml.src.features.build_features import ( - bayesienne_mean, - preprocessing_ratings, - preprocessing_movies, - preprocessing_links -) - -@pytest.fixture -def sample_ratings_df(): - return pd.DataFrame({ - 'userId': [1, 1, 2, 2], - 'movieId': [1, 2, 1, 2], - 'rating': [4.0, 3.0, 5.0, 4.0], - 'timestamp': [1000000, 1000001, 1000002, 1000003] - }) - -@pytest.fixture -def sample_movies_df(): - return pd.DataFrame({ - 'movieId': [1, 2], - 'title': ['Movie 1 (2020)', 'Movie 2 (2019)'], - 'genres': ['Action|Adventure', 'Comedy|Drama'] - }) - -@pytest.fixture -def sample_links_df(): - return pd.DataFrame({ - 'movieId': [1, 2], - 'imdbId': [111, 222], - 'tmdbId': [123.0, np.nan] - }) -def test_bayesienne_mean(): - series = pd.Series([4.0, 3.0, 5.0]) - M = 4.0 # moyenne globale - C = 3.0 # nombre moyen de votes - result = bayesienne_mean(series, M, C) - assert isinstance(result, float) - assert 3.0 <= result <= 5.0 +# Définir le chemin du fichier à tester +module_file_path = '/home/antoine/jul24_cmlops_reco_film/kubernetes/airflow/order/docker/prod/python_transform/build_features.py' -def test_preprocessing_ratings(sample_ratings_df, tmp_path): - # Créer un fichier temporaire pour les tests - temp_file = tmp_path / "ratings.csv" - sample_ratings_df.to_csv(temp_file, index=False) +# Charger le module +spec = importlib.util.spec_from_file_location("build_features", module_file_path) +build_features = importlib.util.module_from_spec(spec) +sys.modules["build_features"] = build_features +spec.loader.exec_module(build_features) - # Tester la fonction - result = preprocessing_ratings(str(temp_file)) - assert isinstance(result, pd.DataFrame) - assert 'bayesian_mean' in result.columns - assert len(result) == len(sample_ratings_df) -def test_preprocessing_movies(sample_movies_df, tmp_path): - # Créer un fichier temporaire pour les tests - temp_file = tmp_path / "movies.csv" - sample_movies_df.to_csv(temp_file, index=False) +class TestDataProcessing(unittest.TestCase): - # Tester la fonction - result = preprocessing_movies(str(temp_file)) + def setUp(self): + self.raw_data_relative_path = "app/data/to_ingest/bronze" + self.data_directory = "app/data/to_ingest/silver" - assert isinstance(result, pd.DataFrame) - assert 'year' in result.columns - assert result['genres'].iloc[0] == 'Action, Adventure' - assert result['year'].iloc[0] == '2020' + def test_download_and_save_file(self): + url = "https://mlops-project-db.s3.eu-west-1.amazonaws.com/movie_recommandation/" + build_features.download_and_save_file(url, self.raw_data_relative_path) + self.assertTrue(os.path.exists(os.path.join(self.raw_data_relative_path, 'ratings.csv'))) + self.assertTrue(os.path.exists(os.path.join(self.raw_data_relative_path, 'movies.csv'))) + self.assertTrue(os.path.exists(os.path.join(self.raw_data_relative_path, 'links.csv'))) -def test_preprocessing_links(sample_links_df, tmp_path): - # Créer un fichier temporaire pour les tests - temp_file = tmp_path / "links.csv" - sample_links_df.to_csv(temp_file, index=False) + def test_load_data(self): + dfs = build_features.load_data(self.raw_data_relative_path) + self.assertIsInstance(dfs[0], pd.DataFrame) # Vérifie que c'est un DataFrame + self.assertIsInstance(dfs[1], pd.DataFrame) # Vérifie que c'est un DataFrame + self.assertIsInstance(dfs[2], pd.DataFrame) # Vérifie que c'est un DataFrame - # Tester la fonction - result = preprocessing_links(str(temp_file)) + def test_create_users(self): + users_df = build_features.create_users() + self.assertEqual(len(users_df), 500) # Vérifie que 500 utilisateurs sont créés - assert isinstance(result, pd.DataFrame) - assert result['tmdbId'].dtype == 'int64' - assert result['tmdbId'].iloc[1] == 0 # Vérifier que la valeur NaN a été remplacée par 0 \ No newline at end of file +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/tests/test_data_loader.py b/tests/test_data_loader.py new file mode 100644 index 0000000..7625aed --- /dev/null +++ b/tests/test_data_loader.py @@ -0,0 +1,48 @@ +import unittest +import pandas as pd +import os +import importlib.util +import sys + + + +# Définir le chemin du fichier à tester +module_file_path = '/home/antoine/jul24_cmlops_reco_film/kubernetes/airflow/order/docker/prod/python_transform/data_loader.py' + +# Charger le module +spec = importlib.util.spec_from_file_location("data_loader", module_file_path) +data_loader = importlib.util.module_from_spec(spec) +sys.modules["data_loader"] = data_loader +spec.loader.exec_module(data_loader) + + +class TestDataProcessing(unittest.TestCase): + + def setUp(self): + self.config = data_loader.load_config() # Chargez votre configuration ici + + def test_load_config(self): + self.assertIsNotNone(self.config['host']) + self.assertIsNotNone(self.config['database']) + self.assertIsNotNone(self.config['user']) + self.assertIsNotNone(self.config['password']) + + def test_execute_query(self): + query = "SELECT 1;" # Exemple simple qui devrait toujours réussir + result = data_loader.execute_query_psql(query, self.config) + self.assertEqual(result, 1) # + + def test_upsert_to_psql(self): + # Créez un DataFrame d'exemple pour tester l'upsert + df_test = pd.DataFrame({ + 'id': [1], + 'userId': [1], + 'movieId': [1], + 'rating': [5], + 'timestamp': [1234567890], + 'bayesian_mean': [4.5] + }) + data_loader.upsert_to_psql(data_loader.table_ratings, df_test) # + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/tests/test_db.py b/tests/test_db.py old mode 100644 new mode 100755 diff --git a/tests/test_db_creation.sql b/tests/test_db_creation.sql old mode 100644 new mode 100755 diff --git a/tests/test_streamlit_app.py b/tests/test_streamlit_app.py old mode 100644 new mode 100755 diff --git a/tests/test_train_model.py b/tests/test_train_model.py old mode 100644 new mode 100755