Skip to content

Commit

Permalink
fix: use pod's IP instead of kube DNS fqdn (#2868)
Browse files Browse the repository at this point in the history
Co-authored-by: Henry Fontanier <[email protected]>
  • Loading branch information
fontanierh and Henry Fontanier authored Dec 13, 2023
1 parent e754f91 commit 0fabb06
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 78 deletions.
17 changes: 11 additions & 6 deletions core/bin/dust_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2228,11 +2228,16 @@ async fn databases_query_run(

// SQLite Workers

#[derive(serde::Deserialize)]
struct SQLiteWorkersUpsertOrDeletePayload {
url: String,
}

async fn sqlite_workers_heartbeat(
extract::Path(pod_name): extract::Path<String>,
extract::Json(payload): extract::Json<SQLiteWorkersUpsertOrDeletePayload>,
extract::Extension(state): extract::Extension<Arc<APIState>>,
) -> (StatusCode, Json<APIResponse>) {
match state.store.sqlite_workers_upsert(&pod_name).await {
match state.store.sqlite_workers_upsert(&payload.url).await {
Err(e) => error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"internal_server_error",
Expand All @@ -2250,10 +2255,10 @@ async fn sqlite_workers_heartbeat(
}

async fn sqlite_workers_delete(
extract::Path(pod_name): extract::Path<String>,
extract::Json(payload): extract::Json<SQLiteWorkersUpsertOrDeletePayload>,
extract::Extension(state): extract::Extension<Arc<APIState>>,
) -> (StatusCode, Json<APIResponse>) {
match state.store.sqlite_workers_delete(&pod_name).await {
match state.store.sqlite_workers_delete(&payload.url).await {
Err(e) => error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"internal_server_error",
Expand Down Expand Up @@ -2483,7 +2488,7 @@ fn main() {
"/projects/:project_id/data_sources/:data_source_id/databases/:database_id/query",
post(databases_query_run),
)
.route("/sqlite_workers/:pod_name", delete(sqlite_workers_delete))
.route("/sqlite_workers", delete(sqlite_workers_delete))
// Misc
.route("/tokenize", post(tokenize))

Expand All @@ -2498,7 +2503,7 @@ fn main() {

// In a separate router, to avoid noisy tracing.
let sqlite_heartbeat_router = Router::new()
.route("/sqlite_workers/:pod_name", post(sqlite_workers_heartbeat))
.route("/sqlite_workers", post(sqlite_workers_heartbeat))
.layer(extract::Extension(state.clone()));

let app = Router::new().merge(router).merge(sqlite_heartbeat_router);
Expand Down
27 changes: 22 additions & 5 deletions core/bin/sqlite_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,20 @@ impl WorkerState {
}

async fn _core_request(&self, method: &str) -> Result<()> {
let hostname = match std::env::var("HOSTNAME") {
Ok(hostname) => hostname,
Err(_) => Err(anyhow!("HOSTNAME not set."))?,
let worker_url = match std::env::var("IS_LOCAL_DEV") {
Ok(_) => "http://localhost:3005".to_string(),
_ => {
let port = match std::env::var("POD_PORT") {
Ok(port) => port,
Err(_) => Err(anyhow!("PORT not set."))?,
};
let ip = match std::env::var("POD_IP") {
Ok(ip) => ip,
Err(_) => Err(anyhow!("IP not set."))?,
};

format!("http://{}:{}", ip, port)
}
};

let core_api = match std::env::var("CORE_API") {
Expand All @@ -108,8 +119,14 @@ impl WorkerState {

let req = Request::builder()
.method(method)
.uri(format!("{}/sqlite_workers/{}", core_api, hostname))
.body(Body::empty())?;
.uri(format!("{}/sqlite_workers", core_api))
.header("Content-Type", "application/json")
.body(Body::from(
json!({
"url": worker_url,
})
.to_string(),
))?;

let res = Client::new().request(req).await?;

Expand Down
35 changes: 9 additions & 26 deletions core/src/sqlite_workers/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ pub const HEARTBEAT_INTERVAL_MS: u64 = 3_000;

pub struct SqliteWorker {
last_heartbeat: u64,
pod_name: String,
url: String,
}

impl SqliteWorker {
pub fn new(pod_name: String, last_heartbeat: u64) -> Self {
pub fn new(url: String, last_heartbeat: u64) -> Self {
Self {
last_heartbeat: last_heartbeat,
pod_name,
url,
}
}

Expand All @@ -37,7 +37,7 @@ impl SqliteWorker {
rows: Vec<DatabaseRow>,
truncate: bool,
) -> Result<()> {
let url = self.url()?;
let url = self.url();
let req = Request::builder()
.method("POST")
.uri(format!(
Expand Down Expand Up @@ -70,7 +70,7 @@ impl SqliteWorker {
table_id: &str,
limit_offset: Option<(usize, usize)>,
) -> Result<(Vec<DatabaseRow>, usize)> {
let worker_url = self.url()?;
let worker_url = self.url();

let mut uri = format!(
"{}/databases/{}/tables/{}/rows",
Expand Down Expand Up @@ -127,7 +127,7 @@ impl SqliteWorker {
table_id: &str,
row_id: &str,
) -> Result<Option<DatabaseRow>> {
let worker_url = self.url()?;
let worker_url = self.url();

let uri = format!(
"{}/databases/{}/tables/{}/rows/{}",
Expand Down Expand Up @@ -173,7 +173,7 @@ impl SqliteWorker {
tables: Vec<DatabaseTable>,
query: &str,
) -> Result<Vec<DatabaseResult>> {
let worker_url = self.url()?;
let worker_url = self.url();

let req = Request::builder()
.method("POST")
Expand Down Expand Up @@ -214,24 +214,7 @@ impl SqliteWorker {
}
}

pub fn url(&self) -> Result<String> {
match std::env::var("IS_LOCAL_DEV") {
Ok(_) => return Ok("http://localhost:3005".to_string()),
Err(_) => (),
}
let cluster_namespace = match std::env::var("CLUSTER_NAMESPACE") {
Ok(n) => n,
Err(_) => Err(anyhow!("CLUSTER_NAMESPACE env var not set"))?,
};
let core_sqlite_headless_service_name =
match std::env::var("CORE_SQLITE_HEADLESS_SERVICE_NAME") {
Ok(s) => s,
Err(_) => Err(anyhow!("CORE_SQLITE_HEADLESS_SERVICE_NAME env var not set"))?,
};

Ok(format!(
"http://{}.{}.{}.svc.cluster.local",
self.pod_name, core_sqlite_headless_service_name, cluster_namespace
))
pub fn url(&self) -> &str {
&self.url
}
}
38 changes: 19 additions & 19 deletions core/src/stores/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2037,7 +2037,7 @@ impl Store for PostgresStore {
// Check if there is already an assigned live worker.
let stmt = tx
.prepare(
"SELECT pod_name, last_heartbeat
"SELECT url, last_heartbeat
FROM sqlite_workers
WHERE id IN (
SELECT sqlite_worker
Expand All @@ -2060,8 +2060,8 @@ impl Store for PostgresStore {
let worker: Option<SqliteWorker> = match r.len() {
0 => None,
1 => {
let (pod_name, last_heartbeat): (String, i64) = (r[0].get(0), r[0].get(1));
Some(SqliteWorker::new(pod_name, last_heartbeat as u64))
let (url, last_heartbeat): (String, i64) = (r[0].get(0), r[0].get(1));
Some(SqliteWorker::new(url, last_heartbeat as u64))
}
_ => unreachable!(),
};
Expand All @@ -2076,7 +2076,7 @@ impl Store for PostgresStore {
// Pick a random live worker.
let stmt = tx
.prepare(
"SELECT id, pod_name, last_heartbeat
"SELECT id, url, last_heartbeat
FROM sqlite_workers
WHERE last_heartbeat > $1 ORDER BY RANDOM() LIMIT 1",
)
Expand All @@ -2086,7 +2086,7 @@ impl Store for PostgresStore {
match r.len() {
0 => Err(anyhow!("No live workers found"))?,
1 => {
let (sqlite_worker_row_id, pod_name, last_heartbeat): (i64, String, i64) =
let (sqlite_worker_row_id, url, last_heartbeat): (i64, String, i64) =
(r[0].get(0), r[0].get(1), r[0].get(2));

// Update the database row to assign the worker.
Expand All @@ -2109,7 +2109,7 @@ impl Store for PostgresStore {
// Release the lock.
tx.commit().await?;

Ok(SqliteWorker::new(pod_name, last_heartbeat as u64))
Ok(SqliteWorker::new(url, last_heartbeat as u64))
}
_ => unreachable!(),
}
Expand Down Expand Up @@ -2758,30 +2758,30 @@ impl Store for PostgresStore {
let c = pool.get().await?;

let stmt = c
.prepare("SELECT pod_name, last_heartbeat FROM sqlite_workers")
.prepare("SELECT url, last_heartbeat FROM sqlite_workers")
.await?;
let rows = c.query(&stmt, &[]).await?;

rows.iter()
.map(|row| {
let pod_name: String = row.get(0);
let url: String = row.get(0);
let last_heartbeat: i64 = row.get(1);
Ok(SqliteWorker::new(pod_name, last_heartbeat as u64))
Ok(SqliteWorker::new(url, last_heartbeat as u64))
})
.collect::<Result<Vec<_>>>()
}

async fn sqlite_workers_upsert(&self, pod_name: &str) -> Result<SqliteWorker> {
async fn sqlite_workers_upsert(&self, url: &str) -> Result<SqliteWorker> {
let pool = self.pool.clone();
let c = pool.get().await?;

let last_heartbeat = utils::now();

let stmt = c
.prepare(
"INSERT INTO sqlite_workers (id, created, pod_name, last_heartbeat) \
"INSERT INTO sqlite_workers (id, created, url, last_heartbeat) \
VALUES (DEFAULT, $1, $2, $3) \
ON CONFLICT (pod_name) DO UPDATE \
ON CONFLICT (url) DO UPDATE \
SET last_heartbeat = EXCLUDED.last_heartbeat RETURNING id",
)
.await?;
Expand All @@ -2790,16 +2790,16 @@ impl Store for PostgresStore {
&stmt,
&[
&(utils::now() as i64),
&pod_name.to_string(),
&url.to_string(),
&(last_heartbeat as i64),
],
)
.await?;

Ok(SqliteWorker::new(pod_name.to_string(), last_heartbeat))
Ok(SqliteWorker::new(url.to_string(), last_heartbeat))
}

async fn sqlite_workers_delete(&self, pod_name: &str) -> Result<()> {
async fn sqlite_workers_delete(&self, url: &str) -> Result<()> {
let pool = self.pool.clone();
let c = pool.get().await?;

Expand All @@ -2810,18 +2810,18 @@ impl Store for PostgresStore {
WHERE sqlite_worker IN (
SELECT id
FROM sqlite_workers
WHERE pod_name = $1
WHERE url = $1
)",
)
.await?;
c.execute(&stmt, &[&pod_name.to_string()]).await?;
c.execute(&stmt, &[&url.to_string()]).await?;

// Delete the worker.
let stmt = c
.prepare("DELETE FROM sqlite_workers WHERE pod_name = $1")
.prepare("DELETE FROM sqlite_workers WHERE url = $1")
.await?;

c.execute(&stmt, &[&pod_name.to_string()]).await?;
c.execute(&stmt, &[&url.to_string()]).await?;

Ok(())
}
Expand Down
8 changes: 4 additions & 4 deletions core/src/stores/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,8 @@ pub trait Store {

// SQLite Workers
async fn sqlite_workers_list(&self) -> Result<Vec<SqliteWorker>>;
async fn sqlite_workers_upsert(&self, pod_name: &str) -> Result<SqliteWorker>;
async fn sqlite_workers_delete(&self, pod_name: &str) -> Result<()>;
async fn sqlite_workers_upsert(&self, url: &str) -> Result<SqliteWorker>;
async fn sqlite_workers_delete(&self, url: &str) -> Result<()>;
async fn sqlite_workers_cleanup(&self, ttl: u64) -> Result<()>;

// Cloning
Expand Down Expand Up @@ -399,7 +399,7 @@ pub const POSTGRES_TABLES: [&'static str; 14] = [
CREATE TABLE IF NOT EXISTS sqlite_workers (
id BIGSERIAL PRIMARY KEY,
created BIGINT NOT NULL,
pod_name TEXT NOT NULL,
url TEXT NOT NULL,
last_heartbeat BIGINT NOT NULL
);",
"-- database
Expand Down Expand Up @@ -478,7 +478,7 @@ pub const SQL_INDEXES: [&'static str; 23] = [
"CREATE UNIQUE INDEX IF NOT EXISTS
idx_databases_tables_database_table_name ON databases_tables (database, name);",
"CREATE UNIQUE INDEX IF NOT EXISTS
idx_sqlite_workers_pod_name ON sqlite_workers (pod_name);",
idx_sqlite_workers_url ON sqlite_workers (url);",
];

pub const SQL_FUNCTIONS: [&'static str; 3] = [
Expand Down
2 changes: 0 additions & 2 deletions k8s/configmaps/core-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,3 @@ data:
DD_SERVICE: "core"
DD_LOGS_INJECTION: "true"
DD_RUNTIME_METRICS_ENABLED: "true"
CORE_SQLITE_HEADLESS_SERVICE_NAME: "core-sqlite-worker-headless-service"
CLUSTER_NAMESPACE: "default"
23 changes: 7 additions & 16 deletions k8s/deployments/core-sqlite-worker-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,6 @@ spec:
imagePullPolicy: Always
ports:
- containerPort: 3005
readinessProbe:
httpGet:
path: /
port: 3005
initialDelaySeconds: 5
periodSeconds: 5

envFrom:
- configMapRef:
Expand All @@ -42,10 +36,13 @@ spec:
fieldRef:
fieldPath: status.hostIP

# TODO: Uncomment this when we have service accounts
# volumeMounts:
# - name: service-account-volume
# mountPath: /etc/service-accounts
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP

- name: POD_PORT
value: "3005"

resources:
requests:
Expand All @@ -54,9 +51,3 @@ spec:
limits:
cpu: 500m
memory: 250Mi

# TODO: Uncomment this when we have service accounts
# volumes:
# - name: service-account-volume
# secret:
# secretName: gcp-service-account-secret

0 comments on commit 0fabb06

Please sign in to comment.