Skip to content

Commit

Permalink
feat: WIP prometheus config generation
Browse files Browse the repository at this point in the history
Signed-off-by: Zander Franks <[email protected]>
  • Loading branch information
voximity committed Mar 30, 2024
1 parent d83d773 commit fd711f1
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 16 deletions.
26 changes: 26 additions & 0 deletions crates/snot-agent/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,32 @@ impl AgentService for AgentRpcServer {
}
}

async fn get_metrics(self, _: context::Context) -> Result<String, AgentError> {
if !matches!(
self.state.agent_state.read().await.deref(),
AgentState::Node(_, _)
) {
return Err(AgentError::InvalidState);
}

let url = format!("http://127.0.0.1:{}/", self.state.cli.ports.metrics);

let response = reqwest::Client::new()
.get(url)
.send()
.await
.map_err(|_| AgentError::FailedToMakeRequest)?;

if !response.status().is_success() {
return Err(AgentError::FailedToMakeRequest);
}

response
.text()
.await
.map_err(|_| AgentError::FailedToMakeRequest)
}

async fn get_metric(self, _: context::Context, metric: AgentMetric) -> f64 {
let metrics = self.state.metrics.read().await;

Expand Down
5 changes: 4 additions & 1 deletion crates/snot-common/src/rpc/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@ pub trait AgentService {
async fn broadcast_tx(tx: String) -> Result<(), AgentError>;

/// Locally execute an authorization, using the given query
/// environment id is passed so the agent can determine which aot binary to use
/// environment id is passed so the agent can determine which aot binary to
/// use
async fn execute_authorization(
env_id: usize,
query: String,
auth: String,
) -> Result<(), AgentError>;

async fn get_metrics() -> Result<String, AgentError>;

async fn get_metric(metric: AgentMetric) -> f64;
}

Expand Down
2 changes: 0 additions & 2 deletions crates/snot-common/src/rpc/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ use crate::state::AgentId;

#[tarpc::service]
pub trait ControlService {
async fn placeholder() -> String;

/// Resolve the addresses of the given agents.
async fn resolve_addrs(
peers: HashSet<AgentId>,
Expand Down
47 changes: 47 additions & 0 deletions crates/snot/src/prometheus/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use indexmap::IndexMap;
use serde::Serialize;

// TODO: we could probably clean this up or make it look a little bit prettier
// later

#[derive(Debug, Clone, Serialize)]
pub struct PrometheusConfig {
pub global: GlobalConfig,
pub scrape_configs: Vec<ScrapeConfig>,
}

#[derive(Debug, Clone, Serialize)]
pub struct GlobalConfig {
pub scrape_interval: String,
pub scrape_timeout: String,
pub evaluation_interval: String,
}

impl Default for GlobalConfig {
fn default() -> Self {
Self {
scrape_interval: "15s".into(),
scrape_timeout: "10s".into(),
evaluation_interval: "1m".into(),
}
}
}

#[derive(Debug, Clone, Serialize)]
pub struct ScrapeConfig {
pub job_name: String,
pub honor_timestamps: Option<bool>,
pub scrape_interval: Option<String>,
pub scrape_timeout: Option<String>,
pub metrics_path: Option<String>,
pub scheme: Option<String>,
pub follow_redirects: Option<bool>,
#[serde(default)]
pub static_configs: Vec<StaticConfig>,
}

#[derive(Debug, Clone, Serialize)]
pub struct StaticConfig {
pub targets: Vec<String>,
pub labels: IndexMap<String, String>,
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
mod config;

use anyhow::bail;
use bollard::{
container::{self, ListContainersOptions},
Expand All @@ -6,10 +8,12 @@ use bollard::{
Docker,
};
use futures_util::TryStreamExt;
use indexmap::IndexMap;
use serde_json::json;
use tracing::info;

use crate::state::GlobalState;
use self::config::{PrometheusConfig, ScrapeConfig, StaticConfig};
use crate::{env::EnvPeer, state::GlobalState};

const PROMETHEUS_IMAGE: &str = "prom/prometheus:latest";

Expand All @@ -21,9 +25,58 @@ const PROMETHEUS_CTR_DATA: &str = "/prometheus";
const PROMETHEUS_CTR_LABEL: &str = "snops_prometheus";
const PROMETHEUS_CTR_LABEL_VALUE: &str = "snops_prometheus=control_plane";

// TODO: clean this function up, possibly make config zero-copy, or use json!
// macro or something
/// Save Prometheus config based on the current state of the control plane.
pub fn save_prometheus_config() {
// ...
pub async fn generate_prometheus_config(state: &GlobalState) -> PrometheusConfig {
let envs = state.envs.read().await;

let mut scrape_configs = vec![ScrapeConfig {
job_name: "prometheus".into(),
honor_timestamps: Some(true),
scrape_interval: None,
scrape_timeout: None,
metrics_path: Some("/metrics".into()),
scheme: Some("http".into()),
follow_redirects: Some(true),
static_configs: vec![StaticConfig {
targets: vec!["localhost:9090".into()],
labels: Default::default(),
}],
}];

for env in envs.iter() {
for (key, peer) in env.1.node_map.iter() {
let EnvPeer::Internal(agent_id) = peer else {
// TODO: support scraping from external peers
continue;
};

let mut labels = IndexMap::new();
labels.insert("env".into(), env.0.to_string());
labels.insert("agent".into(), agent_id.to_string());

// TODO: CLEANUP, possibly zero copy config
scrape_configs.push(ScrapeConfig {
job_name: format!("snarkos_env{}_{}", env.0, key),
metrics_path: Some(format!("/api/v1/agents/{}/metrics", agent_id)),
honor_timestamps: Some(true),
scrape_interval: None,
scrape_timeout: None,
scheme: Some("http".into()),
follow_redirects: Some(true),
static_configs: vec![StaticConfig {
targets: vec![format!("host.docker.internal:{}", state.cli.port)],
labels,
}],
});
}
}

PrometheusConfig {
global: Default::default(),
scrape_configs,
}
}

/// Initialize the Prometheus container.
Expand Down Expand Up @@ -58,8 +111,19 @@ pub async fn init(state: &GlobalState) -> anyhow::Result<()> {

info!("found an existing prometheus container");

let id = container.id.unwrap_or_default();

// start the container if it is not already running
match &container.state {
Some(state) if state == "Running" => (),
_ => {
docker.start_container::<&str>(&id, None).await?;
info!("started the matching prometheus container");
}
}

// save the container ID to state
*state.prom_ctr.lock().unwrap() = container.id.unwrap_or_default();
*state.prom_ctr.lock().unwrap() = id;

return Ok(());
}
Expand Down
33 changes: 33 additions & 0 deletions crates/snot/src/server/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub(super) fn routes() -> Router<AppState> {
Router::new()
.route("/agents", get(get_agents))
.route("/agents/:id/tps", get(get_agent_tps))
.route("/agents/:id/metrics", get(get_agent_metrics))
.route("/env/prepare", post(post_env_prepare))
.route("/env/:env_id/storage/:ty", get(redirect_storage))
.nest("/env/:env_id/cannons", redirect_cannon_routes())
Expand Down Expand Up @@ -53,6 +54,38 @@ async fn get_agents(state: State<AppState>) -> impl IntoResponse {
Json(json!({ "count": state.pool.read().await.len() }))
}

async fn get_agent_metrics(state: State<AppState>, Path(id): Path<AgentId>) -> Response {
let client = {
let pool = state.pool.read().await;
let Some(agent) = pool.get(&id) else {
return StatusCode::NOT_FOUND.into_response();
};

let Some(client) = agent.client_owned() else {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "agent is not an active node"})),
)
.into_response();
};

client
};

match client
.into_inner()
.get_metrics(tarpc::context::current())
.await
{
Ok(Ok(body)) => body.into_response(),
_ => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "could not fetch metrics"})),
)
.into_response(),
}
}

async fn get_agent_tps(state: State<AppState>, Path(id): Path<AgentId>) -> Response {
let pool = state.pool.read().await;
let Some(agent) = pool.get(&id) else {
Expand Down
4 changes: 0 additions & 4 deletions crates/snot/src/server/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ pub struct ControlRpcServer {
}

impl ControlService for ControlRpcServer {
async fn placeholder(self, _: context::Context) -> String {
"Hello, world".into()
}

async fn resolve_addrs(
self,
_: context::Context,
Expand Down
10 changes: 5 additions & 5 deletions crates/snot/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
use std::{
collections::{HashMap, HashSet},
net::IpAddr,
sync::Arc,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
},
sync::{Arc, Mutex},
time::Instant,
};

Expand Down Expand Up @@ -246,6 +242,10 @@ impl Agent {
}

impl AgentClient {
pub fn into_inner(self) -> AgentServiceClient {
self.0
}

pub async fn reconcile(
&self,
to: AgentState,
Expand Down

0 comments on commit fd711f1

Please sign in to comment.