diff --git a/.gitignore b/.gitignore index 079702c..0eb95e5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,11 @@ -env +# Python __pycache__ -terraform* -.terraform* +env +venv +temp_venv + +# Terraform files +.terraform +*.tfstate +*.tfstate.* +.terraform.lock.hcl diff --git a/infra/s3_to_prefect.tf b/infra/s3/s3_to_prefect.tf similarity index 100% rename from infra/s3_to_prefect.tf rename to infra/s3/s3_to_prefect.tf diff --git a/infra/workspaces/modules/workspace/main.tf b/infra/workspaces/modules/workspace/main.tf new file mode 100644 index 0000000..d9ec739 --- /dev/null +++ b/infra/workspaces/modules/workspace/main.tf @@ -0,0 +1,19 @@ +terraform { + required_providers { + prefect = { + source = "PrefectHQ/prefect" + } + } +} + +# Module for creating a Prefect workspace and its default work pool +resource "prefect_workspace" "workspace" { + name = var.workspace_name + handle = var.workspace_handle +} + +resource "prefect_work_pool" "default" { + name = var.work_pool_name + workspace_id = prefect_workspace.workspace.id + type = var.work_pool_type +} diff --git a/infra/workspaces/modules/workspace/outputs.tf b/infra/workspaces/modules/workspace/outputs.tf new file mode 100644 index 0000000..81f7cc3 --- /dev/null +++ b/infra/workspaces/modules/workspace/outputs.tf @@ -0,0 +1,14 @@ +output "workspace_id" { + value = prefect_workspace.workspace.id + description = "ID of the created workspace" +} + +output "workspace_handle" { + value = prefect_workspace.workspace.handle + description = "Handle of the created workspace" +} + +output "work_pool_id" { + value = prefect_work_pool.default.id + description = "ID of the created work pool" +} diff --git a/infra/workspaces/modules/workspace/variables.tf b/infra/workspaces/modules/workspace/variables.tf new file mode 100644 index 0000000..0fd0cb6 --- /dev/null +++ b/infra/workspaces/modules/workspace/variables.tf @@ -0,0 +1,21 @@ +variable "workspace_name" { + description = "Name of the workspace to create" + type = string +} + +variable "workspace_handle" { + type = string + description = "Handle (slug) for the Prefect workspace" +} + +variable "work_pool_name" { + type = string + description = "Name of the default work pool" + default = "my-work-pool" +} + +variable "work_pool_type" { + type = string + description = "Type of the work pool" + default = "docker" +} diff --git a/infra/workspaces/provision.tf b/infra/workspaces/provision.tf new file mode 100644 index 0000000..8d5bbda --- /dev/null +++ b/infra/workspaces/provision.tf @@ -0,0 +1,49 @@ +terraform { + required_providers { + prefect = { + source = "PrefectHQ/prefect" + } + } +} + +provider "prefect" { + api_key = var.prefect_api_key + account_id = var.prefect_account_id +} + +# Create staging environment +module "staging" { + source = "./modules/workspace" + workspace_name = "Staging" + workspace_handle = var.staging_workspace +} + +# Create production environment +module "production" { + source = "./modules/workspace" + workspace_name = "Production" + workspace_handle = var.prod_workspace +} + +variable "prefect_api_key" { + description = "Prefect Cloud API key" + type = string + sensitive = true +} + +variable "prefect_account_id" { + description = "Prefect Cloud Account ID" + type = string +} + +variable "prod_workspace" { + description = "Name of the production workspace" + type = string + default = "production" +} + +variable "staging_workspace" { + description = "Name of the staging workspace" + type = string + default = "staging" +} diff --git a/setup_env.sh b/setup_env.sh new file mode 100755 index 0000000..f8f885f --- /dev/null +++ b/setup_env.sh @@ -0,0 +1,192 @@ +#!/bin/bash + +############################################################################### +# This script sets up a _paid_ Prefect Cloud account with resources: # +# # +# 1. Two workspaces: `production` and `staging` (customizable via env vars) # +# 2. A Docker work pool in each workspace # +# 3. A flow in each workspace # +# 4. The flow in each workspace is run multiple times # +# 5. The flow in `staging` has failures to demonstrate debugging # +# # +# NOTE: You must have Docker and Terraform installed # +############################################################################### + +# Exit on any error +set -e + +cleanup() { + + # Kill any remaining worker processes + if [ ! -z "$PROD_WORKER_PID" ]; then + kill $PROD_WORKER_PID 2>/dev/null || true + fi + if [ ! -z "$STAGING_WORKER_PID" ]; then + kill $STAGING_WORKER_PID 2>/dev/null || true + fi + + # Deactivate and remove virtual environment + if [ -d "temp_venv" ]; then + deactivate 2>/dev/null || true + rm -rf temp_venv + fi + + echo "๐งน Cleanup completed" + +} + +# Set up trap to call cleanup function on script exit (success or failure) +trap cleanup EXIT + +############################################################################### +# Check for dependencies +############################################################################### + +# Check if Docker is running +echo "๐ณ Checking if Docker is running..." +if ! docker info > /dev/null 2>&1; then + echo "โ Error: Docker is not running. Please start Docker and try again." + exit 1 +fi + +echo "โ Docker is running" + +# Check if Terraform is installed +echo "๐ง Checking if Terraform is installed..." +if ! command -v terraform &> /dev/null; then + echo "โ Error: Terraform is not installed. Please install Terraform and try again." + exit 1 +fi + +echo "โ Terraform is installed" + +# Check if Python is installed and determine the Python command +echo "๐ Checking if Python is installed..." +if command -v python3 &> /dev/null; then + PYTHON_CMD="python3" +elif command -v python &> /dev/null; then + PYTHON_CMD="python" +else + echo "โ Error: Python is not installed. Please install Python 3.9 or higher and try again." + exit 1 +fi + +# Verify Python version is 3.9 or higher +if ! $PYTHON_CMD -c "import sys; assert sys.version_info >= (3, 9), 'Python 3.9 or higher is required'" &> /dev/null; then + echo "โ Error: Python 3.9 or higher is required. Found $($PYTHON_CMD --version)" + exit 1 +fi + +echo "โ Python $(${PYTHON_CMD} --version) is installed" + +# Check if jq is installed +echo "๐ง Checking if jq is installed..." +if ! command -v jq &> /dev/null; then + echo "โ Error: jq is not installed. Please install jq and try again." + exit 1 +fi + +echo "โ jq is installed" + +############################################################################### +# Establish account and workspace details +############################################################################### + +echo "๐ Fetching Prefect account details..." + +# Must have set TF_VAR_prefect_api_key and TF_VAR_prefect_account_id environment variables +if [ -z "$TF_VAR_prefect_api_key" ]; then + echo "โ Error: TF_VAR_prefect_api_key environment variable is not set" + exit 1 +fi + +if [ -z "$TF_VAR_prefect_account_id" ]; then + echo "โ Error: TF_VAR_prefect_account_id environment variable is not set" + exit 1 +fi + +# Set default workspace names if not provided via environment variables +PROD_WORKSPACE=${TF_VAR_prod_workspace:-"production"} +STAGING_WORKSPACE=${TF_VAR_staging_workspace:-"staging"} + +# Export for Terraform to use +export TF_VAR_prod_workspace=$PROD_WORKSPACE +export TF_VAR_staging_workspace=$STAGING_WORKSPACE + +# Account details +ACCOUNT_DETAILS=$(curl -s "https://api.prefect.cloud/api/accounts/$TF_VAR_prefect_account_id" \ + -H "Authorization: Bearer $TF_VAR_prefect_api_key") + +# Get account handle and plan type using jq +ACCOUNT_HANDLE=$(echo "$ACCOUNT_DETAILS" | jq -r '.handle') +PLAN_TYPE=$(echo "$ACCOUNT_DETAILS" | jq -r '.plan_type') + +if [[ $PLAN_TYPE == "PERSONAL" ]]; then + echo "โ Error: This script requires a paid Prefect Cloud account with support for multiple workspaces." + exit 1 +fi + +############################################################################### +# Set up virtual environment +############################################################################### + +# Create and activate virtual environment +echo "๐ Setting up Python virtual environment..." +$PYTHON_CMD -m venv temp_venv +source temp_venv/bin/activate + +# Install requirements +echo "๐ฆ Installing Python packages..." +pip install -r ./requirements.txt + +############################################################################### +# Provision Prefect Cloud resources +############################################################################### + +echo "๐๏ธ Running Terraform to provision infrastructure..." +terraform -chdir=infra/workspaces init +terraform -chdir=infra/workspaces apply -auto-approve + +############################################################################### +# Run flows in production +############################################################################### + +echo "๐ Populate $PROD_WORKSPACE workspace..." + +# Start worker for production workspace with suppressed output +prefect cloud workspace set --workspace "$ACCOUNT_HANDLE/$PROD_WORKSPACE" +prefect worker start --pool "my-work-pool" > /dev/null 2>&1 & +PROD_WORKER_PID=$! + +# Give workers time to start +sleep 5 + +# Run in production workspace +python ./simulate_failures.py & +PROD_SIM_PID=$! + +# Wait for simulations to complete +wait $PROD_SIM_PID + +############################################################################### +# Run flows in staging +############################################################################### + +echo "๐ Populate $STAGING_WORKSPACE workspace..." + +# Start worker for staging workspace with suppressed output +prefect cloud workspace set --workspace "$ACCOUNT_HANDLE/$STAGING_WORKSPACE" +prefect worker start --pool "my-work-pool" > /dev/null 2>&1 & +STAGING_WORKER_PID=$! + +# Give workers time to start +sleep 5 + +# Run in staging workspace +python ./simulate_failures.py --fail-at-run 3 & +STAGING_SIM_PID=$! + +# Wait for simulations to complete +wait $STAGING_SIM_PID + +echo "โ All done!" diff --git a/simulate_failures.py b/simulate_failures.py index 02cafb7..ba55cf6 100644 --- a/simulate_failures.py +++ b/simulate_failures.py @@ -50,7 +50,7 @@ async def create_runs(deployment_id: str, num_runs: int, fail_at_run: int | None # Deploy the flow deployment_id = data_pipeline.deploy( name=args.name, - work_pool_name="default-work-pool", + work_pool_name="my-work-pool", image="prefecthq/prefect:3-latest", push=False, tags=args.tags.split(',')