Skip to content

Commit

Permalink
Merge pull request #11 from clingen-data-model/cloud-run-job-invocation
Browse files Browse the repository at this point in the history
Cloud run job deploy and invocation
  • Loading branch information
toneillbroad authored Mar 8, 2024
2 parents f64ccd5 + e121589 commit cd76c79
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 38 deletions.
20 changes: 0 additions & 20 deletions .cloudbuild/docker-build-dev.cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,6 @@ steps:
args: [ 'build', '.', '-t', 'clinvar-ftp-watcher:$COMMIT_SHA']
- name: 'gcr.io/cloud-builders/docker'
args: [ 'tag', 'clinvar-ftp-watcher:$COMMIT_SHA', 'gcr.io/clingen-dev/clinvar-ftp-watcher:$COMMIT_SHA']
# - name: 'gcr.io/clingen-dev/git-image-updater'
# secretEnv: ["GITHUB_TOKEN"]
# args:
# - '-c'
# - |
# git clone https://clingen-ci:[email protected]/clingen-data-model/architecture \
# && cd architecture \
# && git checkout -b image-update-$SHORT_SHA \
# && /usr/bin/yq eval -i ".genegraph_docker_image_tag = \"$COMMIT_SHA\"" ./helm/values/genegraph/values-dev.yaml \
# && date "+%Y-%m-%dT%H%M" > /workspace/DATETIME.txt \
# && /usr/bin/yq eval -i ".genegraph_data_version = \"$(tr -d '\n' < /workspace/DATETIME.txt):$COMMIT_SHA\"" ./helm/values/genegraph/values-dev.yaml \
# && git add -u \
# && git -c user.name="Clingen CI Automation" -c user.email="[email protected]" commit -m "bumping docker image for genegraph" \
# && git push origin image-update-$SHORT_SHA \
# && gh pr create --fill -l automation

#availableSecrets:
# secretManager:
# - versionName: projects/clingen-dev/secrets/clingen-ci-github-token/versions/1
# env: GITHUB_TOKEN

# push the images
images:
Expand Down
47 changes: 47 additions & 0 deletions .github/workflows/deploy-job-main.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
name: Deploy Job
run-name: Deploy ${{ github.event.workflow_run.head_branch }} by @${{ github.actor }}

on:
workflow_run:
workflows: ["CI"]
types:
- completed
branches:
- main
- cloud-run-job-invocation

permissions:
id-token: write

jobs:
release:
runs-on: ubuntu-latest
if: ${{ github.event.workflow_run.conclusion == 'success' }}
steps:
- name: checkout
uses: actions/checkout@v3
with:
ref: ${{ github.event.workflow_run.head_branch }}

- name: authenticate to google cloud
id: "auth"
uses: google-github-actions/auth@v2
with:
workload_identity_provider: projects/522856288592/locations/global/workloadIdentityPools/clingen-actions-pool/providers/clingen-github-actions
service_account: clinvar-ftp-watcher-deployment@clingen-dev.iam.gserviceaccount.com

- name: setup gcloud sdk
uses: google-github-actions/setup-gcloud@v2

- name: set env vars
run: |
export branch=${{ github.event.workflow_run.head_branch }}
export commit=${{ github.event.workflow_run.head_sha }}
echo "branch=$branch" >> $GITHUB_ENV
echo "commit=$commit" >> $GITHUB_ENV
echo "instance_name=clinvar-ftp-watcher-$branch" >> $GITHUB_ENV
- name: build and deploy
run: |
bash misc/bin/deploy-job.sh
1 change: 1 addition & 0 deletions deps.edn
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
org.apache.kafka/kafka-clients {:mvn/version "3.1.0"}
com.google.auth/google-auth-library-oauth2-http {:mvn/version "1.20.0"}
com.google.cloud.functions/functions-framework-api {:mvn/version "1.0.4"}
com.google.cloud/google-cloud-run {:mvn/version "0.35.0"}
com.google.cloud/google-cloud-workflows {:mvn/version "2.31.0"}
com.google.cloud/google-cloud-workflow-executions {:mvn/version "2.31.0"}
com.taoensso/timbre {:mvn/version "6.3.1"}}
Expand Down
70 changes: 70 additions & 0 deletions misc/bin/deploy-job.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#!/usr/bin/env bash

set -xeo pipefail

if [ -z "$branch" ]; then
branch=$(git rev-parse --abbrev-ref HEAD)
else
echo "branch set in environment"
fi
if [ -z "$commit" ]; then
commit=$(git rev-parse HEAD)
else
echo "commit set in environment"
fi

echo "Branch: $branch"
echo "Commit: $commit"

set -u

if [ "$branch" == "main" ]; then
instance_name="clinvar-ftp-watcher"
else
instance_name="clinvar-ftp-watcher-${branch}"
fi
clinvar_ftp_watcher_bucket="clinvar-ftp-watcher"

region="us-central1"
# project=$(gcloud config get project)
image=gcr.io/clingen-dev/clinvar-ftp-watcher:$commit
deployment_service_account=clinvar-ftp-watcher-deployment@clingen-dev.iam.gserviceaccount.com


if gcloud run jobs list --region us-central1 | awk '{print $2}' | grep "^$instance_name$" ; then
echo "Cloud Run Job $instance_name already exists"
echo "Deleting Cloud Run Job"
gcloud run jobs delete $instance_name --region $region --quiet
fi

################################################################
# Build the image
cloudbuild=.cloudbuild/docker-build-dev.cloudbuild.yaml

tar --no-xattrs -c \
Dockerfile \
build.clj \
deps.edn \
misc \
src \
.cloudbuild \
| gzip --fast > archive.tar.gz

gcloud builds submit \
--substitutions="COMMIT_SHA=${commit}" \
--config .cloudbuild/docker-build-dev.cloudbuild.yaml \
--gcs-log-dir=gs://${clinvar_ftp_watcher_bucket}/build/logs \
archive.tar.gz

################################################################
# Deploy job

gcloud run jobs create $instance_name \
--cpu=1 \
--image=$image \
--max-retries=0 \
--region=$region \
--service-account=$deployment_service_account \
--set-secrets=DX_JAAS_CONFIG=dx-prod-jaas:latest


92 changes: 92 additions & 0 deletions src/watcher/cloudrunjob.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
(ns watcher.cloudrunjob
"A place for GCP Cloud Run Job related artifacts."
(:require [clojure.data.json :as json]
[clojure.string :as str]
[taoensso.timbre :refer [log info warn error]])
(:import [com.google.cloud.run.v2 JobsClient Execution JobName EnvVar
RunJobRequest RunJobRequest$Overrides RunJobRequest$Overrides$ContainerOverride]))


(def DEFAULT_GCP_PROJECT_ID "clingen-dev")
(def DEFAULT_GCP_LOCATION "us-central1")
(def DEFAULT_GCP_JOB_NAME "clinvar-ingest-main")

;; TODO s/b a macro?
(defn gcp-project-id []
(let [project-id (System/getenv "GCP_PROJECT_ID")]
(if (nil? project-id)
DEFAULT_GCP_PROJECT_ID
project-id)))

(defn gcp-location []
(let [location (System/getenv "GCP_LOCATION")]
(if (nil? location)
DEFAULT_GCP_LOCATION
location)))

(defn gcp-job-name []
(let [job-name (System/getenv "GCP_JOB_NAME")]
(if (nil? job-name)
DEFAULT_GCP_JOB_NAME
job-name)))


(defn normalize-keys [^clojure.lang.IPersistentMap payload]
"Normalize the keys in payload map to be lower case and replace spaces with underscore"
(reduce (fn [m [k v]]
(assoc m (-> k
str/lower-case
(str/replace #" " "_"))
v))
{}
payload))


(defn envvar [k v]
"Create EnvVars to use to pass as overrides to cloud run job. Note that all values are strings."
(-> (EnvVar/newBuilder)
(.setName k)
(.setValue (if (= "java.lang.String" (type v)) v (str v)))
.build))


(defn envars [^clojure.lang.IPersistentMap payload]
"Creates normalized EnvVars for each argument in the payload map."
(reduce (fn [l [k v]]
(conj l (envvar k v)))
[]
(normalize-keys payload)))


(defn overrides [^clojure.lang.IPersistentMap payload]
"Create and populate the RunJobRequest with environment variable overrides."
(-> (RunJobRequest$Overrides/newBuilder)
(.addContainerOverrides (-> (RunJobRequest$Overrides$ContainerOverride/newBuilder)
(.addAllEnv (envars payload))
.build))
.build))

(defn initiate-cloud-run-job [^clojure.lang.IPersistentMap payload]
"Pass a ClinVar release payload map to initiate the google cloud run job to ingest."
(let [jobs-client (JobsClient/create)]
(try
(let [job-name (-> (JobName/of (gcp-project-id) (gcp-location) (gcp-job-name))
.toString)
overrides (overrides payload)
run-job-request (-> (RunJobRequest/newBuilder)
(.setName job-name)
(.setOverrides overrides)
.build)]
(.runJobAsync jobs-client run-job-request))
(catch Exception e (throw e))
(finally (.close jobs-client)))))


#_ (initiate-cloud-run-job
{"Name" "ClinVarVariationRelease_2023-1209.xml.gz",
"Size" 3192148421,
"Released" "2023-12-10 01:49:23",
"Last Modified" "2023-12-10 01:49:23",
"Directory" "http://ftp.ncbi.nlm.nih.gov/pub/clinvar/xml/clinvar_variation/weekly_release",
"Release Date" "2023-12-09"})

42 changes: 24 additions & 18 deletions src/watcher/watcher.clj
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
(ns watcher.watcher
"Source for cloud function that periodicly watches the clivar ftp dir
for new files. State is maintained in a kafka topic."
(:require [watcher.ftpparse :as ftpparse]
[watcher.stream :as stream]
[watcher.util :as util]
[watcher.workflow :as workflow]
[clojure.data.json :as json]
[clojure.instant :refer [read-instant-date]]
[clojure.pprint :refer [pprint]]
[taoensso.timbre :refer [log info warn error]])
(:require [watcher.ftpparse :as ftpparse]
[watcher.stream :as stream]
[watcher.util :as util]
[watcher.cloudrunjob :as job]
[clojure.data.json :as json]
[clojure.instant :refer [read-instant-date]]
[clojure.pprint :refer [pprint]]
[taoensso.timbre :refer [log info warn error]])
(:import [java.util Date])
(:gen-class))

Expand Down Expand Up @@ -65,15 +65,15 @@
(defn -main
"Main processing point: reads the last message from the clinvar-ftp-watcher topic,
gets the last dated file, reads the clinvar ftp site looking for files with newer dates,
and writes the newer files to the clinvar-ftp-watcher topic and initiates the google workflow
for each found file.
and writes the newer files to the clinvar-ftp-watcher topic and initiates the google cloud
run job for each found file.
Args:
--kafka - will not write newly found files to the kafka stream
--workflow - will not initiate the google cloud workflow to process the files
--job - will not initiate the google cloud job to process the files
"
[& args]
(let [write-to-kafka (= -1 (if (some? args) (.indexOf args "--kafka") -1))
initiate-workflow (= -1 (if (some? args) (.indexOf args "--workflow") -1))
initiate-job (= -1 (if (some? args) (.indexOf args "--job") -1))
files (-> (stream/get-last-processed)
get-last-processed-date
get-latest-files-since)
Expand All @@ -86,12 +86,18 @@
(stream/save-to-topic date-processed (json/write-str file-details))
(info "Updated kafka topic with new file details."))
(info "No new file information written to kafka."))
(if initiate-workflow
(doseq [release file-details]
(let [payload (json/write-str release)
initiated-workflow (workflow/initiate-workflow payload)]
(info "Initiated workflow " initiated-workflow " with payload " payload)))
(info "No workflows initiated.")))))
(if initiate-job
(doseq [release-map file-details]
;; Dereferencing this future will cause this process to wait for future completion.
;; since this is running as a cloud run job if we wait, gcp will kill this process with:
;; "WARNING: The task has been cancelled. Please refer to
;; https://github.com/googleapis/google-cloud-java#lro-timeouts for more information"
;; Waiting for the future to complete will require adding JobsSettings
;; https://cloud.google.com/java/docs/reference/google-cloud-run/latest/com.google.cloud.run.v2.JobsClient#com_google_cloud_run_v2_JobsClient_JobsClient_com_google_cloud_run_v2_JobsSettings_
;;
(let [initiated-job (future (job/initiate-cloud-run-job release-map))]
(info "Initiated cloud run job " (job/gcp-job-name) " with payload " release-map)))
(info "Cloud run job not initiated.")))))


(comment
Expand Down

0 comments on commit cd76c79

Please sign in to comment.