diff --git a/.gitignore b/.gitignore index 227b083e..6c4b19da 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,5 @@ ipython_config.py # Idea .idea + +**/.DS_store diff --git a/Data-Engineering/Airflow/Scripts/clean_data.py b/Data-Engineering/Airflow/Scripts/clean_data.py new file mode 100644 index 00000000..e4063544 --- /dev/null +++ b/Data-Engineering/Airflow/Scripts/clean_data.py @@ -0,0 +1,261 @@ +#!/bin/python3 +""" +EZUAF DEMO +created Vincent Charbonnier, 2023 +clean fruit&veg dataset in SQL DBs +""" + +import subprocess + +# List of libraries to install +libraries_to_install = ["fuzzywuzzy", "pycountry", "python-Levenshtein"] + +# Run the pip command to install the libraries +for library in libraries_to_install: + try: + subprocess.run(["pip", "install", library], check=True) + print(f"{library} library installed successfully.") + except subprocess.CalledProcessError: + print(f"Failed to install {library} library.") + +# import the necessary libraries +from fuzzywuzzy import process +import pycountry +import requests +import sys +import argparse +import logging + +# Set up logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Parse command line arguments +parser = argparse.ArgumentParser() +parser.add_argument("-db", "--database_type", help="Type of SQL database (e.g. mysql, postgresql)", required=True) +parser.add_argument("-H", "--host", help="SQL hostname", required=True) +parser.add_argument("-u", "--user", help="SQL username", required=True) +parser.add_argument("-p", "--password", help="SQL password", required=True) +parser.add_argument("-P", "--port", help="SQL port", default=3306, type=int) +parser.add_argument("-d", "--database", help="SQL database name", required=True) +parser.add_argument("-t", "--table", help="SQL table name", required=True) +args = parser.parse_args() +if args.database_type != None: db_type=args.database_type +else: + print("hostname required") + sys.exit() +if args.host != None: db_host=args.host +else: + print("hostname required") + sys.exit() +if args.user != None: db_user=args.user +else: + print("username is required") + sys.exit() +if args.password != None: db_password=args.password +else: + print("password is required") + sys.exit() +if args.database != None: db_name=args.database +else: + print("database name is required") + sys.exit() +if args.table != None: table_name=args.table +else: + print("table name is required") + sys.exit() +if args.port != None: db_port=args.port +else: + print("port name is required") + +# Define a minimum match score for fuzzy matching +MIN_MATCH_SCORE = 70 + +# Define a function to validate the country name using fuzzy matching +def validate_country_name_fuzzy(country_name): + countries = [c.name for c in pycountry.countries] + match = process.extractOne(country_name, countries) + if match[1] >= MIN_MATCH_SCORE: + country = pycountry.countries.get(name=match[0]) + return country.alpha_2 + else: + return None + +# Define a function to validate the currency code using fuzzy matching +def validate_currency_code(currency_code): + currencies = [c.alpha_3 for c in pycountry.currencies] + match = process.extractOne(currency_code, currencies) + if match[1] >= MIN_MATCH_SCORE: + return match[0] + else: + return None + +def get_currency_code(country_name): + """ + Given a country name, returns its currency code using pycountry library. + If the country name is not found, returns None. + """ + try: + country_code = pycountry.countries.search_fuzzy(country_name)[0].alpha_3 + if country_code in ['AUT', 'BEL', 'CYP', 'EST', 'FIN', 'FRA', 'DEU', 'GRC', 'IRL', 'ITA', 'LVA', 'LTU', 'LUX', 'MLT', 'NLD', 'PRT', 'SVK', 'SVN', 'ESP']: + return 'EUR' + currency_code = pycountry.currencies.get(numeric=pycountry.countries.get(alpha_3=country_code).numeric).alpha_3 + return currency_code + except LookupError: + print(f"E: Could not find currency for country {country_name}") + return None + +def get_database_connection(database_type): + """ + Given the database type, returns a connection to the database. + If the database type is not supported, returns None. + """ + if database_type == "mysql": + # Define the MySQL database parameters + host = db_host + user = db_user + password = db_password + database = db_name + table = table_name + port = db_port + + # Connect to the MySQL database + print("# Connecting to the MySQL database...") + try: + import MySQLdb + except ImportError: + print("Error: MySQLdb library not found. Please install it using pip.") + sys.exit() + + cnx = MySQLdb.Connection(user=user, password=password, + host=host, port=int(port), database=database) + return cnx + elif database_type == "postgresql": + # Define the PostgreSQL database parameters + host = db_host + user = db_user + password = db_password + database = db_name + table = table_name + port = db_port + + # Connect to the PostgreSQL database + print("# Connecting to the PostgreSQL database...") + try: + import psycopg2 + except ImportError: + print("Error: psycopg2 library not found. Please install it using pip.") + sys.exit() + + cnx = psycopg2.connect(host=host, port=port, user=user, + password=password, database=database) + return cnx + else: + print("Error: Unsupported database type.") + return None + +def execute_query(cursor, query, params=None): + """ + Given a cursor, a query, and optional parameters, executes the query and returns the result. + If an error occurs, prints an error message and returns None. + """ + try: + cursor.execute(query, params) + return cursor.fetchall() + except Exception as e: + print(f"Error: {str(e)}") + return None + +def update_database_table(database_type, database, table): + # Create a connection to the database + cnx = get_database_connection(database_type) + if cnx is None: + return + + # Create a cursor to execute SQL queries + cursor = cnx.cursor() + + # Read the database table + print(f"# Reading the {database_type} database table {database}.{table}...") + query = f"SELECT * FROM {table}" + rows = execute_query(cursor, query) + + # Define the API endpoint for fetching the latest exchange rates + exchange_rate_api_endpoint = "https://api.exchangerate-api.com/v4/latest/EUR" + + # Fetch the latest exchange rates from the API + print("# Fetching the latest exchange rates from the API...") + response = requests.get(exchange_rate_api_endpoint) + if response.status_code == 200: + exchange_rates = response.json().get("rates") + else: + # Handle the case where the API request fails + print("Error: Unable to fetch exchange rates from API") + exit() + + # Iterate over the rows and validate the country names + print("# Analyzing the data...") + updated_rows = [] + for row in rows: + country = str(row[9]) + curr_code = str(row[7]) + # Validate the country name and get the currency code + try: + country_obj = pycountry.countries.search_fuzzy(country)[0] + country_name = country_obj.official_name + currency_code = get_currency_code(country_name) + except LookupError: + print(f"E: Could not find currency for country {country_name}") + continue + + # If the currency code is already EUR, add the row to updated_rows without performing any currency conversion + if curr_code == "EUR": + updated_rows.append((row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], country_name, row[10])) + else: + # Convert the currency value to EUR using the exchange rate + currency_rate = exchange_rates.get(currency_code) + + if currency_rate is not None: + currency_code = "EUR" + + # Convert row[3] and row[5] to float + try: + unitprice = float(row[3]) + quantity = float(row[5]) + except ValueError: + print("Error: Unable to convert to numeric value for multiplication.") + continue # Skip this iteration and move to the next row + + # Calculate the unit price in Euro + unit_euro = unitprice / currency_rate + + # Calculate the total sales and round to two decimal places + totalsales = unit_euro * quantity + + updated_rows.append((row[0], row[1], row[2], unit_euro, row[4], quantity, totalsales, currency_code, row[8], country_name, row[10])) + + # Write the corrected data back to the database table + print(f"# Updating the {database_type} database table {db_name}.{table_name} with validated country names and currencies...") + query = f"TRUNCATE TABLE {table}" + cursor.execute(query) + query = f"INSERT INTO {table} (productid, product, type, unitprice, unit, qty, totalsales, currency, store, country, year) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" + cursor.executemany(query, updated_rows) + cnx.commit() + + # Get details from the database cursor + cursor.execute(f"SELECT * FROM {table} LIMIT 10") + column_names = [column[0] for column in cursor.description] + results = cursor.fetchall() + + # Print the results from the database + print(f"# Results from the {database_type} database {db_name}:") + print(column_names) + for result in results: + print(result) + + # Close the database connection + cnx.close() + print(f"# Done updating {db_type} database {db_name}.") + +# Call the function to update the SQL database +update_database_table(db_type, db_name, table_name) diff --git a/Data-Engineering/Airflow/load_s3_csv_to_mariadb_db_czech.py b/Data-Engineering/Airflow/load_s3_csv_to_mariadb_db_czech.py new file mode 100644 index 00000000..9461b2dd --- /dev/null +++ b/Data-Engineering/Airflow/load_s3_csv_to_mariadb_db_czech.py @@ -0,0 +1,132 @@ +# This DAG downloads a CSV file from MinIO and imports it into MariaDB +# +# This DAG assumes that you have already created the following connections in Airflow: +# 1. s3_connection: A connection to your MinIO instance +# 2. mysql_connection: A connection to your MariaDB instance +# +# +# created by Dirk Derichsweiler & Vincent Charbonnier & Isabelle Steinhauser +# 2023-08-08 +# + +# Define the AWS credentials and bucket/file information + +from datetime import datetime, timedelta +from airflow import DAG +from airflow.operators.python_operator import PythonOperator +import boto3 +import pandas as pd +import MySQLdb +import logging +import os + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': datetime.now(), # Set the start date to the current date and time + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +# Define the op_kwargs dictionary +op_kwargs = { + 'bucket_name': 'ezaf-demo', + 'file_name': 'Czechia_sales_data_2019_2023.csv', + 's3_endpoint': 'https://home.hpe-qa16-ezaf.com', + 'access_key': 'minioadmin', + 'secret_key': 'minioadmin', + 'db_host': '10.227.209.61', + 'db_port': '3306', + 'db_user': 'hpepoc', + 'db_password': 'Admin@12345', + 'db_name': 'czech_mysql_store1', + 'table_name': 'czech' +} + +def read_csv_from_s3(bucket_name, file_name, s3_endpoint, access_key=None, secret_key=None): + s3 = boto3.client( + 's3', + verify=False, + endpoint_url=s3_endpoint, + aws_access_key_id=access_key, + aws_secret_access_key=secret_key + ) + obj = s3.get_object(Bucket=bucket_name, Key=file_name) + df = pd.read_csv(obj['Body']) + return df + +def import_csv_to_mariadb(df, db_host, db_port, db_user, db_password, db_name, table_name): + conn = MySQLdb.connector.connect( + host=db_host, + port=db_port, + user=db_user, + password=db_password, + database=db_name + ) + cursor = conn.cursor() + + # Create table if it doesn't exist + create_table_query = f"CREATE TABLE IF NOT EXISTS {table_name} (" + for column in df.columns: + # Skip the 'PRICE' column + if column == 'UNITPRICE' or column == 'TOTALSALES': + create_table_query += f"{column} FLOAT, " + else: + create_table_query += f"{column} VARCHAR(255), " + create_table_query = create_table_query[:-2] + ")" + cursor.execute(create_table_query) + + # Convert 'PRICE' column to numeric type + df['UNITPRICE'] = df['UNITPRICE'].astype(float) + + # Insert data into the table + insert_query = f"INSERT INTO {table_name} ({', '.join(df.columns)}) VALUES ({', '.join(['%s'] * len(df.columns))})" + cursor.executemany(insert_query, df.values.tolist()) + + conn.commit() + cursor.close() + conn.close() + +def process_csv_file(bucket_name, file_name, s3_endpoint, access_key=None, secret_key=None, + db_host=None, db_port=None, db_user=None, db_password=None, db_name=None, table_name=None): + df = read_csv_from_s3(bucket_name, file_name, s3_endpoint, access_key, secret_key) + + # Output the headers + headers = df.columns.tolist() + logging.info("Headers: %s", headers) + + import_csv_to_mariadb(df, db_host, db_port, db_user, db_password, db_name, table_name) + +# Define the function to run the clean_data.py script +def run_clean_data_script(db_host=None, db_port=None, db_user=None, db_password=None, db_name=None, table_name=None): + import subprocess + + script_path = "/usr/local/airflow/dags/gitdags/Scripts/clean_data.py" # Replace with the actual path to clean_data.py + command = [ + "python", + script_path, + "-db", "mysql", + "-H", db_host, + "-u", db_user, + "-p", db_password, + "-P", db_port, + "-d", db_name, + "-t", table_name + ] + + subprocess.run(command, check=True) + +with DAG('load_s3_csv_to_mysql_db_czech', default_args=default_args, schedule_interval='0 7 * * *') as dag: + process_csv = PythonOperator( + task_id='process_csv', + python_callable=process_csv_file, + op_kwargs=op_kwargs + ) + + clean_data = PythonOperator( + task_id='clean_data', + python_callable=run_clean_data_script, + op_kwargs=op_kwargs + ) + + process_csv >> clean_data diff --git a/Data-Engineering/Airflow/load_s3_csv_to_mariadb_db_germany.py b/Data-Engineering/Airflow/load_s3_csv_to_mariadb_db_germany.py new file mode 100644 index 00000000..1455db83 --- /dev/null +++ b/Data-Engineering/Airflow/load_s3_csv_to_mariadb_db_germany.py @@ -0,0 +1,132 @@ +# This DAG downloads a CSV file from MinIO and imports it into MariaDB +# +# This DAG assumes that you have already created the following connections in Airflow: +# 1. s3_connection: A connection to your MinIO instance +# 2. mysql_connection: A connection to your MariaDB instance +# +# +# created by Dirk Derichsweiler & Vincent Charbonnier & Isabelle Steinhauser +# 2023-08-08 +# + +# Define the AWS credentials and bucket/file information + +from datetime import datetime, timedelta +from airflow import DAG +from airflow.operators.python_operator import PythonOperator +import boto3 +import pandas as pd +import MySQLdb +import logging +import os + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': datetime.now(), # Set the start date to the current date and time + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +# Define the op_kwargs dictionary +op_kwargs = { + 'bucket_name': 'ezaf-demo', + 'file_name': 'Germany_sales_data_2019_2023.csv', + 's3_endpoint': 'https://home.hpe-qa16-ezaf.com:31900', + 'access_key': 'minioadmin', + 'secret_key': 'minioadmin', + 'db_host': '10.227.209.61', + 'db_port': '3306', + 'db_user': 'hpepoc', + 'db_password': 'Admin@12345', + 'db_name': 'german_mysql_store1', + 'table_name': 'germany' +} + +def read_csv_from_s3(bucket_name, file_name, s3_endpoint, access_key=None, secret_key=None): + s3 = boto3.client( + 's3', + verify=False, + endpoint_url=s3_endpoint, + aws_access_key_id=access_key, + aws_secret_access_key=secret_key + ) + obj = s3.get_object(Bucket=bucket_name, Key=file_name) + df = pd.read_csv(obj['Body']) + return df + +def import_csv_to_mariadb(df, db_host, db_port, db_user, db_password, db_name, table_name): + conn = MySQLdb.Connection( + host=db_host, + port=int(db_port), + user=db_user, + password=db_password, + database=db_name + ) + cursor = conn.cursor() + + # Create table if it doesn't exist + create_table_query = f"CREATE TABLE IF NOT EXISTS {table_name} (" + for column in df.columns: + # Skip the 'PRICE' column + if column == 'UNITPRICE' or column == 'TOTALSALES': + create_table_query += f"{column} FLOAT, " + else: + create_table_query += f"{column} VARCHAR(255), " + create_table_query = create_table_query[:-2] + ")" + cursor.execute(create_table_query) + + # Convert 'PRICE' column to numeric type + df['UNITPRICE'] = df['UNITPRICE'].astype(float) + + # Insert data into the table + insert_query = f"INSERT INTO {table_name} ({', '.join(df.columns)}) VALUES ({', '.join(['%s'] * len(df.columns))})" + cursor.executemany(insert_query, df.values.tolist()) + + conn.commit() + cursor.close() + conn.close() + +def process_csv_file(bucket_name, file_name, s3_endpoint, access_key=None, secret_key=None, + db_host=None, db_port=None, db_user=None, db_password=None, db_name=None, table_name=None): + df = read_csv_from_s3(bucket_name, file_name, s3_endpoint, access_key, secret_key) + + # Output the headers + headers = df.columns.tolist() + logging.info("Headers: %s", headers) + + import_csv_to_mariadb(df, db_host, db_port, db_user, db_password, db_name, table_name) + +# Define the function to run the clean_data.py script +def run_clean_data_script(db_host=None, db_port=None, db_user=None, db_password=None, db_name=None, table_name=None): + import subprocess + + script_path = "/usr/local/airflow/dags/gitdags/Scripts/clean_data.py" # Replace with the actual path to clean_data.py + command = [ + "python", + script_path, + "-db", "mysql", + "-H", db_host, + "-u", db_user, + "-p", db_password, + "-P", db_port, + "-d", db_name, + "-t", table_name + ] + + subprocess.run(command, check=True) + +with DAG('load_s3_csv_to_mysql_db_germany', default_args=default_args, schedule_interval='0 7 * * *') as dag: + process_csv = PythonOperator( + task_id='process_csv', + python_callable=process_csv_file, + op_kwargs=op_kwargs + ) + + clean_data = PythonOperator( + task_id='clean_data', + python_callable=run_clean_data_script, + op_kwargs=op_kwargs + ) + + process_csv >> clean_data diff --git a/Data-Engineering/Airflow/load_s3_csv_to_mariadb_db_swiss.py b/Data-Engineering/Airflow/load_s3_csv_to_mariadb_db_swiss.py new file mode 100644 index 00000000..671e01bd --- /dev/null +++ b/Data-Engineering/Airflow/load_s3_csv_to_mariadb_db_swiss.py @@ -0,0 +1,132 @@ +# This DAG downloads a CSV file from MinIO and imports it into MariaDB +# +# This DAG assumes that you have already created the following connections in Airflow: +# 1. s3_connection: A connection to your MinIO instance +# 2. mysql_connection: A connection to your MariaDB instance +# +# +# created by Dirk Derichsweiler & Vincent Charbonnier & Isabelle Steinhauser +# 2023-08-08 +# + +# Define the AWS credentials and bucket/file information + +from datetime import datetime, timedelta +from airflow import DAG +from airflow.operators.python_operator import PythonOperator +import boto3 +import pandas as pd +import MySQLdb +import logging +import os + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': datetime.now(), # Set the start date to the current date and time + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +# Define the op_kwargs dictionary +op_kwargs = { + 'bucket_name': 'ezaf-demo', + 'file_name': 'Switzerland_sales_data_2019_2023.csv', + 's3_endpoint': 'https://home.hpe-qa16-ezaf.com:31900', + 'access_key': 'minioadmin', + 'secret_key': 'minioadmin', + 'db_host': '10.227.209.65', + 'db_port': '3306', + 'db_user': 'hpepoc', + 'db_password': 'Admin@12345', + 'db_name': 'swiss_mariadb_store1', + 'table_name': 'swiss' +} + +def read_csv_from_s3(bucket_name, file_name, s3_endpoint, access_key=None, secret_key=None): + s3 = boto3.client( + 's3', + verify=False, + endpoint_url=s3_endpoint, + aws_access_key_id=access_key, + aws_secret_access_key=secret_key + ) + obj = s3.get_object(Bucket=bucket_name, Key=file_name) + df = pd.read_csv(obj['Body']) + return df + +def import_csv_to_mariadb(df, db_host, db_port, db_user, db_password, db_name, table_name): + conn = MySQLdb.Connection( + host=db_host, + port=int(db_port), + user=db_user, + password=db_password, + database=db_name + ) + cursor = conn.cursor() + + # Create table if it doesn't exist + create_table_query = f"CREATE TABLE IF NOT EXISTS {table_name} (" + for column in df.columns: + # Skip the 'PRICE' column + if column == 'UNITPRICE' or column == 'TOTALSALES': + create_table_query += f"{column} FLOAT, " + else: + create_table_query += f"{column} VARCHAR(255), " + create_table_query = create_table_query[:-2] + ")" + cursor.execute(create_table_query) + + # Convert 'PRICE' column to numeric type + df['UNITPRICE'] = df['UNITPRICE'].astype(float) + + # Insert data into the table + insert_query = f"INSERT INTO {table_name} ({', '.join(df.columns)}) VALUES ({', '.join(['%s'] * len(df.columns))})" + cursor.executemany(insert_query, df.values.tolist()) + + conn.commit() + cursor.close() + conn.close() + +def process_csv_file(bucket_name, file_name, s3_endpoint, access_key=None, secret_key=None, + db_host=None, db_port=None, db_user=None, db_password=None, db_name=None, table_name=None): + df = read_csv_from_s3(bucket_name, file_name, s3_endpoint, access_key, secret_key) + + # Output the headers + headers = df.columns.tolist() + logging.info("Headers: %s", headers) + + import_csv_to_mariadb(df, db_host, db_port, db_user, db_password, db_name, table_name) + +# Define the function to run the clean_data.py script +def run_clean_data_script(db_host=None, db_port=None, db_user=None, db_password=None, db_name=None, table_name=None): + import subprocess + + script_path = "/usr/local/airflow/dags/gitdags/Scripts/clean_data.py" # Replace with the actual path to clean_data.py + command = [ + "python", + script_path, + "-db", "mysql", + "-H", db_host, + "-u", db_user, + "-p", db_password, + "-P", db_port, + "-d", db_name, + "-t", table_name + ] + + subprocess.run(command, check=True) + +with DAG('load_s3_csv_to_mariadb_swiss', default_args=default_args, schedule_interval='0 7 * * *') as dag: + process_csv = PythonOperator( + task_id='process_csv', + python_callable=process_csv_file, + op_kwargs=op_kwargs + ) + + clean_data = PythonOperator( + task_id='clean_data', + python_callable=run_clean_data_script, + op_kwargs=op_kwargs + ) + + process_csv >> clean_data diff --git a/Data-Science/MLflow/bike-sharing-mlflow.ipynb b/Data-Science/MLflow/bike-sharing-mlflow.ipynb index 3527652f..71f68a46 100644 --- a/Data-Science/MLflow/bike-sharing-mlflow.ipynb +++ b/Data-Science/MLflow/bike-sharing-mlflow.ipynb @@ -30,25 +30,6 @@ "### Set Expriment" ] }, - { - "cell_type": "code", - "execution_count": 39, - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "2022/11/18 19:24:12 INFO mlflow.tracking.fluent: Experiment with name 'bike-sharing-exp' does not exist. Creating a new experiment.\n" - ] - } - ], - "source": [ - "# Set up an experiment with set_exp from ezmllib.mlflow\n", - "experiment_name = 'bike-sharing-exp'\n", - "mlflow.set_experiment(experiment_name)" - ] - }, { "cell_type": "markdown", "metadata": { @@ -125,6 +106,25 @@ "os.mkdir(\"model_artifacts\")" ] }, + { + "cell_type": "code", + "execution_count": 39, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2022/11/18 19:24:12 INFO mlflow.tracking.fluent: Experiment with name 'bike-sharing-exp' does not exist. Creating a new experiment.\n" + ] + } + ], + "source": [ + "# Set up an experiment with set_exp from ezmllib.mlflow\n", + "experiment_name = 'bike-sharing-exp'\n", + "mlflow.set_experiment(experiment_name)" + ] + }, { "cell_type": "markdown", "metadata": { diff --git a/Data-Science/Ray/Ray-RuntimeEnv-Proxy/ray-runtime-env-packages-vlidate.ipynb b/Data-Science/Ray/Ray-RuntimeEnv-Proxy/ray-runtime-env-packages-vlidate.ipynb new file mode 100644 index 00000000..2e5f6b9e --- /dev/null +++ b/Data-Science/Ray/Ray-RuntimeEnv-Proxy/ray-runtime-env-packages-vlidate.ipynb @@ -0,0 +1,243 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 2, + "id": "4c6f181b-2e1f-465d-aabd-e5d0de5781d6", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2023-08-18 15:11:16,796\tINFO client_builder.py:237 -- Passing the following kwargs to ray.init() on the server: ignore_reinit_error\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "check1\n", + "check2\n", + "check3\n", + "hello Avi, welcome to Ray !\n", + "check4\n", + "\u001b[2m\u001b[36m(A pid=804, ip=10.224.1.240)\u001b[0m configuration_dtl :: ===== :: {'MINIO_HOST_URL': 'home.hpe-apps-ezaf.com:31900', 'MINIO_ACCESS_KEY': 'minioadmin', 'MINIO_SECRET_KEY': 'minioadmin', 'SOURCE_PATH': '/source/feed-2.csv', 'GENERATED_PATH': '/source/generated-data.csv', 'storage_uri': 's3://experiments/ray/pickels/logisticregression/model', 'protocol_version': 'v2', 'bucket_name': 'experiments', 'EZAF_ENV': 'hpe-staging-ezaf', 'token_url': 'https://keycloak.{0}.com/realms/UA/protocol/openid-connect/token'}\n", + "\u001b[2m\u001b[36m(A pid=804, ip=10.224.1.240)\u001b[0m MINIO_CLIENT \n", + "\u001b[2m\u001b[36m(A pid=804, ip=10.224.1.240)\u001b[0m shape============== [20, 7]\n", + "\u001b[2m\u001b[36m(A pid=804, ip=10.224.1.240)\u001b[0m X.values[0]============== [ 0. 10. 3. 0. 1. 2. 10.59] ======= [0.0, 10.0, 3.0, 0.0, 1.0, 2.0, 10.59]\n", + "( step customer age gender merchant category amount\n", + "0 0 10 3 0 1 2 10.590000\n", + "1 0 7 2 0 1 2 21.920000\n", + "2 0 14 3 1 2 0 44.260000\n", + "3 0 3 3 1 3 0 324.500000\n", + "4 0 15 1 1 0 2 26.070000\n", + "5 0 12 2 0 1 2 10.880000\n", + "6 0 13 4 1 1 2 8.050000\n", + "7 0 1 3 0 1 2 25.590000\n", + "8 0 5 5 0 1 2 15.260000\n", + "9 0 6 3 1 1 2 20.730000\n", + "10 1 0 2 1 1 2 4.650000\n", + "11 1 16 3 1 1 2 5.810000\n", + "12 1 9 3 0 0 2 56.620000\n", + "13 1 8 3 1 1 2 29.320000\n", + "14 1 2 2 0 0 2 24.190000\n", + "15 1 17 2 0 0 2 6.790000\n", + "16 1 18 3 0 1 2 74.530000\n", + "17 1 4 4 0 4 1 255.140000\n", + "18 1 4 4 0 4 1 21.930000\n", + "19 1 11 4 1 1 2 5.360000\n", + "20 0 4 3 0 3 0 23.227007\n", + "21 0 12 3 1 2 0 81.762797\n", + "22 0 3 3 0 3 0 296.833337\n", + "23 0 11 3 0 2 0 37.741261\n", + "24 0 3 3 0 3 0 323.072259\n", + "25 1 4 4 0 4 1 248.122677\n", + "26 0 3 3 0 3 0 312.878222\n", + "27 0 3 3 0 3 0 269.867841\n", + "28 0 5 3 0 3 0 25.990152\n", + "29 0 12 3 0 2 0 82.936343\n", + "30 0 3 3 0 3 0 303.397758\n", + "31 0 9 3 0 2 0 33.647811, fraud\n", + "0 0\n", + "1 0\n", + "2 1\n", + "3 1\n", + "4 0\n", + "5 0\n", + "6 0\n", + "7 0\n", + "8 0\n", + "9 0\n", + "10 0\n", + "11 0\n", + "12 0\n", + "13 0\n", + "14 0\n", + "15 0\n", + "16 0\n", + "17 1\n", + "18 1\n", + "19 0\n", + "20 1\n", + "21 1\n", + "22 1\n", + "23 1\n", + "24 1\n", + "25 1\n", + "26 1\n", + "27 1\n", + "28 1\n", + "29 1\n", + "30 1\n", + "31 1)\n", + "check5\n" + ] + } + ], + "source": [ + "import ray\n", + "'''\n", + "SAMPLE CODE TO VALIDATE RUNTIME_ENV\n", + "'''\n", + "runtime_env = {\n", + " \"eager_install\":True,\n", + " \"pip\": [\"imblearn\",\"minio\", \"urllib3\", \"requests\", \"pandas\", \"scipy\", \"kfp\"], \n", + " \"env_vars\":{\"HTTP_PROXY\": \"http://10.78.90.46:80\", \"HTTPS_PROXY\": \"http://10.78.90.46:80\", \"http_proxy\": \"http://10.78.90.46:80\", \"https_proxy\": \"http://10.78.90.46:80\"},\n", + " \"pip_check\": False\n", + "}\n", + "\n", + "ray.init(\n", + " address=\"ray://kuberay-head-svc.kuberay:10001\", \n", + " ignore_reinit_error=True, \n", + " runtime_env=runtime_env, \n", + " _temp_dir=\"/tmp/ray/\", \n", + " local_mode=False,\n", + " namespace=\"kuberay\")\n", + "\n", + "print(\"check1\")\n", + "@ray.remote(runtime_env=runtime_env)\n", + "# @ray.remote\n", + "class A:\n", + " def __call__(self, *args, **kwargs):\n", + " return \"hello Avi, welcome to Ray !\"\n", + " \n", + " def overspaling_smote(self):\n", + " import json\n", + " from minio import Minio\n", + " from imblearn.over_sampling import SMOTE\n", + " import urllib3\n", + " import uuid\n", + " import requests\n", + " import pandas as pd\n", + " from requests.packages.urllib3.exceptions import InsecureRequestWarning\n", + " requests.packages.urllib3.disable_warnings(InsecureRequestWarning)\n", + "\n", + " configuration_dtl = {\n", + " \"MINIO_HOST_URL\":\"home.hpe-apps-ezaf.com:31900\",\n", + " \"MINIO_ACCESS_KEY\":\"minioadmin\",\n", + " \"MINIO_SECRET_KEY\":\"minioadmin\",\n", + " \"SOURCE_PATH\":\"/source/feed-2.csv\",\n", + " \"GENERATED_PATH\":\"/source/generated-data.csv\",\n", + " \"storage_uri\": \"s3://experiments/ray/pickels/logisticregression/model\", \n", + " \"protocol_version\": \"v2\",\n", + " \"bucket_name\": \"experiments\",\n", + " \"EZAF_ENV\":\"hpe-staging-ezaf\",\n", + " \"token_url\":\"https://keycloak.{0}.com/realms/UA/protocol/openid-connect/token\"\n", + " }\n", + "\n", + " print(\"configuration_dtl :: ===== ::\", json.loads(json.dumps(configuration_dtl)))\n", + " MINIO_CLIENT_INFR = Minio(\n", + " endpoint= configuration_dtl.get('MINIO_HOST_URL'), \n", + " access_key=configuration_dtl.get('MINIO_ACCESS_KEY'), \n", + " secret_key=configuration_dtl.get('MINIO_SECRET_KEY'),\n", + " secure=True,\n", + " http_client = urllib3.PoolManager(cert_reqs='CERT_NONE'))\n", + "\n", + " print(\"MINIO_CLIENT\", MINIO_CLIENT_INFR)\n", + " csv_file = MINIO_CLIENT_INFR.get_object(configuration_dtl.get('bucket_name'), configuration_dtl.get('GENERATED_PATH'))\n", + " data = pd.read_csv(csv_file)\n", + " data.head(5)\n", + " data_reduced = data.drop(['zipcodeOri','zipMerchant'],axis=1)\n", + " data_reduced.loc[:,['customer','merchant','category']].astype('category')\n", + "\n", + " # turning object columns type to categorical for easing the transformation process\n", + " col_categorical = data_reduced.select_dtypes(include= ['object']).columns\n", + " for col in col_categorical:\n", + " data_reduced[col] = data_reduced[col].astype('category')\n", + " data_reduced[col_categorical] = data_reduced[col_categorical].apply(lambda x: x.cat.codes)\n", + " data_reduced.head(5)\n", + "\n", + " # In contrast, model inference is the process of using a trained model to infer a result from live data.\n", + " X = data_reduced.drop(['fraud'], axis=1)\n", + " y = data_reduced['fraud']\n", + " print(\"shape==============\", [len(X.values), len(X.values[0])])\n", + " print(\"X.values[0]==============\", X.values[0], \"=======\", list(X.values[0]))\n", + "\n", + " '''\n", + " 1. Using SMOTE (Synthetic Minority Oversampling Technique) for balancing the dataset. \n", + " 2. Resulted counts show that now we have exact number of class instances (1 and 0).\n", + " '''\n", + " sm = SMOTE(random_state=42,k_neighbors=2)\n", + " X_res, y_res = sm.fit_resample(X, y)\n", + " y_res = pd.DataFrame(y_res)\n", + "\n", + " return X_res,y_res\n", + "\n", + "print(\"check2\")\n", + "a = A.remote()\n", + "print(\"check3\")\n", + "print(ray.get(a.__call__.remote()))\n", + "print(\"check4\")\n", + "print(ray.get(a.overspaling_smote.remote()))\n", + "print(\"check5\")\n", + "ray.shutdown()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c1cf7f7e-ab17-4bfb-939a-da407cce01a3", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8f24bc91-7e68-466e-973c-67861514dc3f", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "raw", + "id": "789d387f-84fa-47f4-b596-96fb7cfef6f5", + "metadata": { + "tags": [] + }, + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.10" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/Data-Science/Ray/Ray-RuntimeEnv-Proxy/readme.md b/Data-Science/Ray/Ray-RuntimeEnv-Proxy/readme.md index 1b167a53..bca16e3a 100644 --- a/Data-Science/Ray/Ray-RuntimeEnv-Proxy/readme.md +++ b/Data-Science/Ray/Ray-RuntimeEnv-Proxy/readme.md @@ -20,4 +20,13 @@ To complete the tutorial follow simple steps below: 1. Login to you EzAF cluster. 2. Create a new notebook server using the `jupyter-data-science` image. 3. Clone the repository locally. -4. Launch the `ray-runtimeenv-workaround.ipynb` notebook file. \ No newline at end of file +4. Launch the `ray-runtimeenv-workaround.ipynb` notebook file. + + +## EXAMPLE-3 +With environmental variables such as (runtime_env, num_cpus, proxy, etc.) +To complete the tutorial follow simple steps below: +1. Login to you EzAF cluster. +2. Create a new notebook server using the `jupyter-data-science` image. +3. Clone the repository locally. +4. Launch the `ray-runtime-env-packages-vlidate.ipynb` notebook file. \ No newline at end of file diff --git a/Data-Science/Ray/news_recommendation.ipynb b/Data-Science/Ray/news_recommendation.ipynb index 6cd14323..d7c733c7 100644 --- a/Data-Science/Ray/news_recommendation.ipynb +++ b/Data-Science/Ray/news_recommendation.ipynb @@ -1,6 +1,7 @@ { "cells": [ { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ @@ -192,10 +193,11 @@ "#ray.init(address=\"ray://kuberay-head-svc.kuberay:10001\", runtime_env={\"working_dir\": \"./\"})\n", "\n", "# Run this line, all required packages exist in Ray cluster\n", - "ray.init(address=\"ray://kuberay-head-svc.kuberay:10001\", runtime_env={\"working_dir\": \"./\"]})" + "ray.init(address=\"ray://kuberay-head-svc.kuberay:10001\", runtime_env={\"working_dir\": \"./\"})" ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ @@ -218,6 +220,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ @@ -320,6 +323,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ @@ -356,6 +360,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": { "tags": [] @@ -374,6 +379,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ @@ -381,6 +387,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ @@ -401,6 +408,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ @@ -464,6 +472,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ @@ -491,6 +500,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ @@ -498,6 +508,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ @@ -515,6 +526,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ @@ -549,6 +561,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ @@ -581,6 +594,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": { "tags": [] @@ -609,6 +623,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": { "tags": [] diff --git a/E2E-Demos/Question-Answering/01.create-vectorstore.ipynb b/E2E-Demos/Question-Answering/01.create-vectorstore.ipynb index c2084156..83876c1f 100644 --- a/E2E-Demos/Question-Answering/01.create-vectorstore.ipynb +++ b/E2E-Demos/Question-Answering/01.create-vectorstore.ipynb @@ -84,11 +84,13 @@ ] }, { - "cell_type": "raw", + "cell_type": "code", + "execution_count": null, "id": "bafbff81-1b65-41fb-9d65-a26e88779d37", "metadata": { "tags": [] }, + "outputs": [], "source": [ "def load_docs(source_dir: str) -> list:\n", " \"\"\"Load all documents in a the given directory.\"\"\"\n", diff --git a/E2E-Demos/Question-Answering/02.serve-vectorstore.ipynb b/E2E-Demos/Question-Answering/02.serve-vectorstore.ipynb index f10bb26e..b10afe60 100644 --- a/E2E-Demos/Question-Answering/02.serve-vectorstore.ipynb +++ b/E2E-Demos/Question-Answering/02.serve-vectorstore.ipynb @@ -137,8 +137,8 @@ "metadata": {}, "outputs": [], "source": [ - "predictor_image = (input(\"Enter the name of the predictor image (default: gcr.io/mapr-252711/ezua-demos/vectorstore:v0.1.0): \")\n", - " or \"gcr.io/mapr-252711/ezua-demos/vectorstore:v0.1.0\")" + "predictor_image = (input(\"Enter the name of the predictor image (default: dpoulopoulos/vectorstore:b98a87f): \")\n", + " or \"dpoulopoulos/vectorstore:b98a87f\")" ] }, { diff --git a/E2E-Demos/Question-Answering/04.serve-llm.ipynb b/E2E-Demos/Question-Answering/04.serve-llm.ipynb index ee03260f..8e770e79 100644 --- a/E2E-Demos/Question-Answering/04.serve-llm.ipynb +++ b/E2E-Demos/Question-Answering/04.serve-llm.ipynb @@ -89,10 +89,10 @@ "metadata": {}, "outputs": [], "source": [ - "PREDICTOR_IMAGE = (input(\"Enter the name of the predictor image (default: gcr.io/mapr-252711/ezua-demos/llm-predictor:v0.1.0): \")\n", - " or \"gcr.io/mapr-252711/ezua-demos/llm-predictor:v0.1.0\")\n", - "TRANSFORMER_IMAGE = (input(\"Enter the name of the transformer image (default: gcr.io/mapr-252711/ezua-demos/llm-transformer:v0.1.0): \")\n", - " or \"gcr.io/mapr-252711/ezua-demos/llm-transformer:v0.1.0\")" + "PREDICTOR_IMAGE = (input(\"Enter the name of the predictor image (default: dpoulopoulos/llm-predictor:b98a87f): \")\n", + " or \"dpoulopoulos/llm-predictor:b98a87f\")\n", + "TRANSFORMER_IMAGE = (input(\"Enter the name of the transformer image (default: dpoulopoulos/llm-transformer:b98a87f): \")\n", + " or \"dpoulopoulos/llm-transformer:b98a87f\")" ] }, { diff --git a/E2E-Demos/Question-Answering/README.md b/E2E-Demos/Question-Answering/README.md index f23b4900..d87343fe 100644 --- a/E2E-Demos/Question-Answering/README.md +++ b/E2E-Demos/Question-Answering/README.md @@ -10,19 +10,43 @@ To complete the tutorial follow the steps below: - Login to Ezmeral Unified Analytics cluster. - Create a new notebook server using the `jupyter-data-science` image. + +![select-jupyter-image](images/image-selection.jpg) + +- Wait for notebook to be ready and then "Connect" to newly created notebook. + - Clone the repository locally. -- Create a new conda environment using the specified `environment.yaml` file: + + Open the Terminal within the launcher: + + ![select-jupyter-image](images/nb-launcher.jpg) + + ```bash + git clone https://github.com/HPEEzmeral/ezua-tutorials.git ``` + + ```bash + cd ezua-tutorials/E2E-Demos/Question-Answering + ``` + +- Create a new conda environment using the specified `environment.yaml` file: + + ```bash conda env create -f environment.yaml ``` + - Activate the new conda environment: - ``` + + ```bash conda activate question-answering ``` + - Add the new conda environment as an ipykernel: - ``` + + ```bash python -m ipykernel install --user --name=question-answering ``` + - Launch the five Notebooks in order and execute the code cells. Make sure to select the `question-answering` environment kernel for each Notebook. ## How It Works @@ -40,6 +64,7 @@ KServe allows you to create an inference service using a custom predictor. Since 1. LLM Model: Move into the `llm` directory and build the Docker image using the provided Dockerfile. > For your convenience, you can use the pre-built images we have prepared for you: +> > - Vector Store: `gcr.io/mapr-252711/ezua-demos/vectorstore:v0.1.0` > - LLM Predictor: `gcr.io/mapr-252711/ezua-demos/llm-predictor:v0.1.0` > - LLM Transformer: `gcr.io/mapr-252711/ezua-demos/llm-transformer:v0.1.0` @@ -63,4 +88,4 @@ The last Notebook outlines the user's perspective. The application flow is depic 1. LLM Transformer: Get the `4` most relevant documents from the Vector Store response. 1. LLM Transformer: Create a new request to the LLM Predictor passing the `4` most relevant documents as context and the user's question. 1. LLM Predictor: Accept the request from the LLM Transformer, extract the user's question as well as the context, and answer the user's question based on the relevant context. -1. LLM Predictor: Respond to the LLM Transformer with the completion prediction. \ No newline at end of file +1. LLM Predictor: Respond to the LLM Transformer with the completion prediction. diff --git a/E2E-Demos/Question-Answering/images/image-selection.jpg b/E2E-Demos/Question-Answering/images/image-selection.jpg new file mode 100644 index 00000000..fe144e90 Binary files /dev/null and b/E2E-Demos/Question-Answering/images/image-selection.jpg differ diff --git a/E2E-Demos/Question-Answering/images/nb-launcher.jpg b/E2E-Demos/Question-Answering/images/nb-launcher.jpg new file mode 100644 index 00000000..604c7097 Binary files /dev/null and b/E2E-Demos/Question-Answering/images/nb-launcher.jpg differ