Skip to content

Commit

Permalink
improved log messages (#139)
Browse files Browse the repository at this point in the history
  • Loading branch information
bwsw authored Apr 18, 2024
1 parent f34c2f4 commit b2bd381
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 8 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ savant_core = { path = "savant_core" }
savant_core_py = { path = "savant_core_py" }

[workspace.package]
version = "0.2.25"
version = "0.2.26"
edition = "2021"
authors = ["Ivan Kudriavtsev <[email protected]>"]
description = "Savant Rust core functions library"
Expand Down
4 changes: 4 additions & 0 deletions python/zmq/zmq_reqrep_native.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
from savant_rs.utils import gen_frame
from savant_rs.utils.serialization import Message
from savant_rs.zmq import WriterConfigBuilder, ReaderConfigBuilder, BlockingWriter, BlockingReader
from savant_rs.logging import set_log_level
from savant_rs.logging import LogLevel

set_log_level(LogLevel.Info)

socket_name = "tcp://127.0.0.1:3333"

Expand Down
26 changes: 20 additions & 6 deletions savant_core/src/transport/zeromq/reader.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::bail;
use log::{debug, error, info, warn};
use std::str::from_utf8;
use std::time::Duration;
use zmq::Context;

Expand All @@ -8,6 +9,7 @@ use crate::transport::zeromq::{
create_ipc_dirs, set_ipc_permissions, MockSocketResponder, ReaderConfig, ReaderSocketType,
RoutingIdFilter, Socket, SocketProvider, CONFIRMATION_MESSAGE, ZMQ_LINGER,
};
use crate::utils::bytes_to_hex_string;
use mini_moka::sync::{Cache, CacheBuilder};

pub struct Reader<R: MockSocketResponder, P: SocketProvider<R>> {
Expand Down Expand Up @@ -143,10 +145,22 @@ impl<R: MockSocketResponder, P: SocketProvider<R> + Default> Reader<R, P> {
}

pub fn blacklist_source(&mut self, source: &[u8]) {
info!(
target: "savant_rs::zeromq::reader",
"Blacklisting source '{}' for endpoint '{}'",
from_utf8(source).unwrap_or(&bytes_to_hex_string(source)),
self.config.endpoint()
);
self.source_blacklist_cache.insert(source.to_vec(), ());
}

pub fn is_blacklisted(&self, source: &[u8]) -> bool {
debug!(
target: "savant_rs::zeromq::reader",
"Checking if source '{}' is blacklisted for endpoint '{}'",
from_utf8(source).unwrap_or(&bytes_to_hex_string(source)),
self.config.endpoint()
);
self.source_blacklist_cache.contains_key(&source.to_vec())
}

Expand Down Expand Up @@ -222,7 +236,7 @@ impl<R: MockSocketResponder, P: SocketProvider<R> + Default> Reader<R, P> {
debug!(
target: "savant_rs::zeromq::reader",
"Received message from blacklisted source {:?} from ZeroMQ socket for endpoint {}",
topic,
from_utf8(topic).unwrap_or(&bytes_to_hex_string(topic)),
self.config.endpoint()
);

Expand Down Expand Up @@ -260,10 +274,10 @@ impl<R: MockSocketResponder, P: SocketProvider<R> + Default> Reader<R, P> {
if !self.config.topic_prefix_spec().matches(topic) {
debug!(
target: "savant_rs::zeromq::reader",
"Received message with invalid topic from ZeroMQ socket for endpoint {}. Expected topic to match spec {:?}, but got {:?}",
"Received message with invalid topic from ZeroMQ socket for endpoint {}. Expected topic to match spec {:?}, but got {}",
self.config.endpoint(),
self.config.topic_prefix_spec(),
topic
from_utf8(topic).unwrap_or(&bytes_to_hex_string(topic))
);

if self.config.socket_type() == &ReaderSocketType::Rep {
Expand All @@ -290,10 +304,10 @@ impl<R: MockSocketResponder, P: SocketProvider<R> + Default> Reader<R, P> {
} else {
debug!(
target: "savant_rs::zeromq::reader",
"Received message with invalid routing ID from ZeroMQ socket for endpoint {}. Got topic = {:?}, routing_id = {:?}",
"Received message with invalid routing ID from ZeroMQ socket for endpoint {}. Got topic = {}, routing_id = {}",
self.config.endpoint(),
topic,
routing_id
from_utf8(topic).unwrap_or(&bytes_to_hex_string(topic)),
routing_id.map(|r| bytes_to_hex_string(r)).unwrap_or(String::new())
);

Ok(ReaderResult::routing_id_mismatch(topic, &routing_id))
Expand Down
6 changes: 5 additions & 1 deletion savant_core/src/transport/zeromq/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ use crate::transport::zeromq::{
create_ipc_dirs, set_ipc_permissions, MockSocketResponder, Socket, SocketProvider,
WriterConfig, WriterSocketType, CONFIRMATION_MESSAGE, ZMQ_LINGER,
};
use crate::utils::bytes_to_hex_string;
use anyhow::bail;
use log::{debug, info, warn};
use std::str::from_utf8;

pub struct Writer<R: MockSocketResponder, P: SocketProvider<R>> {
context: Option<zmq::Context>,
Expand Down Expand Up @@ -151,7 +153,9 @@ impl<R: MockSocketResponder, P: SocketProvider<R> + Default> Writer<R, P> {
.collect::<Vec<_>>();
debug!(
target: "savant_rs::zeromq::writer",
"Sending message to ZeroMQ socket: {:?} {:?}", topic, m);
"Sending message to ZeroMQ socket: {} {:?}",
from_utf8(topic).unwrap_or(&bytes_to_hex_string(topic)),
m);
let mut send_retries = *self.config.send_retries();
while send_retries >= 0 {
let res = socket.send_multipart(&parts, 0);
Expand Down
8 changes: 8 additions & 0 deletions savant_core/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,10 @@
pub mod default_once;
pub mod iter;
use std::fmt::Write;

pub fn bytes_to_hex_string(bytes: &[u8]) -> String {
bytes.iter().fold(String::new(), |mut output, b| {
let _ = write!(output, "{b:02X}");
output
})
}

0 comments on commit b2bd381

Please sign in to comment.