Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(logs): Loki integration #108

Merged
merged 8 commits into from
Apr 28, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions crates/aot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ snarkos-node = { workspace = true, optional = true }
snarkvm.workspace = true
snops-common.workspace = true
tokio.workspace = true
tracing-flame = { workspace = true, optional = true }
tracing.workspace = true
tracing-appender.workspace = true
tracing-flame = { workspace = true, optional = true }
tracing-loki = "0.2.4"
tracing-subscriber.workspace = true
tracing.workspace = true
rocksdb = { workspace = true, features = ["lz4"] }

[target.'cfg(all(target_os = "linux", target_arch = "x86_64"))'.dependencies]
Expand Down
48 changes: 39 additions & 9 deletions crates/aot/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::{
use anyhow::Result;
use clap::Parser;
use crossterm::tty::IsTty;
use reqwest::Url;
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{layer::SubscriberExt, Layer};

Expand All @@ -28,6 +29,9 @@ pub struct Cli {
#[arg(long, default_value_t = 4)]
pub verbosity: u8,

#[arg(long)]
pub loki: Option<Url>,

#[clap(subcommand)]
pub command: Command,
}
Expand Down Expand Up @@ -125,6 +129,13 @@ impl Cli {
let mut layers = vec![];
let mut guards = vec![];

macro_rules! non_blocking_appender {
($name:ident = ( $args:expr )) => {
let ($name, guard) = tracing_appender::non_blocking($args);
guards.push(guard);
};
}

if cfg!(not(feature = "flame")) && self.enable_profiling {
// TODO should be an error
panic!("Flame feature is not enabled");
Expand Down Expand Up @@ -154,14 +165,13 @@ impl Cli {
}

let file_appender = tracing_appender::rolling::daily(logfile_dir, logfile);
let (non_blocking, file_guard) = tracing_appender::non_blocking(file_appender);
guards.push(file_guard);
non_blocking_appender!(log_writer = (file_appender));

// Add layer redirecting logs to the file
layers.push(
tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_writer(non_blocking)
.with_writer(log_writer)
.with_filter(filter2)
.boxed(),
);
Expand All @@ -170,8 +180,7 @@ impl Cli {
// Initialize tracing.
// Add layer using LogWriter for stdout / terminal
if matches!(self.command, Command::Run(_)) {
let (stdout, g) = tracing_appender::non_blocking(io::stdout());
guards.push(g);
non_blocking_appender!(stdout = (io::stdout()));

layers.push(
tracing_subscriber::fmt::layer()
Expand All @@ -181,25 +190,46 @@ impl Cli {
.boxed(),
);
} else {
let (stderr, g) = tracing_appender::non_blocking(io::stderr());
guards.push(g);
non_blocking_appender!(stderr = (io::stderr()));
layers.push(tracing_subscriber::fmt::layer().with_writer(stderr).boxed());
};

if let Some(loki) = &self.loki {
let mut builder = tracing_loki::builder();

let env_var = std::env::var("SNOPS_LOKI_LABELS").ok();
let fields = match &env_var {
Some(var) => var
.split(',')
.map(|item| item.split_once('=').unwrap_or((item, "")))
.collect(),
None => vec![],
};

for (key, value) in fields {
builder = builder.label(key, value).expect("bad loki label");
}

let (layer, task) = builder.build_url(loki.to_owned()).expect("bad loki url");
tokio::task::spawn(task);
layers.push(Box::new(layer));
}

let subscriber = tracing_subscriber::registry::Registry::default().with(layers);

tracing::subscriber::set_global_default(subscriber).unwrap();
(guard, guards)
}

pub fn run(self) -> Result<()> {
#[tokio::main]
pub async fn run(self) -> Result<()> {
let _guards = self.init_logger();

match self.command {
Command::Genesis(command) => command.parse(),
Command::Ledger(command) => command.parse(),
#[cfg(feature = "node")]
Command::Run(command) => command.parse(),
Command::Run(command) => command.parse().await,
Command::Execute(command) => command.parse(),
Command::Authorize(command) => {
println!("{}", serde_json::to_string(&command.parse()?)?);
Expand Down
1 change: 0 additions & 1 deletion crates/aot/src/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ pub struct Runner {
}

impl Runner {
#[tokio::main]
pub async fn parse(self) -> Result<()> {
let bind_addr = self.bind_addr;
let node_ip = SocketAddr::new(bind_addr, self.node);
Expand Down
1 change: 1 addition & 0 deletions crates/snops-agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ async fn main() {
cli: args,
endpoint,
jwt: Mutex::new(jwt),
loki: Default::default(),
env_to_storage: Default::default(),
agent_state: Default::default(),
reconcilation_handle: Default::default(),
Expand Down
21 changes: 21 additions & 0 deletions crates/snops-agent/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub struct AgentRpcServer {

impl AgentService for AgentRpcServer {
async fn handshake(self, _: context::Context, handshake: Handshake) {
// store and cache JWT
if let Some(token) = handshake.jwt {
// cache the JWT in the state JWT mutex
self.state
Expand All @@ -57,6 +58,15 @@ impl AgentService for AgentRpcServer {
.await
.expect("failed to write jwt file");
}

// store loki server URL
if let Some(loki) = handshake.loki {
self.state
.loki
.lock()
.expect("failed to acquire loki URL lock")
.replace(loki);
}
}

async fn reconcile(
Expand Down Expand Up @@ -186,6 +196,17 @@ impl AgentService for AgentRpcServer {
state.cli.path.join(LEDGER_BASE_DIR)
};

// add loki URL if one is set
if let Some(loki) = &*state.loki.lock().unwrap() {
command
.env(
"SNOPS_LOKI_LABELS",
format!("env_id={},node_key={}", env_id, node.node_key),
)
.arg("--loki")
.arg(loki.as_str());
}

command
.stdout(Stdio::piped())
.stderr(Stdio::piped())
Expand Down
2 changes: 2 additions & 0 deletions crates/snops-agent/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{
sync::{Arc, Mutex},
};

use reqwest::Url;
use snops_common::{
api::StorageInfo,
rpc::control::ControlServiceClient,
Expand All @@ -28,6 +29,7 @@ pub struct GlobalState {
pub cli: Cli,
pub endpoint: SocketAddr,
pub jwt: Mutex<Option<String>>,
pub loki: Mutex<Option<Url>>,
pub agent_state: RwLock<AgentState>,
pub env_to_storage: RwLock<HashMap<usize, StorageInfo>>,
pub reconcilation_handle: AsyncMutex<Option<AbortHandle>>,
Expand Down
1 change: 1 addition & 0 deletions crates/snops-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ strum_macros.workspace = true
tarpc.workspace = true
thiserror.workspace = true
tokio.workspace = true
url.workspace = true
2 changes: 2 additions & 0 deletions crates/snops-common/src/rpc/agent.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use std::net::IpAddr;

use serde::{Deserialize, Serialize};
use url::Url;

use super::error::*;
use crate::state::{AgentState, PortConfig};

#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct Handshake {
pub jwt: Option<String>,
pub loki: Option<Url>,
}

/// The RPC service that agents implement as a server.
Expand Down
13 changes: 10 additions & 3 deletions crates/snops/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
use std::{fmt::Display, net::SocketAddr, path::PathBuf, str::FromStr};
use std::{fmt::Display, path::PathBuf, str::FromStr};

use clap::Parser;
use url::Url;

#[derive(Debug, Parser)]
pub struct Cli {
/// Control plane server port
#[arg(long, default_value_t = 1234)]
pub port: u16,

/// Optional IP:port that a Prometheus server is running on
// TODO: store services in a file config or something?
/// Optional URL referencing a Prometheus server
#[arg(long)]
pub prometheus: Option<SocketAddr>,
pub prometheus: Option<Url>,

// TODO: clarify that this needs to be an IP that agents can reach (handle external/internal?)
/// Optional URL referencing a Loki server
#[arg(long)]
pub loki: Option<Url>,

#[arg(long, default_value_t = PrometheusLocation::Docker)]
pub prometheus_location: PrometheusLocation,
Expand Down
Loading