Skip to content

Commit

Permalink
Merge pull request #1 from NYPL/initial-commit
Browse files Browse the repository at this point in the history
Initial commit
  • Loading branch information
aaronfriedman6 authored Apr 4, 2024
2 parents 7d34b98 + dc83f61 commit 85a25f1
Show file tree
Hide file tree
Showing 18 changed files with 840 additions and 2 deletions.
35 changes: 35 additions & 0 deletions .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
name: Run Python unit tests

on:
pull_request:
types: [ labeled, unlabeled, opened, reopened, synchronize ]

jobs:
changelog:
name: Updates changelog
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: dangoslen/changelog-enforcer@v3
test:
runs-on: ubuntu-latest
steps:
- name: Checkout repo
uses: actions/checkout@v3

- name: Set up Python 3.12
uses: actions/setup-python@v4
with:
python-version: '3.12'
cache: 'pip'

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r devel_requirements.txt
pip install -r requirements.txt
- name: Run linter and test suite
run: |
make lint
make test
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
.DS_Store
__pycache__/
.vscode/
*env/
*.py[cod]
*$py.class
1 change: 1 addition & 0 deletions .python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.12.0
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
## 2024-04-04 -- v0.0.1
### Added
- Initial commit without any recovery queries
10 changes: 10 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM python:3.12
ADD . /src
WORKDIR /src

COPY requirements.txt ./
RUN pip install --upgrade pip && \
pip install -r requirements.txt

COPY . .
CMD [ "python", "./main.py"]
21 changes: 21 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
.DEFAULT: help

help:
@echo "make help"
@echo " display this help statement"
@echo "make run"
@echo " run the application in development mode"
@echo "make test"
@echo " run associated test suite with pytest"
@echo "make lint"
@echo " lint project files using the black linter"

run:
export ENVIRONMENT=devel; \
python main.py

test:
pytest tests

lint:
black ./ --check --exclude="(env/)|(tests/)"
54 changes: 52 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,52 @@
# shoppertrak-poller
Hits Shoppertrak API for visits data and sends to Kinesis
# LocationVisitsPoller

The LocationVisitsPoller periodically hits the ShopperTrak API for the number of incoming and outgoing visitors per location per day and writes the data to LocationVisits Kinesis streams for ingest into the [BIC](https://github.com/NYPL/BIC). Note that because there are onerous ShopperTrak API rate limits and there is no QA version of the API, the poller is only deployed to production and there is no QA version.

## Running locally
* Add your `AWS_PROFILE` to the config file for the environment you want to run
* Alternatively, you can manually export it (e.g. `export AWS_PROFILE=<profile>`)
* Run `ENVIRONMENT=<env> python main.py`
* `<env>` should be the config filename without the `.yaml` suffix. Note that running the poller with `production.yaml` will actually send records to the production Kinesis stream -- it is not meant to be used for development purposes.
* `make run` will run the poller using the development environment
* Alternatively, to build and run a Docker container, run:
```
docker image build -t location-visits-poller:local .
docker container run -e ENVIRONMENT=<env> -e AWS_ACCESS_KEY_ID=<> -e AWS_SECRET_ACCESS_KEY=<> location-visits-poller:local
```

## Git workflow
This repo has only two branches: [`main`](https://github.com/NYPL/location-visits-poller/tree/main), which contains the latest and greatest commits and [`production`](https://github.com/NYPL/location-visits-poller/tree/production), which contains what's in our production environment.

### Workflow
- Cut a feature branch off of `main`
- Commit changes to your feature branch
- File a pull request against `main` and assign a reviewer
- In order for the PR to be accepted, it must pass all unit tests, have no lint issues, and update the CHANGELOG (or contain the Skip-Changelog label in GitHub)
- After the PR is accepted, merge into `main`
- Merge `main` > `production`
- Deploy app to production and confirm it works

## Deployment
The poller is deployed as an AWS ECS service to a [prod](https://us-east-1.console.aws.amazon.com/ecs/home?region=us-east-1#/clusters/location-visits-poller-production/services) environment only. To upload a new version of this service, create a new release in GitHub off of the `production` branch and tag it `production-vX.X.X`. The GitHub Actions deploy-production workflow will then deploy the code to ECR and update the ECS service appropriately.

## Environment variables
Every variable not marked as optional below is required for the poller to run. There are additional optional variables that can be used for development purposes -- `devel.yaml` sets each of these and they are described below. Note that the `production.yaml` file is actually read by the deployed service, so do not change it unless you want to change how the service will behave in the wild -- it is not meant for local testing.

| Name | Notes |
| ------------- | ------------- |
| `AWS_REGION` | Always `us-east-1`. The AWS region used for the Redshift, S3, KMS, and Kinesis clients. |
| `SHOPPERTRAK_API_BASE_URL` | ShopperTrak API base URL to which the poller sends requests. This is not a full endpoint, as either the `site` or `allsites` endpoints may be used. |
| `MAX_RETRIES` | Number of times to try hitting the ShopperTrak API if it's busy before throwing an error |
| `S3_BUCKET` | S3 bucket for the cache. This can be empty when `IGNORE_CACHE` is `True`. |
| `S3_RESOURCE` | Name of the resource for the S3 cache. This can be empty when `IGNORE_CACHE` is `True`. |
| `LOCATION_VISITS_SCHEMA_URL` | Platform API endpoint from which to retrieve the LocationVisits Avro schema |
| `KINESIS_BATCH_SIZE` | How many records should be sent to Kinesis at once. Kinesis supports up to 500 records per batch. This can be empty when `IGNORE_KINESIS` is `True`. |
| `KINESIS_STREAM_ARN` | Encrypted ARN for the Kinesis stream the poller sends the encoded data to |
| `SHOPPERTRAK_USERNAME` | Encrypted ShopperTrak API username |
| `SHOPPERTRAK_PASSWORD` | Encrypted ShopperTrak API password |
| `LOG_LEVEL` (optional) | What level of logs should be output. Set to `info` by default. |
| `LAST_POLL_DATE` (optional) | If `IGNORE_CACHE` is `True`, the starting state. The first date to be queried will be the day *after* this date. If `IGNORE_CACHE` is `False`, this field is not read. |
| `LAST_END_DATE` (optional) | The most recent date to query for. If this is left blank, it will be yesterday. |
| `IGNORE_CACHE` (optional) | Whether fetching and setting the state from S3 should *not* be done. If this is `True`, the `LAST_POLL_DATE` will be used for the initial state. |
| `IGNORE_KINESIS` (optional) | Whether sending the encoded records to Kinesis should *not* be done |
15 changes: 15 additions & 0 deletions config/devel.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
PLAINTEXT_VARIABLES:
AWS_REGION: us-east-1
SHOPPERTRAK_API_BASE_URL: https://stws.shoppertrak.com/Traffic/Customer/v1.0/service/
LOCATION_VISITS_SCHEMA_URL:
MAX_RETRIES: 3
LOG_LEVEL: info
LAST_POLL_DATE: 2024-03-25
END_DATE: 2024-03-27
IGNORE_CACHE: True
IGNORE_KINESIS: True
ENCRYPTED_VARIABLES:
SHOPPERTRAK_USERNAME: AQECAHh7ea2tyZ6phZgT4B9BDKwguhlFtRC6hgt+7HbmeFsrsgAAAGUwYwYJKoZIhvcNAQcGoFYwVAIBADBPBgkqhkiG9w0BBwEwHgYJYIZIAWUDBAEuMBEEDMwZFr7dqHccVpGsIgIBEIAigggkYIMh62vSRtwzJWr5Q0FGN5rX29BeCX/JcP1uHV7Jtw==
SHOPPERTRAK_PASSWORD: AQECAHh7ea2tyZ6phZgT4B9BDKwguhlFtRC6hgt+7HbmeFsrsgAAAGswaQYJKoZIhvcNAQcGoFwwWgIBADBVBgkqhkiG9w0BBwEwHgYJYIZIAWUDBAEuMBEEDO0tpI+96T7sLWve0QIBEIAoq3KNN5xKl69tvlBReWIegDOru/Ejk1yxRwCoEd3f/J4A8FLm+vfVng==
...
5 changes: 5 additions & 0 deletions devel_requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
black
pytest
pytest-freezer
pytest-mock
requests-mock
6 changes: 6 additions & 0 deletions lib/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from .shoppertrak_api_client import (
ShopperTrakApiClient,
ShopperTrakApiClientError,
ALL_SITES_ENDPOINT,
SINGLE_SITE_ENDPOINT,
)
87 changes: 87 additions & 0 deletions lib/pipeline_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import os
import pytz

from datetime import datetime, timedelta
from lib import ShopperTrakApiClient, ALL_SITES_ENDPOINT
from nypl_py_utils.classes.avro_encoder import AvroEncoder
from nypl_py_utils.classes.kinesis_client import KinesisClient
from nypl_py_utils.classes.s3_client import S3Client
from nypl_py_utils.functions.log_helper import create_log


class PipelineController:
"""Class for orchestrating pipeline runs"""

def __init__(self):
self.logger = create_log("pipeline_controller")
self.shoppertrak_api_client = ShopperTrakApiClient(
os.environ["SHOPPERTRAK_USERNAME"], os.environ["SHOPPERTRAK_PASSWORD"]
)
self.avro_encoder = AvroEncoder(os.environ["LOCATION_VISITS_SCHEMA_URL"])

self.yesterday = datetime.now(pytz.timezone("US/Eastern")).date() - timedelta(
days=1
)
self.ignore_cache = os.environ.get("IGNORE_CACHE", False) == "True"
if not self.ignore_cache:
self.s3_client = S3Client(
os.environ["S3_BUCKET"], os.environ["S3_RESOURCE"]
)

self.ignore_kinesis = os.environ.get("IGNORE_KINESIS", False) == "True"
if not self.ignore_kinesis:
self.kinesis_client = KinesisClient(
os.environ["KINESIS_STREAM_ARN"], int(os.environ["KINESIS_BATCH_SIZE"])
)

def run(self):
"""Main method for the class -- runs the pipeline"""
all_sites_start_date = self._get_poll_date(0) + timedelta(days=1)
all_sites_end_date = (
datetime.strptime(os.environ["END_DATE"], "%Y-%m-%d").date()
if self.ignore_cache
else self.yesterday
)
self.logger.info(
f"Getting all sites data from {all_sites_start_date} through "
f"{all_sites_end_date}"
)
self.process_all_sites_data(all_sites_end_date, 0)
if not self.ignore_cache:
self.s3_client.close()
if not self.ignore_kinesis:
self.kinesis_client.close()

def process_all_sites_data(self, end_date, batch_num):
"""Gets visits data from all available sites for the given day(s)"""
last_poll_date = self._get_poll_date(batch_num)
poll_date = last_poll_date + timedelta(days=1)
if poll_date <= end_date:
poll_date_str = poll_date.strftime("%Y%m%d")
self.logger.info(f"Beginning batch {batch_num+1}: {poll_date_str}")

all_sites_xml_root = self.shoppertrak_api_client.query(
ALL_SITES_ENDPOINT, poll_date_str
)
results = self.shoppertrak_api_client.parse_response(
all_sites_xml_root, poll_date_str
)

encoded_records = self.avro_encoder.encode_batch(results)
if not self.ignore_kinesis:
self.kinesis_client.send_records(encoded_records)
if not self.ignore_cache:
self.s3_client.set_cache({"last_poll_date": poll_date.isoformat()})

self.logger.info(f"Finished batch {batch_num+1}: {poll_date_str}")
self.process_all_sites_data(end_date, batch_num + 1)

def _get_poll_date(self, batch_num):
"""Retrieves the last poll date from the S3 cache or the config"""
if self.ignore_cache:
poll_str = os.environ["LAST_POLL_DATE"]
poll_date = datetime.strptime(poll_str, "%Y-%m-%d").date()
return poll_date + timedelta(days=batch_num)
else:
poll_str = self.s3_client.fetch_cache()["last_poll_date"]
return datetime.strptime(poll_str, "%Y-%m-%d").date()
Loading

0 comments on commit 85a25f1

Please sign in to comment.