Skip to content

Commit

Permalink
🚧 update kubernetes
Browse files Browse the repository at this point in the history
  • Loading branch information
AntoinePELAMOURGUES committed Dec 3, 2024
1 parent f3a530c commit a6c4310
Show file tree
Hide file tree
Showing 28 changed files with 344 additions and 347 deletions.
14 changes: 12 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
NAMESPACE1 = reco-movies
NAMESPACE2 = airflow

.PHONY: help setup1 setup2 start stop down restart logs-supabase logs-airflow logs-api logs-fastapi clean network all namespace pv secrets configmaps deployments services ingress clean-kube-reco clean-kube-airflow apply-configmap start-minikube start-airflow pv-airflow reco
.PHONY: help setup1 setup2 start stop down restart logs-supabase logs-airflow logs-api logs-fastapi clean network all namespace pv secrets configmaps deployments services ingress clean-kube-reco clean-kube-airflow apply-configmap start-minikube start-airflow pv-airflow reco start-mlflow

# Help command to list all available targets
help:
Expand Down Expand Up @@ -126,11 +126,21 @@ install-helm:
start-airflow:
sudo apt-get update
helm repo add apache-airflow https://airflow.apache.org
helm upgrade --install airflow apache-airflow/airflow --namespace airflow --create-namespace -f kubernetes/airflow/my_values.yml
helm upgrade --install airflow apache-airflow/airflow --namespace airflow --create-namespace -f kubernetes/airflow/my_airflow_values.yml
kubectl apply -f kubernetes/storageclass/storageclass.yaml -n airflow
kubectl apply -f kubernetes/persistent-volumes/airflow-local-dags-folder.yml -n airflow
kubectl apply -f kubernetes/persistent-volumes/airflow-local-logs-folder.yml -n airflow
kubectl apply -f kubernetes/persistent-volumes/mlfow_storage.yml
kubectl apply -f kubernetes/secrets/airflow-secrets.yaml -n airflow
kubectl apply -f kubernetes/configmaps/airflow_configmaps.yml -n airflow
kubectl apply -f kubernetes/deployments/pgadmin-deployment.yml -n airflow
kubectl apply -f kubernetes/services/pgadmin_service.yml -n airflow

start-mlflow:
helm repo add bitnami https://charts.bitnami.com/bitnami
helm repo update
helm install mlf-ts bitnami/mlflow --namespace mlflow --create-namespace
kubectl apply -f kubernetes/services/mlflow_service.yml -n mlflow


delete-pv-airflow:
Expand Down
7 changes: 0 additions & 7 deletions docker/mlflow/Dockerfile

This file was deleted.

File renamed without changes.
21 changes: 21 additions & 0 deletions docker/python_train_svd_model.py/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Utiliser une image de base Python
FROM python:3.8-slim

# Installer les dépendances système nécessaires
RUN apt-get update && apt-get install -y \
build-essential \
libpq-dev \
&& rm -rf /var/lib/apt/lists/*

# Définir le répertoire de travail
WORKDIR /app

# Copier les fichiers requis
COPY ./train_models.py ./train_models.py
COPY ./requirements.txt ./requirements.txt

# Installer les dépendances Python
RUN pip install --no-cache-dir -r requirements.txt

# Commande par défaut pour exécuter le script
CMD ["python3", "train_models.py"]
6 changes: 6 additions & 0 deletions docker/python_train_svd_model.py/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pandas==2.0.3
scikit-surprise==1.1.4
psycopg2-binary==2.9.10
mlflow==2.17.2
scikit-learn==1.3.2
numpy==1.24.4
173 changes: 173 additions & 0 deletions docker/python_train_svd_model.py/train_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
import os
import pandas as pd
from surprise import Dataset, Reader
from surprise.prediction_algorithms.matrix_factorization import SVD
from surprise.model_selection import train_test_split
from surprise import accuracy
import pickle
from datetime import datetime
from scipy.sparse import csr_matrix
from sklearn.neighbors import NearestNeighbors
import numpy as np
import psycopg2
import mlflow



def load_config():
"""Charge la configuration de la base de données à partir des variables d'environnement."""
return {
'host': os.getenv('AIRFLOW_POSTGRESQL_SERVICE_HOST'),
'database': os.getenv('DATABASE'),
'user': os.getenv('USER'),
'password': os.getenv('PASSWORD')
}

def connect(config):
"""Connecte au serveur PostgreSQL et retourne la connexion."""
try:
conn = psycopg2.connect(**config)
print('Connected to the PostgreSQL server.')
return conn
except (psycopg2.DatabaseError, Exception) as error:
print(f"Connection error: {error}")
return None


def fetch_ratings(table):
"""Récupère les données de la table ratings et retourne un DataFrame."""
config = load_config()
conn = connect(config)

if conn is not None:
try:
# Exécutez une requête SQL pour récupérer les données de la table ratings
query = f"SELECT * FROM {table};"
df = pd.read_sql_query(query, conn)
print("Data fetched successfully.")
return df
except Exception as e:
print(f"Error fetching data: {e}")
return None
finally:
conn.close() # Assurez-vous de fermer la connexion
else:
print("Failed to connect to the database.")
return None


def train_SVD_model(df) -> tuple:
"""Entraîne un modèle SVD de recommandation et sauvegarde le modèle.
Args:
df (pd.DataFrame): DataFrame contenant les colonnes userId, movieId et rating.
"""
# Démarrer une nouvelle expérience MLflow
mlflow.start_run()

start_time = datetime.now() # Démarrer la mesure du temps

# Préparer les données pour Surprise
reader = Reader(rating_scale=(0.5, 5))
data = Dataset.load_from_df(df[['userId', 'movieId', 'rating']], reader=reader)

# Diviser les données en ensembles d'entraînement et de test
trainset, testset = train_test_split(data, test_size=0.25)

# Créer et entraîner le modèle SVD
model = SVD(n_factors=150, n_epochs=30, lr_all=0.01, reg_all=0.05)
model.fit(trainset)

# Tester le modèle sur l'ensemble de test et calculer RMSE
predictions = model.test(testset)
acc = accuracy.rmse(predictions)

# Arrondir à 2 chiffres après la virgule
acc_rounded = round(acc, 2)

print("Valeur de l'écart quadratique moyen (RMSE) :", acc_rounded)

# Enregistrer les métriques dans MLflow
mlflow.log_metric("RMSE", acc_rounded)

# Enregistrer le modèle avec MLflow
mlflow.sklearn.log_model(model, "model_SVD")

end_time = datetime.now()

duration = end_time - start_time
print(f'Durée de l\'entraînement : {duration}')

# Finir l'exécution de l'expérience MLflow
mlflow.end_run()



def create_X(df):
"""Crée une matrice creuse et les dictionnaires de correspondance.
Args:
df (pd.DataFrame): DataFrame avec colonnes userId, movieId, rating.
Returns:
tuple: (matrice_creuse, user_mapper, movie_mapper, user_inv_mapper, movie_inv_mapper)
"""
M = df['userId'].nunique()
N = df['movieId'].nunique()

user_mapper = dict(zip(np.unique(df["userId"]), list(range(M))))
movie_mapper = dict(zip(np.unique(df["movieId"]), list(range(N))))

user_inv_mapper = dict(zip(list(range(M)), np.unique(df["userId"])))
movie_inv_mapper = dict(zip(list(range(N)), np.unique(df["movieId"])))

user_index = [user_mapper[i] for i in df['userId']]
item_index = [movie_mapper[i] for i in df['movieId']]

X = csr_matrix((df["rating"], (user_index,item_index)), shape=(M,N))

return X, user_mapper, movie_mapper, user_inv_mapper, movie_inv_mapper


def train_matrix_model(df, k = 10, metric='cosine'):
"""Entraîne et sauvegarde un modèle KNN basé sur une matrice creuse.
Args:
df (pd.DataFrame): DataFrame avec les données d'évaluation.
k (int): Nombre de voisins à considérer.
metric (str): Métrique de distance pour KNN.
"""
# Démarrer une nouvelle expérience MLflow
mlflow.start_run()
# Démarrer la mesure du temps
start_time = datetime.now()
X, user_mapper, movie_mapper, user_inv_mapper, movie_inv_mapper = create_X(df)
# Transposer la matrice X pour que les films soient en lignes et les utilisateurs en colonnes
X = X.T
# Initialiser NearestNeighbors avec k+1 car nous voulons inclure le film lui-même dans les voisins
kNN = NearestNeighbors(n_neighbors=k + 1, algorithm="brute", metric=metric)

kNN.fit(X)

end_time = datetime.now()

duration = end_time - start_time
print(f'Durée de l\'entraînement : {duration}')

# Enregistrer les informations du modèle dans MLflow (par exemple la durée d'entraînement)
mlflow.log_param("k_neighbors", k)
mlflow.log_param("metric", metric)
mlflow.log_param("training_duration", duration.total_seconds())
# Enregistrer le modèle avec MLflow
mlflow.sklearn.log_model(kNN, "model_KNN")

mlflow.end_run() # Finir l'exécution de l'expérience MLflow

if __name__ == "__main__":
# Définir l'URL du serveur MLflow
mlflow.set_tracking_uri(os.getenv("MLFLOW_TRACKING_URI"))
ratings = fetch_ratings('ratings')
print('Entrainement du modèle SVD')
train_SVD_model(ratings)
print('Entrainement du modèle CSR Matrix')
train_matrix_model(ratings)
27 changes: 17 additions & 10 deletions docker/python_transform/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@
# Use the official Python slim image
FROM python:3.8-slim

# Set the working directory
WORKDIR /app

COPY ./build_features.py ./build_features.py

COPY ./data_to_db.py ./data_to_db.py

COPY ./start.sh ./start.sh

# Copy the requirements file
COPY ./requirements.txt ./requirements.txt

RUN mkdir -p data/to_ingest
# Install necessary system packages and Python dependencies
RUN apt-get update && \
apt-get install -y --no-install-recommends gcc libpq-dev && \
pip install --upgrade pip && \
pip install -r requirements.txt && \
apt-get remove --purge -y gcc libpq-dev && \
apt-get autoremove -y && \
rm -rf /var/lib/apt/lists/*

RUN pip install -r requirements.txt
# Copy your application files
COPY ./build_features.py ./build_features.py
COPY ./data_to_db.py ./data_to_db.py
COPY ./start.sh ./start.sh

# Rendre le script start.sh exécutable
# Make start.sh executable
RUN chmod +x start.sh

# Spécifier la commande à exécuter lors du démarrage du conteneur
# Command to run your application (if applicable)
CMD ["./start.sh"]
14 changes: 7 additions & 7 deletions docker/python_transform/data_to_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,30 @@

# Définition des tables SQLAlchemy pour les opérations d'upsert
table_movies = table('movies',
column('movieId'),
column('movieid'),
column('title'),
column('genres'),
column('year')
)

table_ratings = table('ratings',
column('id'),
column('userId'),
column('movieId'),
column('userid'),
column('movieid'),
column('rating'),
column('timestamp'),
column('bayesian_mean')
)

table_links = table('links',
column('id'),
column('movieId'),
column('imdbId'),
column('tmdbId')
column('movieid'),
column('imdbid'),
column('tmdbid')
)

table_users = table('users',
column('userId'),
column('userid'),
column('username'),
column('email'),
column('hached_password')
Expand Down
5 changes: 3 additions & 2 deletions docker/python_transform/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ numpy==1.24.4
pandas==1.5.3
bcrypt==3.2.0
passlib
psycopg2==2.9.9
psycopg2-binary==2.9.9
python-dateutil==2.9.0.post0
SQLAlchemy==2.0.30
typing_extensions==4.11.0
typing_extensions==4.11.0
requests==2.31.0
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def create_conn(**kwargs):
postgres_conn_conf = get_postgres_conn_conf()

with DAG(
dag_id='init_order',
dag_id='init_db',
tags=['order', 'antoine'],
default_args={
'owner': 'airflow',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,11 @@
from airflow.utils.dates import days_ago
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.kubernetes.secret import Secret
from kubernetes.client import models as k8s

secret_database = Secret(
deploy_type="env",
deploy_target="DATABASE",
secret="sql-conn",
)

secret_user = Secret(
deploy_type="env",
deploy_target="USER",
secret="sql-conn",
)

secret_password = Secret(
secret_password = Secret(
deploy_type="env",
deploy_target="PASSWORD",
secret="sql-conn",
secret="sql-conn"
)

with DAG(
Expand All @@ -29,15 +16,20 @@
'owner': 'airflow',
'start_date': days_ago(0, minute=1),
},
schedule_interval=None, # Pas de planification automatique,
catchup=False
) as dag:

python_transform = KubernetesPodOperator(
task_id="python_transform",
image="antoinepela/projet_reco_movies:order-python-transform-latest",
image="antoinepela/projet_reco_movies:python-transform-latest",
cmds=["/bin/bash", "-c", "/app/start.sh"],
namespace= "airflow",
env_vars={
'DATABASE': 'postgres',
'USER': 'postgres',
},
secrets= [secret_password],
)


python_transform
Loading

0 comments on commit a6c4310

Please sign in to comment.