Skip to content

Commit

Permalink
chore: print time taken for building the list of actions to process
Browse files Browse the repository at this point in the history
  • Loading branch information
gk-kindred committed Nov 6, 2024
1 parent 100f869 commit 8a5cb34
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 15 deletions.
26 changes: 13 additions & 13 deletions packages/talos_messenger_actions/src/kafka/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,19 @@ impl ProducerContext for MessengerProducerContext {
match result {
Ok(msg) => {
info!("Message {:?} {:?}", msg.key(), msg.offset());
if let Some(headers) = msg.headers() {
if let Some(h1) = headers.iter().find(|x| x.key.eq("messengerEndOnCommitActionsTimestamp")) {
let t1 = String::from_utf8_lossy(h1.value.unwrap()).to_string();
let ts1 = OffsetDateTime::parse(&t1, &Rfc3339).unwrap();
let ts2 = OffsetDateTime::now_utc();
let diff_time = ts2 - ts1;
warn!(
"Time taken between publish version={:?} from start to end is {:?}ms",
msg.offset(),
diff_time.as_seconds_f64() * 1_000_f64
);
};
};
// if let Some(headers) = msg.headers() {
// if let Some(h1) = headers.iter().find(|x| x.key.eq("messengerEndOnCommitActionsTimestamp")) {
// let t1 = String::from_utf8_lossy(h1.value.unwrap()).to_string();
// let ts1 = OffsetDateTime::parse(&t1, &Rfc3339).unwrap();
// let ts2 = OffsetDateTime::now_utc();
// let diff_time = ts2 - ts1;
// warn!(
// "Time taken between publish version={:?} from start to end is {:?}ms",
// msg.offset(),
// diff_time.as_seconds_f64() / 1_000_f64
// );
// };
// };

// Safe to ignore error check, as error occurs only if receiver is closed or dropped, which would happen if the thread receving has errored. In such a scenario, the publisher thread would also shutdown.
if let Err(error) = block_on(self.tx_feedback_channel.send(MessengerChannelFeedback::Success(version, "kafka".to_string()))) {
Expand Down
13 changes: 11 additions & 2 deletions packages/talos_messenger_core/src/suffix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use ahash::{HashMap, HashMapExt};
use log::{debug, warn};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::fmt::Debug;
use std::{fmt::Debug, time::Instant};
use strum::{Display, EnumString};
use talos_certifier::model::{CandidateMessage, Decision, DecisionMessageTrait};
use talos_suffix::{core::SuffixResult, Suffix, SuffixTrait};
Expand Down Expand Up @@ -265,6 +265,7 @@ where

let start_index = current_prune_index.unwrap_or(0);

let start_ms = Instant::now();
let items: Vec<ActionsMapWithVersion> = self
.messages
.range(start_index..)
Expand All @@ -273,7 +274,7 @@ where
// Filter only the items awaiting to be processed.
.filter(|&x| x.item.get_state().eq(&SuffixItemState::ReadyToProcess))
// Take while contiguous ones, whose safepoint is already processed.
.filter(|&x| {
.take_while(|&x| {
let Some(safepoint) = x.item.get_safepoint() else {
return false;
};
Expand Down Expand Up @@ -308,6 +309,14 @@ where
})
.collect();

warn!(
"Total items in suffix ={} | Current_prune_index = {} | total items to process ={} | time taken to build the list ={}ms",
self.suffix_length(),
start_index,
items.len(),
start_ms.elapsed().as_millis()
);

items
}

Expand Down

0 comments on commit 8a5cb34

Please sign in to comment.