Skip to content

Commit

Permalink
updates to user parsing functions and tools
Browse files Browse the repository at this point in the history
  • Loading branch information
threnjen committed Dec 2, 2024
1 parent bc28e2b commit a35ccdb
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 21 deletions.
4 changes: 3 additions & 1 deletion modules/lambda_functions/generate_user_urls_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import numpy as np
import pandas as pd
from datetime import datetime

from config import CONFIGS
from utils.processing_functions import load_file_local_first, save_file_local_first
Expand Down Expand Up @@ -33,9 +34,10 @@ def lambda_handler(event, context):
will be split into blocks and saved to S3 for the scraper
to pick up."""

timestamp = datetime.now().strftime("%Y%m%d") = datetime.now().strftime("%Y%m%d")
users = load_file_local_first(
path=f"ratings",
file_name="unique_ids.json",
file_name=f"unique_ids_{timestamp}.json",
)

user_ids = list(users.values())[0]
Expand Down
7 changes: 4 additions & 3 deletions modules/ratings_data_cleaner/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import gc
import os
from collections import defaultdict

from datetime import datetime

import pandas as pd
from bs4 import BeautifulSoup
Expand Down Expand Up @@ -151,8 +151,9 @@ def _create_file_of_unique_user_ids(self, ratings_df: pd.DataFrame) -> list:
ratings_df["username"] = ratings_df["username"].astype(str)
unique_ids = {"list_of_ids": sorted(ratings_df["username"].unique().tolist())}

timestamp = datetime.now().strftime("%Y%m%d")
save_file_local_first(
path="ratings", file_name="unique_ids.json", data=unique_ids
path="ratings", file_name=f"unique_ids_{timestamp}.json", data=unique_ids
)


Expand Down
46 changes: 31 additions & 15 deletions modules/users_data_cleaner/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@
class DirtyDataExtractor:

def __init__(self) -> None:
self.total_entries = 0
pass

def data_extraction_chain(self):
"""Main function to extract data from the XML files"""
file_list_to_process = self._get_file_list()
all_entries = self._process_file_list(file_list_to_process)
users_df = self._create_table_from_data(all_entries)
all_ratings_with_dates = self._process_file_list_for_rating_dates(
file_list_to_process
)
users_df = self._create_table_from_data(all_ratings_with_dates)
self._save_dfs_to_disk_or_s3(
directory="dirty_dfs_directory", table_name="user_data", df=users_df
)
Expand All @@ -46,22 +48,27 @@ def _get_file_list(self) -> list[str]:
"""Get the list of files to process"""

xml_directory = USER_CONFIGS["output_xml_directory"]
# file_list_to_process = []
file_list_to_process = get_s3_keys_based_on_env(xml_directory)
print(f"User files to process: {len(file_list_to_process)}\n\n")
if not file_list_to_process:
local_files = get_local_keys_based_on_env(xml_directory)
file_list_to_process = [x for x in local_files if x.endswith(".xml")]
print(f"\nUser files to process: {len(file_list_to_process)}")
print(file_list_to_process[-5:])
return file_list_to_process

def _process_file_list(self, file_list_to_process: list) -> list[dict]:
def _process_file_list_for_rating_dates(
self, file_list_to_process: list
) -> list[dict]:
"""Process the list of files in the S3 bucket
This function will process the list of files in the S3 bucket
and extract the necessary information from the XML files. The
function will return a list of dictionaries containing the data"""

all_entries = []
all_ratings_with_dates = []
users_parsed = 0

for file_name in file_list_to_process:
for file_name in file_list_to_process[:1]:

local_open = load_file_local_first(
path=USER_CONFIGS["output_xml_directory"],
Expand All @@ -71,18 +78,24 @@ def _process_file_list(self, file_list_to_process: list) -> list[dict]:
game_page = BeautifulSoup(local_open, features="xml")

username = file_name.split("user_")[-1].split(".xml")[0]
print(f"\nParsing user {username}")
# print(f"\nParsing user {username}")

one_user_reviews = self._get_ratings_from_user(username, game_page)
print(f"Ratings for user {username}: {len(one_user_reviews)}")
# print(f"Ratings for user {username}: {len(one_user_reviews)}")

users_parsed += 1

self.total_entries += len(one_user_reviews)
all_ratings_with_dates += one_user_reviews

all_entries += one_user_reviews
if users_parsed % 1000 == 0:
print(f"\nTotal number of users processed: {users_parsed}")
print(
f"Last user processed: {username} with {len(one_user_reviews)} ratings"
)

print(f"\nTotal number of ratings ratings processed: {self.total_entries}")
print(f"\nTotal number of ratings ratings processed: {users_parsed}")

return all_entries
return all_ratings_with_dates

def _get_ratings_from_user(
self, username: str, user_entry: BeautifulSoup
Expand All @@ -101,10 +114,13 @@ def _get_ratings_from_user(

return user_ratings

def _create_table_from_data(self, all_entries: dict[list]) -> pd.DataFrame:
def _create_table_from_data(
self, all_ratings_with_dates: dict[list]
) -> pd.DataFrame:
"""Create a DataFrame from the data"""
df = pd.DataFrame(
all_entries, columns=["username", "BGGId", "rating", "lastmodified"]
all_ratings_with_dates,
columns=["username", "BGGId", "rating", "lastmodified"],
)
df = df.sort_values(by="username").reset_index(drop=True)
df = df.drop_duplicates()
Expand Down
6 changes: 4 additions & 2 deletions utils/processing_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ def get_s3_keys_based_on_env(directory: str):

def get_local_keys_based_on_env(directory: str):
directory = f"{WORKING_DIR}{directory}"
return [f"{directory}/{x}" for x in LocalFileHandler().list_files(directory)]
return sorted(
[f"{directory}/{x}" for x in LocalFileHandler().list_files(directory)]
)


def save_file_local_first(path: str, file_name: str, data: Union[pd.DataFrame, dict]):
Expand All @@ -59,7 +61,7 @@ def load_file_local_first(path: str = None, file_name: str = ""):

load_path = f"{WORKING_DIR}{file_path}"

print(f"Loading: {load_path}")
# print(f"Loading: {load_path}")

try:
# open from local_pile_path
Expand Down

0 comments on commit a35ccdb

Please sign in to comment.