Skip to content

Commit

Permalink
Merge pull request #2 from NYPL/include-recovery-data
Browse files Browse the repository at this point in the history
Query Redshift to find missing and unrecovered data from the past 30 days and query ShopperTrak for it
  • Loading branch information
aaronfriedman6 authored Apr 24, 2024
2 parents 85a25f1 + c06a322 commit 096563a
Show file tree
Hide file tree
Showing 12 changed files with 799 additions and 138 deletions.
58 changes: 58 additions & 0 deletions .github/workflows/deploy-production.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
name: Deploy to production if tagged as production release

on:
release:
types: [ released ]

permissions:
id-token: write
contents: read

jobs:
check_production_tag:
name: Check if the release is tagged as production
runs-on: ubuntu-latest
outputs:
has_production_tag: ${{ steps.check-production-tag.outputs.run_jobs }}
steps:
- name: check production tag ${{ github.ref }}
id: check-production-tag
run: |
if [[ ${{ github.ref }} =~ refs\/tags\/production ]]; then
echo "run_jobs=true" >> $GITHUB_OUTPUT
else
echo "run_jobs=false" >> $GITHUB_OUTPUT
fi
publish_production:
needs: [ check_production_tag ]
if: needs.check_production_tag.outputs.has_production_tag == 'true'
name: Publish image to ECR and update ECS stack
runs-on: ubuntu-latest
steps:
- name: Checkout repo
uses: actions/checkout@v4

- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
role-to-assume: arn:aws:iam::946183545209:role/GithubActionsDeployerRole
aws-region: us-east-1

- name: Login to Amazon ECR
id: login-ecr
uses: aws-actions/amazon-ecr-login@v2

- name: Build, tag, and push image to Amazon ECR
env:
ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }}
ECR_REPOSITORY: location-visits-poller
IMAGE_TAG: ${{ github.sha }}
run: |
docker build -t $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG .
docker push $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG
docker tag $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG $ECR_REGISTRY/$ECR_REPOSITORY:production-latest
docker push $ECR_REGISTRY/$ECR_REPOSITORY:production-latest
- name: Force ECS Update
run: |
aws ecs update-service --cluster location-visits-poller-production --service location-visits-poller-production --force-new-deployment
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 2024-04-24 -- v1.0.0
### Added
- Perform recovery queries on past thirty days of missing data

## 2024-04-04 -- v0.0.1
### Added
- Initial commit without any recovery queries
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,5 @@ Every variable not marked as optional below is required for the poller to run. T
| `LAST_POLL_DATE` (optional) | If `IGNORE_CACHE` is `True`, the starting state. The first date to be queried will be the day *after* this date. If `IGNORE_CACHE` is `False`, this field is not read. |
| `LAST_END_DATE` (optional) | The most recent date to query for. If this is left blank, it will be yesterday. |
| `IGNORE_CACHE` (optional) | Whether fetching and setting the state from S3 should *not* be done. If this is `True`, the `LAST_POLL_DATE` will be used for the initial state. |
| `IGNORE_KINESIS` (optional) | Whether sending the encoded records to Kinesis should *not* be done |
| `IGNORE_KINESIS` (optional) | Whether sending the encoded records to Kinesis should *not* be done |
| `IGNORE_UPDATE` (optional) | Whether marking old records as stale in Redshift should *not* be done |
7 changes: 6 additions & 1 deletion config/devel.yaml
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
---
PLAINTEXT_VARIABLES:
AWS_REGION: us-east-1
REDSHIFT_DB_NAME: dev
SHOPPERTRAK_API_BASE_URL: https://stws.shoppertrak.com/Traffic/Customer/v1.0/service/
LOCATION_VISITS_SCHEMA_URL:
LOCATION_VISITS_SCHEMA_URL: https://qa-platform.nypl.org/api/v0.1/current-schemas/LocationVisits
MAX_RETRIES: 3
LOG_LEVEL: info
LAST_POLL_DATE: 2024-03-25
END_DATE: 2024-03-27
IGNORE_CACHE: True
IGNORE_KINESIS: True
IGNORE_UPDATE: True
ENCRYPTED_VARIABLES:
REDSHIFT_DB_HOST: AQECAHh7ea2tyZ6phZgT4B9BDKwguhlFtRC6hgt+7HbmeFsrsgAAAKIwgZ8GCSqGSIb3DQEHBqCBkTCBjgIBADCBiAYJKoZIhvcNAQcBMB4GCWCGSAFlAwQBLjARBAzfC7it2NYa1YnFy+4CARCAW1NrqGfSfPptEF4epEXl8Hr6ntrJcZ0DE1GeGt2c2mMnMt7hcim39NTgxHvi5DIldF4J2UJzgG0KBs5UGRU9dyjrQu5zLfdB8Kv50d1lHfvqM2aDQAK0SQ188BY=
REDSHIFT_DB_USER: AQECAHh7ea2tyZ6phZgT4B9BDKwguhlFtRC6hgt+7HbmeFsrsgAAAG0wawYJKoZIhvcNAQcGoF4wXAIBADBXBgkqhkiG9w0BBwEwHgYJYIZIAWUDBAEuMBEEDDqSUOKwGRhPNnJwhQIBEIAqJ/XgIWvG3fz4FnBPSzL640Apqd6SeIJ+KoU5N6Ka9RjN9eyBSBtU+WZJ
REDSHIFT_DB_PASSWORD: AQECAHh7ea2tyZ6phZgT4B9BDKwguhlFtRC6hgt+7HbmeFsrsgAAAGswaQYJKoZIhvcNAQcGoFwwWgIBADBVBgkqhkiG9w0BBwEwHgYJYIZIAWUDBAEuMBEEDI90LTVkj0mhVM9dKgIBEIAoAV0TJogqQmNrh7BohDJ9hS7h4oKPcQy3z8kIRoW4Fltz3/zAP0HDew==
SHOPPERTRAK_USERNAME: AQECAHh7ea2tyZ6phZgT4B9BDKwguhlFtRC6hgt+7HbmeFsrsgAAAGUwYwYJKoZIhvcNAQcGoFYwVAIBADBPBgkqhkiG9w0BBwEwHgYJYIZIAWUDBAEuMBEEDMwZFr7dqHccVpGsIgIBEIAigggkYIMh62vSRtwzJWr5Q0FGN5rX29BeCX/JcP1uHV7Jtw==
SHOPPERTRAK_PASSWORD: AQECAHh7ea2tyZ6phZgT4B9BDKwguhlFtRC6hgt+7HbmeFsrsgAAAGswaQYJKoZIhvcNAQcGoFwwWgIBADBVBgkqhkiG9w0BBwEwHgYJYIZIAWUDBAEuMBEEDO0tpI+96T7sLWve0QIBEIAoq3KNN5xKl69tvlBReWIegDOru/Ejk1yxRwCoEd3f/J4A8FLm+vfVng==
...
18 changes: 18 additions & 0 deletions config/production.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
PLAINTEXT_VARIABLES:
AWS_REGION: us-east-1
REDSHIFT_DB_NAME: production
SHOPPERTRAK_API_BASE_URL: https://stws.shoppertrak.com/Traffic/Customer/v1.0/service/
LOCATION_VISITS_SCHEMA_URL: https://platform.nypl.org/api/v0.1/current-schemas/LocationVisits
S3_BUCKET: bic-poller-states-production
S3_RESOURCE: location_visits_poller_state.json
KINESIS_BATCH_SIZE: 500
MAX_RETRIES: 3
ENCRYPTED_VARIABLES:
REDSHIFT_DB_HOST: AQECAHh7ea2tyZ6phZgT4B9BDKwguhlFtRC6hgt+7HbmeFsrsgAAAKIwgZ8GCSqGSIb3DQEHBqCBkTCBjgIBADCBiAYJKoZIhvcNAQcBMB4GCWCGSAFlAwQBLjARBAzfC7it2NYa1YnFy+4CARCAW1NrqGfSfPptEF4epEXl8Hr6ntrJcZ0DE1GeGt2c2mMnMt7hcim39NTgxHvi5DIldF4J2UJzgG0KBs5UGRU9dyjrQu5zLfdB8Kv50d1lHfvqM2aDQAK0SQ188BY=
REDSHIFT_DB_USER: AQECAHh7ea2tyZ6phZgT4B9BDKwguhlFtRC6hgt+7HbmeFsrsgAAAG0wawYJKoZIhvcNAQcGoF4wXAIBADBXBgkqhkiG9w0BBwEwHgYJYIZIAWUDBAEuMBEEDDqSUOKwGRhPNnJwhQIBEIAqJ/XgIWvG3fz4FnBPSzL640Apqd6SeIJ+KoU5N6Ka9RjN9eyBSBtU+WZJ
REDSHIFT_DB_PASSWORD: AQECAHh7ea2tyZ6phZgT4B9BDKwguhlFtRC6hgt+7HbmeFsrsgAAAGswaQYJKoZIhvcNAQcGoFwwWgIBADBVBgkqhkiG9w0BBwEwHgYJYIZIAWUDBAEuMBEEDI90LTVkj0mhVM9dKgIBEIAoAV0TJogqQmNrh7BohDJ9hS7h4oKPcQy3z8kIRoW4Fltz3/zAP0HDew==
SHOPPERTRAK_USERNAME: AQECAHh7ea2tyZ6phZgT4B9BDKwguhlFtRC6hgt+7HbmeFsrsgAAAGUwYwYJKoZIhvcNAQcGoFYwVAIBADBPBgkqhkiG9w0BBwEwHgYJYIZIAWUDBAEuMBEEDMwZFr7dqHccVpGsIgIBEIAigggkYIMh62vSRtwzJWr5Q0FGN5rX29BeCX/JcP1uHV7Jtw==
SHOPPERTRAK_PASSWORD: AQECAHh7ea2tyZ6phZgT4B9BDKwguhlFtRC6hgt+7HbmeFsrsgAAAGswaQYJKoZIhvcNAQcGoFwwWgIBADBVBgkqhkiG9w0BBwEwHgYJYIZIAWUDBAEuMBEEDO0tpI+96T7sLWve0QIBEIAoq3KNN5xKl69tvlBReWIegDOru/Ejk1yxRwCoEd3f/J4A8FLm+vfVng==
KINESIS_STREAM_ARN: AQECAHh7ea2tyZ6phZgT4B9BDKwguhlFtRC6hgt+7HbmeFsrsgAAAKkwgaYGCSqGSIb3DQEHBqCBmDCBlQIBADCBjwYJKoZIhvcNAQcBMB4GCWCGSAFlAwQBLjARBAzK/AO63EjbacrBSckCARCAYnWDJG+KL514QqwqguhZ7PTrA/NXbD9n9e41gNXF3qnxmHD4UHN5pGCzalRsPnyNanz6N0Up0LsvPKKkCFJb5F7Zqh9XroXKPRNs1DRTPmHqgtLyTPgJ88LnGUTzUrIDTqmi
...
54 changes: 54 additions & 0 deletions helpers/query_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
_REDSHIFT_HOURS_QUERY = """
SELECT sierra_code, weekday, regular_open, regular_close
FROM {hours_table} LEFT JOIN {codes_table}
ON {hours_table}.drupal_location_id = {codes_table}.drupal_code
WHERE date_of_change IS NULL;"""

_REDSHIFT_CREATE_TABLE_QUERY = """
CREATE TEMPORARY TABLE #recoverable_site_dates AS
SELECT shoppertrak_site_id, increment_start::DATE AS increment_date
FROM {table}
WHERE NOT is_healthy_data
AND is_fresh
AND increment_start >= '{start_date}'
AND increment_start < '{end_date}'
GROUP BY shoppertrak_site_id, increment_date;"""

_REDSHIFT_KNOWN_QUERY = """
SELECT #recoverable_site_dates.shoppertrak_site_id, orbit, increment_start,
id, is_healthy_data, enters, exits
FROM #recoverable_site_dates LEFT JOIN {table}
ON #recoverable_site_dates.shoppertrak_site_id = {table}.shoppertrak_site_id
AND #recoverable_site_dates.increment_date = {table}.increment_start::DATE
WHERE is_fresh;"""

_REDSHIFT_UPDATE_QUERY = """
UPDATE {table} SET is_fresh = False
WHERE id IN ({ids});"""

REDSHIFT_DROP_QUERY = "DROP TABLE #recoverable_site_dates;"

REDSHIFT_RECOVERABLE_QUERY = """
SELECT *
FROM #recoverable_site_dates
ORDER BY increment_date, shoppertrak_site_id;"""


def build_redshift_hours_query(hours_table, codes_table):
return _REDSHIFT_HOURS_QUERY.format(
hours_table=hours_table, codes_table=codes_table
)


def build_redshift_create_table_query(table, start_date, end_date):
return _REDSHIFT_CREATE_TABLE_QUERY.format(
table=table, start_date=start_date, end_date=end_date
)


def build_redshift_known_query(table):
return _REDSHIFT_KNOWN_QUERY.format(table=table)


def build_redshift_update_query(table, ids):
return _REDSHIFT_UPDATE_QUERY.format(table=table, ids=ids)
164 changes: 156 additions & 8 deletions lib/pipeline_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,22 @@
import pytz

from datetime import datetime, timedelta
from lib import ShopperTrakApiClient, ALL_SITES_ENDPOINT
from helpers.query_helper import (
build_redshift_create_table_query,
build_redshift_hours_query,
build_redshift_known_query,
build_redshift_update_query,
REDSHIFT_DROP_QUERY,
REDSHIFT_RECOVERABLE_QUERY,
)
from lib import (
ShopperTrakApiClient,
ALL_SITES_ENDPOINT,
SINGLE_SITE_ENDPOINT,
)
from nypl_py_utils.classes.avro_encoder import AvroEncoder
from nypl_py_utils.classes.kinesis_client import KinesisClient
from nypl_py_utils.classes.redshift_client import RedshiftClient
from nypl_py_utils.classes.s3_client import S3Client
from nypl_py_utils.functions.log_helper import create_log

Expand All @@ -15,13 +28,29 @@ class PipelineController:
def __init__(self):
self.logger = create_log("pipeline_controller")
self.shoppertrak_api_client = ShopperTrakApiClient(
os.environ["SHOPPERTRAK_USERNAME"], os.environ["SHOPPERTRAK_PASSWORD"]
os.environ["SHOPPERTRAK_USERNAME"],
os.environ["SHOPPERTRAK_PASSWORD"],
dict(),
)
self.redshift_client = RedshiftClient(
os.environ["REDSHIFT_DB_HOST"],
os.environ["REDSHIFT_DB_NAME"],
os.environ["REDSHIFT_DB_USER"],
os.environ["REDSHIFT_DB_PASSWORD"],
)
self.avro_encoder = AvroEncoder(os.environ["LOCATION_VISITS_SCHEMA_URL"])

self.yesterday = datetime.now(pytz.timezone("US/Eastern")).date() - timedelta(
days=1
)
redshift_suffix = ""
if os.environ["REDSHIFT_DB_NAME"] != "production":
redshift_suffix = "_" + os.environ["REDSHIFT_DB_NAME"]
self.redshift_visits_table = "location_visits" + redshift_suffix
self.redshift_hours_table = "location_hours" + redshift_suffix
self.redshift_branch_codes_table = "branch_codes_map" + redshift_suffix

self.ignore_update = os.environ.get("IGNORE_UPDATE", False) == "True"
self.ignore_cache = os.environ.get("IGNORE_CACHE", False) == "True"
if not self.ignore_cache:
self.s3_client = S3Client(
Expand All @@ -36,6 +65,9 @@ def __init__(self):

def run(self):
"""Main method for the class -- runs the pipeline"""
self.logger.info("Getting regular branch hours from Redshift")
self.shoppertrak_api_client.location_hours_dict = self.get_location_hours_dict()

all_sites_start_date = self._get_poll_date(0) + timedelta(days=1)
all_sites_end_date = (
datetime.strptime(os.environ["END_DATE"], "%Y-%m-%d").date()
Expand All @@ -47,24 +79,45 @@ def run(self):
f"{all_sites_end_date}"
)
self.process_all_sites_data(all_sites_end_date, 0)
self.logger.info("Finished querying for all sites data")
if not self.ignore_cache:
self.s3_client.close()

broken_start_date = self.yesterday - timedelta(days=29)
self.logger.info(
f"Attempting to recover previously unhealthy data from {broken_start_date} "
f"up to {all_sites_start_date}"
)
self.process_broken_orbits(broken_start_date, all_sites_start_date)
self.logger.info("Finished attempting to recover unhealthy data")
if not self.ignore_kinesis:
self.kinesis_client.close()

def get_location_hours_dict(self):
"""
Queries Redshift for each location's current regular hours and returns a map
from (branch_code, weekday) to (regular_open, regular_close)
"""
self.redshift_client.connect()
raw_hours = self.redshift_client.execute_query(
build_redshift_hours_query(
self.redshift_hours_table, self.redshift_branch_codes_table
)
)
self.redshift_client.close_connection()
return {(row[0], row[1]): (row[2], row[3]) for row in raw_hours}

def process_all_sites_data(self, end_date, batch_num):
"""Gets visits data from all available sites for the given day(s)"""
last_poll_date = self._get_poll_date(batch_num)
poll_date = last_poll_date + timedelta(days=1)
if poll_date <= end_date:
poll_date_str = poll_date.strftime("%Y%m%d")
self.logger.info(f"Beginning batch {batch_num+1}: {poll_date_str}")

self.logger.info(f"Beginning batch {batch_num+1}: {poll_date.isoformat()}")
all_sites_xml_root = self.shoppertrak_api_client.query(
ALL_SITES_ENDPOINT, poll_date_str
ALL_SITES_ENDPOINT, poll_date
)
results = self.shoppertrak_api_client.parse_response(
all_sites_xml_root, poll_date_str
all_sites_xml_root, poll_date
)

encoded_records = self.avro_encoder.encode_batch(results)
Expand All @@ -73,9 +126,104 @@ def process_all_sites_data(self, end_date, batch_num):
if not self.ignore_cache:
self.s3_client.set_cache({"last_poll_date": poll_date.isoformat()})

self.logger.info(f"Finished batch {batch_num+1}: {poll_date_str}")
self.logger.info(f"Finished batch {batch_num+1}: {poll_date.isoformat()}")
self.process_all_sites_data(end_date, batch_num + 1)

def process_broken_orbits(self, start_date, end_date):
"""
Re-queries individual sites with unhealthy data from the past 30 days (a limit
set by the API) to see if any data has since been recovered
"""
create_table_query = build_redshift_create_table_query(
self.redshift_visits_table, start_date, end_date
)
self.redshift_client.connect()
self.redshift_client.execute_transaction([(create_table_query, None)])
recoverable_site_dates = self.redshift_client.execute_query(
REDSHIFT_RECOVERABLE_QUERY
)
known_data = self.redshift_client.execute_query(
build_redshift_known_query(self.redshift_visits_table)
)
self.redshift_client.execute_transaction([(REDSHIFT_DROP_QUERY, None)])

# For all the site/date pairs with unhealthy data, form a dictionary of the
# currently stored data for those sites on those dates where the key is (site
# ID, orbit, timestamp) and the value is (Redshift ID, is_healthy_data, enters,
# exits). This is to mark old rows as stale and to prevent sending duplicate
# records when only some of the data for a site needs to be recovered on a
# particular date (e.g. when only one of several orbits is broken, or when an
# orbit goes down in the middle of the day).
known_data_dict = dict()
if known_data:
known_data_dict = {
(row[0], row[1], row[2]): (row[3], row[4], row[5], row[6])
for row in known_data
}
self._recover_data(recoverable_site_dates, known_data_dict)
self.redshift_client.close_connection()

def _recover_data(self, site_dates, known_data_dict):
"""
Individually query the ShopperTrak API for each site/date pair with any
unhealthy data. Then check to see if the returned data is actually "recovered"
data, as it may have never been unhealthy to begin with. If so, send to Kinesis.
"""
for row in site_dates:
site_xml_root = self.shoppertrak_api_client.query(
SINGLE_SITE_ENDPOINT + row[0], row[1]
)
if site_xml_root:
site_results = self.shoppertrak_api_client.parse_response(
site_xml_root, row[1], is_recovery_mode=True
)
self._process_recovered_data(site_results, known_data_dict)

def _process_recovered_data(self, recovered_data, known_data_dict):
"""
Check that ShopperTrak "recovered" data was actually unhealthy to begin with
and, if so, encode, send to Kinesis, and mark old Redshift rows as stale
"""
results = []
stale_ids = []
for fresh_row in recovered_data:
key = (
fresh_row["shoppertrak_site_id"],
fresh_row["orbit"],
datetime.strptime(fresh_row["increment_start"], "%Y-%m-%d %H:%M:%S"),
)
if key not in known_data_dict:
results.append(fresh_row)
else:
known_row = known_data_dict[key]
if not known_row[1]: # previously unhealthy data
results.append(fresh_row)
stale_ids.append(str(known_row[0]))
elif ( # previously healthy data that doesn't match the new API data
fresh_row["enters"] != known_row[2]
or fresh_row["exits"] != known_row[3]
):
self.logger.warning(
f"Different healthy data found in API and Redshift: {key} "
f"mapped to {fresh_row} in the API and {known_row} in Redshift"
)

# Mark old rows for successfully recovered data as stale
if stale_ids:
self.logger.info(f"Updating {len(stale_ids)} stale records")
update_query = build_redshift_update_query(
self.redshift_visits_table, ",".join(stale_ids)
)
if not self.ignore_update:
self.redshift_client.execute_transaction([(update_query, None)])

if results:
encoded_records = self.avro_encoder.encode_batch(results)
if not self.ignore_kinesis:
self.kinesis_client.send_records(encoded_records)
else:
self.logger.info("No recovered data found")

def _get_poll_date(self, batch_num):
"""Retrieves the last poll date from the S3 cache or the config"""
if self.ignore_cache:
Expand Down
Loading

0 comments on commit 096563a

Please sign in to comment.