From 14c42ceac6bb2fb3c66871b4406222b4f9ee4f6f Mon Sep 17 00:00:00 2001 From: Terry ONeill <47833589+toneillbroad@users.noreply.github.com> Date: Fri, 16 Feb 2024 15:02:46 -0500 Subject: [PATCH 1/8] Added cloud run job invocation instead of workflow --- deps.edn | 1 + src/watcher/cloudrunjob.clj | 100 ++++++++++++++++++++++++++++++++++++ src/watcher/watcher.clj | 35 ++++++------- 3 files changed, 118 insertions(+), 18 deletions(-) create mode 100644 src/watcher/cloudrunjob.clj diff --git a/deps.edn b/deps.edn index b4f7054..111d0a6 100644 --- a/deps.edn +++ b/deps.edn @@ -18,6 +18,7 @@ org.apache.kafka/kafka-clients {:mvn/version "3.1.0"} com.google.auth/google-auth-library-oauth2-http {:mvn/version "1.20.0"} com.google.cloud.functions/functions-framework-api {:mvn/version "1.0.4"} + com.google.cloud/google-cloud-run {:mvn/version "0.35.0"} com.google.cloud/google-cloud-workflows {:mvn/version "2.31.0"} com.google.cloud/google-cloud-workflow-executions {:mvn/version "2.31.0"} com.taoensso/timbre {:mvn/version "6.3.1"}} diff --git a/src/watcher/cloudrunjob.clj b/src/watcher/cloudrunjob.clj new file mode 100644 index 0000000..532a7ba --- /dev/null +++ b/src/watcher/cloudrunjob.clj @@ -0,0 +1,100 @@ +(ns watcher.cloudrunjob + "A place for GCP Cloud Run Job related artifacts." + (:require [clojure.data.json :as json] + [clojure.string :as str] + [taoensso.timbre :refer [log info warn error]]) + (:import [com.google.cloud.run.v2 JobsClient Execution JobName EnvVar + RunJobRequest RunJobRequest$Overrides RunJobRequest$Overrides$ContainerOverride])) + + +(def DEFAULT_GCP_PROJECT_ID "clingen-dev") +(def DEFAULT_GCP_LOCATION "us-central1") +(def DEFAULT_GCP_JOB_NAME "clinvar-ftp-watcher") + +;; TODO s/b a macro? +(defn gcp-project-id [] + (let [project-id (System/getenv "GCP_PROJECT_ID")] + (if (nil? project-id) + DEFAULT_GCP_PROJECT_ID + project-id))) + +(defn gcp-location [] + (let [location (System/getenv "GCP_LOCATION")] + (if (nil? location) + DEFAULT_GCP_LOCATION + location))) + +(defn gcp-job-name [] + (let [job-name (System/getenv "GCP_JOB_NAME")] + (if (nil? job-name) + DEFAULT_GCP_JOB_NAME + job-name))) + + +(defn normalize-keys [^clojure.lang.IPersistentMap payload] + "Normalize the keys in payload map to be lower case and replace spaces with underscore" + (reduce (fn [m [k v]] + (assoc m (-> k + str/lower-case + (str/replace #" " "_")) + v)) + {} + payload)) + + +(defn envvar [k v] + "Create EnvVars to use to pass as overrides to cloud run job. Note that all values are strings." + (-> (EnvVar/newBuilder) + (.setName k) + (.setValue (if (= "java.lang.String" (type v)) v (str v))) + .build)) + + +(defn envars [^clojure.lang.IPersistentMap payload] + "Creates normalized EnvVars for each argument in the payload map." + (reduce (fn [l [k v]] + (conj l (envvar k v))) + [] + (normalize-keys payload))) + + +(defn overrides [^clojure.lang.IPersistentMap payload] + "Create and populate the RunJobRequest with environment variable overrides." + (-> (RunJobRequest$Overrides/newBuilder) + (.addContainerOverrides (-> (RunJobRequest$Overrides$ContainerOverride/newBuilder) + (.addAllEnv (envars payload)) + .build)) + .build)) + + +(defn initiate-cloud-run-job [^clojure.lang.IPersistentMap payload] + "Pass a ClinVar release payload map to initiate the google cloud run job to ingest." + (let [jobs-client (JobsClient/create)] + (try + (let [job-name (-> (JobName/of (gcp-project-id) (gcp-location) (gcp-job-name)) + .toString) + overrides (overrides payload) + run-job-request (-> (RunJobRequest/newBuilder) + (.setName job-name) + (.setOverrides overrides) + .build) + execution (-> (.runJobAsync jobs-client run-job-request) + .get) + success-count (.getSucceededCount execution) + start-time (.getSeconds (.getStartTime execution)) + end-time (.getSeconds (.getCompletionTime execution)) + tot-time (- end-time start-time)] + (info (str "Cloud Run Job: " job-name " executed in " tot-time " seconds. Result: " + (if success-count "SUCCEEDED" "FAILED")))) + (catch Exception e (throw e)) + (finally (.close jobs-client))))) + + +#_ (initiate-cloud-run-job + {"Name" "ClinVarVariationRelease_2023-1209.xml.gz", + "Size" 3192148421, + "Released" "2023-12-10 01:49:23", + "Last Modified" "2023-12-10 01:49:23", + "Directory" "http://ftp.ncbi.nlm.nih.gov/pub/clinvar/xml/clinvar_variation/weekly_release", + "Release Date" "2023-12-09"}) + diff --git a/src/watcher/watcher.clj b/src/watcher/watcher.clj index 4e046fe..f03a7a3 100644 --- a/src/watcher/watcher.clj +++ b/src/watcher/watcher.clj @@ -1,14 +1,14 @@ (ns watcher.watcher "Source for cloud function that periodicly watches the clivar ftp dir for new files. State is maintained in a kafka topic." - (:require [watcher.ftpparse :as ftpparse] - [watcher.stream :as stream] - [watcher.util :as util] - [watcher.workflow :as workflow] - [clojure.data.json :as json] - [clojure.instant :refer [read-instant-date]] - [clojure.pprint :refer [pprint]] - [taoensso.timbre :refer [log info warn error]]) + (:require [watcher.ftpparse :as ftpparse] + [watcher.stream :as stream] + [watcher.util :as util] + [watcher.cloudrunjob :as job] + [clojure.data.json :as json] + [clojure.instant :refer [read-instant-date]] + [clojure.pprint :refer [pprint]] + [taoensso.timbre :refer [log info warn error]]) (:import [java.util Date]) (:gen-class)) @@ -65,15 +65,15 @@ (defn -main "Main processing point: reads the last message from the clinvar-ftp-watcher topic, gets the last dated file, reads the clinvar ftp site looking for files with newer dates, - and writes the newer files to the clinvar-ftp-watcher topic and initiates the google workflow - for each found file. + and writes the newer files to the clinvar-ftp-watcher topic and initiates the google cloud + run job for each found file. Args: --kafka - will not write newly found files to the kafka stream - --workflow - will not initiate the google cloud workflow to process the files + --job - will not initiate the google cloud job to process the files " [& args] (let [write-to-kafka (= -1 (if (some? args) (.indexOf args "--kafka") -1)) - initiate-workflow (= -1 (if (some? args) (.indexOf args "--workflow") -1)) + initiate-job (= -1 (if (some? args) (.indexOf args "--job") -1)) files (-> (stream/get-last-processed) get-last-processed-date get-latest-files-since) @@ -86,12 +86,11 @@ (stream/save-to-topic date-processed (json/write-str file-details)) (info "Updated kafka topic with new file details.")) (info "No new file information written to kafka.")) - (if initiate-workflow - (doseq [release file-details] - (let [payload (json/write-str release) - initiated-workflow (workflow/initiate-workflow payload)] - (info "Initiated workflow " initiated-workflow " with payload " payload))) - (info "No workflows initiated."))))) + (if initiate-job + (doseq [release-map file-details] + (let [initiated-job (job/initiate-cloud-run-job release-map)] + (info "Initiated cloud run job " initiated-job " with payload " release-map))) + (info "Cloud run job not initiated."))))) (comment From f9be7463c3671be2babcca1c6edd010d3c999482 Mon Sep 17 00:00:00 2001 From: Terry ONeill Date: Mon, 4 Mar 2024 16:53:34 -0500 Subject: [PATCH 2/8] Invoking cloud run job in a future w/o waiting for completion --- src/watcher/cloudrunjob.clj | 14 +++----------- src/watcher/watcher.clj | 4 ++-- 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/src/watcher/cloudrunjob.clj b/src/watcher/cloudrunjob.clj index 532a7ba..0fde7ca 100644 --- a/src/watcher/cloudrunjob.clj +++ b/src/watcher/cloudrunjob.clj @@ -9,7 +9,7 @@ (def DEFAULT_GCP_PROJECT_ID "clingen-dev") (def DEFAULT_GCP_LOCATION "us-central1") -(def DEFAULT_GCP_JOB_NAME "clinvar-ftp-watcher") +(def DEFAULT_GCP_JOB_NAME "clinvar-ingest-main") ;; TODO s/b a macro? (defn gcp-project-id [] @@ -66,7 +66,6 @@ .build)) .build)) - (defn initiate-cloud-run-job [^clojure.lang.IPersistentMap payload] "Pass a ClinVar release payload map to initiate the google cloud run job to ingest." (let [jobs-client (JobsClient/create)] @@ -77,15 +76,8 @@ run-job-request (-> (RunJobRequest/newBuilder) (.setName job-name) (.setOverrides overrides) - .build) - execution (-> (.runJobAsync jobs-client run-job-request) - .get) - success-count (.getSucceededCount execution) - start-time (.getSeconds (.getStartTime execution)) - end-time (.getSeconds (.getCompletionTime execution)) - tot-time (- end-time start-time)] - (info (str "Cloud Run Job: " job-name " executed in " tot-time " seconds. Result: " - (if success-count "SUCCEEDED" "FAILED")))) + .build)] + (.runJobAsync jobs-client run-job-request)) (catch Exception e (throw e)) (finally (.close jobs-client))))) diff --git a/src/watcher/watcher.clj b/src/watcher/watcher.clj index f03a7a3..bb10d57 100644 --- a/src/watcher/watcher.clj +++ b/src/watcher/watcher.clj @@ -88,8 +88,8 @@ (info "No new file information written to kafka.")) (if initiate-job (doseq [release-map file-details] - (let [initiated-job (job/initiate-cloud-run-job release-map)] - (info "Initiated cloud run job " initiated-job " with payload " release-map))) + (let [initiated-job (future (job/initiate-cloud-run-job release-map))] + (info "Initiated cloud run job " (job/ gcp-job-name) " with payload " release-map))) (info "Cloud run job not initiated."))))) From 2690de08328f9cd444a91043693d5199442a62af Mon Sep 17 00:00:00 2001 From: Terry ONeill Date: Mon, 4 Mar 2024 17:24:00 -0500 Subject: [PATCH 3/8] Added a comment on deref of the futures in cloud run job context --- src/watcher/watcher.clj | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/watcher/watcher.clj b/src/watcher/watcher.clj index bb10d57..743b2a1 100644 --- a/src/watcher/watcher.clj +++ b/src/watcher/watcher.clj @@ -88,6 +88,13 @@ (info "No new file information written to kafka.")) (if initiate-job (doseq [release-map file-details] + ;; Dereferencing this future will cause this process to wait for future completion. + ;; since this is running as a cloud run job if we wait, gcp will kill this process with: + ;; "WARNING: The task has been cancelled. Please refer to + ;; https://github.com/googleapis/google-cloud-java#lro-timeouts for more information" + ;; Waiting for the future to complete will require adding JobsSettings + ;; https://cloud.google.com/java/docs/reference/google-cloud-run/latest/com.google.cloud.run.v2.JobsClient#com_google_cloud_run_v2_JobsClient_JobsClient_com_google_cloud_run_v2_JobsSettings_ + ;; (let [initiated-job (future (job/initiate-cloud-run-job release-map))] (info "Initiated cloud run job " (job/ gcp-job-name) " with payload " release-map))) (info "Cloud run job not initiated."))))) From 151498522f4e1eb89b36d78d6a755b00401ddf51 Mon Sep 17 00:00:00 2001 From: Terry ONeill Date: Mon, 4 Mar 2024 17:35:29 -0500 Subject: [PATCH 4/8] Fix typo --- src/watcher/watcher.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/watcher/watcher.clj b/src/watcher/watcher.clj index 743b2a1..0a9bfe6 100644 --- a/src/watcher/watcher.clj +++ b/src/watcher/watcher.clj @@ -96,7 +96,7 @@ ;; https://cloud.google.com/java/docs/reference/google-cloud-run/latest/com.google.cloud.run.v2.JobsClient#com_google_cloud_run_v2_JobsClient_JobsClient_com_google_cloud_run_v2_JobsSettings_ ;; (let [initiated-job (future (job/initiate-cloud-run-job release-map))] - (info "Initiated cloud run job " (job/ gcp-job-name) " with payload " release-map))) + (info "Initiated cloud run job " (job/gcp-job-name) " with payload " release-map))) (info "Cloud run job not initiated."))))) From 493d039bbc1724a652960178d915b466a586458e Mon Sep 17 00:00:00 2001 From: Terry ONeill Date: Fri, 8 Mar 2024 11:02:58 -0500 Subject: [PATCH 5/8] Added deploy script and github action for deploying watcher as cloud run job. --- .cloudbuild/docker-build-dev.cloudbuild.yaml | 20 ------- .github/workflows/deploy-job-main.yaml | 47 +++++++++++++++ misc/bin/deploy-job.sh | 63 ++++++++++++++++++++ 3 files changed, 110 insertions(+), 20 deletions(-) create mode 100644 .github/workflows/deploy-job-main.yaml create mode 100644 misc/bin/deploy-job.sh diff --git a/.cloudbuild/docker-build-dev.cloudbuild.yaml b/.cloudbuild/docker-build-dev.cloudbuild.yaml index 1737c7d..f72ee0a 100644 --- a/.cloudbuild/docker-build-dev.cloudbuild.yaml +++ b/.cloudbuild/docker-build-dev.cloudbuild.yaml @@ -10,26 +10,6 @@ steps: args: [ 'build', '.', '-t', 'clinvar-ftp-watcher:$COMMIT_SHA'] - name: 'gcr.io/cloud-builders/docker' args: [ 'tag', 'clinvar-ftp-watcher:$COMMIT_SHA', 'gcr.io/clingen-dev/clinvar-ftp-watcher:$COMMIT_SHA'] -# - name: 'gcr.io/clingen-dev/git-image-updater' -# secretEnv: ["GITHUB_TOKEN"] -# args: -# - '-c' -# - | -# git clone https://clingen-ci:$$GITHUB_TOKEN@github.com/clingen-data-model/architecture \ -# && cd architecture \ -# && git checkout -b image-update-$SHORT_SHA \ -# && /usr/bin/yq eval -i ".genegraph_docker_image_tag = \"$COMMIT_SHA\"" ./helm/values/genegraph/values-dev.yaml \ -# && date "+%Y-%m-%dT%H%M" > /workspace/DATETIME.txt \ -# && /usr/bin/yq eval -i ".genegraph_data_version = \"$(tr -d '\n' < /workspace/DATETIME.txt):$COMMIT_SHA\"" ./helm/values/genegraph/values-dev.yaml \ -# && git add -u \ -# && git -c user.name="Clingen CI Automation" -c user.email="clingendevs@broadinstitute.org" commit -m "bumping docker image for genegraph" \ -# && git push origin image-update-$SHORT_SHA \ -# && gh pr create --fill -l automation - -#availableSecrets: -# secretManager: -# - versionName: projects/clingen-dev/secrets/clingen-ci-github-token/versions/1 -# env: GITHUB_TOKEN # push the images images: diff --git a/.github/workflows/deploy-job-main.yaml b/.github/workflows/deploy-job-main.yaml new file mode 100644 index 0000000..80a652e --- /dev/null +++ b/.github/workflows/deploy-job-main.yaml @@ -0,0 +1,47 @@ +name: Deploy Job +run-name: Deploy ${{ github.event.workflow_run.head_branch }} by @${{ github.actor }} + +on: + workflow_run: + workflows: ["CI"] + types: + - completed + branches: + - main + - cloud-run-job-invocation + +permissions: + id-token: write + +jobs: + release: + runs-on: ubuntu-latest + if: ${{ github.event.workflow_run.conclusion == 'success' }} + steps: + - name: checkout + uses: actions/checkout@v3 + with: + ref: ${{ github.event.workflow_run.head_branch }} + + - name: authenticate to google cloud + id: "auth" + uses: google-github-actions/auth@v2 + with: + workload_identity_provider: projects/522856288592/locations/global/workloadIdentityPools/clingen-actions-pool/providers/clingen-github-actions + service_account: clinvar-ftp-watcher-deployment@clingen-dev.iam.gserviceaccount.com + + - name: setup gcloud sdk + uses: google-github-actions/setup-gcloud@v2 + + - name: set env vars + run: | + export branch=${{ github.event.workflow_run.head_branch }} + export commit=${{ github.event.workflow_run.head_sha }} + echo "branch=$branch" >> $GITHUB_ENV + echo "commit=$commit" >> $GITHUB_ENV + echo "instance_name=clinvar-ftp-watcher-$branch" >> $GITHUB_ENV + + - name: build and deploy + run: | + bash misc/bin/deploy-job.sh + diff --git a/misc/bin/deploy-job.sh b/misc/bin/deploy-job.sh new file mode 100644 index 0000000..043ebe8 --- /dev/null +++ b/misc/bin/deploy-job.sh @@ -0,0 +1,63 @@ +#!/usr/bin/env bash + +set -xeo pipefail + +if [ -z "$branch" ]; then + branch=$(git rev-parse --abbrev-ref HEAD) +else + echo "branch set in environment" +fi +if [ -z "$commit" ]; then + commit=$(git rev-parse HEAD) +else + echo "commit set in environment" +fi + +echo "Branch: $branch" +echo "Commit: $commit" + +set -u + +instance_name="clinvar-ftp-watcher-${branch}" +clinvar_ftp_watcher_bucket="clinvar-ftp-watcher" + +region="us-central1" +# project=$(gcloud config get project) +image_tag=workflow-py-$commit +image=gcr.io/clingen-dev/clinvar-ftp-watcher:$image_tag +deployment_service_account=clinvar-ftp-watcher-deployment@clingen-dev.iam.gserviceaccount.com + + +if gcloud run jobs list --region us-central1 | awk '{print $2}' | grep "^$instance_name$" ; then + echo "Cloud Run Job $instance_name already exists" + echo "Deleting Cloud Run Job" + gcloud run jobs delete $instance_name --region $region --quiet +fi + +################################################################ +# Build the image +cloudbuild=.cloudbuild/docker-build-dev.cloudbuild.yaml + +tar --no-xattrs -c \ + Dockerfile \ + build.clj \ + deps.edn \ + misc \ + src \ + .cloudbuild \ + | gzip --fast > archive.tar.gz + +gcloud builds submit \ + --substitutions="COMMIT_SHA=${image_tag}" \ + --config .cloudbuild/docker-build-dev.cloudbuild.yaml \ + --gcs-log-dir=gs://${clinvar_ftp_watcher_bucket}/build/logs \ + archive.tar.gz + +################################################################ +# Deploy job + +gcloud run jobs create $instance_name \ + --image=$image \ + --region=$region \ + --service-account=$pipeline_service_account \ + --set-env-vars=CLINVAR_FTP_WATCHER_BUCKET=$clinvar_ingest_bucket From b11a7a71d03315823b26a1b8840bc886ac14f92d Mon Sep 17 00:00:00 2001 From: Terry ONeill Date: Fri, 8 Mar 2024 13:26:30 -0500 Subject: [PATCH 6/8] A little bit of cleanup --- misc/bin/deploy-job.sh | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/misc/bin/deploy-job.sh b/misc/bin/deploy-job.sh index 043ebe8..9ee525f 100644 --- a/misc/bin/deploy-job.sh +++ b/misc/bin/deploy-job.sh @@ -18,13 +18,16 @@ echo "Commit: $commit" set -u -instance_name="clinvar-ftp-watcher-${branch}" +if [ "$branch" == "main" ]; then + instance_name="clinvar-ftp-watcher" +else + instance_name="clinvar-ftp-watcher-${branch}" +fi clinvar_ftp_watcher_bucket="clinvar-ftp-watcher" region="us-central1" # project=$(gcloud config get project) -image_tag=workflow-py-$commit -image=gcr.io/clingen-dev/clinvar-ftp-watcher:$image_tag +image=gcr.io/clingen-dev/clinvar-ftp-watcher:$commit deployment_service_account=clinvar-ftp-watcher-deployment@clingen-dev.iam.gserviceaccount.com @@ -48,7 +51,7 @@ tar --no-xattrs -c \ | gzip --fast > archive.tar.gz gcloud builds submit \ - --substitutions="COMMIT_SHA=${image_tag}" \ + --substitutions="COMMIT_SHA=${commit}" \ --config .cloudbuild/docker-build-dev.cloudbuild.yaml \ --gcs-log-dir=gs://${clinvar_ftp_watcher_bucket}/build/logs \ archive.tar.gz @@ -59,5 +62,5 @@ gcloud builds submit \ gcloud run jobs create $instance_name \ --image=$image \ --region=$region \ - --service-account=$pipeline_service_account \ - --set-env-vars=CLINVAR_FTP_WATCHER_BUCKET=$clinvar_ingest_bucket + --service-account=$deployment_service_account #\ + # --set-env-vars=CLINVAR_FTP_WATCHER_BUCKET=$clinvar_ftp_watcher_bucket From 46629e5a0f6795b79d9e775c51098a06ed78abe1 Mon Sep 17 00:00:00 2001 From: Terry ONeill Date: Fri, 8 Mar 2024 15:53:15 -0500 Subject: [PATCH 7/8] Added env var for DX_JAAS_CONFIG --- misc/bin/deploy-job.sh | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/misc/bin/deploy-job.sh b/misc/bin/deploy-job.sh index 9ee525f..60031a3 100644 --- a/misc/bin/deploy-job.sh +++ b/misc/bin/deploy-job.sh @@ -62,5 +62,7 @@ gcloud builds submit \ gcloud run jobs create $instance_name \ --image=$image \ --region=$region \ - --service-account=$deployment_service_account #\ - # --set-env-vars=CLINVAR_FTP_WATCHER_BUCKET=$clinvar_ftp_watcher_bucket + --service-account=$deployment_service_account \ + --set-secrets=DX_JAAS_CONFIG=dx-prod-jaas:latest + + From e121589627cea7049a76d6996ff388aee6214b92 Mon Sep 17 00:00:00 2001 From: Terry ONeill Date: Fri, 8 Mar 2024 16:10:30 -0500 Subject: [PATCH 8/8] Added max-reties and cpu specs to gcloud run jobs create --- misc/bin/deploy-job.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/misc/bin/deploy-job.sh b/misc/bin/deploy-job.sh index 60031a3..17011d4 100644 --- a/misc/bin/deploy-job.sh +++ b/misc/bin/deploy-job.sh @@ -60,7 +60,9 @@ gcloud builds submit \ # Deploy job gcloud run jobs create $instance_name \ + --cpu=1 \ --image=$image \ + --max-retries=0 \ --region=$region \ --service-account=$deployment_service_account \ --set-secrets=DX_JAAS_CONFIG=dx-prod-jaas:latest