From 0fabb0686fd654b7c2db52b84bc3d5e8c5967876 Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Wed, 13 Dec 2023 19:40:24 +0100 Subject: [PATCH] fix: use pod's IP instead of kube DNS fqdn (#2868) Co-authored-by: Henry Fontanier --- core/bin/dust_api.rs | 17 ++++++--- core/bin/sqlite_worker.rs | 27 ++++++++++--- core/src/sqlite_workers/client.rs | 35 +++++------------ core/src/stores/postgres.rs | 38 +++++++++---------- core/src/stores/store.rs | 8 ++-- k8s/configmaps/core-configmap.yaml | 2 - .../core-sqlite-worker-deployment.yaml | 23 ++++------- 7 files changed, 72 insertions(+), 78 deletions(-) diff --git a/core/bin/dust_api.rs b/core/bin/dust_api.rs index c2ee3bba4733..1327a4386aa4 100644 --- a/core/bin/dust_api.rs +++ b/core/bin/dust_api.rs @@ -2228,11 +2228,16 @@ async fn databases_query_run( // SQLite Workers +#[derive(serde::Deserialize)] +struct SQLiteWorkersUpsertOrDeletePayload { + url: String, +} + async fn sqlite_workers_heartbeat( - extract::Path(pod_name): extract::Path, + extract::Json(payload): extract::Json, extract::Extension(state): extract::Extension>, ) -> (StatusCode, Json) { - match state.store.sqlite_workers_upsert(&pod_name).await { + match state.store.sqlite_workers_upsert(&payload.url).await { Err(e) => error_response( StatusCode::INTERNAL_SERVER_ERROR, "internal_server_error", @@ -2250,10 +2255,10 @@ async fn sqlite_workers_heartbeat( } async fn sqlite_workers_delete( - extract::Path(pod_name): extract::Path, + extract::Json(payload): extract::Json, extract::Extension(state): extract::Extension>, ) -> (StatusCode, Json) { - match state.store.sqlite_workers_delete(&pod_name).await { + match state.store.sqlite_workers_delete(&payload.url).await { Err(e) => error_response( StatusCode::INTERNAL_SERVER_ERROR, "internal_server_error", @@ -2483,7 +2488,7 @@ fn main() { "/projects/:project_id/data_sources/:data_source_id/databases/:database_id/query", post(databases_query_run), ) - .route("/sqlite_workers/:pod_name", delete(sqlite_workers_delete)) + .route("/sqlite_workers", delete(sqlite_workers_delete)) // Misc .route("/tokenize", post(tokenize)) @@ -2498,7 +2503,7 @@ fn main() { // In a separate router, to avoid noisy tracing. let sqlite_heartbeat_router = Router::new() - .route("/sqlite_workers/:pod_name", post(sqlite_workers_heartbeat)) + .route("/sqlite_workers", post(sqlite_workers_heartbeat)) .layer(extract::Extension(state.clone())); let app = Router::new().merge(router).merge(sqlite_heartbeat_router); diff --git a/core/bin/sqlite_worker.rs b/core/bin/sqlite_worker.rs index f458242bb5ea..43f65f031167 100644 --- a/core/bin/sqlite_worker.rs +++ b/core/bin/sqlite_worker.rs @@ -96,9 +96,20 @@ impl WorkerState { } async fn _core_request(&self, method: &str) -> Result<()> { - let hostname = match std::env::var("HOSTNAME") { - Ok(hostname) => hostname, - Err(_) => Err(anyhow!("HOSTNAME not set."))?, + let worker_url = match std::env::var("IS_LOCAL_DEV") { + Ok(_) => "http://localhost:3005".to_string(), + _ => { + let port = match std::env::var("POD_PORT") { + Ok(port) => port, + Err(_) => Err(anyhow!("PORT not set."))?, + }; + let ip = match std::env::var("POD_IP") { + Ok(ip) => ip, + Err(_) => Err(anyhow!("IP not set."))?, + }; + + format!("http://{}:{}", ip, port) + } }; let core_api = match std::env::var("CORE_API") { @@ -108,8 +119,14 @@ impl WorkerState { let req = Request::builder() .method(method) - .uri(format!("{}/sqlite_workers/{}", core_api, hostname)) - .body(Body::empty())?; + .uri(format!("{}/sqlite_workers", core_api)) + .header("Content-Type", "application/json") + .body(Body::from( + json!({ + "url": worker_url, + }) + .to_string(), + ))?; let res = Client::new().request(req).await?; diff --git a/core/src/sqlite_workers/client.rs b/core/src/sqlite_workers/client.rs index 309118133cd3..a1be4e5eb902 100644 --- a/core/src/sqlite_workers/client.rs +++ b/core/src/sqlite_workers/client.rs @@ -12,14 +12,14 @@ pub const HEARTBEAT_INTERVAL_MS: u64 = 3_000; pub struct SqliteWorker { last_heartbeat: u64, - pod_name: String, + url: String, } impl SqliteWorker { - pub fn new(pod_name: String, last_heartbeat: u64) -> Self { + pub fn new(url: String, last_heartbeat: u64) -> Self { Self { last_heartbeat: last_heartbeat, - pod_name, + url, } } @@ -37,7 +37,7 @@ impl SqliteWorker { rows: Vec, truncate: bool, ) -> Result<()> { - let url = self.url()?; + let url = self.url(); let req = Request::builder() .method("POST") .uri(format!( @@ -70,7 +70,7 @@ impl SqliteWorker { table_id: &str, limit_offset: Option<(usize, usize)>, ) -> Result<(Vec, usize)> { - let worker_url = self.url()?; + let worker_url = self.url(); let mut uri = format!( "{}/databases/{}/tables/{}/rows", @@ -127,7 +127,7 @@ impl SqliteWorker { table_id: &str, row_id: &str, ) -> Result> { - let worker_url = self.url()?; + let worker_url = self.url(); let uri = format!( "{}/databases/{}/tables/{}/rows/{}", @@ -173,7 +173,7 @@ impl SqliteWorker { tables: Vec, query: &str, ) -> Result> { - let worker_url = self.url()?; + let worker_url = self.url(); let req = Request::builder() .method("POST") @@ -214,24 +214,7 @@ impl SqliteWorker { } } - pub fn url(&self) -> Result { - match std::env::var("IS_LOCAL_DEV") { - Ok(_) => return Ok("http://localhost:3005".to_string()), - Err(_) => (), - } - let cluster_namespace = match std::env::var("CLUSTER_NAMESPACE") { - Ok(n) => n, - Err(_) => Err(anyhow!("CLUSTER_NAMESPACE env var not set"))?, - }; - let core_sqlite_headless_service_name = - match std::env::var("CORE_SQLITE_HEADLESS_SERVICE_NAME") { - Ok(s) => s, - Err(_) => Err(anyhow!("CORE_SQLITE_HEADLESS_SERVICE_NAME env var not set"))?, - }; - - Ok(format!( - "http://{}.{}.{}.svc.cluster.local", - self.pod_name, core_sqlite_headless_service_name, cluster_namespace - )) + pub fn url(&self) -> &str { + &self.url } } diff --git a/core/src/stores/postgres.rs b/core/src/stores/postgres.rs index b6ec93d6a5fe..c8f872957a4a 100644 --- a/core/src/stores/postgres.rs +++ b/core/src/stores/postgres.rs @@ -2037,7 +2037,7 @@ impl Store for PostgresStore { // Check if there is already an assigned live worker. let stmt = tx .prepare( - "SELECT pod_name, last_heartbeat + "SELECT url, last_heartbeat FROM sqlite_workers WHERE id IN ( SELECT sqlite_worker @@ -2060,8 +2060,8 @@ impl Store for PostgresStore { let worker: Option = match r.len() { 0 => None, 1 => { - let (pod_name, last_heartbeat): (String, i64) = (r[0].get(0), r[0].get(1)); - Some(SqliteWorker::new(pod_name, last_heartbeat as u64)) + let (url, last_heartbeat): (String, i64) = (r[0].get(0), r[0].get(1)); + Some(SqliteWorker::new(url, last_heartbeat as u64)) } _ => unreachable!(), }; @@ -2076,7 +2076,7 @@ impl Store for PostgresStore { // Pick a random live worker. let stmt = tx .prepare( - "SELECT id, pod_name, last_heartbeat + "SELECT id, url, last_heartbeat FROM sqlite_workers WHERE last_heartbeat > $1 ORDER BY RANDOM() LIMIT 1", ) @@ -2086,7 +2086,7 @@ impl Store for PostgresStore { match r.len() { 0 => Err(anyhow!("No live workers found"))?, 1 => { - let (sqlite_worker_row_id, pod_name, last_heartbeat): (i64, String, i64) = + let (sqlite_worker_row_id, url, last_heartbeat): (i64, String, i64) = (r[0].get(0), r[0].get(1), r[0].get(2)); // Update the database row to assign the worker. @@ -2109,7 +2109,7 @@ impl Store for PostgresStore { // Release the lock. tx.commit().await?; - Ok(SqliteWorker::new(pod_name, last_heartbeat as u64)) + Ok(SqliteWorker::new(url, last_heartbeat as u64)) } _ => unreachable!(), } @@ -2758,20 +2758,20 @@ impl Store for PostgresStore { let c = pool.get().await?; let stmt = c - .prepare("SELECT pod_name, last_heartbeat FROM sqlite_workers") + .prepare("SELECT url, last_heartbeat FROM sqlite_workers") .await?; let rows = c.query(&stmt, &[]).await?; rows.iter() .map(|row| { - let pod_name: String = row.get(0); + let url: String = row.get(0); let last_heartbeat: i64 = row.get(1); - Ok(SqliteWorker::new(pod_name, last_heartbeat as u64)) + Ok(SqliteWorker::new(url, last_heartbeat as u64)) }) .collect::>>() } - async fn sqlite_workers_upsert(&self, pod_name: &str) -> Result { + async fn sqlite_workers_upsert(&self, url: &str) -> Result { let pool = self.pool.clone(); let c = pool.get().await?; @@ -2779,9 +2779,9 @@ impl Store for PostgresStore { let stmt = c .prepare( - "INSERT INTO sqlite_workers (id, created, pod_name, last_heartbeat) \ + "INSERT INTO sqlite_workers (id, created, url, last_heartbeat) \ VALUES (DEFAULT, $1, $2, $3) \ - ON CONFLICT (pod_name) DO UPDATE \ + ON CONFLICT (url) DO UPDATE \ SET last_heartbeat = EXCLUDED.last_heartbeat RETURNING id", ) .await?; @@ -2790,16 +2790,16 @@ impl Store for PostgresStore { &stmt, &[ &(utils::now() as i64), - &pod_name.to_string(), + &url.to_string(), &(last_heartbeat as i64), ], ) .await?; - Ok(SqliteWorker::new(pod_name.to_string(), last_heartbeat)) + Ok(SqliteWorker::new(url.to_string(), last_heartbeat)) } - async fn sqlite_workers_delete(&self, pod_name: &str) -> Result<()> { + async fn sqlite_workers_delete(&self, url: &str) -> Result<()> { let pool = self.pool.clone(); let c = pool.get().await?; @@ -2810,18 +2810,18 @@ impl Store for PostgresStore { WHERE sqlite_worker IN ( SELECT id FROM sqlite_workers - WHERE pod_name = $1 + WHERE url = $1 )", ) .await?; - c.execute(&stmt, &[&pod_name.to_string()]).await?; + c.execute(&stmt, &[&url.to_string()]).await?; // Delete the worker. let stmt = c - .prepare("DELETE FROM sqlite_workers WHERE pod_name = $1") + .prepare("DELETE FROM sqlite_workers WHERE url = $1") .await?; - c.execute(&stmt, &[&pod_name.to_string()]).await?; + c.execute(&stmt, &[&url.to_string()]).await?; Ok(()) } diff --git a/core/src/stores/store.rs b/core/src/stores/store.rs index b9da5f00d3a0..0fe90eb1bc4b 100644 --- a/core/src/stores/store.rs +++ b/core/src/stores/store.rs @@ -276,8 +276,8 @@ pub trait Store { // SQLite Workers async fn sqlite_workers_list(&self) -> Result>; - async fn sqlite_workers_upsert(&self, pod_name: &str) -> Result; - async fn sqlite_workers_delete(&self, pod_name: &str) -> Result<()>; + async fn sqlite_workers_upsert(&self, url: &str) -> Result; + async fn sqlite_workers_delete(&self, url: &str) -> Result<()>; async fn sqlite_workers_cleanup(&self, ttl: u64) -> Result<()>; // Cloning @@ -399,7 +399,7 @@ pub const POSTGRES_TABLES: [&'static str; 14] = [ CREATE TABLE IF NOT EXISTS sqlite_workers ( id BIGSERIAL PRIMARY KEY, created BIGINT NOT NULL, - pod_name TEXT NOT NULL, + url TEXT NOT NULL, last_heartbeat BIGINT NOT NULL );", "-- database @@ -478,7 +478,7 @@ pub const SQL_INDEXES: [&'static str; 23] = [ "CREATE UNIQUE INDEX IF NOT EXISTS idx_databases_tables_database_table_name ON databases_tables (database, name);", "CREATE UNIQUE INDEX IF NOT EXISTS - idx_sqlite_workers_pod_name ON sqlite_workers (pod_name);", + idx_sqlite_workers_url ON sqlite_workers (url);", ]; pub const SQL_FUNCTIONS: [&'static str; 3] = [ diff --git a/k8s/configmaps/core-configmap.yaml b/k8s/configmaps/core-configmap.yaml index b6a2801120b0..cf9296dcb13d 100644 --- a/k8s/configmaps/core-configmap.yaml +++ b/k8s/configmaps/core-configmap.yaml @@ -7,5 +7,3 @@ data: DD_SERVICE: "core" DD_LOGS_INJECTION: "true" DD_RUNTIME_METRICS_ENABLED: "true" - CORE_SQLITE_HEADLESS_SERVICE_NAME: "core-sqlite-worker-headless-service" - CLUSTER_NAMESPACE: "default" diff --git a/k8s/deployments/core-sqlite-worker-deployment.yaml b/k8s/deployments/core-sqlite-worker-deployment.yaml index 7a598082b43c..9211758ebced 100644 --- a/k8s/deployments/core-sqlite-worker-deployment.yaml +++ b/k8s/deployments/core-sqlite-worker-deployment.yaml @@ -24,12 +24,6 @@ spec: imagePullPolicy: Always ports: - containerPort: 3005 - readinessProbe: - httpGet: - path: / - port: 3005 - initialDelaySeconds: 5 - periodSeconds: 5 envFrom: - configMapRef: @@ -42,10 +36,13 @@ spec: fieldRef: fieldPath: status.hostIP - # TODO: Uncomment this when we have service accounts - # volumeMounts: - # - name: service-account-volume - # mountPath: /etc/service-accounts + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + + - name: POD_PORT + value: "3005" resources: requests: @@ -54,9 +51,3 @@ spec: limits: cpu: 500m memory: 250Mi - - # TODO: Uncomment this when we have service accounts - # volumes: - # - name: service-account-volume - # secret: - # secretName: gcp-service-account-secret