Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(logs): Loki integration #108

Merged
merged 8 commits into from
Apr 28, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions crates/aot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@ snarkos-node = { workspace = true, optional = true }
snarkvm.workspace = true
snops-common.workspace = true
tokio.workspace = true
tracing-flame = { workspace = true, optional = true }
tracing.workspace = true
tracing-appender.workspace = true
tracing-flame = { workspace = true, optional = true }
tracing-loki = "0.2.4"
tracing-subscriber.workspace = true
tracing.workspace = true
rocksdb = { workspace = true, features = ["lz4"] }

[target.'cfg(all(target_os = "linux", target_arch = "x86_64"))'.dependencies]
Expand Down
63 changes: 49 additions & 14 deletions crates/aot/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@
use std::fs::File;
#[cfg(feature = "flame")]
use std::io::BufWriter;
use std::{
io::{self},
path::PathBuf,
};
use std::{io, path::PathBuf, thread};

use anyhow::Result;
use clap::Parser;
use crossterm::tty::IsTty;
use reqwest::Url;
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{layer::SubscriberExt, Layer};

Expand All @@ -31,6 +29,9 @@ pub struct Cli {
#[arg(long, default_value_t = 4)]
pub verbosity: u8,

#[arg(long)]
pub loki: Option<Url>,

#[clap(subcommand)]
pub command: Command,
}
Expand Down Expand Up @@ -62,6 +63,11 @@ impl Flushable for tracing_flame::FlushGuard<BufWriter<File>> {
}
}

#[cfg(feature = "flame")]
type FlameGuard = Box<dyn Flushable>;
#[cfg(not(feature = "flame"))]
type FlameGuard = ();

impl Cli {
/// Initializes the logger.
///
Expand All @@ -74,7 +80,7 @@ impl Cli {
/// 5 => info, debug, trace, snarkos_node_router=trace
/// 6 => info, debug, trace, snarkos_node_tcp=trace
/// ```
pub fn init_logger(&self) -> (Box<dyn Flushable>, Vec<WorkerGuard>) {
pub fn init_logger(&self) -> (FlameGuard, Vec<WorkerGuard>) {
let verbosity = self.verbosity;

match verbosity {
Expand All @@ -84,7 +90,7 @@ impl Cli {
};

// Filter out undesirable logs. (unfortunately EnvFilter cannot be cloned)
let [filter, filter2] = std::array::from_fn(|_| {
let [filter, filter2, filter3] = std::array::from_fn(|_| {
let filter = tracing_subscriber::EnvFilter::from_default_env()
.add_directive("mio=off".parse().unwrap())
.add_directive("tokio_util=off".parse().unwrap())
Expand Down Expand Up @@ -129,6 +135,13 @@ impl Cli {
let mut layers = vec![];
let mut guards = vec![];

macro_rules! non_blocking_appender {
($name:ident = ( $args:expr )) => {
let ($name, guard) = tracing_appender::non_blocking($args);
guards.push(guard);
};
}

if cfg!(not(feature = "flame")) && self.enable_profiling {
// TODO should be an error
panic!("Flame feature is not enabled");
Expand All @@ -145,7 +158,7 @@ impl Cli {
};

#[cfg(not(feature = "flame"))]
let guard = Box::new(());
let guard = ();

if let Some(logfile) = self.log.as_ref() {
// Create the directories tree for a logfile if it doesn't exist.
Expand All @@ -158,14 +171,13 @@ impl Cli {
}

let file_appender = tracing_appender::rolling::daily(logfile_dir, logfile);
let (non_blocking, file_guard) = tracing_appender::non_blocking(file_appender);
guards.push(file_guard);
non_blocking_appender!(log_writer = (file_appender));

// Add layer redirecting logs to the file
layers.push(
tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_writer(non_blocking)
.with_writer(log_writer)
.with_filter(filter2)
.boxed(),
);
Expand All @@ -174,8 +186,7 @@ impl Cli {
// Initialize tracing.
// Add layer using LogWriter for stdout / terminal
if matches!(self.command, Command::Run(_)) {
let (stdout, g) = tracing_appender::non_blocking(io::stdout());
guards.push(g);
non_blocking_appender!(stdout = (io::stdout()));

layers.push(
tracing_subscriber::fmt::layer()
Expand All @@ -185,11 +196,35 @@ impl Cli {
.boxed(),
);
} else {
let (stderr, g) = tracing_appender::non_blocking(io::stderr());
guards.push(g);
non_blocking_appender!(stderr = (io::stderr()));
layers.push(tracing_subscriber::fmt::layer().with_writer(stderr).boxed());
};

if let Some(loki) = &self.loki {
let mut builder = tracing_loki::builder();

let env_var = std::env::var("SNOPS_LOKI_LABELS").ok();
let fields = match &env_var {
Some(var) => var
.split(',')
.map(|item| item.split_once('=').unwrap_or((item, "")))
.collect(),
None => vec![],
};

for (key, value) in fields {
builder = builder.label(key, value).expect("bad loki label");
}

let (layer, task) = builder.build_url(loki.to_owned()).expect("bad loki url");
thread::spawn(|| {
let rt = tokio::runtime::Runtime::new().unwrap();
let handle = rt.spawn(task);
rt.block_on(handle).unwrap();
});
Comment on lines +220 to +224
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might cause a panic. Could happen when we run the node and it delegates a tokio task to this thread, but it already has a runtime.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about how to deal with this... Might switch to futures::executor::block_on

layers.push(Box::new(layer.with_filter(filter3)));
}

let subscriber = tracing_subscriber::registry::Registry::default().with(layers);

tracing::subscriber::set_global_default(subscriber).unwrap();
Expand Down
1 change: 1 addition & 0 deletions crates/snops-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ tokio-tungstenite.workspace = true
tracing-appender.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
url.workspace = true
1 change: 1 addition & 0 deletions crates/snops-agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ async fn main() {
cli: args,
endpoint,
jwt: Mutex::new(jwt),
loki: Default::default(),
env_info: Default::default(),
agent_state: Default::default(),
reconcilation_handle: Default::default(),
Expand Down
5 changes: 1 addition & 4 deletions crates/snops-agent/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ use std::{
time::Duration,
};

use tracing::warn;

use self::tps::TpsMetric;
use crate::state::GlobalState;

Expand Down Expand Up @@ -47,8 +45,7 @@ pub fn init(state: Arc<GlobalState>) {
.await
{
Ok(response) => response,
Err(e) => {
warn!("failed to scrape latest metrics: {e}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We no longer want this warning?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can re-add once we have something that checks if the node is actually running. At the moment it is always spam

Err(_e) => {
break 'metrics Default::default();
}
};
Expand Down
Loading