Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Archive scrape output files to GCP Cloud Storage #133

Merged
merged 5 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions openstates/cli/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import contextlib
import datetime
import glob
from google.cloud import storage # type: ignore
import importlib
import inspect
import logging
Expand All @@ -28,6 +29,11 @@

ALL_ACTIONS = ("scrape", "import")

# Settings to archive scraped out put to GCP Cloud Storage
GCP_PROJECT = os.environ.get("GCP_PROJECT", None)
BUCKET_NAME = os.environ.get("BUCKET_NAME", None)
SCRAPE_LAKE_PREFIX = os.environ.get("BUCKET_PREFIX", "legislation")


class _Unset:
pass
Expand Down Expand Up @@ -96,6 +102,7 @@ def do_scrape(
]
)

last_scrape_end_datetime = datetime.datetime.utcnow()
for scraper_name, scrape_args in scrapers.items():
ScraperCls = juris.scrapers[scraper_name]
if (
Expand Down Expand Up @@ -124,6 +131,7 @@ def do_scrape(
file_archiving_enabled=args.archive,
)
partial_report = scraper.do_scrape(**scrape_args, session=session)
last_scrape_end_datetime = partial_report["end"]
stats.write_stats(
[
{
Expand Down Expand Up @@ -157,6 +165,7 @@ def do_scrape(
file_archiving_enabled=args.archive,
)
report[scraper_name] = scraper.do_scrape(**scrape_args)
last_scrape_end_datetime = report[scraper_name]["end"]
session = scrape_args.get("session", "")
if session:
stats.write_stats(
Expand Down Expand Up @@ -189,9 +198,37 @@ def do_scrape(
]
)

# optionally upload scrape output to cloud storage
# but do not archive if realtime mode enabled, as realtime mode has its own archiving process
if args.archive and not args.realtime:
archive_to_cloud_storage(datadir, juris, last_scrape_end_datetime)

return report


def archive_to_cloud_storage(
datadir: str, juris: State, last_scrape_end_datetime: datetime.datetime
) -> None:
# check if we have necessary settings
if GCP_PROJECT is None or BUCKET_NAME is None:
logger.error(
"Scrape archiving is turned on, but necessary settings are missing. No archive was done."
)
return
cloud_storage_client = storage.Client()
bucket = cloud_storage_client.bucket(BUCKET_NAME, GCP_PROJECT)
jurisdiction_id = juris.jurisdiction_id.replace("ocd-jurisdiction/", "")
destination_prefx = (
f"{SCRAPE_LAKE_PREFIX}/{jurisdiction_id}/{last_scrape_end_datetime.isoformat()}"
)

# read files in directory and upload
for file_path in glob.glob(datadir + "/*.json"):
blob_name = os.path.join(destination_prefx, os.path.basename(file_path))
blob = bucket.blob(blob_name)
blob.upload_from_filename(file_path)


def do_import(juris: State, args: argparse.Namespace) -> dict[str, typing.Any]:
# import inside here because to avoid loading Django code unnecessarily
from openstates.data.models import Jurisdiction as DatabaseJurisdiction
Expand Down
Loading
Loading