Skip to content

Commit

Permalink
feat: loki lokating
Browse files Browse the repository at this point in the history
Signed-off-by: Zander Franks <[email protected]>
  • Loading branch information
voximity committed Apr 10, 2024
1 parent f0ab418 commit 40eecdb
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 11 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/snops-agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ async fn main() {
cli: args,
endpoint,
jwt: Mutex::new(jwt),
loki: Default::default(),
env_to_storage: Default::default(),
agent_state: Default::default(),
reconcilation_handle: Default::default(),
Expand Down
27 changes: 21 additions & 6 deletions crates/snops-agent/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub struct AgentRpcServer {

impl AgentService for AgentRpcServer {
async fn handshake(self, _: context::Context, handshake: Handshake) {
// store and cache JWT
if let Some(token) = handshake.jwt {
// cache the JWT in the state JWT mutex
self.state
Expand All @@ -57,6 +58,15 @@ impl AgentService for AgentRpcServer {
.await
.expect("failed to write jwt file");
}

// store loki server URL
if let Some(loki) = handshake.loki {
self.state
.loki
.lock()
.expect("failed to acquire loki URL lock")
.replace(loki);
}
}

async fn reconcile(
Expand Down Expand Up @@ -186,16 +196,21 @@ impl AgentService for AgentRpcServer {
state.cli.path.join(LEDGER_BASE_DIR)
};

// add loki URL if one is set
if let Some(loki) = &*state.loki.lock().unwrap() {
command
.env(
"SNOPS_LOKI_LABELS",
format!("env_id={},node_key={}", env_id, node.node_key),
)
.arg("--loki")
.arg(loki.as_str());
}

command
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.envs(&node.env)
.env(
"SNOPS_LOKI_LABELS",
format!("env_id={},node_key={}", env_id, node.node_key),
)
.arg("--loki")
.arg("http://127.0.0.1:3100/")
.arg("--log")
.arg(state.cli.path.join(SNARKOS_LOG_FILE))
.arg("run")
Expand Down
2 changes: 2 additions & 0 deletions crates/snops-agent/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{
sync::{Arc, Mutex},
};

use reqwest::Url;
use snops_common::{
api::StorageInfo,
rpc::control::ControlServiceClient,
Expand All @@ -28,6 +29,7 @@ pub struct GlobalState {
pub cli: Cli,
pub endpoint: SocketAddr,
pub jwt: Mutex<Option<String>>,
pub loki: Mutex<Option<Url>>,
pub agent_state: RwLock<AgentState>,
pub env_to_storage: RwLock<HashMap<usize, StorageInfo>>,
pub reconcilation_handle: AsyncMutex<Option<AbortHandle>>,
Expand Down
1 change: 1 addition & 0 deletions crates/snops-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ strum_macros.workspace = true
tarpc.workspace = true
thiserror.workspace = true
tokio.workspace = true
url.workspace = true
2 changes: 2 additions & 0 deletions crates/snops-common/src/rpc/agent.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use std::net::IpAddr;

use serde::{Deserialize, Serialize};
use url::Url;

use super::error::*;
use crate::state::{AgentState, PortConfig};

#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct Handshake {
pub jwt: Option<String>,
pub loki: Option<Url>,
}

/// The RPC service that agents implement as a server.
Expand Down
13 changes: 10 additions & 3 deletions crates/snops/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
use std::{fmt::Display, net::SocketAddr, path::PathBuf, str::FromStr};
use std::{fmt::Display, path::PathBuf, str::FromStr};

use clap::Parser;
use url::Url;

#[derive(Debug, Parser)]
pub struct Cli {
/// Control plane server port
#[arg(long, default_value_t = 1234)]
pub port: u16,

/// Optional IP:port that a Prometheus server is running on
// TODO: store services in a file config or something?
/// Optional URL referencing a Prometheus server
#[arg(long)]
pub prometheus: Option<SocketAddr>,
pub prometheus: Option<Url>,

// TODO: clarify that this needs to be an IP that agents can reach (handle external/internal?)
/// Optional URL referencing a Loki server
#[arg(long)]
pub loki: Option<Url>,

#[arg(long, default_value_t = PrometheusLocation::Docker)]
pub prometheus_location: PrometheusLocation,
Expand Down
9 changes: 7 additions & 2 deletions crates/snops/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use axum::{
Router,
};
use futures_util::stream::StreamExt;
use prometheus_http_query::Client as PrometheusClient;
use serde::Deserialize;
use snops_common::{
prelude::*,
Expand Down Expand Up @@ -52,7 +53,8 @@ pub async fn start(cli: Cli) -> Result<(), StartError> {

let prometheus = cli
.prometheus
.and_then(|p| prometheus_http_query::Client::try_from(format!("http://{p}")).ok()); // TODO: https
.as_ref()
.and_then(|p| PrometheusClient::try_from(p.as_str()).ok());

let state = GlobalState {
cli,
Expand Down Expand Up @@ -152,7 +154,10 @@ async fn handle_socket(
let id: AgentId = 'insertion: {
let client = client.clone();
let mut pool = state.pool.write().await;
let mut handshake = Handshake::default();
let mut handshake = Handshake {
loki: state.cli.loki.clone(),
..Default::default()
};

// attempt to reconnect if claims were passed
'reconnect: {
Expand Down

0 comments on commit 40eecdb

Please sign in to comment.