From ce10ebfb3dd2dc6670881a7279550f4379b35b33 Mon Sep 17 00:00:00 2001 From: Cody Baker Date: Fri, 13 Sep 2024 16:23:25 -0400 Subject: [PATCH 1/3] try to reduce RAM usage --- .../_map_binned_s3_logs_to_dandisets.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py b/src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py index 9f0d3ac..1eb324e 100644 --- a/src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py +++ b/src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py @@ -111,7 +111,7 @@ def _map_binned_logs_to_dandiset( dandiset_id = dandiset.identifier dandiset_log_folder_path = dandiset_logs_folder_path / dandiset_id - all_reduced_s3_logs_per_blob_id = dict() + all_reduced_s3_logs_per_blob_id_aggregated_by_day = dict() blob_id_to_asset_path = dict() total_bytes_across_versions_by_blob_id = dict() dandiset_versions = list(dandiset.get_versions()) @@ -191,8 +191,9 @@ def _map_binned_logs_to_dandiset( ) reordered_reduced_s3_log["date"] = [entry[:10] for entry in reordered_reduced_s3_log["timestamp"]] - reduced_s3_logs_per_day.append(reordered_reduced_s3_log) - all_reduced_s3_logs_per_blob_id[blob_id] = reordered_reduced_s3_log + aggregated_activity_by_day = _aggregate_activity_by_day(reduced_s3_logs_per_day=[reordered_reduced_s3_log]) + reduced_s3_logs_per_day.append(aggregated_activity_by_day) + all_reduced_s3_logs_per_blob_id_aggregated_by_day[blob_id] = aggregated_activity_by_day total_bytes = sum(reduced_s3_log_binned_by_blob_id["bytes_sent"]) total_bytes_per_asset_path[asset.path] = total_bytes @@ -204,8 +205,8 @@ def _map_binned_logs_to_dandiset( continue # No activity found (possible dandiset version was never accessed); skip to next version version_summary_by_day_file_path = dandiset_version_log_folder_path / "version_summary_by_day.tsv" - _write_aggregated_activity_by_day( - reduced_s3_logs_per_day=reduced_s3_logs_per_day, file_path=version_summary_by_day_file_path + aggregated_activity_by_day.to_csv( + path_or_buf=version_summary_by_day_file_path, mode="w", sep="\t", header=True, index=False ) version_summary_by_region_file_path = dandiset_version_log_folder_path / "version_summary_by_region.tsv" @@ -218,7 +219,7 @@ def _map_binned_logs_to_dandiset( total_bytes_per_asset_path=total_bytes_per_asset_path, file_path=version_summary_by_asset_file_path ) - if len(all_reduced_s3_logs_per_blob_id) == 0: + if len(all_reduced_s3_logs_per_blob_id_aggregated_by_day) == 0: return None # No activity found (possible dandiset was never accessed); skip to next version # Single path across versions could have been replaced at various points by a new blob @@ -228,13 +229,13 @@ def _map_binned_logs_to_dandiset( dandiset_summary_by_day_file_path = dandiset_log_folder_path / "dandiset_summary_by_day.tsv" _write_aggregated_activity_by_day( - reduced_s3_logs_per_day=all_reduced_s3_logs_per_blob_id.values(), + reduced_s3_logs_per_day=all_reduced_s3_logs_per_blob_id_aggregated_by_day.values(), file_path=dandiset_summary_by_day_file_path, ) dandiset_summary_by_region_file_path = dandiset_log_folder_path / "dandiset_summary_by_region.tsv" _write_aggregated_activity_by_region( - reduced_s3_logs_per_day=all_reduced_s3_logs_per_blob_id.values(), + reduced_s3_logs_per_day=all_reduced_s3_logs_per_blob_id_aggregated_by_day.values(), file_path=dandiset_summary_by_region_file_path, ) From 8f5581847ac755145425059081dde23a5dd9179b Mon Sep 17 00:00:00 2001 From: Cody Baker Date: Fri, 13 Sep 2024 18:01:58 -0400 Subject: [PATCH 2/3] version bump --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index d58f17b..6074eb8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ packages = ["src/dandi_s3_log_parser"] [project] name = "dandi_s3_log_parser" -version="0.4.1" +version="0.4.2" authors = [ { name="Cody Baker", email="cody.c.baker.phd@gmail.com" }, ] From 7240e3e7e0bd651b3294c9c97a03cf7e18ec73c0 Mon Sep 17 00:00:00 2001 From: Cody Baker Date: Sat, 14 Sep 2024 01:39:28 -0400 Subject: [PATCH 3/3] fix --- .../_map_binned_s3_logs_to_dandisets.py | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py b/src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py index 1eb324e..28ceb43 100644 --- a/src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py +++ b/src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py @@ -112,6 +112,7 @@ def _map_binned_logs_to_dandiset( dandiset_log_folder_path = dandiset_logs_folder_path / dandiset_id all_reduced_s3_logs_per_blob_id_aggregated_by_day = dict() + all_reduced_s3_logs_per_blob_id_aggregated_by_region = dict() blob_id_to_asset_path = dict() total_bytes_across_versions_by_blob_id = dict() dandiset_versions = list(dandiset.get_versions()) @@ -130,7 +131,8 @@ def _map_binned_logs_to_dandiset( dandiset_version = client.get_dandiset(dandiset_id=dandiset_id, version_id=version_id) - reduced_s3_logs_per_day = [] + all_reduced_s3_logs_aggregated_by_day_for_version = [] + all_reduced_s3_logs_aggregated_by_region_for_version = [] total_bytes_per_asset_path = dict() dandiset_version_assets = list(dandiset_version.get_assets()) for asset in tqdm.tqdm( @@ -191,27 +193,37 @@ def _map_binned_logs_to_dandiset( ) reordered_reduced_s3_log["date"] = [entry[:10] for entry in reordered_reduced_s3_log["timestamp"]] + + # Aggregate per asset to save memory (most impactful for 000108) aggregated_activity_by_day = _aggregate_activity_by_day(reduced_s3_logs_per_day=[reordered_reduced_s3_log]) - reduced_s3_logs_per_day.append(aggregated_activity_by_day) + all_reduced_s3_logs_aggregated_by_day_for_version.append(aggregated_activity_by_day) all_reduced_s3_logs_per_blob_id_aggregated_by_day[blob_id] = aggregated_activity_by_day + aggregated_activity_by_region = _aggregate_activity_by_region( + reduced_s3_logs_per_day=[reordered_reduced_s3_log] + ) + all_reduced_s3_logs_aggregated_by_region_for_version.append(aggregated_activity_by_region) + all_reduced_s3_logs_per_blob_id_aggregated_by_region[blob_id] = aggregated_activity_by_region + total_bytes = sum(reduced_s3_log_binned_by_blob_id["bytes_sent"]) total_bytes_per_asset_path[asset.path] = total_bytes blob_id_to_asset_path[blob_id] = asset.path total_bytes_across_versions_by_blob_id[blob_id] = total_bytes - if len(reduced_s3_logs_per_day) == 0: + if len(all_reduced_s3_logs_aggregated_by_day_for_version) == 0: continue # No activity found (possible dandiset version was never accessed); skip to next version version_summary_by_day_file_path = dandiset_version_log_folder_path / "version_summary_by_day.tsv" - aggregated_activity_by_day.to_csv( - path_or_buf=version_summary_by_day_file_path, mode="w", sep="\t", header=True, index=False + _write_aggregated_activity_by_day( + reduced_s3_logs_per_day=all_reduced_s3_logs_aggregated_by_day_for_version, + file_path=version_summary_by_day_file_path, ) version_summary_by_region_file_path = dandiset_version_log_folder_path / "version_summary_by_region.tsv" _write_aggregated_activity_by_region( - reduced_s3_logs_per_day=reduced_s3_logs_per_day, file_path=version_summary_by_region_file_path + reduced_s3_logs_per_day=all_reduced_s3_logs_aggregated_by_region_for_version, + file_path=version_summary_by_region_file_path, ) version_summary_by_asset_file_path = dandiset_version_log_folder_path / "version_summary_by_asset.tsv" @@ -235,7 +247,7 @@ def _map_binned_logs_to_dandiset( dandiset_summary_by_region_file_path = dandiset_log_folder_path / "dandiset_summary_by_region.tsv" _write_aggregated_activity_by_region( - reduced_s3_logs_per_day=all_reduced_s3_logs_per_blob_id_aggregated_by_day.values(), + reduced_s3_logs_per_day=all_reduced_s3_logs_per_blob_id_aggregated_by_region.values(), file_path=dandiset_summary_by_region_file_path, )