Skip to content

Commit

Permalink
Merge pull request #133 from openstates/scrape-archive-to-gcp-storage
Browse files Browse the repository at this point in the history
Archive scrape output files to GCP Cloud Storage
  • Loading branch information
jessemortenson authored Aug 1, 2024
2 parents 7ac7b73 + 8472cc9 commit 3c83c0b
Show file tree
Hide file tree
Showing 3 changed files with 328 additions and 1 deletion.
37 changes: 37 additions & 0 deletions openstates/cli/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import contextlib
import datetime
import glob
from google.cloud import storage # type: ignore
import importlib
import inspect
import logging
Expand All @@ -28,6 +29,11 @@

ALL_ACTIONS = ("scrape", "import")

# Settings to archive scraped out put to GCP Cloud Storage
GCP_PROJECT = os.environ.get("GCP_PROJECT", None)
BUCKET_NAME = os.environ.get("BUCKET_NAME", None)
SCRAPE_LAKE_PREFIX = os.environ.get("BUCKET_PREFIX", "legislation")


class _Unset:
pass
Expand Down Expand Up @@ -96,6 +102,7 @@ def do_scrape(
]
)

last_scrape_end_datetime = datetime.datetime.utcnow()
for scraper_name, scrape_args in scrapers.items():
ScraperCls = juris.scrapers[scraper_name]
if (
Expand Down Expand Up @@ -124,6 +131,7 @@ def do_scrape(
file_archiving_enabled=args.archive,
)
partial_report = scraper.do_scrape(**scrape_args, session=session)
last_scrape_end_datetime = partial_report["end"]
stats.write_stats(
[
{
Expand Down Expand Up @@ -157,6 +165,7 @@ def do_scrape(
file_archiving_enabled=args.archive,
)
report[scraper_name] = scraper.do_scrape(**scrape_args)
last_scrape_end_datetime = report[scraper_name]["end"]
session = scrape_args.get("session", "")
if session:
stats.write_stats(
Expand Down Expand Up @@ -189,9 +198,37 @@ def do_scrape(
]
)

# optionally upload scrape output to cloud storage
# but do not archive if realtime mode enabled, as realtime mode has its own archiving process
if args.archive and not args.realtime:
archive_to_cloud_storage(datadir, juris, last_scrape_end_datetime)

return report


def archive_to_cloud_storage(
datadir: str, juris: State, last_scrape_end_datetime: datetime.datetime
) -> None:
# check if we have necessary settings
if GCP_PROJECT is None or BUCKET_NAME is None:
logger.error(
"Scrape archiving is turned on, but necessary settings are missing. No archive was done."
)
return
cloud_storage_client = storage.Client()
bucket = cloud_storage_client.bucket(BUCKET_NAME, GCP_PROJECT)
jurisdiction_id = juris.jurisdiction_id.replace("ocd-jurisdiction/", "")
destination_prefx = (
f"{SCRAPE_LAKE_PREFIX}/{jurisdiction_id}/{last_scrape_end_datetime.isoformat()}"
)

# read files in directory and upload
for file_path in glob.glob(datadir + "/*.json"):
blob_name = os.path.join(destination_prefx, os.path.basename(file_path))
blob = bucket.blob(blob_name)
blob.upload_from_filename(file_path)


def do_import(juris: State, args: argparse.Namespace) -> dict[str, typing.Any]:
# import inside here because to avoid loading Django code unnecessarily
from openstates.data.models import Jurisdiction as DatabaseJurisdiction
Expand Down
Loading

0 comments on commit 3c83c0b

Please sign in to comment.