Skip to content

Commit

Permalink
Merge pull request #7 from cal-itp/fix-output
Browse files Browse the repository at this point in the history
Skip writing parquet output files if there were no validation errors
  • Loading branch information
atvaccaro authored Apr 7, 2022
2 parents a6a9d1e + 37c1a10 commit 26bfc65
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 12 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: google-github-actions/setup-gcloud@master
- uses: google-github-actions/setup-gcloud@v0
with:
service_account_key: ${{ secrets.GCP_SA_KEY }}
export_default_credentials: true
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ repos:
- id: end-of-file-fixer
- id: check-added-large-files
- repo: https://github.com/psf/black
rev: 19.10b0
rev: 22.3.0
hooks:
- id: black
- repo: https://github.com/pycqa/isort
Expand Down
20 changes: 12 additions & 8 deletions gtfs_rt_validator_api.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
__version__ = "0.0.3"
__version__ = "0.0.5"

import concurrent
import json
import multiprocessing
import os
import shutil
import subprocess
import traceback
from collections import defaultdict
from concurrent.futures import ProcessPoolExecutor
from pathlib import Path
Expand Down Expand Up @@ -184,27 +185,29 @@ def validate_gcs_bucket(
dst_path_gtfs = f"{tmp_dir_name}/gtfs"
dst_path_rt = f"{tmp_dir_name}/rt"

# fetch and zip gtfs schedule
download_gtfs_schedule_zip(gtfs_schedule_path, dst_path_gtfs, fs)

# fetch rt data
if gtfs_rt_glob_path is None:
raise ValueError("One of gtfs rt glob path or date must be specified")

num_files = download_rt_files(dst_path_rt, fs, glob_path=gtfs_rt_glob_path)

# fetch and zip gtfs schedule
download_gtfs_schedule_zip(gtfs_schedule_path, dst_path_gtfs, fs)

logger.info(f"validating {num_files} files")
validate(f"{dst_path_gtfs}.zip", dst_path_rt, verbose=verbose)

if results_bucket and aggregate_counts:
logger.info(f"Saving aggregate counts as: {results_bucket}")

error_counts = rollup_error_counts(dst_path_rt)
df = pd.DataFrame(error_counts)

with NamedTemporaryFile() as tmp_file:
df.to_parquet(tmp_file.name)
fs.put(tmp_file.name, results_bucket)
if error_counts:
df = pd.DataFrame(error_counts)

with NamedTemporaryFile() as tmp_file:
df.to_parquet(tmp_file.name)
fs.put(tmp_file.name, results_bucket)

elif results_bucket and not aggregate_counts:
# validator stores results as {filename}.results.json
Expand All @@ -228,6 +231,7 @@ def validate_gcs_bucket(
fs.put(final_files, results_bucket)

except Exception as e:
typer.echo(f"got exception during validation: {traceback.format_exc()}")
raise e

finally:
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ google-auth==1.32.1
gtfs-realtime-bindings==0.0.7
pytest==6.2.5
black==19.10b0
typer==0.4.0
typer==0.4.1
pendulum==2.1.2
structlog==21.5.0
calitp==0.0.8
4 changes: 3 additions & 1 deletion tests/test_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ def test_validation_manual():
)

download_rt_files(
dir_rt, fs, glob_path=f"{GCS_BASE_DIR}/gtfs_rt_126/*/126/0/*",
dir_rt,
fs,
glob_path=f"{GCS_BASE_DIR}/gtfs_rt_126/*/126/0/*",
)

print("validating")
Expand Down

0 comments on commit 26bfc65

Please sign in to comment.