From 40eecdba81e00ec7718615ad3b618019f39282b5 Mon Sep 17 00:00:00 2001 From: Zander Franks Date: Wed, 10 Apr 2024 01:08:56 -0500 Subject: [PATCH] feat: loki lokating Signed-off-by: Zander Franks --- Cargo.lock | 1 + crates/snops-agent/src/main.rs | 1 + crates/snops-agent/src/rpc.rs | 27 +++++++++++++++++++++------ crates/snops-agent/src/state.rs | 2 ++ crates/snops-common/Cargo.toml | 1 + crates/snops-common/src/rpc/agent.rs | 2 ++ crates/snops/src/cli.rs | 13 ++++++++++--- crates/snops/src/server/mod.rs | 9 +++++++-- 8 files changed, 45 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d93f807c..ddc1a55c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5984,6 +5984,7 @@ dependencies = [ "tarpc", "thiserror", "tokio", + "url", ] [[package]] diff --git a/crates/snops-agent/src/main.rs b/crates/snops-agent/src/main.rs index b619fb1f..02f54162 100644 --- a/crates/snops-agent/src/main.rs +++ b/crates/snops-agent/src/main.rs @@ -112,6 +112,7 @@ async fn main() { cli: args, endpoint, jwt: Mutex::new(jwt), + loki: Default::default(), env_to_storage: Default::default(), agent_state: Default::default(), reconcilation_handle: Default::default(), diff --git a/crates/snops-agent/src/rpc.rs b/crates/snops-agent/src/rpc.rs index 7004acd5..59a39560 100644 --- a/crates/snops-agent/src/rpc.rs +++ b/crates/snops-agent/src/rpc.rs @@ -45,6 +45,7 @@ pub struct AgentRpcServer { impl AgentService for AgentRpcServer { async fn handshake(self, _: context::Context, handshake: Handshake) { + // store and cache JWT if let Some(token) = handshake.jwt { // cache the JWT in the state JWT mutex self.state @@ -57,6 +58,15 @@ impl AgentService for AgentRpcServer { .await .expect("failed to write jwt file"); } + + // store loki server URL + if let Some(loki) = handshake.loki { + self.state + .loki + .lock() + .expect("failed to acquire loki URL lock") + .replace(loki); + } } async fn reconcile( @@ -186,16 +196,21 @@ impl AgentService for AgentRpcServer { state.cli.path.join(LEDGER_BASE_DIR) }; + // add loki URL if one is set + if let Some(loki) = &*state.loki.lock().unwrap() { + command + .env( + "SNOPS_LOKI_LABELS", + format!("env_id={},node_key={}", env_id, node.node_key), + ) + .arg("--loki") + .arg(loki.as_str()); + } + command .stdout(Stdio::piped()) .stderr(Stdio::piped()) .envs(&node.env) - .env( - "SNOPS_LOKI_LABELS", - format!("env_id={},node_key={}", env_id, node.node_key), - ) - .arg("--loki") - .arg("http://127.0.0.1:3100/") .arg("--log") .arg(state.cli.path.join(SNARKOS_LOG_FILE)) .arg("run") diff --git a/crates/snops-agent/src/state.rs b/crates/snops-agent/src/state.rs index a47fcda1..374cbcfb 100644 --- a/crates/snops-agent/src/state.rs +++ b/crates/snops-agent/src/state.rs @@ -4,6 +4,7 @@ use std::{ sync::{Arc, Mutex}, }; +use reqwest::Url; use snops_common::{ api::StorageInfo, rpc::control::ControlServiceClient, @@ -28,6 +29,7 @@ pub struct GlobalState { pub cli: Cli, pub endpoint: SocketAddr, pub jwt: Mutex>, + pub loki: Mutex>, pub agent_state: RwLock, pub env_to_storage: RwLock>, pub reconcilation_handle: AsyncMutex>, diff --git a/crates/snops-common/Cargo.toml b/crates/snops-common/Cargo.toml index 73db0f30..eeeae45a 100644 --- a/crates/snops-common/Cargo.toml +++ b/crates/snops-common/Cargo.toml @@ -16,3 +16,4 @@ strum_macros.workspace = true tarpc.workspace = true thiserror.workspace = true tokio.workspace = true +url.workspace = true diff --git a/crates/snops-common/src/rpc/agent.rs b/crates/snops-common/src/rpc/agent.rs index e8a671e5..2bf4a9a3 100644 --- a/crates/snops-common/src/rpc/agent.rs +++ b/crates/snops-common/src/rpc/agent.rs @@ -1,6 +1,7 @@ use std::net::IpAddr; use serde::{Deserialize, Serialize}; +use url::Url; use super::error::*; use crate::state::{AgentState, PortConfig}; @@ -8,6 +9,7 @@ use crate::state::{AgentState, PortConfig}; #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct Handshake { pub jwt: Option, + pub loki: Option, } /// The RPC service that agents implement as a server. diff --git a/crates/snops/src/cli.rs b/crates/snops/src/cli.rs index 52902b0f..81c6276d 100644 --- a/crates/snops/src/cli.rs +++ b/crates/snops/src/cli.rs @@ -1,6 +1,7 @@ -use std::{fmt::Display, net::SocketAddr, path::PathBuf, str::FromStr}; +use std::{fmt::Display, path::PathBuf, str::FromStr}; use clap::Parser; +use url::Url; #[derive(Debug, Parser)] pub struct Cli { @@ -8,9 +9,15 @@ pub struct Cli { #[arg(long, default_value_t = 1234)] pub port: u16, - /// Optional IP:port that a Prometheus server is running on + // TODO: store services in a file config or something? + /// Optional URL referencing a Prometheus server #[arg(long)] - pub prometheus: Option, + pub prometheus: Option, + + // TODO: clarify that this needs to be an IP that agents can reach (handle external/internal?) + /// Optional URL referencing a Loki server + #[arg(long)] + pub loki: Option, #[arg(long, default_value_t = PrometheusLocation::Docker)] pub prometheus_location: PrometheusLocation, diff --git a/crates/snops/src/server/mod.rs b/crates/snops/src/server/mod.rs index 2fc99eb6..365a4dcc 100644 --- a/crates/snops/src/server/mod.rs +++ b/crates/snops/src/server/mod.rs @@ -13,6 +13,7 @@ use axum::{ Router, }; use futures_util::stream::StreamExt; +use prometheus_http_query::Client as PrometheusClient; use serde::Deserialize; use snops_common::{ prelude::*, @@ -52,7 +53,8 @@ pub async fn start(cli: Cli) -> Result<(), StartError> { let prometheus = cli .prometheus - .and_then(|p| prometheus_http_query::Client::try_from(format!("http://{p}")).ok()); // TODO: https + .as_ref() + .and_then(|p| PrometheusClient::try_from(p.as_str()).ok()); let state = GlobalState { cli, @@ -152,7 +154,10 @@ async fn handle_socket( let id: AgentId = 'insertion: { let client = client.clone(); let mut pool = state.pool.write().await; - let mut handshake = Handshake::default(); + let mut handshake = Handshake { + loki: state.cli.loki.clone(), + ..Default::default() + }; // attempt to reconnect if claims were passed 'reconnect: {