Skip to content

Commit

Permalink
worker creates k8s objects
Browse files Browse the repository at this point in the history
  • Loading branch information
imor committed Jul 31, 2024
1 parent 7710513 commit 98c24ce
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 13 deletions.
7 changes: 6 additions & 1 deletion api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@ bytes = { workspace = true }
config = { workspace = true, features = ["yaml"] }
config-types = { path = "../config-types" }
k8s-openapi = { workspace = true, features = ["latest"] }
kube = { workspace = true, features = ["runtime", "derive"] }
kube = { workspace = true, features = [
"runtime",
"derive",
"client",
"rustls-tls",
] }
secrecy = { workspace = true, features = ["serde", "alloc"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true, features = ["std"] }
Expand Down
150 changes: 150 additions & 0 deletions api/src/k8s_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
use k8s_openapi::api::core::v1::{ConfigMap, Pod, Secret};
use serde_json::json;
use thiserror::Error;
use tracing::*;

use kube::{
api::{Api, PostParams, ResourceExt},
Client,
};

#[derive(Debug, Error)]
pub enum K8sError {
#[error["serde_json error: {0}"]]
Serde(#[from] serde_json::error::Error),

#[error["kube error: {0}"]]
Kube(#[from] kube::Error),
}

pub async fn create_bq_service_account_key_secret(
bq_service_account_key: &str,
) -> Result<(), K8sError> {
info!("creating BQ service account key secret");

let secret: Secret = serde_json::from_value(json!({
"apiVersion": "v1",
"kind": "Secret",
"metadata": {
"name": "bq-service-account-key"
},
"type": "Opaque",
"stringData": {
"service-account-key": bq_service_account_key,
}
}))?;

let client = Client::try_default().await?;
let secrets: Api<Secret> = Api::default_namespaced(client);

let pp = PostParams::default();
match secrets.create(&pp, &secret).await {
Ok(o) => {
let name = o.name_any();
assert_eq!(secret.name_any(), name);
info!("created Secret {}", name);
}
Err(kube::Error::Api(ae)) => {
error!("Error: {ae}");
assert_eq!(ae.code, 409);
} // if you skipped delete, for instance
Err(e) => return Err(e.into()), // any other case is probably bad
}

Ok(())
}

pub async fn create_config_map(base_config: &str, prod_config: &str) -> Result<(), K8sError> {
info!("creating config map");

let cm: ConfigMap = serde_json::from_value(json!({
"kind": "ConfigMap",
"apiVersion": "v1",
"metadata": {
"name": "replicator-config"
},
"data": {
"base.yaml": base_config,
"prod.yaml": prod_config,
}
}))?;

let client = Client::try_default().await?;
let config_maps: Api<ConfigMap> = Api::default_namespaced(client);

let pp = PostParams::default();
match config_maps.create(&pp, &cm).await {
Ok(o) => {
let name = o.name_any();
assert_eq!(cm.name_any(), name);
info!("created ConfigMap {}", name);
}
Err(kube::Error::Api(ae)) => {
error!("Error: {ae}");
assert_eq!(ae.code, 409);
} // if you skipped delete, for instance
Err(e) => return Err(e.into()), // any other case is probably bad
}
Ok(())
}

pub async fn create_pod() -> Result<(), K8sError> {
info!("creating Pod instance replicator");

let client = Client::try_default().await?;
let pods: Api<Pod> = Api::default_namespaced(client);

let p: Pod = serde_json::from_value(json!({
"apiVersion": "v1",
"kind": "Pod",
"metadata": { "name": "replicator" },
"spec": {
"volumes": [
{
"name": "config-file",
"configMap": {
"name": "replicator-config"
}
}
],
"containers": [{
"name": "replicator",
"image": "ramsup/replicator:0.0.7",
"env": [
{
"name": "APP_ENVIRONMENT",
"value": "prod"
},
{
"name": "APP_SINK__BIGQUERY__SERVICE_ACCOUNT_KEY",
"valueFrom": {
"secretKeyRef": {
"name": "bq-service-account-key",
"key": "service-account-key"
}
}
}
],
"volumeMounts": [{
"name": "config-file",
"mountPath": "/app/configuration"
}]
}],
}
}))?;

let pp = PostParams::default();
match pods.create(&pp, &p).await {
Ok(o) => {
let name = o.name_any();
assert_eq!(p.name_any(), name);
info!("created Pod {}", name);
}
Err(kube::Error::Api(ae)) => {
error!("Error: {ae}");
assert_eq!(ae.code, 409);
} // if you skipped delete, for instance
Err(e) => return Err(e.into()), // any other case is probably bad
}
Ok(())
}
1 change: 1 addition & 0 deletions api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod configuration;
pub mod k8s_client;
pub mod queue;
pub mod startup;
pub mod telemetry;
Expand Down
7 changes: 0 additions & 7 deletions api/src/queue.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use config_types::Settings;
use sqlx::{PgPool, Postgres, Transaction};

pub async fn enqueue_task(
Expand Down Expand Up @@ -29,12 +28,6 @@ pub struct Task {
pub data: serde_json::Value,
}

#[allow(clippy::large_enum_variant)]
pub enum Request {
CreateOrUpdate(Settings),
Delete(String),
}

pub async fn dequeue_task(pool: &PgPool) -> Result<Option<(PgTransaction, Task)>, anyhow::Error> {
let mut txn = pool.begin().await?;

Expand Down
52 changes: 47 additions & 5 deletions api/src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::time::Duration;

use config_types::SinkSettings;
use sqlx::PgPool;
use tracing::debug;
use tracing::{debug, info};
use tracing_log::log::error;

use crate::{
configuration::Settings,
k8s_client::{create_bq_service_account_key_secret, create_config_map, create_pod},
queue::{delete_task, dequeue_task},
startup::get_connection_pool,
};
Expand All @@ -26,21 +28,61 @@ async fn worker_loop(pool: PgPool, poll_duration: Duration) -> Result<(), anyhow
debug!("successfully executed task");
}
Err(e) => {
error!("error while executing task: {e}");
error!("error while executing task: {e:#?}");
}
}
tokio::time::sleep(poll_duration).await;
}
}

#[allow(clippy::large_enum_variant)]
#[derive(serde::Serialize, serde::Deserialize)]
pub enum Request {
CreateOrUpdate {
project_ref: String,
settings: config_types::Settings,
},
Delete {
project_ref: String,
},
}

pub async fn try_execute_task(pool: &PgPool) -> Result<ExecutionOutcome, anyhow::Error> {
let task = dequeue_task(pool).await?;
if task.is_none() {
let Some((transaction, task)) = task else {
return Ok(ExecutionOutcome::EmptyQueue);
};

let request = serde_json::from_value::<Request>(task.data)?;

match request {
Request::CreateOrUpdate {
project_ref,
settings,
} => {
info!(
"creating or updating k8s objects for project ref: {}",
project_ref
);

let SinkSettings::BigQuery {
project_id: _,
dataset_id: _,
service_account_key,
} = &settings.sink;
create_bq_service_account_key_secret(service_account_key).await?;
let base_config = "";
let prod_config = serde_json::to_string(&settings)?;
create_config_map(base_config, &prod_config).await?;
create_pod().await?;
}
Request::Delete { project_ref } => {
info!("deleting project ref: {}", project_ref);
}
}
let (transaction, task) = task.unwrap();
//TODO: perform task here

delete_task(transaction, task.id).await?;

Ok(ExecutionOutcome::TaskCompleted)
}

Expand Down

0 comments on commit 98c24ce

Please sign in to comment.