Skip to content

Commit

Permalink
Process lamp files, initial (#92)
Browse files Browse the repository at this point in the history
* Process lamp files, initial

* Fixing testing job

* Merging with Hamima's code

Co-authored-by: Hamima Nasrin <[email protected]>

* Cleaning up imports

* Fixing imports

* use previous stop id for departures

* move s3 bucket to prod (new location,) uncomment s3 upload, nicer docstrings

* import

* speelling

* rename

* Run job hourly

---------

Co-authored-by: Hamima Nasrin <[email protected]>
  • Loading branch information
devinmatte and hamima-halim authored Apr 7, 2024
1 parent 592350a commit 2d354a9
Show file tree
Hide file tree
Showing 16 changed files with 485 additions and 129 deletions.
28 changes: 28 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
name: test

on:
pull_request:
push:
branches:
- main

jobs:
backend:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.11"]
steps:
- name: Checkout repo
uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Setup Poetry
run: |
curl -sSL https://install.python-poetry.org | python3 -
poetry install
- name: test code with pytest
run: |
poetry run python -m pytest
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.env.local
**/__pycache__
.python-version
.python-version
.temp
2 changes: 1 addition & 1 deletion .vscode/extensions.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"recommendations": ["ms-python.black-formatter"]
"recommendations": ["ms-python.black-formatter", "ms-python.flake8"]
}
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ So far we have:
- Store MBTA Alerts data daily.
- Store number of trips with new trains on Orange and Red line daily.
- Store Bluebikes station status data every 5 min.
- Store ridership data
- Process and store speed restrictions

To add a new lambda function, put the methods you need in a new file in chalicelib/.
Then add your trigger in app.py.
Expand Down
2 changes: 2 additions & 0 deletions deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ echo "Deploying version $GIT_VERSION | $GIT_SHA"

# Adding some datadog tags to get better data
DD_TAGS="git.commit.sha:$GIT_SHA,git.repository_url:github.com/transitmatters/data-ingestion"
DD_GIT_REPOSITORY_URL="github.com/transitmatters/data-ingestion"
DD_GIT_COMMIT_SHA="$GIT_SHA"

poetry export -f requirements.txt --output ingestor/requirements.txt --without-hashes

Expand Down
7 changes: 7 additions & 0 deletions ingestor/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
predictions,
landing,
trip_metrics,
lamp,
yankee,
)

Expand Down Expand Up @@ -161,3 +162,9 @@ def store_landing_data(event):
@app.schedule(Cron("0/5", "0-6,9-23", "*", "*", "?", "*"))
def update_yankee_shuttles(event):
yankee.update_shuttles()


# Runs every 60 minutes from either 4 AM -> 1:55AM or 5 AM -> 2:55 AM depending on DST
@app.schedule(Cron("0", "0-6,9-23", "*", "*", "?", "*"))
def process_daily_lamp(event):
lamp.ingest_lamp_data()
Empty file added ingestor/chalicelib/__init__.py
Empty file.
3 changes: 3 additions & 0 deletions ingestor/chalicelib/lamp/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
__all__ = ["ingest_lamp_data"]

from .ingest import ingest_lamp_data
Empty file.
Empty file.
150 changes: 150 additions & 0 deletions ingestor/chalicelib/lamp/ingest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
from datetime import date
import io
from typing import Tuple
import requests
import pandas as pd

from .utils import format_dateint, get_current_service_date
from ..parallel import make_parallel
from ..s3 import upload_df_as_csv


LAMP_INDEX_URL = "https://performancedata.mbta.com/lamp/subway-on-time-performance-v1/index.csv"
RAPID_DAILY_URL_TEMPLATE = "https://performancedata.mbta.com/lamp/subway-on-time-performance-v1/{YYYY_MM_DD}-subway-on-time-performance-v1.parquet"
S3_BUCKET = "tm-mbta-performance"
# month and day are not zero-padded
S3_KEY_TEMPLATE = "Events-lamp/daily-data/{stop_id}/Year={YYYY}/Month={_M}/Day={_D}/events.csv"


# LAMP columns to fetch from parquet files
INPUT_COLUMNS = [
"service_date",
"route_id",
"trip_id",
"stop_id",
"direction_id",
"stop_sequence",
"vehicle_id",
"vehicle_label",
"move_timestamp", # departure time from the previous station
"stop_timestamp", # arrival time at the current station
]

# columns that should be output to s3 events.csv
OUTPUT_COLUMNS = [
"service_date",
"route_id",
"trip_id",
"direction_id",
"stop_id",
"stop_sequence",
"vehicle_id",
"vehicle_label",
"event_type",
"event_time",
]


def _local_save(s3_key, stop_events):
"""TODO remove this temp code, it saves the output files locally!"""
import os

s3_key = ".temp/" + s3_key
if not os.path.exists(os.path.dirname(s3_key)):
os.makedirs(os.path.dirname(s3_key))
stop_events.to_csv(s3_key)


def _process_arrival_departure_times(pq_df: pd.DataFrame) -> pd.DataFrame:
"""Process and collate arrivals and departures for a timetable of events.
Before: TODO add example
After: TODO add example
"""
# NB: While generally, we can trust df dtypes fetched from parquet files as the files are compressed with columnar metadata,
# theres some numerical imprecisions that numpy seem to be throwing on M1 machines
# that are affecting how epoch timestamps are being cased to datetimes. Maybe not a problem on the AWS machines, though?
pq_df["dep_time"] = pd.to_datetime(pq_df["move_timestamp"], unit="s", utc=True).dt.tz_convert("US/Eastern")
pq_df["arr_time"] = pd.to_datetime(pq_df["stop_timestamp"], unit="s", utc=True).dt.tz_convert("US/Eastern")

# explode departure and arrival times
arr_df = pq_df[pq_df["arr_time"].notna()]
arr_df = arr_df.assign(event_type="ARR").rename(columns={"arr_time": "event_time"})
arr_df = arr_df[OUTPUT_COLUMNS]

dep_df = pq_df[pq_df["dep_time"].notna()]
dep_df = dep_df.assign(event_type="DEP").rename(columns={"dep_time": "event_time"}).drop(columns=["arr_time"])

# these departures are from the the previous stop! so set them to the previous stop id
# find the stop id for the departure whose sequence number precences the recorded one
# stop sequences don't necessarily increment by 1 or with a reliable pattern
dep_df = dep_df.sort_values(by=["stop_sequence"])
dep_df = pd.merge_asof(
dep_df,
dep_df,
on=["stop_sequence"],
by=[
"service_date", # comment for faster performance
"route_id",
"trip_id",
"vehicle_id",
"vehicle_label", # comment for faster performance
"direction_id",
"event_type", # comment for faster performance
],
direction="backward",
suffixes=("_curr", "_prev"),
allow_exact_matches=False, # don't want to match on itself
)
# use CURRENT time, but PREVIOUS stop id
dep_df = dep_df.rename(columns={"event_time_curr": "event_time", "stop_id_prev": "stop_id"})[OUTPUT_COLUMNS]

# stitch together arrivals and departures
return pd.concat([arr_df, dep_df])


def fetch_pq_file_from_remote(service_date: date) -> pd.DataFrame:
"""Fetch a parquet file from LAMP for a given service date."""
# TODO(check if file exists in index, throw if it doesn't)
url = RAPID_DAILY_URL_TEMPLATE.format(YYYY_MM_DD=service_date.strftime("%Y-%m-%d"))
result = requests.get(url)
return pd.read_parquet(io.BytesIO(result.content), columns=INPUT_COLUMNS, engine="pyarrow")


def ingest_pq_file(pq_df: pd.DataFrame) -> pd.DataFrame:
"""Process and tranform columns for the full day's events."""
pq_df["direction_id"] = pq_df["direction_id"].astype("int16")
pq_df["service_date"] = pq_df["service_date"].apply(format_dateint)

processed_daily_events = _process_arrival_departure_times(pq_df)
return processed_daily_events.sort_values(by=["event_time"])


def upload_to_s3(stop_id_and_events: Tuple[str, pd.DataFrame], service_date: date) -> None:
"""Upload events to s3 as a .csv file."""
# unpack from iterable
stop_id, stop_events = stop_id_and_events

# Upload to s3 as csv
s3_key = S3_KEY_TEMPLATE.format(stop_id=stop_id, YYYY=service_date.year, _M=service_date.month, _D=service_date.day)
# _local_save(s3_key, stop_events)
upload_df_as_csv(S3_BUCKET, s3_key, stop_events)
return [stop_id]


_parallel_upload = make_parallel(upload_to_s3)


def ingest_lamp_data():
"""Ingest and upload today's LAMP data."""
service_date = get_current_service_date()
pq_df = fetch_pq_file_from_remote(service_date)
processed_daily_events = ingest_pq_file(pq_df)

# split daily events by stop_id and parallel upload to s3
stop_event_groups = processed_daily_events.groupby("stop_id")
_parallel_upload(stop_event_groups, service_date)


if __name__ == "__main__":
ingest_lamp_data()
Empty file.
29 changes: 29 additions & 0 deletions ingestor/chalicelib/lamp/tests/test_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from datetime import date, datetime
from ..utils import EASTERN_TIME, service_date


def test_service_date():
assert service_date(datetime(2023, 12, 15, 3, 0, 0)) == date(2023, 12, 15)
assert service_date(datetime(2023, 12, 15, 5, 45, 0)) == date(2023, 12, 15)
assert service_date(datetime(2023, 12, 15, 7, 15, 0)) == date(2023, 12, 15)
assert service_date(datetime(2023, 12, 15, 23, 59, 59)) == date(2023, 12, 15)
assert service_date(datetime(2023, 12, 16, 0, 0, 0)) == date(2023, 12, 15)
assert service_date(datetime(2023, 12, 16, 2, 59, 59)) == date(2023, 12, 15)


def test_localized_datetime():
assert service_date(datetime(2023, 12, 15, 3, 0, 0, tzinfo=EASTERN_TIME)) == date(2023, 12, 15)
assert service_date(datetime(2023, 12, 15, 5, 45, 0, tzinfo=EASTERN_TIME)) == date(2023, 12, 15)
assert service_date(datetime(2023, 12, 15, 7, 15, 0, tzinfo=EASTERN_TIME)) == date(2023, 12, 15)
assert service_date(datetime(2023, 12, 15, 23, 59, 59, tzinfo=EASTERN_TIME)) == date(2023, 12, 15)
assert service_date(datetime(2023, 12, 16, 0, 0, 0, tzinfo=EASTERN_TIME)) == date(2023, 12, 15)
assert service_date(datetime(2023, 12, 16, 2, 59, 59, tzinfo=EASTERN_TIME)) == date(2023, 12, 15)


def test_edt_vs_est_datetimes():
assert service_date(datetime(2023, 11, 5, 23, 59, 59, tzinfo=EASTERN_TIME)) == date(2023, 11, 5)
assert service_date(datetime(2023, 11, 6, 0, 0, 0, tzinfo=EASTERN_TIME)) == date(2023, 11, 5)
assert service_date(datetime(2023, 11, 6, 1, 0, 0, tzinfo=EASTERN_TIME)) == date(2023, 11, 5)
assert service_date(datetime(2023, 11, 6, 2, 0, 0, tzinfo=EASTERN_TIME)) == date(2023, 11, 5)
# 3am EST is 4am EDT
assert service_date(datetime(2023, 11, 6, 3, 0, 0, tzinfo=EASTERN_TIME)) == date(2023, 11, 6)
25 changes: 25 additions & 0 deletions ingestor/chalicelib/lamp/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from datetime import date, datetime, timedelta
from zoneinfo import ZoneInfo

EASTERN_TIME = ZoneInfo("US/Eastern")


def service_date(ts: datetime) -> date:
# In practice a None TZ is UTC, but we want to be explicit
# In many places we have an implied eastern
ts = ts.replace(tzinfo=EASTERN_TIME)

if ts.hour >= 3 and ts.hour <= 23:
return date(ts.year, ts.month, ts.day)

prior = ts - timedelta(days=1)
return date(prior.year, prior.month, prior.day)


def get_current_service_date() -> date:
return service_date(datetime.now(EASTERN_TIME))


def format_dateint(dtint: int) -> str:
"""Safely takes a dateint of YYYYMMDD to YYYY-MM-DD."""
return datetime.strptime(str(dtint), "%Y%m%d").strftime("%Y-%m-%d")
Loading

0 comments on commit 2d354a9

Please sign in to comment.