diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..b88a67a --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,6 @@ +version: 2 +updates: + - package-ecosystem: "github-actions" + directory: "/" + schedule: + interval: weekly diff --git a/.github/workflows/aws-ecs-deploy.yml b/.github/workflows/aws-ecs-deploy.yml new file mode 100644 index 0000000..0919ab5 --- /dev/null +++ b/.github/workflows/aws-ecs-deploy.yml @@ -0,0 +1,121 @@ +name: Build and Deploy to AWS ECS +on: + push: + branches: + - "main" + - "update-action" + release: + types: + - created + workflow_dispatch: + inputs: + env: + description: "AWS Env" + required: true + default: "dev" + ref: + description: "Branch, Tag, or Full SHA" + required: true +env: + AWS_APP_NAME: police-data-trust-scraper + AWS_REGION: us-east-1 + DOCKERFILE: ./Dockerfile + DOCKER_PATH: ./ +jobs: + setup_env: + name: Set-up environment + runs-on: ubuntu-latest + steps: + - name: Debug Action + uses: hmarr/debug-action@v3.0.0 + - name: Checkout + uses: actions/checkout@v4 + with: + ref: ${{ github.event.inputs.ref }} + - name: Set AWS Env & Image Tag per workflow + run: | + SHORT_SHA=$(git rev-parse --short HEAD) + if [[ "$GITHUB_EVENT_NAME" == "push" ]]; then + echo AWS_APPENV="$AWS_APP_NAME"-dev >> $GITHUB_ENV + echo IMAGE_TAG=$SHORT_SHA >> $GITHUB_ENV + fi + if [[ "$GITHUB_EVENT_NAME" == "release" ]]; then + RELEASE_TAG=$(git describe --tags) + echo AWS_APPENV="$AWS_APP_NAME"-prod >> $GITHUB_ENV + echo IMAGE_TAG=$RELEASE_TAG >> $GITHUB_ENV + fi + if [[ "$GITHUB_EVENT_NAME" == "workflow_dispatch" ]]; then + INPUT_ENV=${{ github.event.inputs.env }}; INPUT_REF=${{ github.event.inputs.ref }} + echo AWS_APPENV="$AWS_APP_NAME"-$INPUT_ENV >> $GITHUB_ENV + echo IMAGE_TAG=$SHORT_SHA >> $GITHUB_ENV + fi + outputs: + AWS_APPENV: ${{ env.AWS_APPENV }} + IMAGE_TAG: ${{ env.IMAGE_TAG }} + build: + name: Build & Push Docker Image + runs-on: ubuntu-latest + needs: [setup_env] + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + ref: ${{ github.event.inputs.ref }} + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v4 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws-region: ${{ env.AWS_REGION }} + - name: Login to ECR + uses: docker/login-action@v3 + with: + registry: ${{ secrets.AWS_ACCOUNT_ID }}.dkr.ecr.${{ env.AWS_REGION }}.amazonaws.com + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + - name: Build and push + uses: docker/build-push-action@v6 + with: + context: . + platforms: linux/amd64,linux/arm64 + push: true + tags: ${{ secrets.AWS_ACCOUNT_ID }}.dkr.ecr.${{ env.AWS_REGION }}.amazonaws.com/${{ needs.setup_env.outputs.AWS_APPENV }}:${{ needs.setup_env.outputs.IMAGE_TAG }}, ${{ secrets.AWS_ACCOUNT_ID }}.dkr.ecr.${{ env.AWS_REGION }}.amazonaws.com/${{ needs.setup_env.outputs.AWS_APPENV }}:latest + deploy: + name: Deploy to AWS ECS + runs-on: ubuntu-latest + needs: [setup_env, build] + steps: + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v4 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws-region: ${{ env.AWS_REGION }} + - name: Login to Amazon ECR + id: login-ecr + uses: aws-actions/amazon-ecr-login@v2 + - name: Get Network Config & write to file + id: aws-network-config + run: | + aws ecs describe-services --cluster infra-prod --services police-data-trust-backend-dev | jq '.services[0].networkConfiguration' > network-config-scraper.json + - name: Pull Task Definition & write to file + id: aws-task-definition + run: | + aws ecs describe-task-definition \ + --task-definition ${{ needs.setup_env.outputs.AWS_APPENV }} \ + --query taskDefinition | \ + jq 'del(.taskDefinitionArn,.revision,.status,.registeredBy,.registeredAt,.compatibilities,.requiresAttributes)' > task-def-scraper.json + - name: Interpolate new Docker Image into Task Definition + id: task-definition + uses: aws-actions/amazon-ecs-render-task-definition@v1.5.0 + with: + task-definition: task-def-scraper.json + container-name: ${{ needs.setup_env.outputs.AWS_APPENV }} + image: ${{ secrets.AWS_ACCOUNT_ID }}.dkr.ecr.${{ env.AWS_REGION }}.amazonaws.com/${{ needs.setup_env.outputs.AWS_APPENV }}:${{ needs.setup_env.outputs.IMAGE_TAG }} + - name: Deploy to Amazon ECS Scheduled Tasks + uses: mikeyavorsky/ecs-deploy-task-definition-to-scheduled-task@v3.1.0 + with: + cluster: infra-prod + task-definition: task-def-scraper.json diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml new file mode 100644 index 0000000..73b2c7c --- /dev/null +++ b/.github/workflows/checks.yml @@ -0,0 +1,21 @@ +name: Checks + +on: + push: + +jobs: + pre-commit-and-coverage: + runs-on: ubuntu-latest + env: + PYTHONPATH: /home/runner/work/police-data-trust-scrapers/police-data-trust-scrapers:$PYTHONPATH + steps: + - uses: actions/checkout@v4 + - name: Setup python + uses: actions/setup-python@v5 + with: + python-version: 3.13 + - name: Install dependencies + run: pip install -r requirements_dev.txt + - uses: pre-commit/action@v3.0.1 + - name: Check coverage + run: pytest --cov=scrapers diff --git a/.github/workflows/python-tests.yml b/.github/workflows/python-tests.yml new file mode 100644 index 0000000..be145b1 --- /dev/null +++ b/.github/workflows/python-tests.yml @@ -0,0 +1,24 @@ +name: Python tests + +on: +- push +- pull_request + +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Python 3.13 Setup + uses: actions/setup-python@v5 + with: + python-version: 3.13 + - name: Install dependencies + run: | + sudo apt-get update + sudo apt-get install python3-dev + python -m pip install --upgrade pip + python -m pip install -r requirements.txt + python -m pip install -r requirements_dev.txt + - name: Run tests + run: python -m pytest diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..751b0fd --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,30 @@ +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v5.0.0 + hooks: + - id: check-yaml + - id: end-of-file-fixer + - id: trailing-whitespace + - id: detect-private-key + - id: detect-aws-credentials + args: [--allow-missing-credentials] + - id: check-added-large-files + - repo: https://github.com/PyCQA/bandit + rev: 1.7.10 + hooks: + - id: bandit + additional_dependencies: ["bandit[toml]"] + args: ["-c", "pyproject.toml"] + - repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.6.9 + hooks: + - id: ruff + args: ["--preview", "--fix"] + - id: ruff-format + - repo: https://github.com/thlorenz/doctoc.git + rev: v2.2.0 + hooks: + - id: doctoc + name: Add TOC for md files + files: + ^README\.md$ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..18ad8b3 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,16 @@ +# docker build command: +# docker build -t police-data-trust-transformer -f Dockerfile . +FROM python:3-alpine + +WORKDIR /app/ + +ENV PYTHONPATH=/app/:$PYTHONPATH + +COPY ./requirements.txt . +RUN pip install -r requirements.txt + +COPY ./src . + +WORKDIR /app/src + +CMD [ "python3 main.py" ] diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..2f664d5 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2024 National Police Data Coalition + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..4bb7ac1 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,13 @@ +[tool.coverage.run] +omit = ["settings.py"] + +[tool.coverage.report] +include_namespace_packages = true +skip_covered = true +show_missing = true + +[tool.bandit] +exclude_dirs = ["tests"] + +[tool.ruff.lint] +extend-select = ["I"] diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..687903f --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +boto3 +ujson diff --git a/requirements_dev.txt b/requirements_dev.txt new file mode 100644 index 0000000..9415562 --- /dev/null +++ b/requirements_dev.txt @@ -0,0 +1,5 @@ +-r requirements.txt + +pre-commit==4.0.1 +pytest==8.3.3 +pytest-cov==5.0.0 diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/listener/__init__.py b/src/listener/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/listener/listener.py b/src/listener/listener.py new file mode 100644 index 0000000..8cefeb5 --- /dev/null +++ b/src/listener/listener.py @@ -0,0 +1,98 @@ +from datetime import datetime, UTC +from io import BytesIO, StringIO +from logging import getLogger +from threading import Thread +from time import sleep + +from boto3 import Session +import ujson + +from ..transformers.transformer import Transformer + +class Listener(Thread): + def __init__(self, queue_name: str, table_name: str, output_bucket_name: str, region: str = "us-east-1"): + # TODO: ideally we would have a function on the app to catch shutdown + # events and close gracefully, but until then daemon it is. + super().__init__(daemon=True) + self.queue_name = queue_name + self.session = Session(region_name=region) + self.sqs_client = self.session.client("sqs") + self.s3_client = self.session.client("s3") + self.dynamodb_client = self.session.client("dynamodb") + self.sqs_queue_url = self.sqs_client.get_queue_url( + QueueName=self.queue_name) + self.dynamodb_table_name = table_name + self.output_bucket_name = output_bucket_name + self.logger = getLogger(self.__class__.__name__) + + self.transformer_map: dict[str, Transformer] = {} + + def run(self): + while True: + try: + resp = self.sqs_client.receive_message( + QueueUrl=self.sqs_queue_url, + # retrieve one message at a time - we could up this + # and parallelize but no point until way more files. + MaxNumberOfMessages=1, + # 10 minutes to process message before it becomes + # visible for another consumer. + VisibilityTimeout=600, + ) + # if no messages found, wait 5m for next poll + if len(resp["Messages"]) == 0: + sleep(600) + continue + + for message in resp["Messages"]: + sqs_body = ujson.loads(message["Body"]) + # this comes through as a list, but we expect one object + for record in sqs_body["Records"]: + bucket_name = record["s3"]["bucket"]["name"] + key = record["s3"]["object"]["key"] + + # TODO: check dynamodb table to reduce data duplication + + with BytesIO() as fileobj: + self.s3_client.download_fileobj( + bucket_name, key, fileobj) + fileobj.seek(0) + content = fileobj.read() + _ = content # for linting. + + # TODO: we now have an in-memory copy of s3 file content + # This is where we would run the importer. + # we want a standardized importer class; use like: + # transformer = self.get_transformer_for_content_type(key) + # transformed_content = transformer(content).transform() + + self.logger.info(f"Transformed s3://{bucket_name}/{key}") + + transformed_content = list({"test": "converted"}) # TODO: temporary until we have a converter + + output_key = self.generate_target_s3_path(key) + with StringIO() as fileobj: + fileobj.write("\n".join(ujson.dumps(content) for content in transformed_content)) + fileobj.seek(0) + self.s3_client.upload_fileobj( + fileobj, self.output_bucket_name, output_key) + + self.logger.info(f"Uploaded transformed content to s3://{self.output_bucket_name}/{output_key}") + + # TODO: update dynamodb cache to store processed files to reduce duplication. + except Exception as e: + self.logger.error( + f"Failed to process scraper events sqs queue: {e}") + sleep(600) + pass + + def get_transformer_for_content_type(self, s3_key: str) -> Transformer: + # s3 keys should be of format /subject/source/time.jsonl + prefix = "/".join(s3_key.split("/")[:-1]) + return self.transformer_map[prefix] + + @staticmethod + def generate_target_s3_path(input_s3_key: str) -> str: + components = input_s3_key.split("/")[:-1] + components.append(datetime.now(tz=UTC).isoformat()) + return "/".join(components) diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..94427c5 --- /dev/null +++ b/src/main.py @@ -0,0 +1,13 @@ +from os import environ + +from .listener.listener import Listener + +if __name__ == '__main__': + listener = Listener( + queue_name=environ["SQS_QUEUE_NAME"], + table_name=environ["DYNAMODB_TABLE_NAME"], + output_bucket_name=environ["OUTPUT_S3_BUCKET_NAME"], + region=environ["AWS_REGION"] + ) + listener.start() + listener.join() diff --git a/src/transformers/__init__.py b/src/transformers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/transformers/transformer.py b/src/transformers/transformer.py new file mode 100644 index 0000000..552bbfa --- /dev/null +++ b/src/transformers/transformer.py @@ -0,0 +1,6 @@ +class Transformer: + def __init__(self, content: bytes): + self.content = content + + def transform(self) -> list[dict]: + raise Exception("unimplemented; extend class to write a transformer")