diff --git a/.github/workflows/deploy_file_to_s3.yml b/.github/workflows/deploy_file_to_s3.yml index cf767f6..e90d29b 100644 --- a/.github/workflows/deploy_file_to_s3.yml +++ b/.github/workflows/deploy_file_to_s3.yml @@ -1,4 +1,4 @@ -name: PROD deployment to boardgamegeek ratings data cleaner +name: Deploy config file to S3 on: push: diff --git a/Dockerfiles/Dockerfile.ratings-data-cleaner b/Dockerfiles/Dockerfile.ratings-data-cleaner index adf8ebe..9b9aa76 100644 --- a/Dockerfiles/Dockerfile.ratings-data-cleaner +++ b/Dockerfiles/Dockerfile.ratings-data-cleaner @@ -13,13 +13,13 @@ RUN pip3 install pipenv RUN mkdir -p modules # Copy the source code into the container -COPY modules/user_data_cleaner modules/user_data_cleaner +COPY modules/ratings_data_cleaner modules/ratings_data_cleaner COPY utils utils COPY data data -COPY modules/user_data_cleaner/Pipfile* . +COPY modules/ratings_data_cleaner/Pipfile* . COPY config.py . # Install dependencies with pipenv RUN pipenv sync -ENTRYPOINT ["pipenv", "run", "python", "modules/user_data_cleaner/main.py"] \ No newline at end of file +ENTRYPOINT ["pipenv", "run", "python", "modules/ratings_data_cleaner/main.py"] \ No newline at end of file diff --git a/Dockerfiles/Dockerfile.users-data-cleaner b/Dockerfiles/Dockerfile.users-data-cleaner index 9b9aa76..f7d775b 100644 --- a/Dockerfiles/Dockerfile.users-data-cleaner +++ b/Dockerfiles/Dockerfile.users-data-cleaner @@ -13,13 +13,13 @@ RUN pip3 install pipenv RUN mkdir -p modules # Copy the source code into the container -COPY modules/ratings_data_cleaner modules/ratings_data_cleaner +COPY modules/users_data_cleaner modules/users_data_cleaner COPY utils utils COPY data data -COPY modules/ratings_data_cleaner/Pipfile* . +COPY modules/users_data_cleaner/Pipfile* . COPY config.py . # Install dependencies with pipenv RUN pipenv sync -ENTRYPOINT ["pipenv", "run", "python", "modules/ratings_data_cleaner/main.py"] \ No newline at end of file +ENTRYPOINT ["pipenv", "run", "python", "modules/users_data_cleaner/main.py"] \ No newline at end of file diff --git a/README.md b/README.md index 88a169f..be53f9d 100644 --- a/README.md +++ b/README.md @@ -91,7 +91,7 @@ Return to the `aws_terraform_bgg` directory and run `make setup_boardgamegeek` - TEST LOCAL - `bgg_scraper.main.py` for GAME to test a single file locally - Use to test a single specific url file. Must have generated game urls first with step 02. - Run locally and pass the scraper type `game` as an arg, and an existing filename without directory or suffix from `data/prod/scraper_urls_raw_game` - - Example: `python bgg_scraper/main.py game group1_game_scraper_urls_raw` + - Example: `python bgg_scraper/main.py game group1_games_scraper_urls_raw_raw` - Only saves data locally to `data/prod/games/scraped_xml_raw` - TEST ON AWS - `lambda_functions.dev_bgg_scraper_fargate_trigger` for GAME will trigger process to run and write scraping on S3 @@ -135,7 +135,7 @@ Return to the `aws_terraform_bgg` directory and run `make setup_boardgamegeek` - TEST - `bgg_scraper.main.py` for USER - Use to test a single specific url file. Must have generated ratings urls first with step 05. - Run locally and pass both scraper type `user` as an arg, and an existing filename without directory or suffix from `data/prod/scraper_urls_raw_user` - - Example: `python bgg_scraper/main.py ratings group1_ratings_scraper_urls_raw` + - Example: `python bgg_scraper/main.py ratings group1_ratings_scraper_urls_raw_raw` - Only saves data locally to `data/prod/users/scraped_xml_raw` ## I added some new stuff to my deployment. How do I update it? diff --git a/aws_dagster_bgg/assets/assets.py b/aws_dagster_bgg/assets/assets.py index a6eb70a..1ab2f78 100644 --- a/aws_dagster_bgg/assets/assets.py +++ b/aws_dagster_bgg/assets/assets.py @@ -10,7 +10,7 @@ @asset -def bgg_games_csv( +def boardgame_ranks_csv( s3_resource: ConfigurableResource, lambda_resource: ConfigurableResource, config_resource: ConfigurableResource, @@ -49,8 +49,8 @@ def bgg_games_csv( ) -@asset(deps=["bgg_games_csv"]) -def game_scraper_urls( +@asset(deps=["boardgame_ranks_csv"]) +def games_scraper_urls_raw( lambda_resource: ConfigurableResource, s3_resource: ConfigurableResource, config_resource: ConfigurableResource, @@ -95,8 +95,8 @@ def game_scraper_urls( return True -@asset(deps=["game_scraper_urls"]) -def scraped_game_xmls( +@asset(deps=["games_scraper_urls_raw"]) +def games_scraped_xml_raw( ecs_resource: ConfigurableResource, s3_resource: ConfigurableResource, config_resource: ConfigurableResource, @@ -112,7 +112,7 @@ def scraped_game_xmls( return True -@asset(deps=["scraped_game_xmls"]) +@asset(deps=["games_scraped_xml_raw"]) def game_dfs_clean( s3_resource: ConfigurableResource, ecs_resource: ConfigurableResource, @@ -143,7 +143,7 @@ def game_dfs_clean( logger.info(data_sets) data_set_file_names = [ - f"{WORKING_ENV_DIR}{configs['game']['clean_dfs_directory']}/{x}_clean.pkl" + f"{WORKING_ENV_DIR}{configs['games']['clean_dfs_directory']}/{x}_clean.pkl" for x in data_sets ] logger.info(data_set_file_names) @@ -168,7 +168,7 @@ def game_dfs_clean( @asset(deps=["game_dfs_clean"]) -def ratings_scraper_urls( +def ratings_scraper_urls_raw( lambda_resource: ConfigurableResource, s3_resource: ConfigurableResource, config_resource: ConfigurableResource, @@ -211,8 +211,8 @@ def ratings_scraper_urls( return True -@asset(deps=["ratings_scraper_urls"]) -def scraped_ratings_xmls( +@asset(deps=["ratings_scraper_urls_raw"]) +def ratings_scraped_xml_raw( ecs_resource: ConfigurableResource, s3_resource: ConfigurableResource, config_resource: ConfigurableResource, @@ -228,8 +228,8 @@ def scraped_ratings_xmls( return True -@asset(deps=["scraped_ratings_xmls"]) -def ratings_data_df( +@asset(deps=["ratings_scraped_xml_raw"]) +def ratings_dfs_dirty( s3_resource: ConfigurableResource, ecs_resource: ConfigurableResource, config_resource: ConfigurableResource, @@ -280,7 +280,7 @@ def ratings_data_df( @asset -def user_scraper_urls( +def users_scraper_urls_raw( lambda_resource: ConfigurableResource, s3_resource: ConfigurableResource, config_resource: ConfigurableResource, @@ -325,8 +325,8 @@ def user_scraper_urls( return True -@asset(deps=["user_scraper_urls"]) -def scraped_user_xmls( +@asset(deps=["users_scraper_urls_raw"]) +def users_scraped_xml_raw( ecs_resource: ConfigurableResource, s3_resource: ConfigurableResource, config_resource: ConfigurableResource, @@ -342,6 +342,57 @@ def scraped_user_xmls( return True +@asset(deps=["users_scraped_xml_raw"]) +def user_dfs_dirty( + s3_resource: ConfigurableResource, + ecs_resource: ConfigurableResource, + config_resource: ConfigurableResource, +) -> bool: + """ + Creates a clean dataframe for the ratings data from the scraped ratings XML files + """ + + configs = config_resource.get_config_file() + + bucket = configs["s3_scraper_bucket"] + key = f'{WORKING_ENV_DIR}{configs["users"]["output_xml_directory"]}' + + raw_ratings_files = s3_resource.list_file_keys(bucket=bucket, key=key) + + assert len(raw_ratings_files) == 30 if ENVIRONMENT == "prod" else 1 + + task_definition = ( + "bgg_users_data_cleaner" + if ENVIRONMENT == "prod" + else "dev_bgg_users_data_cleaner" + ) + + ecs_resource.launch_ecs_task(task_definition=task_definition) + + check_filenames = [ + f"{WORKING_ENV_DIR}{configs['users']['dirty_dfs_directory']}/users_data.pkl" + ] + logger.info(check_filenames) + + original_timestamps = { + key: s3_resource.get_last_modified( + bucket=bucket, + key=key, + ) + for key in check_filenames + } + + compare_timestamps_for_refresh( + original_timestamps=original_timestamps, + file_list_to_check=check_filenames, + location_bucket=bucket, + sleep_timer=300, + s3_resource=s3_resource, + ) + + return True + + @op def compare_timestamps_for_refresh( original_timestamps: dict, diff --git a/aws_dagster_bgg/jobs/__init__.py b/aws_dagster_bgg/jobs/__init__.py index 4de0580..37a81e5 100644 --- a/aws_dagster_bgg/jobs/__init__.py +++ b/aws_dagster_bgg/jobs/__init__.py @@ -3,5 +3,5 @@ bgg_job = define_asset_job("bgg_job", AssetSelection.all()) user_job = define_asset_job( - "user_job", selection=["user_scraper_urls", "scraped_user_xmls"] + "user_job", selection=["users_scraper_urls_raw", "users_scraped_xml_raw"] ) diff --git a/config.json b/config.json index 288a71e..479d34a 100644 --- a/config.json +++ b/config.json @@ -3,7 +3,8 @@ "game_dfs_dirty": "game_dfs_dirty", "scraper_task_definition": "bgg_scraper", "game_cleaner_task_definition": "bgg_game_data_cleaner", - "user_cleaner_task_definition": "bgg_user_data_cleaner", + "ratings_cleaner_task_definition": "bgg_ratings_data_cleaner", + "user_cleaner_task_definition": "bgg_users_data_cleaner", "ecs_cluster": "boardgamegeek", "orchestrator_task_definition": "bgg_orchestrator", "boardgamegeek_csv_filename": "boardgames_ranks.csv", diff --git a/modules/bgg_boardgame_file_retrieval/get_bgg_games_file.py b/modules/bgg_boardgame_file_retrieval/get_bgg_games_file.py index 8745c26..5b061ff 100644 --- a/modules/bgg_boardgame_file_retrieval/get_bgg_games_file.py +++ b/modules/bgg_boardgame_file_retrieval/get_bgg_games_file.py @@ -1,6 +1,7 @@ import os import time import zipfile +import csv from os.path import expanduser from tempfile import mkdtemp @@ -24,8 +25,8 @@ def initialize_driver(default_directory: str) -> webdriver.Chrome: options for the scraper to work. The function will return the initialized driver.""" - # if not os.environ.get("ENVIRONMENT", "dev") == "prod": - # return webdriver.Chrome() + if not os.environ.get("ENVIRONMENT", "dev") == "prod": + return webdriver.Chrome() chrome_options = ChromeOptions() chrome_options.add_argument("--headless=new") @@ -106,13 +107,32 @@ def lambda_handler(event: dict = None, context: dict = None) -> None: with zipfile.ZipFile(f"{default_directory}/Downloads/{filename}", "r") as zip_ref: zip_ref.extractall(extract_directory) + local_file = f"{extract_directory}/boardgames_ranks.csv" + output_file = f"{extract_directory}/boardgames_ranks.tsv" + + # Input and output file paths + input_file = local_file + output_file = output_file + + # Convert CSV to TSV + with open(input_file, "r") as csv_file, open( + output_file, "w", newline="" + ) as tsv_file: + csv_reader = csv.reader(csv_file) + tsv_writer = csv.writer(tsv_file, delimiter="\t") + + for row in csv_reader: + tsv_writer.writerow(row) + + print(f"Converted {input_file} to {output_file}") + wr.s3.upload( - local_file=f"{extract_directory}/boardgames_ranks.csv", + local_file=output_file, path=f"s3://{S3_SCRAPER_BUCKET}/data/prod/boardgames_ranks.csv", ) wr.s3.upload( - local_file=f"{extract_directory}/boardgames_ranks.csv", + local_file=output_file, path=f"s3://{S3_SCRAPER_BUCKET}/data/test/boardgames_ranks.csv", ) diff --git a/modules/lambda_functions/bgg_ratings_data_cleaner_fargate_trigger.py b/modules/lambda_functions/bgg_ratings_data_cleaner_fargate_trigger.py index 743d07f..d52a1ec 100644 --- a/modules/lambda_functions/bgg_ratings_data_cleaner_fargate_trigger.py +++ b/modules/lambda_functions/bgg_ratings_data_cleaner_fargate_trigger.py @@ -7,7 +7,7 @@ ENVIRONMENT = os.environ.get("ENVIRONMENT", "dev") S3_SCRAPER_BUCKET = os.environ.get("S3_SCRAPER_BUCKET") -SCRAPER_TASK_DEFINITION = CONFIGS["ratings_cleaner_task_definition"] +TASK_DEFINITION = CONFIGS["ratings_cleaner_task_definition"] TERRAFORM_STATE_BUCKET = os.environ.get("TF_VAR_BUCKET") @@ -36,9 +36,7 @@ def lambda_handler(event, context): terraform_state_file = get_terraform_state_file_for_vpc() task_definition = ( - f"dev_{SCRAPER_TASK_DEFINITION}" - if ENVIRONMENT != "prod" - else SCRAPER_TASK_DEFINITION + f"dev_{TASK_DEFINITION}" if ENVIRONMENT != "prod" else TASK_DEFINITION ) print(task_definition) diff --git a/modules/ratings_data_cleaner/main.py b/modules/ratings_data_cleaner/main.py index c3831c8..bdf4f32 100644 --- a/modules/ratings_data_cleaner/main.py +++ b/modules/ratings_data_cleaner/main.py @@ -139,13 +139,14 @@ def _save_dfs_to_disk_or_s3(self, ratings_df: dict[pd.DataFrame]): def _create_file_of_unique_user_ids(self, ratings_df: pd.DataFrame) -> list: """Create a list of unique user IDs""" - user_ratings_count_df = ratings_df.groupby("username").count()["rating"] - ratings_names_less_than_5 = user_ratings_count_df[ - user_ratings_count_df < 5 - ].index - ratings_df = ratings_df.drop( - ratings_df[ratings_df["username"].isin(ratings_names_less_than_5)].index - ) + if ENVIRONMENT == "prod": + user_ratings_count_df = ratings_df.groupby("username").count()["rating"] + ratings_names_less_than_5 = user_ratings_count_df[ + user_ratings_count_df < 5 + ].index + ratings_df = ratings_df.drop( + ratings_df[ratings_df["username"].isin(ratings_names_less_than_5)].index + ) unique_ids = {"list_of_ids": ratings_df["username"].unique().tolist()} diff --git a/modules/user_data_cleaner/Pipfile b/modules/users_data_cleaner/Pipfile similarity index 100% rename from modules/user_data_cleaner/Pipfile rename to modules/users_data_cleaner/Pipfile diff --git a/modules/user_data_cleaner/Pipfile.lock b/modules/users_data_cleaner/Pipfile.lock similarity index 100% rename from modules/user_data_cleaner/Pipfile.lock rename to modules/users_data_cleaner/Pipfile.lock diff --git a/modules/user_data_cleaner/main.py b/modules/users_data_cleaner/main.py similarity index 100% rename from modules/user_data_cleaner/main.py rename to modules/users_data_cleaner/main.py