diff --git a/.cloudbuild/docker-build-dev.cloudbuild.yaml b/.cloudbuild/docker-build-dev.cloudbuild.yaml index 1737c7d..f72ee0a 100644 --- a/.cloudbuild/docker-build-dev.cloudbuild.yaml +++ b/.cloudbuild/docker-build-dev.cloudbuild.yaml @@ -10,26 +10,6 @@ steps: args: [ 'build', '.', '-t', 'clinvar-ftp-watcher:$COMMIT_SHA'] - name: 'gcr.io/cloud-builders/docker' args: [ 'tag', 'clinvar-ftp-watcher:$COMMIT_SHA', 'gcr.io/clingen-dev/clinvar-ftp-watcher:$COMMIT_SHA'] -# - name: 'gcr.io/clingen-dev/git-image-updater' -# secretEnv: ["GITHUB_TOKEN"] -# args: -# - '-c' -# - | -# git clone https://clingen-ci:$$GITHUB_TOKEN@github.com/clingen-data-model/architecture \ -# && cd architecture \ -# && git checkout -b image-update-$SHORT_SHA \ -# && /usr/bin/yq eval -i ".genegraph_docker_image_tag = \"$COMMIT_SHA\"" ./helm/values/genegraph/values-dev.yaml \ -# && date "+%Y-%m-%dT%H%M" > /workspace/DATETIME.txt \ -# && /usr/bin/yq eval -i ".genegraph_data_version = \"$(tr -d '\n' < /workspace/DATETIME.txt):$COMMIT_SHA\"" ./helm/values/genegraph/values-dev.yaml \ -# && git add -u \ -# && git -c user.name="Clingen CI Automation" -c user.email="clingendevs@broadinstitute.org" commit -m "bumping docker image for genegraph" \ -# && git push origin image-update-$SHORT_SHA \ -# && gh pr create --fill -l automation - -#availableSecrets: -# secretManager: -# - versionName: projects/clingen-dev/secrets/clingen-ci-github-token/versions/1 -# env: GITHUB_TOKEN # push the images images: diff --git a/.github/workflows/deploy-job-main.yaml b/.github/workflows/deploy-job-main.yaml new file mode 100644 index 0000000..80a652e --- /dev/null +++ b/.github/workflows/deploy-job-main.yaml @@ -0,0 +1,47 @@ +name: Deploy Job +run-name: Deploy ${{ github.event.workflow_run.head_branch }} by @${{ github.actor }} + +on: + workflow_run: + workflows: ["CI"] + types: + - completed + branches: + - main + - cloud-run-job-invocation + +permissions: + id-token: write + +jobs: + release: + runs-on: ubuntu-latest + if: ${{ github.event.workflow_run.conclusion == 'success' }} + steps: + - name: checkout + uses: actions/checkout@v3 + with: + ref: ${{ github.event.workflow_run.head_branch }} + + - name: authenticate to google cloud + id: "auth" + uses: google-github-actions/auth@v2 + with: + workload_identity_provider: projects/522856288592/locations/global/workloadIdentityPools/clingen-actions-pool/providers/clingen-github-actions + service_account: clinvar-ftp-watcher-deployment@clingen-dev.iam.gserviceaccount.com + + - name: setup gcloud sdk + uses: google-github-actions/setup-gcloud@v2 + + - name: set env vars + run: | + export branch=${{ github.event.workflow_run.head_branch }} + export commit=${{ github.event.workflow_run.head_sha }} + echo "branch=$branch" >> $GITHUB_ENV + echo "commit=$commit" >> $GITHUB_ENV + echo "instance_name=clinvar-ftp-watcher-$branch" >> $GITHUB_ENV + + - name: build and deploy + run: | + bash misc/bin/deploy-job.sh + diff --git a/deps.edn b/deps.edn index b4f7054..111d0a6 100644 --- a/deps.edn +++ b/deps.edn @@ -18,6 +18,7 @@ org.apache.kafka/kafka-clients {:mvn/version "3.1.0"} com.google.auth/google-auth-library-oauth2-http {:mvn/version "1.20.0"} com.google.cloud.functions/functions-framework-api {:mvn/version "1.0.4"} + com.google.cloud/google-cloud-run {:mvn/version "0.35.0"} com.google.cloud/google-cloud-workflows {:mvn/version "2.31.0"} com.google.cloud/google-cloud-workflow-executions {:mvn/version "2.31.0"} com.taoensso/timbre {:mvn/version "6.3.1"}} diff --git a/misc/bin/deploy-job.sh b/misc/bin/deploy-job.sh new file mode 100644 index 0000000..17011d4 --- /dev/null +++ b/misc/bin/deploy-job.sh @@ -0,0 +1,70 @@ +#!/usr/bin/env bash + +set -xeo pipefail + +if [ -z "$branch" ]; then + branch=$(git rev-parse --abbrev-ref HEAD) +else + echo "branch set in environment" +fi +if [ -z "$commit" ]; then + commit=$(git rev-parse HEAD) +else + echo "commit set in environment" +fi + +echo "Branch: $branch" +echo "Commit: $commit" + +set -u + +if [ "$branch" == "main" ]; then + instance_name="clinvar-ftp-watcher" +else + instance_name="clinvar-ftp-watcher-${branch}" +fi +clinvar_ftp_watcher_bucket="clinvar-ftp-watcher" + +region="us-central1" +# project=$(gcloud config get project) +image=gcr.io/clingen-dev/clinvar-ftp-watcher:$commit +deployment_service_account=clinvar-ftp-watcher-deployment@clingen-dev.iam.gserviceaccount.com + + +if gcloud run jobs list --region us-central1 | awk '{print $2}' | grep "^$instance_name$" ; then + echo "Cloud Run Job $instance_name already exists" + echo "Deleting Cloud Run Job" + gcloud run jobs delete $instance_name --region $region --quiet +fi + +################################################################ +# Build the image +cloudbuild=.cloudbuild/docker-build-dev.cloudbuild.yaml + +tar --no-xattrs -c \ + Dockerfile \ + build.clj \ + deps.edn \ + misc \ + src \ + .cloudbuild \ + | gzip --fast > archive.tar.gz + +gcloud builds submit \ + --substitutions="COMMIT_SHA=${commit}" \ + --config .cloudbuild/docker-build-dev.cloudbuild.yaml \ + --gcs-log-dir=gs://${clinvar_ftp_watcher_bucket}/build/logs \ + archive.tar.gz + +################################################################ +# Deploy job + +gcloud run jobs create $instance_name \ + --cpu=1 \ + --image=$image \ + --max-retries=0 \ + --region=$region \ + --service-account=$deployment_service_account \ + --set-secrets=DX_JAAS_CONFIG=dx-prod-jaas:latest + + diff --git a/src/watcher/cloudrunjob.clj b/src/watcher/cloudrunjob.clj new file mode 100644 index 0000000..0fde7ca --- /dev/null +++ b/src/watcher/cloudrunjob.clj @@ -0,0 +1,92 @@ +(ns watcher.cloudrunjob + "A place for GCP Cloud Run Job related artifacts." + (:require [clojure.data.json :as json] + [clojure.string :as str] + [taoensso.timbre :refer [log info warn error]]) + (:import [com.google.cloud.run.v2 JobsClient Execution JobName EnvVar + RunJobRequest RunJobRequest$Overrides RunJobRequest$Overrides$ContainerOverride])) + + +(def DEFAULT_GCP_PROJECT_ID "clingen-dev") +(def DEFAULT_GCP_LOCATION "us-central1") +(def DEFAULT_GCP_JOB_NAME "clinvar-ingest-main") + +;; TODO s/b a macro? +(defn gcp-project-id [] + (let [project-id (System/getenv "GCP_PROJECT_ID")] + (if (nil? project-id) + DEFAULT_GCP_PROJECT_ID + project-id))) + +(defn gcp-location [] + (let [location (System/getenv "GCP_LOCATION")] + (if (nil? location) + DEFAULT_GCP_LOCATION + location))) + +(defn gcp-job-name [] + (let [job-name (System/getenv "GCP_JOB_NAME")] + (if (nil? job-name) + DEFAULT_GCP_JOB_NAME + job-name))) + + +(defn normalize-keys [^clojure.lang.IPersistentMap payload] + "Normalize the keys in payload map to be lower case and replace spaces with underscore" + (reduce (fn [m [k v]] + (assoc m (-> k + str/lower-case + (str/replace #" " "_")) + v)) + {} + payload)) + + +(defn envvar [k v] + "Create EnvVars to use to pass as overrides to cloud run job. Note that all values are strings." + (-> (EnvVar/newBuilder) + (.setName k) + (.setValue (if (= "java.lang.String" (type v)) v (str v))) + .build)) + + +(defn envars [^clojure.lang.IPersistentMap payload] + "Creates normalized EnvVars for each argument in the payload map." + (reduce (fn [l [k v]] + (conj l (envvar k v))) + [] + (normalize-keys payload))) + + +(defn overrides [^clojure.lang.IPersistentMap payload] + "Create and populate the RunJobRequest with environment variable overrides." + (-> (RunJobRequest$Overrides/newBuilder) + (.addContainerOverrides (-> (RunJobRequest$Overrides$ContainerOverride/newBuilder) + (.addAllEnv (envars payload)) + .build)) + .build)) + +(defn initiate-cloud-run-job [^clojure.lang.IPersistentMap payload] + "Pass a ClinVar release payload map to initiate the google cloud run job to ingest." + (let [jobs-client (JobsClient/create)] + (try + (let [job-name (-> (JobName/of (gcp-project-id) (gcp-location) (gcp-job-name)) + .toString) + overrides (overrides payload) + run-job-request (-> (RunJobRequest/newBuilder) + (.setName job-name) + (.setOverrides overrides) + .build)] + (.runJobAsync jobs-client run-job-request)) + (catch Exception e (throw e)) + (finally (.close jobs-client))))) + + +#_ (initiate-cloud-run-job + {"Name" "ClinVarVariationRelease_2023-1209.xml.gz", + "Size" 3192148421, + "Released" "2023-12-10 01:49:23", + "Last Modified" "2023-12-10 01:49:23", + "Directory" "http://ftp.ncbi.nlm.nih.gov/pub/clinvar/xml/clinvar_variation/weekly_release", + "Release Date" "2023-12-09"}) + diff --git a/src/watcher/watcher.clj b/src/watcher/watcher.clj index 4e046fe..0a9bfe6 100644 --- a/src/watcher/watcher.clj +++ b/src/watcher/watcher.clj @@ -1,14 +1,14 @@ (ns watcher.watcher "Source for cloud function that periodicly watches the clivar ftp dir for new files. State is maintained in a kafka topic." - (:require [watcher.ftpparse :as ftpparse] - [watcher.stream :as stream] - [watcher.util :as util] - [watcher.workflow :as workflow] - [clojure.data.json :as json] - [clojure.instant :refer [read-instant-date]] - [clojure.pprint :refer [pprint]] - [taoensso.timbre :refer [log info warn error]]) + (:require [watcher.ftpparse :as ftpparse] + [watcher.stream :as stream] + [watcher.util :as util] + [watcher.cloudrunjob :as job] + [clojure.data.json :as json] + [clojure.instant :refer [read-instant-date]] + [clojure.pprint :refer [pprint]] + [taoensso.timbre :refer [log info warn error]]) (:import [java.util Date]) (:gen-class)) @@ -65,15 +65,15 @@ (defn -main "Main processing point: reads the last message from the clinvar-ftp-watcher topic, gets the last dated file, reads the clinvar ftp site looking for files with newer dates, - and writes the newer files to the clinvar-ftp-watcher topic and initiates the google workflow - for each found file. + and writes the newer files to the clinvar-ftp-watcher topic and initiates the google cloud + run job for each found file. Args: --kafka - will not write newly found files to the kafka stream - --workflow - will not initiate the google cloud workflow to process the files + --job - will not initiate the google cloud job to process the files " [& args] (let [write-to-kafka (= -1 (if (some? args) (.indexOf args "--kafka") -1)) - initiate-workflow (= -1 (if (some? args) (.indexOf args "--workflow") -1)) + initiate-job (= -1 (if (some? args) (.indexOf args "--job") -1)) files (-> (stream/get-last-processed) get-last-processed-date get-latest-files-since) @@ -86,12 +86,18 @@ (stream/save-to-topic date-processed (json/write-str file-details)) (info "Updated kafka topic with new file details.")) (info "No new file information written to kafka.")) - (if initiate-workflow - (doseq [release file-details] - (let [payload (json/write-str release) - initiated-workflow (workflow/initiate-workflow payload)] - (info "Initiated workflow " initiated-workflow " with payload " payload))) - (info "No workflows initiated."))))) + (if initiate-job + (doseq [release-map file-details] + ;; Dereferencing this future will cause this process to wait for future completion. + ;; since this is running as a cloud run job if we wait, gcp will kill this process with: + ;; "WARNING: The task has been cancelled. Please refer to + ;; https://github.com/googleapis/google-cloud-java#lro-timeouts for more information" + ;; Waiting for the future to complete will require adding JobsSettings + ;; https://cloud.google.com/java/docs/reference/google-cloud-run/latest/com.google.cloud.run.v2.JobsClient#com_google_cloud_run_v2_JobsClient_JobsClient_com_google_cloud_run_v2_JobsSettings_ + ;; + (let [initiated-job (future (job/initiate-cloud-run-job release-map))] + (info "Initiated cloud run job " (job/gcp-job-name) " with payload " release-map))) + (info "Cloud run job not initiated."))))) (comment