Skip to content

Commit

Permalink
Fresh round of Drogon fixes (#26)
Browse files Browse the repository at this point in the history
* added another bad line; adjusted name of helper to clarify

* fix for CLI

* fix for CLI

* fix for CLI

* fix for CLI

* fix for CLI

* try to suppress ranges

* restored int range checking

* ignore particular file during glob

* try to resolve race condition

* move per job temp folders out of home

* limit for testing

* limit testing number; add error logging for ordering step

* fix iteration during ordering

* add ordering tqdm

* release lock

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* debug

* debugging

* debug

---------

Co-authored-by: CodyCBakerPhD <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Aug 12, 2024
1 parent acc51cc commit b38b4c7
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 66 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/publish_to_pypi_on_github_release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,21 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3

- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.12"

- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install --upgrade build
python -m pip install --upgrade twine
- name: Build package
run: python -m build

- name: Publish to PyPI
uses: pypa/[email protected]
with:
Expand Down
5 changes: 5 additions & 0 deletions src/dandi_s3_log_parser/_buffered_text_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def __init__(self, *, file_path: str | pathlib.Path, maximum_buffer_size_in_byte
self.buffer_size_in_bytes = int(maximum_buffer_size_in_bytes / 3)

self.total_file_size = pathlib.Path(file_path).stat().st_size
self.number_of_buffers = int(self.total_file_size / self.buffer_size_in_bytes) + 1
self.offset = 0

def __iter__(self):
Expand Down Expand Up @@ -60,3 +61,7 @@ def __next__(self) -> list[str]:
self.offset += self.buffer_size_in_bytes - len(last_line.encode("utf-8"))

return buffer

def __len__(self) -> int:
"""Return the number of iterations needed to read the entire file."""
return self.number_of_buffers
40 changes: 19 additions & 21 deletions src/dandi_s3_log_parser/_command_line_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import click
from typing import Literal

from ._s3_log_file_parser import parse_dandi_raw_s3_log, parse_all_dandi_raw_s3_logs
from ._dandi_s3_log_file_parser import parse_dandi_raw_s3_log, parse_all_dandi_raw_s3_logs
from .testing._helpers import find_random_example_line
from ._config import REQUEST_TYPES

Expand All @@ -26,21 +26,6 @@
required=True,
type=click.Path(writable=True),
)
@click.option(
"--mode",
help=(
"""How to resolve the case when files already exist in the folder containing parsed logs.
"w" will overwrite existing content, "a" will append or create if the file does not yet exist.
The intention of the default usage is to have one consolidated raw S3 log file per day and then to iterate
over each day, parsing and binning by asset, effectively 'updating' the parsed collection on each iteration.
HINT: If this iteration is done in chronological order, the resulting parsed logs will also maintain that order.
"""
),
required=False,
type=click.Choice(["w", "a"]),
default="a",
)
@click.option(
"--excluded_ips",
help="A comma-separated list of IP addresses to exclude from parsing.",
Expand All @@ -52,15 +37,28 @@
"--maximum_number_of_workers",
help="The maximum number of workers to distribute tasks across.",
required=False,
type=click.IntRange(1, os.cpu_count()),
type=click.IntRange(min=1, max=os.cpu_count()),
default=1,
)
@click.option(
"--maximum_buffer_size_in_bytes",
help=""""
The theoretical maximum amount of RAM (in bytes) to use on each buffer iteration when reading from the
source text files.
Actual total RAM usage will be higher due to overhead and caching.
Automatically splits this total amount over the maximum number of workers if `maximum_number_of_workers` is
greater than one.
""",
required=False,
type=click.IntRange(min=10**6), # Minimum of 1 MB
default=4 * 10**9,
)
def parse_all_dandi_raw_s3_logs_cli(
base_raw_s3_log_folder_path: str,
parsed_s3_log_folder_path: str,
mode: Literal["w", "a"] = "a",
excluded_ips: str | None = None,
maximum_number_of_workers: int = 1,
excluded_ips: str | None,
maximum_number_of_workers: int,
maximum_buffer_size_in_bytes: int,
) -> None:
split_excluded_ips = excluded_ips.split(",") if excluded_ips is not None else []
handled_excluded_ips = collections.defaultdict(bool) if len(split_excluded_ips) != 0 else None
Expand All @@ -70,9 +68,9 @@ def parse_all_dandi_raw_s3_logs_cli(
parse_all_dandi_raw_s3_logs(
base_raw_s3_log_folder_path=base_raw_s3_log_folder_path,
parsed_s3_log_folder_path=parsed_s3_log_folder_path,
mode=mode,
excluded_ips=handled_excluded_ips,
maximum_number_of_workers=maximum_number_of_workers,
maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes,
)


Expand Down
43 changes: 38 additions & 5 deletions src/dandi_s3_log_parser/_dandi_s3_log_file_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import importlib.metadata

import pandas
from pydantic import Field, validate_call
from pydantic import validate_call, Field
import tqdm

from ._ip_utils import (
Expand All @@ -21,6 +21,7 @@
from ._s3_log_file_parser import parse_raw_s3_log
from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH
from ._order_parsed_logs import order_parsed_logs
from ._ip_utils import _load_ip_address_to_region_cache, _save_ip_address_to_region_cache, _IP_HASH_TO_REGION_FILE_PATH


@validate_call
Expand Down Expand Up @@ -69,7 +70,7 @@ def parse_all_dandi_raw_s3_logs(
parsed_s3_log_folder_path.mkdir(exist_ok=True)

# Create a fresh temporary directory in the home folder and then fresh subfolders for each job
temporary_base_folder_path = DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH / "temp"
temporary_base_folder_path = parsed_s3_log_folder_path / ".temp"
temporary_base_folder_path.mkdir(exist_ok=True)

# Clean up any previous tasks that failed to clean themselves up
Expand All @@ -93,7 +94,10 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
split_by_slash = raw_asset_id.split("/")
return split_by_slash[0] + "_" + split_by_slash[-1]

daily_raw_s3_log_file_paths = list(base_raw_s3_log_folder_path.rglob(pattern="*.log"))
# Workaround to particular issue with current repo storage structure on Drogon
daily_raw_s3_log_file_paths = set(base_raw_s3_log_folder_path.rglob(pattern="*.log")) - set(
[pathlib.Path("/mnt/backup/dandi/dandiarchive-logs/stats/start-end.log")]
)

if maximum_number_of_workers == 1:
for raw_s3_log_file_path in tqdm.tqdm(
Expand All @@ -114,12 +118,20 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
order_results=False, # Will immediately reorder all files at the end
)
else:
ip_hash_to_region = _load_ip_address_to_region_cache()

per_job_temporary_folder_paths = list()
for job_index in range(maximum_number_of_workers):
per_job_temporary_folder_path = temporary_folder_path / f"job_{job_index}"
per_job_temporary_folder_path.mkdir(exist_ok=True)
per_job_temporary_folder_paths.append(per_job_temporary_folder_path)

# Must have one cache copy per job to avoid race conditions
ip_hash_to_region_file_path = per_job_temporary_folder_path / "ip_hash_to_region.yaml"
_save_ip_address_to_region_cache(
ip_hash_to_region=ip_hash_to_region, ip_hash_to_region_file_path=ip_hash_to_region_file_path
)

maximum_buffer_size_in_bytes_per_job = maximum_buffer_size_in_bytes // maximum_number_of_workers

futures = []
Expand Down Expand Up @@ -148,6 +160,19 @@ def asset_id_handler(*, raw_asset_id: str) -> str:

print("\n\nParallel parsing complete!\n\n")

# Merge IP cache files
for job_index in range(maximum_number_of_workers):
per_job_temporary_folder_path = temporary_folder_path / f"job_{job_index}"
ip_hash_to_region_file_path = per_job_temporary_folder_path / "ip_hash_to_region.yaml"

new_ip_hash_to_region = _load_ip_address_to_region_cache(
ip_hash_to_region_file_path=ip_hash_to_region_file_path
)
ip_hash_to_region.update(new_ip_hash_to_region)
_save_ip_address_to_region_cache(
ip_hash_to_region=ip_hash_to_region, ip_hash_to_region_file_path=_IP_HASH_TO_REGION_FILE_PATH
)

for per_job_temporary_folder_path in tqdm.tqdm(
iterable=per_job_temporary_folder_paths,
desc="Merging results across jobs...",
Expand All @@ -164,7 +189,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
total=len(per_job_parsed_s3_log_file_paths),
position=1,
leave=False,
mininterval=1.0,
mininterval=3.0,
):
merged_temporary_file_path = temporary_output_folder_path / per_job_parsed_s3_log_file_path.name

Expand All @@ -175,6 +200,8 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
path_or_buf=merged_temporary_file_path, mode="a", sep="\t", header=header, index=False
)

print("\n\n")

# Always apply this step at the end to be sure we maintained chronological order
# (even if you think order of iteration itself was performed chronologically)
# This step also adds the index counter to the TSV
Expand All @@ -183,7 +210,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
ordered_parsed_s3_log_folder_path=parsed_s3_log_folder_path,
)

shutil.rmtree(path=temporary_output_folder_path, ignore_errors=True)
shutil.rmtree(path=temporary_base_folder_path, ignore_errors=True)

return None

Expand Down Expand Up @@ -213,6 +240,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str:

job_index = os.getpid() % maximum_number_of_workers
per_job_temporary_folder_path = temporary_folder_path / f"job_{job_index}"
ip_hash_to_region_file_path = per_job_temporary_folder_path / "ip_hash_to_region.yaml"

# Define error catching stuff as part of the try clause
# so that if there is a problem within that, it too is caught
Expand All @@ -236,6 +264,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
tqdm_kwargs=dict(position=job_index + 1, leave=False),
maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes,
order_results=False, # Always disable this for parallel processing
ip_hash_to_region_file_path=ip_hash_to_region_file_path,
)
except Exception as exception:
with open(file=parallel_errors_file_path, mode="a") as io:
Expand All @@ -256,6 +285,7 @@ def parse_dandi_raw_s3_log(
tqdm_kwargs: dict | None = None,
maximum_buffer_size_in_bytes: int = 4 * 10**9,
order_results: bool = True,
ip_hash_to_region_file_path: str | pathlib.Path | None = None,
) -> None:
"""
Parse a raw S3 log file and write the results to a folder of TSV files, one for each unique asset ID.
Expand Down Expand Up @@ -300,6 +330,8 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
This is strongly suggested, but a common case of disabling it is if ordering is intended to be applied after
multiple steps of processing instead of during this operation.
"""
raw_s3_log_file_path = pathlib.Path(raw_s3_log_file_path)
parsed_s3_log_folder_path = pathlib.Path(parsed_s3_log_folder_path)
tqdm_kwargs = tqdm_kwargs or dict()

bucket = "dandiarchive"
Expand Down Expand Up @@ -329,6 +361,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
tqdm_kwargs=tqdm_kwargs,
maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes,
order_results=order_results,
ip_hash_to_region_file_path=ip_hash_to_region_file_path,
)

return None
17 changes: 12 additions & 5 deletions src/dandi_s3_log_parser/_ip_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import ipinfo
import requests
import yaml
from pydantic import FilePath

from ._config import (
_IP_HASH_TO_REGION_FILE_PATH,
Expand Down Expand Up @@ -50,18 +51,24 @@ def _get_latest_github_ip_ranges() -> list[str]:
return all_github_ips


def _load_ip_address_to_region_cache() -> dict[str, str]:
def _load_ip_address_to_region_cache(ip_hash_to_region_file_path: FilePath | None = None) -> dict[str, str]:
"""Load the IP address to region cache from disk."""
if not _IP_HASH_TO_REGION_FILE_PATH.exists():
ip_hash_to_region_file_path = ip_hash_to_region_file_path or _IP_HASH_TO_REGION_FILE_PATH

if not ip_hash_to_region_file_path.exists():
return dict()

with open(file=_IP_HASH_TO_REGION_FILE_PATH, mode="r") as stream:
with open(file=ip_hash_to_region_file_path, mode="r") as stream:
return yaml.load(stream=stream, Loader=yaml.SafeLoader)


def _save_ip_address_to_region_cache(ip_hash_to_region: dict[str, str]) -> None:
def _save_ip_address_to_region_cache(
ip_hash_to_region: dict[str, str], ip_hash_to_region_file_path: FilePath | None = None
) -> None:
"""Save the IP address to region cache to disk."""
with open(file=_IP_HASH_TO_REGION_FILE_PATH, mode="w") as stream:
ip_hash_to_region_file_path = ip_hash_to_region_file_path or _IP_HASH_TO_REGION_FILE_PATH

with open(file=ip_hash_to_region_file_path, mode="w") as stream:
yaml.dump(data=ip_hash_to_region, stream=stream)


Expand Down
43 changes: 36 additions & 7 deletions src/dandi_s3_log_parser/_order_parsed_logs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
import pathlib
import datetime
import traceback
import importlib.metadata

import pandas
import tqdm

from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH


def order_parsed_logs(
Expand All @@ -8,12 +15,34 @@ def order_parsed_logs(
"""Order the contents of all parsed log files chronologically."""
ordered_parsed_s3_log_folder_path.mkdir(exist_ok=True)

for unordered_parsed_s3_log_file_path in unordered_parsed_s3_log_folder_path.iterdir():
unordered_parsed_s3_log = pandas.read_table(filepath_or_buffer=unordered_parsed_s3_log_file_path, header=0)
ordered_parsed_s3_log = unordered_parsed_s3_log.sort_values(by="timestamp")
errors_folder_path = DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH / "errors"
errors_folder_path.mkdir(exist_ok=True)

dandi_s3_log_parser_version = importlib.metadata.version(distribution_name="dandi_s3_log_parser")
date = datetime.datetime.now().strftime("%y%m%d")
ordering_errors_file_path = errors_folder_path / f"v{dandi_s3_log_parser_version}_{date}_ordering_errors.txt"

unordered_file_paths = list(unordered_parsed_s3_log_folder_path.glob("*.tsv"))
for unordered_parsed_s3_log_file_path in tqdm.tqdm(
iterable=unordered_file_paths,
total=len(unordered_file_paths),
desc="Ordering parsed logs...",
position=0,
leave=True,
mininterval=3.0,
):
try:
error_message = f"Ordering {unordered_parsed_s3_log_file_path}...\n\n"

unordered_parsed_s3_log = pandas.read_table(filepath_or_buffer=unordered_parsed_s3_log_file_path, header=0)
ordered_parsed_s3_log = unordered_parsed_s3_log.sort_values(by="timestamp")

# correct index of first column
ordered_parsed_s3_log.index = range(len(ordered_parsed_s3_log))
# correct index of first column
ordered_parsed_s3_log.index = range(len(ordered_parsed_s3_log))

ordered_parsed_s3_log_file_path = ordered_parsed_s3_log_folder_path / unordered_parsed_s3_log_file_path.name
ordered_parsed_s3_log.to_csv(path_or_buf=ordered_parsed_s3_log_file_path, sep="\t", header=True, index=True)
ordered_parsed_s3_log_file_path = ordered_parsed_s3_log_folder_path / unordered_parsed_s3_log_file_path.name
ordered_parsed_s3_log.to_csv(path_or_buf=ordered_parsed_s3_log_file_path, sep="\t", header=True, index=True)
except Exception as exception:
with open(file=ordering_errors_file_path, mode="a") as io:
error_message += f"{type(exception)}: {str(exception)}\n\n{traceback.format_exc()}\n\n"
io.write(error_message)
Loading

0 comments on commit b38b4c7

Please sign in to comment.