From a35ccdbf31de26760d24bc5e11308040b00ada3d Mon Sep 17 00:00:00 2001 From: threnjen Date: Mon, 2 Dec 2024 08:34:10 -0800 Subject: [PATCH] updates to user parsing functions and tools --- .../generate_user_urls_lambda.py | 4 +- modules/ratings_data_cleaner/main.py | 7 +-- modules/users_data_cleaner/main.py | 46 +++++++++++++------ utils/processing_functions.py | 6 ++- 4 files changed, 42 insertions(+), 21 deletions(-) diff --git a/modules/lambda_functions/generate_user_urls_lambda.py b/modules/lambda_functions/generate_user_urls_lambda.py index c707578..5565ea5 100644 --- a/modules/lambda_functions/generate_user_urls_lambda.py +++ b/modules/lambda_functions/generate_user_urls_lambda.py @@ -2,6 +2,7 @@ import numpy as np import pandas as pd +from datetime import datetime from config import CONFIGS from utils.processing_functions import load_file_local_first, save_file_local_first @@ -33,9 +34,10 @@ def lambda_handler(event, context): will be split into blocks and saved to S3 for the scraper to pick up.""" + timestamp = datetime.now().strftime("%Y%m%d") = datetime.now().strftime("%Y%m%d") users = load_file_local_first( path=f"ratings", - file_name="unique_ids.json", + file_name=f"unique_ids_{timestamp}.json", ) user_ids = list(users.values())[0] diff --git a/modules/ratings_data_cleaner/main.py b/modules/ratings_data_cleaner/main.py index 02f0a08..fb365f3 100644 --- a/modules/ratings_data_cleaner/main.py +++ b/modules/ratings_data_cleaner/main.py @@ -1,6 +1,6 @@ -import gc import os -from collections import defaultdict + +from datetime import datetime import pandas as pd from bs4 import BeautifulSoup @@ -151,8 +151,9 @@ def _create_file_of_unique_user_ids(self, ratings_df: pd.DataFrame) -> list: ratings_df["username"] = ratings_df["username"].astype(str) unique_ids = {"list_of_ids": sorted(ratings_df["username"].unique().tolist())} + timestamp = datetime.now().strftime("%Y%m%d") save_file_local_first( - path="ratings", file_name="unique_ids.json", data=unique_ids + path="ratings", file_name=f"unique_ids_{timestamp}.json", data=unique_ids ) diff --git a/modules/users_data_cleaner/main.py b/modules/users_data_cleaner/main.py index 3ce30de..5579c76 100644 --- a/modules/users_data_cleaner/main.py +++ b/modules/users_data_cleaner/main.py @@ -25,13 +25,15 @@ class DirtyDataExtractor: def __init__(self) -> None: - self.total_entries = 0 + pass def data_extraction_chain(self): """Main function to extract data from the XML files""" file_list_to_process = self._get_file_list() - all_entries = self._process_file_list(file_list_to_process) - users_df = self._create_table_from_data(all_entries) + all_ratings_with_dates = self._process_file_list_for_rating_dates( + file_list_to_process + ) + users_df = self._create_table_from_data(all_ratings_with_dates) self._save_dfs_to_disk_or_s3( directory="dirty_dfs_directory", table_name="user_data", df=users_df ) @@ -46,22 +48,27 @@ def _get_file_list(self) -> list[str]: """Get the list of files to process""" xml_directory = USER_CONFIGS["output_xml_directory"] + # file_list_to_process = [] file_list_to_process = get_s3_keys_based_on_env(xml_directory) - print(f"User files to process: {len(file_list_to_process)}\n\n") if not file_list_to_process: local_files = get_local_keys_based_on_env(xml_directory) file_list_to_process = [x for x in local_files if x.endswith(".xml")] + print(f"\nUser files to process: {len(file_list_to_process)}") + print(file_list_to_process[-5:]) return file_list_to_process - def _process_file_list(self, file_list_to_process: list) -> list[dict]: + def _process_file_list_for_rating_dates( + self, file_list_to_process: list + ) -> list[dict]: """Process the list of files in the S3 bucket This function will process the list of files in the S3 bucket and extract the necessary information from the XML files. The function will return a list of dictionaries containing the data""" - all_entries = [] + all_ratings_with_dates = [] + users_parsed = 0 - for file_name in file_list_to_process: + for file_name in file_list_to_process[:1]: local_open = load_file_local_first( path=USER_CONFIGS["output_xml_directory"], @@ -71,18 +78,24 @@ def _process_file_list(self, file_list_to_process: list) -> list[dict]: game_page = BeautifulSoup(local_open, features="xml") username = file_name.split("user_")[-1].split(".xml")[0] - print(f"\nParsing user {username}") + # print(f"\nParsing user {username}") one_user_reviews = self._get_ratings_from_user(username, game_page) - print(f"Ratings for user {username}: {len(one_user_reviews)}") + # print(f"Ratings for user {username}: {len(one_user_reviews)}") + + users_parsed += 1 - self.total_entries += len(one_user_reviews) + all_ratings_with_dates += one_user_reviews - all_entries += one_user_reviews + if users_parsed % 1000 == 0: + print(f"\nTotal number of users processed: {users_parsed}") + print( + f"Last user processed: {username} with {len(one_user_reviews)} ratings" + ) - print(f"\nTotal number of ratings ratings processed: {self.total_entries}") + print(f"\nTotal number of ratings ratings processed: {users_parsed}") - return all_entries + return all_ratings_with_dates def _get_ratings_from_user( self, username: str, user_entry: BeautifulSoup @@ -101,10 +114,13 @@ def _get_ratings_from_user( return user_ratings - def _create_table_from_data(self, all_entries: dict[list]) -> pd.DataFrame: + def _create_table_from_data( + self, all_ratings_with_dates: dict[list] + ) -> pd.DataFrame: """Create a DataFrame from the data""" df = pd.DataFrame( - all_entries, columns=["username", "BGGId", "rating", "lastmodified"] + all_ratings_with_dates, + columns=["username", "BGGId", "rating", "lastmodified"], ) df = df.sort_values(by="username").reset_index(drop=True) df = df.drop_duplicates() diff --git a/utils/processing_functions.py b/utils/processing_functions.py index 08ab7d4..ff65cae 100644 --- a/utils/processing_functions.py +++ b/utils/processing_functions.py @@ -37,7 +37,9 @@ def get_s3_keys_based_on_env(directory: str): def get_local_keys_based_on_env(directory: str): directory = f"{WORKING_DIR}{directory}" - return [f"{directory}/{x}" for x in LocalFileHandler().list_files(directory)] + return sorted( + [f"{directory}/{x}" for x in LocalFileHandler().list_files(directory)] + ) def save_file_local_first(path: str, file_name: str, data: Union[pd.DataFrame, dict]): @@ -59,7 +61,7 @@ def load_file_local_first(path: str = None, file_name: str = ""): load_path = f"{WORKING_DIR}{file_path}" - print(f"Loading: {load_path}") + # print(f"Loading: {load_path}") try: # open from local_pile_path