Skip to content

Commit

Permalink
fix: accurately track offsets between filesystem restarts
Browse files Browse the repository at this point in the history
Adds a memory cache of offsets to accurately manage the current state of
offsets between filesystem restarts. This cache is completely separate
from the db-persisted cache used by the client.

Ref: LOG-17217
Signed-off-by: Jacob Hull <[email protected]>
  • Loading branch information
jakedipity committed Aug 19, 2023
1 parent cc2a4d2 commit f350543
Showing 1 changed file with 55 additions and 23 deletions.
78 changes: 55 additions & 23 deletions bin/src/_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ use middleware::Executor;

use pin_utils::pin_mut;
use rand::Rng;
use state::{AgentState, FileId, SpanVec};
use state::{AgentState, FileId, GetOffset, Span, SpanVec};
use std::collections::HashMap;
use std::convert::TryFrom;
use std::sync::Arc;
use tokio::signal::*;
use tokio::sync::Mutex;
Expand Down Expand Up @@ -312,12 +313,27 @@ pub async fn _main(
}
debug!("Initialised offset state");

let mut initial_offsets: Option<HashMap<FileId, SpanVec>> = None;

if let Some(offset_state) = offset_state {
match offset_state.offsets() {
Ok(offsets) => {
initial_offsets =
Some(offsets.into_iter().map(|fo| (fo.key, fo.offsets)).collect());
}
Err(e) => warn!("couldn't retrieve offsets from agent state, {:?}", e),
}
}

let fs_offsets: Arc<Mutex<HashMap<FileId, SpanVec>>> =
Arc::new(Mutex::new(initial_offsets.unwrap_or_default()));

let ds_source_params = (
config.log.dirs.clone(),
config.log.rules.clone(),
config.log.lookback.clone(),
offset_state,
fo_state_handles,
fs_offsets,
);

debug!("Creating fs_source");
Expand All @@ -331,28 +347,44 @@ pub async fn _main(
}
_ => false,
},
|(watched_dirs, rules, lookback, offset_state, fo_state_handles)| {
let mut initial_offsets: Option<HashMap<FileId, SpanVec>> = None;

if let Some(offset_state) = offset_state {
match offset_state.offsets() {
Ok(offsets) => {
initial_offsets =
Some(offsets.into_iter().map(|fo| (fo.key, fo.offsets)).collect());
}
Err(e) => warn!("couldn't retrieve offsets from agent state, {:?}", e),
}
}
|(watched_dirs, rules, lookback, fo_state_handles, fs_offsets)| {
let watched_dirs = watched_dirs.clone();
let rules = rules.clone();
let lookback = lookback.clone();
let deletion_ack_sender = deletion_ack_sender.clone();
let fo_state_handles = fo_state_handles.clone();
let fs_offsets = fs_offsets.clone();
async move {
let tailer = tail::Tailer::new(
watched_dirs,
rules,
lookback,
Some(fs_offsets.lock().await.clone()),
fo_state_handles,
deletion_ack_sender,
);

tail::process(tailer)
.expect("Failed to create FS Tailer")
.filter(move |line| {
let mut pair = (None, None);
if let Ok(line) = line {
pair = (line.get_key(), line.get_offset());
}

let tailer = tail::Tailer::new(
watched_dirs.clone(),
rules.clone(),
lookback.clone(),
initial_offsets,
fo_state_handles.clone(),
deletion_ack_sender.clone(),
);
async move { tail::process(tailer).expect("Failed to create FS Tailer") }
let fs_offsets = fs_offsets.clone();
async move {
if let (Some(key), Some(offsets)) = pair {
let mut span_vec = SpanVec::new();
if let Ok(offsets) = Span::try_from(offsets) {
span_vec.insert(offsets);
fs_offsets.lock().await.insert(FileId::from(key), span_vec);
}
}
true
}
})
}
},
config.log.clear_cache_interval, // we restart tailer to clear fs cache
)
Expand Down

0 comments on commit f350543

Please sign in to comment.