Skip to content

Commit

Permalink
fix: log flood "hit rate limit snoozing" (#550)
Browse files Browse the repository at this point in the history
* feat: print env config to log at startup (#548)
* fix:  missing _REGEX suffix in envvar names:
LINE_EXCLUSION_REGEX,
LINE_INCLUSION_REGEX,
REDACT_REGEX,
* fix: log flood "hit rate limit, snoozing"

Ref: LOG-11879, LOG-18170
  • Loading branch information
dkhokhlov authored Sep 19, 2023
1 parent d42c9cc commit d28f51a
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 60 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions common/config/src/argv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,12 @@ pub struct ArgumentOptions {

/// List of regex patterns to exclude log lines.
/// When set, the Agent will NOT send log lines that match any of these patterns.
#[structopt(long, env = env_vars::LINE_EXCLUSION)]
#[structopt(long, env = env_vars::LINE_EXCLUSION_REGEX)]
line_exclusion: Vec<String>,

/// List of regex patterns to include log lines.
/// When set, the Agent will send ONLY log lines that match any of these patterns.
#[structopt(long, env = env_vars::LINE_INCLUSION)]
#[structopt(long, env = env_vars::LINE_INCLUSION_REGEX)]
line_inclusion: Vec<String>,

/// List of Kubernetes pod metadata to include in log lines.
Expand All @@ -189,7 +189,7 @@ pub struct ArgumentOptions {

/// List of regex patterns used to mask matching sensitive information (such as PII) before
/// sending it in the log line.
#[structopt(long, env = env_vars::REDACT)]
#[structopt(long, env = env_vars::REDACT_REGEX)]
line_redact: Vec<String>,

/// Show the current agent settings from the configuration sources (default config file
Expand Down
125 changes: 79 additions & 46 deletions common/config/src/env_vars.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,68 @@
pub const INGESTION_KEY: &str = "MZ_INGESTION_KEY";
pub const CONFIG_FILE: &str = "MZ_CONFIG_FILE";
pub const LOG_DIRS: &str = "MZ_LOG_DIRS";
pub const TAGS: &str = "MZ_TAGS";
pub const HOST: &str = "MZ_HOST";
pub const ENDPOINT: &str = "MZ_ENDPOINT";
pub const USE_SSL: &str = "MZ_USE_SSL";
pub const USE_COMPRESSION: &str = "MZ_USE_COMPRESSION";
pub const GZIP_LEVEL: &str = "MZ_GZIP_LEVEL";
pub const EXCLUSION_RULES: &str = "MZ_EXCLUSION_RULES";
pub const EXCLUSION_REGEX_RULES: &str = "MZ_EXCLUSION_REGEX_RULES";
pub const INCLUSION_RULES: &str = "MZ_INCLUSION_RULES";
pub const INCLUSION_REGEX_RULES: &str = "MZ_INCLUSION_REGEX_RULES";
pub const K8S_METADATA_LINE_INCLUSION: &str = "MZ_K8S_METADATA_LINE_INCLUSION";
pub const K8S_METADATA_LINE_EXCLUSION: &str = "MZ_K8S_METADATA_LINE_EXCLUSION";
pub const HOSTNAME: &str = "MZ_HOSTNAME";
pub const IP: &str = "MZ_IP";
pub const MAC: &str = "MZ_MAC";
pub const SYSTEMD_JOURNAL_TAILER: &str = "MZ_SYSTEMD_JOURNAL_TAILER";
pub const JOURNALD_PATHS: &str = "MZ_JOURNALD_PATHS";
pub const LOOKBACK: &str = "MZ_LOOKBACK";
pub const DB_PATH: &str = "MZ_DB_PATH";
pub const METRICS_PORT: &str = "MZ_METRICS_PORT";
pub const USE_K8S_LOG_ENRICHMENT: &str = "MZ_USE_K8S_LOG_ENRICHMENT";
pub const LOG_K8S_EVENTS: &str = "MZ_LOG_K8S_EVENTS";
pub const LOG_METRIC_SERVER_STATS: &str = "MZ_LOG_METRIC_SERVER_STATS";
pub const K8S_STARTUP_LEASE: &str = "MZ_K8S_STARTUP_LEASE";
pub const LINE_EXCLUSION: &str = "MZ_LINE_EXCLUSION_REGEX";
pub const LINE_INCLUSION: &str = "MZ_LINE_INCLUSION_REGEX";
pub const REDACT: &str = "MZ_REDACT_REGEX";
pub const INGEST_TIMEOUT: &str = "MZ_INGEST_TIMEOUT";
pub const INGEST_BUFFER_SIZE: &str = "MZ_INGEST_BUFFER_SIZE";
pub const RETRY_DIR: &str = "MZ_RETRY_DIR";
pub const RETRY_DISK_LIMIT: &str = "MZ_RETRY_DISK_LIMIT";
pub const INTERNAL_FS_DELAY: &str = "MZ_INTERNAL_FS_DELAY";
pub const CLEAR_CACHE_INTERVAL: &str = "MZ_CLEAR_CACHE_INTERVAL";
macro_rules! define_env_var {
($name:ident) => {
pub const $name: &str = concat!("MZ_", stringify!($name));
};
}

macro_rules! define_env_vars {
($($name:ident),+ $(,)? ) => {
$(
define_env_var!($name);
)*
pub const ENV_VARS_LIST: &'static [&'static str] = &[$($name),*];
};
}

// env vars names prefixed with "MZ_"
define_env_vars!(
INGESTION_KEY,
CONFIG_FILE,
LOG_DIRS,
TAGS,
HOST,
ENDPOINT,
USE_SSL,
USE_COMPRESSION,
GZIP_LEVEL,
EXCLUSION_RULES,
EXCLUSION_REGEX_RULES,
INCLUSION_RULES,
INCLUSION_REGEX_RULES,
K8S_METADATA_LINE_INCLUSION,
K8S_METADATA_LINE_EXCLUSION,
HOSTNAME,
IP,
MAC,
SYSTEMD_JOURNAL_TAILER,
JOURNALD_PATHS,
LOOKBACK,
DB_PATH,
METRICS_PORT,
USE_K8S_LOG_ENRICHMENT,
LOG_K8S_EVENTS,
LOG_METRIC_SERVER_STATS,
K8S_STARTUP_LEASE,
LINE_EXCLUSION_REGEX,
LINE_INCLUSION_REGEX,
REDACT_REGEX,
INGEST_TIMEOUT,
INGEST_BUFFER_SIZE,
RETRY_DIR,
RETRY_DISK_LIMIT,
INTERNAL_FS_DELAY,
CLEAR_CACHE_INTERVAL,
METADATA_RETRY_DELAY,
META_APP,
META_HOST,
META_ENV,
META_FILE,
META_K8S_FILE,
META_JSON,
META_ANNOTATIONS,
META_LABELS,
NO_CAP,
MOCK_NO_PODS
);

// unused or deprecated
pub const INGESTION_KEY_ALTERNATE: &str = "LOGDNA_AGENT_KEY";
Expand All @@ -50,13 +79,17 @@ pub const EXCLUSION_REGEX_RULES_DEPRECATED: &str = "LOGDNA_EXCLUDE_REGEX";
pub const INCLUSION_RULES_DEPRECATED: &str = "LOGDNA_INCLUDE";
pub const INCLUSION_REGEX_RULES_DEPRECATED: &str = "LOGDNA_INCLUDE_REGEX";

pub const META_APP: &str = "MZ_META_APP";
pub const META_HOST: &str = "MZ_META_HOST";
pub const META_ENV: &str = "MZ_META_ENV";
pub const META_FILE: &str = "MZ_META_FILE";
pub const META_K8S_FILE: &str = "MZ_META_K8S_FILE";
pub const META_JSON: &str = "MZ_META_JSON";
pub const META_ANNOTATIONS: &str = "MZ_META_ANNOTATIONS";
pub const META_LABELS: &str = "MZ_META_LABELS";

pub const NO_CAP: &str = "MZ_NO_CAP";
pub(crate) const DEPRECATED_ENV_VARS_LIST: &[&str] = &[
INGESTION_KEY_ALTERNATE,
CONFIG_FILE_DEPRECATED,
HOST_DEPRECATED,
IBM_HOST_DEPRECATED,
ENDPOINT_DEPRECATED,
USE_SSL_DEPRECATED,
GZIP_LEVEL_DEPRECATED,
LOG_DIRS_DEPRECATED,
EXCLUSION_RULES_DEPRECATED,
EXCLUSION_REGEX_RULES_DEPRECATED,
INCLUSION_RULES_DEPRECATED,
INCLUSION_REGEX_RULES_DEPRECATED,
];
20 changes: 18 additions & 2 deletions common/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,24 @@ impl Config {
print_settings(&yaml_str, &config_path);
}

let env_config: String = env_vars::ENV_VARS_LIST
.iter()
.chain(env_vars::DEPRECATED_ENV_VARS_LIST.iter())
.filter_map(|&key| {
std::env::var(key).ok().map(|value| {
if key.contains("KEY") || key.contains("PIN") {
format!("{}: REDACTED", key)
} else {
format!("{}: {}", key, value)
}
})
})
.collect::<Vec<String>>()
.join("\n");
info!("env config: \n{}", env_config);

info!(
"read the following options from cli, env and config: \n{}",
"effective configuration collected from cli, env and config:\n{}",
yaml_str
);

Expand Down Expand Up @@ -670,7 +686,7 @@ mod tests {
.open(&path)
.unwrap();

guard(file, |mut file| {
let _ = guard(file, |mut file| {
let args = vec![OsString::new()];
serde_yaml::to_writer(&mut file, &RawConfig::default()).unwrap();
file.flush().unwrap();
Expand Down
12 changes: 6 additions & 6 deletions common/config/src/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ from_env_name!(K8S_METADATA_LINE_INCLUSION);
from_env_name!(K8S_METADATA_LINE_EXCLUSION);
from_env_name!(LOG_METRIC_SERVER_STATS);
from_env_name!(K8S_STARTUP_LEASE);
from_env_name!(LINE_EXCLUSION);
from_env_name!(LINE_INCLUSION);
from_env_name!(REDACT);
from_env_name!(LINE_EXCLUSION_REGEX);
from_env_name!(LINE_INCLUSION_REGEX);
from_env_name!(REDACT_REGEX);
from_env_name!(INGEST_TIMEOUT);
from_env_name!(INGEST_BUFFER_SIZE);
from_env_name!(RETRY_DIR);
Expand Down Expand Up @@ -270,14 +270,14 @@ fn from_property_map(map: HashMap<String, String>) -> Result<Config, ConfigError
result.log.db_path = map.get(&DB_PATH).map(PathBuf::from);
result.startup.option = map.get_string(&K8S_STARTUP_LEASE);

if let Some(value) = map.get(&LINE_EXCLUSION) {
if let Some(value) = map.get(&LINE_EXCLUSION_REGEX) {
let regex_rules = result.log.line_exclusion_regex.get_or_insert(Vec::new());
argv::split_by_comma(value)
.iter()
.for_each(|v| regex_rules.push(v.to_string()));
}

if let Some(value) = map.get(&LINE_INCLUSION) {
if let Some(value) = map.get(&LINE_INCLUSION_REGEX) {
let regex_rules = result.log.line_inclusion_regex.get_or_insert(Vec::new());
argv::split_by_comma(value)
.iter()
Expand All @@ -298,7 +298,7 @@ fn from_property_map(map: HashMap<String, String>) -> Result<Config, ConfigError
.for_each(|v| k8s_rules.push(v.to_string()));
}

if let Some(value) = map.get(&REDACT) {
if let Some(value) = map.get(&REDACT_REGEX) {
let regex_rules = result.log.line_redact_regex.get_or_insert(Vec::new());
argv::split_by_comma(value)
.iter()
Expand Down
1 change: 1 addition & 0 deletions common/http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ futures = "0.3"
futures-timer = "3"
prometheus = { version = "0.13", features = ["process"] }
async-compression = { version = "0.3.8", features = ["tokio"] }
rate-limit-macro = { version = "1" }

[dev-dependencies]
tempfile = "3"
Expand Down
7 changes: 4 additions & 3 deletions common/http/src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::Arc;
use metrics::Metrics;
use serde::{Serialize, Serializer};
use std::time::Duration;
use tracing::info;
use tracing::debug;

pub struct RateLimiter {
pub slots: Arc<AtomicUsize>,
Expand Down Expand Up @@ -128,8 +128,9 @@ impl Backoff {

pub async fn snooze(&self) {
let step = self.step.load(Ordering::SeqCst);
// TODO make debug
info!("hit rate limit, snoozing");
rate_limit_macro::rate_limit!(rate = 1, interval = 5, {
debug!("hit rate limit, snoozing");
});
tokio::time::sleep(Duration::from_millis(self.base.pow(step) * self.multipler)).await;
self.step.fetch_add(1, Ordering::SeqCst);
}
Expand Down

0 comments on commit d28f51a

Please sign in to comment.