Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: initial repo setup and basic listener #1

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
version: 2
updates:
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: weekly
121 changes: 121 additions & 0 deletions .github/workflows/aws-ecs-deploy.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
name: Build and Deploy to AWS ECS
on:
push:
branches:
- "main"
- "update-action"
release:
types:
- created
workflow_dispatch:
inputs:
env:
description: "AWS Env"
required: true
default: "dev"
ref:
description: "Branch, Tag, or Full SHA"
required: true
env:
AWS_APP_NAME: police-data-trust-scraper
AWS_REGION: us-east-1
DOCKERFILE: ./Dockerfile
DOCKER_PATH: ./
jobs:
setup_env:
name: Set-up environment
runs-on: ubuntu-latest
steps:
- name: Debug Action
uses: hmarr/[email protected]
- name: Checkout
uses: actions/checkout@v4
with:
ref: ${{ github.event.inputs.ref }}
- name: Set AWS Env & Image Tag per workflow
run: |
SHORT_SHA=$(git rev-parse --short HEAD)
if [[ "$GITHUB_EVENT_NAME" == "push" ]]; then
echo AWS_APPENV="$AWS_APP_NAME"-dev >> $GITHUB_ENV
echo IMAGE_TAG=$SHORT_SHA >> $GITHUB_ENV
fi
if [[ "$GITHUB_EVENT_NAME" == "release" ]]; then
RELEASE_TAG=$(git describe --tags)
echo AWS_APPENV="$AWS_APP_NAME"-prod >> $GITHUB_ENV
echo IMAGE_TAG=$RELEASE_TAG >> $GITHUB_ENV
fi
if [[ "$GITHUB_EVENT_NAME" == "workflow_dispatch" ]]; then
INPUT_ENV=${{ github.event.inputs.env }}; INPUT_REF=${{ github.event.inputs.ref }}
echo AWS_APPENV="$AWS_APP_NAME"-$INPUT_ENV >> $GITHUB_ENV
echo IMAGE_TAG=$SHORT_SHA >> $GITHUB_ENV
fi
outputs:
AWS_APPENV: ${{ env.AWS_APPENV }}
IMAGE_TAG: ${{ env.IMAGE_TAG }}
build:
name: Build & Push Docker Image
runs-on: ubuntu-latest
needs: [setup_env]
steps:
- name: Checkout
uses: actions/checkout@v4
with:
ref: ${{ github.event.inputs.ref }}
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: ${{ env.AWS_REGION }}
- name: Login to ECR
uses: docker/login-action@v3
with:
registry: ${{ secrets.AWS_ACCOUNT_ID }}.dkr.ecr.${{ env.AWS_REGION }}.amazonaws.com
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Build and push
uses: docker/build-push-action@v6
with:
context: .
platforms: linux/amd64,linux/arm64
push: true
tags: ${{ secrets.AWS_ACCOUNT_ID }}.dkr.ecr.${{ env.AWS_REGION }}.amazonaws.com/${{ needs.setup_env.outputs.AWS_APPENV }}:${{ needs.setup_env.outputs.IMAGE_TAG }}, ${{ secrets.AWS_ACCOUNT_ID }}.dkr.ecr.${{ env.AWS_REGION }}.amazonaws.com/${{ needs.setup_env.outputs.AWS_APPENV }}:latest
deploy:
name: Deploy to AWS ECS
runs-on: ubuntu-latest
needs: [setup_env, build]
steps:
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: ${{ env.AWS_REGION }}
- name: Login to Amazon ECR
id: login-ecr
uses: aws-actions/amazon-ecr-login@v2
- name: Get Network Config & write to file
id: aws-network-config
run: |
aws ecs describe-services --cluster infra-prod --services police-data-trust-backend-dev | jq '.services[0].networkConfiguration' > network-config-scraper.json
- name: Pull Task Definition & write to file
id: aws-task-definition
run: |
aws ecs describe-task-definition \
--task-definition ${{ needs.setup_env.outputs.AWS_APPENV }} \
--query taskDefinition | \
jq 'del(.taskDefinitionArn,.revision,.status,.registeredBy,.registeredAt,.compatibilities,.requiresAttributes)' > task-def-scraper.json
- name: Interpolate new Docker Image into Task Definition
id: task-definition
uses: aws-actions/[email protected]
with:
task-definition: task-def-scraper.json
container-name: ${{ needs.setup_env.outputs.AWS_APPENV }}
image: ${{ secrets.AWS_ACCOUNT_ID }}.dkr.ecr.${{ env.AWS_REGION }}.amazonaws.com/${{ needs.setup_env.outputs.AWS_APPENV }}:${{ needs.setup_env.outputs.IMAGE_TAG }}
- name: Deploy to Amazon ECS Scheduled Tasks
uses: mikeyavorsky/[email protected]
with:
cluster: infra-prod
task-definition: task-def-scraper.json
21 changes: 21 additions & 0 deletions .github/workflows/checks.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
name: Checks

on:
push:

jobs:
pre-commit-and-coverage:
runs-on: ubuntu-latest
env:
PYTHONPATH: /home/runner/work/police-data-trust-scrapers/police-data-trust-scrapers:$PYTHONPATH
steps:
- uses: actions/checkout@v4
- name: Setup python
uses: actions/setup-python@v5
with:
python-version: 3.13
- name: Install dependencies
run: pip install -r requirements_dev.txt
- uses: pre-commit/[email protected]
- name: Check coverage
run: pytest --cov=scrapers
24 changes: 24 additions & 0 deletions .github/workflows/python-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name: Python tests

on:
- push
- pull_request

jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Python 3.13 Setup
uses: actions/setup-python@v5
with:
python-version: 3.13
- name: Install dependencies
run: |
sudo apt-get update
sudo apt-get install python3-dev
python -m pip install --upgrade pip
python -m pip install -r requirements.txt
python -m pip install -r requirements_dev.txt
- name: Run tests
run: python -m pytest
30 changes: 30 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v5.0.0
hooks:
- id: check-yaml
- id: end-of-file-fixer
- id: trailing-whitespace
- id: detect-private-key
- id: detect-aws-credentials
args: [--allow-missing-credentials]
- id: check-added-large-files
- repo: https://github.com/PyCQA/bandit
rev: 1.7.10
hooks:
- id: bandit
additional_dependencies: ["bandit[toml]"]
args: ["-c", "pyproject.toml"]
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.6.9
hooks:
- id: ruff
args: ["--preview", "--fix"]
- id: ruff-format
- repo: https://github.com/thlorenz/doctoc.git
rev: v2.2.0
hooks:
- id: doctoc
name: Add TOC for md files
files:
^README\.md$
16 changes: 16 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# docker build command:
# docker build -t police-data-trust-transformer -f Dockerfile .
FROM python:3-alpine

WORKDIR /app/

ENV PYTHONPATH=/app/:$PYTHONPATH

COPY ./requirements.txt .
RUN pip install -r requirements.txt

COPY ./src .

WORKDIR /app/src

CMD [ "python3 main.py" ]
21 changes: 21 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2024 National Police Data Coalition

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
13 changes: 13 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[tool.coverage.run]
omit = ["settings.py"]

[tool.coverage.report]
include_namespace_packages = true
skip_covered = true
show_missing = true

[tool.bandit]
exclude_dirs = ["tests"]

[tool.ruff.lint]
extend-select = ["I"]
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
boto3
ujson
5 changes: 5 additions & 0 deletions requirements_dev.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-r requirements.txt

pre-commit==4.0.1
pytest==8.3.3
pytest-cov==5.0.0
Empty file added src/__init__.py
Empty file.
Empty file added src/listener/__init__.py
Empty file.
98 changes: 98 additions & 0 deletions src/listener/listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
from datetime import datetime, UTC
from io import BytesIO, StringIO
from logging import getLogger
from threading import Thread
from time import sleep

from boto3 import Session
import ujson

from ..transformers.transformer import Transformer

class Listener(Thread):
def __init__(self, queue_name: str, table_name: str, output_bucket_name: str, region: str = "us-east-1"):
# TODO: ideally we would have a function on the app to catch shutdown
# events and close gracefully, but until then daemon it is.
super().__init__(daemon=True)
self.queue_name = queue_name
self.session = Session(region_name=region)
self.sqs_client = self.session.client("sqs")
self.s3_client = self.session.client("s3")
self.dynamodb_client = self.session.client("dynamodb")
self.sqs_queue_url = self.sqs_client.get_queue_url(
QueueName=self.queue_name)
self.dynamodb_table_name = table_name
self.output_bucket_name = output_bucket_name
self.logger = getLogger(self.__class__.__name__)

self.transformer_map: dict[str, Transformer] = {}

def run(self):
while True:
try:
resp = self.sqs_client.receive_message(
QueueUrl=self.sqs_queue_url,
# retrieve one message at a time - we could up this
# and parallelize but no point until way more files.
MaxNumberOfMessages=1,
# 10 minutes to process message before it becomes
# visible for another consumer.
VisibilityTimeout=600,
)
# if no messages found, wait 5m for next poll
if len(resp["Messages"]) == 0:
sleep(600)
continue

for message in resp["Messages"]:
sqs_body = ujson.loads(message["Body"])
# this comes through as a list, but we expect one object
for record in sqs_body["Records"]:
bucket_name = record["s3"]["bucket"]["name"]
key = record["s3"]["object"]["key"]

# TODO: check dynamodb table to reduce data duplication

with BytesIO() as fileobj:
self.s3_client.download_fileobj(
bucket_name, key, fileobj)
fileobj.seek(0)
content = fileobj.read()
_ = content # for linting.

# TODO: we now have an in-memory copy of s3 file content
# This is where we would run the importer.
# we want a standardized importer class; use like:
# transformer = self.get_transformer_for_content_type(key)
# transformed_content = transformer(content).transform()

self.logger.info(f"Transformed s3://{bucket_name}/{key}")

transformed_content = list({"test": "converted"}) # TODO: temporary until we have a converter

output_key = self.generate_target_s3_path(key)
with StringIO() as fileobj:
fileobj.write("\n".join(ujson.dumps(content) for content in transformed_content))
fileobj.seek(0)
self.s3_client.upload_fileobj(
fileobj, self.output_bucket_name, output_key)

self.logger.info(f"Uploaded transformed content to s3://{self.output_bucket_name}/{output_key}")

# TODO: update dynamodb cache to store processed files to reduce duplication.
except Exception as e:
self.logger.error(
f"Failed to process scraper events sqs queue: {e}")
sleep(600)
pass

def get_transformer_for_content_type(self, s3_key: str) -> Transformer:
# s3 keys should be of format /subject/source/time.jsonl
prefix = "/".join(s3_key.split("/")[:-1])
return self.transformer_map[prefix]

@staticmethod
def generate_target_s3_path(input_s3_key: str) -> str:
components = input_s3_key.split("/")[:-1]
components.append(datetime.now(tz=UTC).isoformat())
return "/".join(components)
13 changes: 13 additions & 0 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from os import environ

from .listener.listener import Listener

if __name__ == '__main__':
listener = Listener(
queue_name=environ["SQS_QUEUE_NAME"],
table_name=environ["DYNAMODB_TABLE_NAME"],
output_bucket_name=environ["OUTPUT_S3_BUCKET_NAME"],
region=environ["AWS_REGION"]
)
listener.start()
listener.join()
Empty file added src/transformers/__init__.py
Empty file.
6 changes: 6 additions & 0 deletions src/transformers/transformer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
class Transformer:
def __init__(self, content: bytes):
self.content = content

def transform(self) -> list[dict]:
raise Exception("unimplemented; extend class to write a transformer")