From 3f611bac52eecefdb55a243377ce78af97b8aa22 Mon Sep 17 00:00:00 2001 From: Andrew Vaccaro Date: Wed, 9 Aug 2023 14:14:12 -0400 Subject: [PATCH 1/5] fetch rt secrets every 5 minutes --- .../gtfs_rt_archiver_v3/tasks.py | 11 ++++------- .../gtfs_rt_archiver_v3/ticker.py | 13 ++++++++++++- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/services/gtfs-rt-archiver-v3/gtfs_rt_archiver_v3/tasks.py b/services/gtfs-rt-archiver-v3/gtfs_rt_archiver_v3/tasks.py index e90ed535cd..46671fabee 100644 --- a/services/gtfs-rt-archiver-v3/gtfs_rt_archiver_v3/tasks.py +++ b/services/gtfs-rt-archiver-v3/gtfs_rt_archiver_v3/tasks.py @@ -3,7 +3,7 @@ from datetime import datetime from functools import wraps from pathlib import Path -from typing import Optional +from typing import Mapping, Optional import humanize import pendulum @@ -98,15 +98,12 @@ def increment_task_signals_counter( ).inc() -auth_dict = None last_fetch_file = None @huey.on_startup() -def load_auth_dict(): - global auth_dict, last_fetch_file - # TODO: this isn't ideal, we probably could store the keys from get_secrets_by_label() in consumer.py - auth_dict = os.environ +def load_globals(): + global last_fetch_file last_fetch_file = os.environ["LAST_FETCH_FILE"] @@ -129,7 +126,7 @@ def inner(*args, **kwargs): @huey.task(expires=int(os.getenv("CALITP_FETCH_EXPIRATION_SECONDS", 5))) @scoped -def fetch(tick: datetime, config: GTFSDownloadConfig): +def fetch(tick: datetime, config: GTFSDownloadConfig, auth_dict: Mapping[str, str]): labels = dict( record_name=config.name, record_uri=config.url, diff --git a/services/gtfs-rt-archiver-v3/gtfs_rt_archiver_v3/ticker.py b/services/gtfs-rt-archiver-v3/gtfs_rt_archiver_v3/ticker.py index 8930ad534f..6b529b32e2 100644 --- a/services/gtfs-rt-archiver-v3/gtfs_rt_archiver_v3/ticker.py +++ b/services/gtfs-rt-archiver-v3/gtfs_rt_archiver_v3/ticker.py @@ -5,7 +5,7 @@ import time from datetime import datetime, timezone from pathlib import Path -from typing import List, Tuple +from typing import List, Mapping, Tuple import pendulum import schedule # type: ignore @@ -54,6 +54,16 @@ def get_configs() -> Tuple[pendulum.DateTime, List[GTFSDownloadConfig]]: return latest.ts, configs +@ttl_cache(ttl=300) +def get_secrets() -> Mapping[str, str]: + start = pendulum.now() + secrets = get_secrets_by_label("gtfs_rt") + typer.secho( + f"took {(pendulum.now() - start).in_words()} to load {len(secrets)} secrets" + ) + return secrets + + def main( port: int = int(os.getenv("TICKER_PROMETHEUS_PORT", 9102)), load_env_secrets: bool = False, @@ -82,6 +92,7 @@ def tick(second: int) -> None: fetch( tick=dt, config=config, + auth_dict=get_secrets(), ) typer.secho( f"took {(pendulum.now() - start).in_words()} to enqueue {len(configs)} fetches" From ba6a50401c9c0a4ae2b9beca4eacd9b7ec8abd4f Mon Sep 17 00:00:00 2001 From: Andrew Vaccaro Date: Wed, 9 Aug 2023 14:14:42 -0400 Subject: [PATCH 2/5] bump archiver image tag --- .../apps/overlays/gtfs-rt-archiver-v3-prod/kustomization.yaml | 2 +- .../apps/overlays/gtfs-rt-archiver-v3-test/kustomization.yaml | 2 +- services/gtfs-rt-archiver-v3/pyproject.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/kubernetes/apps/overlays/gtfs-rt-archiver-v3-prod/kustomization.yaml b/kubernetes/apps/overlays/gtfs-rt-archiver-v3-prod/kustomization.yaml index 8084021d24..6cddc756df 100644 --- a/kubernetes/apps/overlays/gtfs-rt-archiver-v3-prod/kustomization.yaml +++ b/kubernetes/apps/overlays/gtfs-rt-archiver-v3-prod/kustomization.yaml @@ -10,4 +10,4 @@ resources: images: - name: 'gtfs-rt-archiver' newName: 'ghcr.io/cal-itp/data-infra/gtfs-rt-archiver-v3' - newTag: '2023.7.20' + newTag: '2023.8.9' diff --git a/kubernetes/apps/overlays/gtfs-rt-archiver-v3-test/kustomization.yaml b/kubernetes/apps/overlays/gtfs-rt-archiver-v3-test/kustomization.yaml index 1d42b0d53f..3c52450592 100644 --- a/kubernetes/apps/overlays/gtfs-rt-archiver-v3-test/kustomization.yaml +++ b/kubernetes/apps/overlays/gtfs-rt-archiver-v3-test/kustomization.yaml @@ -18,4 +18,4 @@ patches: images: - name: 'gtfs-rt-archiver' newName: 'ghcr.io/cal-itp/data-infra/gtfs-rt-archiver-v3' - newTag: '2023.7.20' + newTag: '2023.8.9' diff --git a/services/gtfs-rt-archiver-v3/pyproject.toml b/services/gtfs-rt-archiver-v3/pyproject.toml index 62d7b4788b..0abf01fade 100644 --- a/services/gtfs-rt-archiver-v3/pyproject.toml +++ b/services/gtfs-rt-archiver-v3/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "gtfs-rt-archiver" -version = "2023.7.20" +version = "2023.8.9" description = "" authors = ["Andrew Vaccaro "] From 317562d8afacd1645d8504dbfd9ee202f8263287 Mon Sep 17 00:00:00 2001 From: Andrew Vaccaro Date: Wed, 9 Aug 2023 14:16:50 -0400 Subject: [PATCH 3/5] update docs --- services/gtfs-rt-archiver-v3/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/gtfs-rt-archiver-v3/README.md b/services/gtfs-rt-archiver-v3/README.md index 3f42677648..4c3450582c 100644 --- a/services/gtfs-rt-archiver-v3/README.md +++ b/services/gtfs-rt-archiver-v3/README.md @@ -87,7 +87,7 @@ Code changes require building and pushing a new Docker image, as well as applyin 4. Finally, apply changes using `kubectl` as described above. 1. Currently, the image is built/pushed on merges to main but the Kubernetes manifests are not applied. -### Fixing download configurations +### Changing download configurations GTFS download configurations (for both Schedule and RT) are sourced from the [GTFS Dataset table](https://airtable.com/appPnJWrQ7ui4UmIl/tbl5V6Vjs4mNQgYbc) in the California Transit Airtable base, and we have [specific documentation](https://docs.google.com/document/d/1IO8x9-31LjwmlBDH0Jri-uWI7Zygi_IPc9nqd7FPEQM/edit#heading=h.b2yta6yeugar) for modifying the table. (Both of these Airtable links require authentication/access to Airtable.) You may need to make URL or authentication adjustments in this table. This data is downloaded daily into our infrastructure and will propagate to the GTFS Schedule and RT downloads; you may execute the [Airtable download job](https://o1d2fa0877cf3fb10p-tp.appspot.com/dags/airtable_loader_v2/grid) manually after making edits to "deploy" the changes more quickly. -Another possible intervention is updating or adding authentication information in [Secret Manager](https://console.cloud.google.com/security/secret-manager). You may create new versions . **As of 2023-04-10 the archiver does not automatically pick up new/modified secrets; you must restart the archiver for changes to take effect.** +Another possible intervention is updating or adding authentication information in [Secret Manager](https://console.cloud.google.com/security/secret-manager). You may create new versions of existing secrets, or add entirely new secrets. Secrets must be tagged with `gtfs_rt: true` to be loaded as secrets in the archiver; secrets are refreshed every 5 minutes by the ticker. From ea904b373c2166ac498c06a00067c64323e3ac4c Mon Sep 17 00:00:00 2001 From: Andrew Vaccaro Date: Wed, 9 Aug 2023 14:42:31 -0400 Subject: [PATCH 4/5] use schedule to get both configs and secrets --- .../gtfs-rt-archiver-consumer.yaml | 2 +- .../gtfs-rt-archiver-ticker.yaml | 2 +- .../gtfs-rt-archiver-v3/docker-compose.yml | 4 +-- .../gtfs_rt_archiver_v3/consumer.py | 10 +------ .../gtfs_rt_archiver_v3/ticker.py | 29 +++++++++---------- 5 files changed, 18 insertions(+), 29 deletions(-) diff --git a/kubernetes/apps/manifests/gtfs-rt-archiver-v3/gtfs-rt-archiver-consumer.yaml b/kubernetes/apps/manifests/gtfs-rt-archiver-v3/gtfs-rt-archiver-consumer.yaml index 48ff83a355..1fc3ba851e 100644 --- a/kubernetes/apps/manifests/gtfs-rt-archiver-v3/gtfs-rt-archiver-consumer.yaml +++ b/kubernetes/apps/manifests/gtfs-rt-archiver-v3/gtfs-rt-archiver-consumer.yaml @@ -24,7 +24,7 @@ spec: - name: app image: gtfs-rt-archiver command: ["python"] - args: ["-m", "gtfs_rt_archiver_v3.consumer", "--load-env-secrets"] + args: ["-m", "gtfs_rt_archiver_v3.consumer"] envFrom: - configMapRef: name: archiver-app-vars diff --git a/kubernetes/apps/manifests/gtfs-rt-archiver-v3/gtfs-rt-archiver-ticker.yaml b/kubernetes/apps/manifests/gtfs-rt-archiver-v3/gtfs-rt-archiver-ticker.yaml index 4b1ca60933..743adc2323 100644 --- a/kubernetes/apps/manifests/gtfs-rt-archiver-v3/gtfs-rt-archiver-ticker.yaml +++ b/kubernetes/apps/manifests/gtfs-rt-archiver-v3/gtfs-rt-archiver-ticker.yaml @@ -24,7 +24,7 @@ spec: - name: app image: gtfs-rt-archiver command: ["python"] - args: ["-m", "gtfs_rt_archiver_v3.ticker", "--load-env-secrets"] + args: ["-m", "gtfs_rt_archiver_v3.ticker"] envFrom: - configMapRef: name: archiver-app-vars diff --git a/services/gtfs-rt-archiver-v3/docker-compose.yml b/services/gtfs-rt-archiver-v3/docker-compose.yml index 0d37ad908c..ed2e3cf8f9 100644 --- a/services/gtfs-rt-archiver-v3/docker-compose.yml +++ b/services/gtfs-rt-archiver-v3/docker-compose.yml @@ -38,8 +38,8 @@ services: gtfs-rt-archiver-v3-ticker: <<: *gtfs-rt-archiver-v3-common - command: python -m gtfs_rt_archiver_v3.ticker --load-env-secrets + command: python -m gtfs_rt_archiver_v3.ticker gtfs-rt-archiver-v3-consumer: <<: *gtfs-rt-archiver-v3-common - command: python -m gtfs_rt_archiver_v3.consumer --load-env-secrets + command: python -m gtfs_rt_archiver_v3.consumer diff --git a/services/gtfs-rt-archiver-v3/gtfs_rt_archiver_v3/consumer.py b/services/gtfs-rt-archiver-v3/gtfs_rt_archiver_v3/consumer.py index 5406150d7d..cf43bf0f09 100644 --- a/services/gtfs-rt-archiver-v3/gtfs_rt_archiver_v3/consumer.py +++ b/services/gtfs-rt-archiver-v3/gtfs_rt_archiver_v3/consumer.py @@ -9,7 +9,6 @@ import sentry_sdk import typer -from calitp_data_infra.auth import get_secrets_by_label # type: ignore from huey.constants import WORKER_THREAD # type: ignore from huey.consumer_options import ConsumerConfig # type: ignore from prometheus_client import start_http_server @@ -42,17 +41,10 @@ def set_exception_fingerprint(event, hint): return event -def main( - port: int = int(os.getenv("CONSUMER_PROMETHEUS_PORT", 9102)), - load_env_secrets: bool = False, -): +def main(port: int = int(os.getenv("CONSUMER_PROMETHEUS_PORT", 9102))): sentry_sdk.init(before_send=set_exception_fingerprint) start_http_server(port) - if load_env_secrets: - for key, value in get_secrets_by_label("gtfs_rt").items(): - os.environ[key] = value - config = ConsumerConfig( workers=int( os.getenv("CALITP_HUEY_CONSUMER_WORKERS", 16) diff --git a/services/gtfs-rt-archiver-v3/gtfs_rt_archiver_v3/ticker.py b/services/gtfs-rt-archiver-v3/gtfs_rt_archiver_v3/ticker.py index 6b529b32e2..79700c0c3e 100644 --- a/services/gtfs-rt-archiver-v3/gtfs_rt_archiver_v3/ticker.py +++ b/services/gtfs-rt-archiver-v3/gtfs_rt_archiver_v3/ticker.py @@ -5,13 +5,12 @@ import time from datetime import datetime, timezone from pathlib import Path -from typing import List, Mapping, Tuple +from typing import List, Mapping, Optional import pendulum import schedule # type: ignore import sentry_sdk import typer -from cachetools.func import ttl_cache from calitp_data_infra.auth import get_secrets_by_label # type: ignore from calitp_data_infra.storage import ( # type: ignore GTFSDownloadConfig, @@ -25,9 +24,12 @@ from .metrics import AIRTABLE_CONFIGURATION_AGE, TICKS from .tasks import fetch, huey +configs: Optional[List[GTFSDownloadConfig]] = None +secrets: Optional[Mapping[str, str]] = None -@ttl_cache(ttl=300) -def get_configs() -> Tuple[pendulum.DateTime, List[GTFSDownloadConfig]]: + +def get_configs(): + global configs typer.secho("pulling updated configs from airtable") latest = get_latest(GTFSDownloadConfigExtract) fs = get_fs() @@ -51,32 +53,26 @@ def get_configs() -> Tuple[pendulum.DateTime, List[GTFSDownloadConfig]]: f"found {len(configs)} configs in airtable {latest.path} {age} seconds old" ) AIRTABLE_CONFIGURATION_AGE.set(age) - return latest.ts, configs -@ttl_cache(ttl=300) -def get_secrets() -> Mapping[str, str]: +def get_secrets(): + global secrets start = pendulum.now() secrets = get_secrets_by_label("gtfs_rt") typer.secho( f"took {(pendulum.now() - start).in_words()} to load {len(secrets)} secrets" ) - return secrets def main( port: int = int(os.getenv("TICKER_PROMETHEUS_PORT", 9102)), - load_env_secrets: bool = False, touch_file: Path = Path(os.environ["LAST_TICK_FILE"]), ): assert isinstance(touch_file, Path) sentry_sdk.init(environment=os.getenv("AIRFLOW_ENV")) start_http_server(port) - - if load_env_secrets: - for key, value in get_secrets_by_label("gtfs_rt").items(): - os.environ[key] = value - + get_configs() + get_secrets() typer.secho("flushing huey") huey.flush() @@ -86,13 +82,12 @@ def tick(second: int) -> None: dt = datetime.now(timezone.utc).replace(second=second, microsecond=0) typer.secho(f"ticking {dt}") TICKS.inc() - extracted_at, configs = get_configs() random.shuffle(configs) for config in configs: fetch( tick=dt, config=config, - auth_dict=get_secrets(), + auth_dict=secrets, ) typer.secho( f"took {(pendulum.now() - start).in_words()} to enqueue {len(configs)} fetches" @@ -101,6 +96,8 @@ def tick(second: int) -> None: schedule.every().minute.at(":00").do(tick, second=0) schedule.every().minute.at(":20").do(tick, second=20) schedule.every().minute.at(":40").do(tick, second=40) + schedule.every().minute.at(":45").do(get_configs) + schedule.every().minute.at(":45").do(get_secrets) typer.secho(f"ticking starting at {pendulum.now()}!") while True: From bc0821b289570563b755cf6e75e542971d033f71 Mon Sep 17 00:00:00 2001 From: Andrew Vaccaro Date: Wed, 9 Aug 2023 14:48:09 -0400 Subject: [PATCH 5/5] make mypy pass --- services/gtfs-rt-archiver-v3/gtfs_rt_archiver_v3/ticker.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/services/gtfs-rt-archiver-v3/gtfs_rt_archiver_v3/ticker.py b/services/gtfs-rt-archiver-v3/gtfs_rt_archiver_v3/ticker.py index 79700c0c3e..6a709bd389 100644 --- a/services/gtfs-rt-archiver-v3/gtfs_rt_archiver_v3/ticker.py +++ b/services/gtfs-rt-archiver-v3/gtfs_rt_archiver_v3/ticker.py @@ -72,7 +72,9 @@ def main( sentry_sdk.init(environment=os.getenv("AIRFLOW_ENV")) start_http_server(port) get_configs() + assert configs is not None get_secrets() + assert secrets is not None typer.secho("flushing huey") huey.flush()