Skip to content

Commit

Permalink
feat: add support for prefix_{}_suffix syntax
Browse files Browse the repository at this point in the history
test

test
  • Loading branch information
suxb201 committed Aug 13, 2024
1 parent fbff187 commit f13fd6a
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 20 deletions.
24 changes: 14 additions & 10 deletions src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ async fn run_commands_on_single_thread(limiter: Arc<ConnLimiter>, config: Client
// prepare pipeline
let mut p = Vec::new();
for _ in 0..pipeline_cnt {
p.push(cmd.gen_cmd());
if context.is_loading {
p.push(cmd.gen_cmd_with_lock());
} else {
p.push(cmd.gen_cmd());
}
}
let instant = std::time::Instant::now();
client.run_commands(p).await;
Expand All @@ -60,7 +64,7 @@ async fn run_commands_on_single_thread(limiter: Arc<ConnLimiter>, config: Client
local.await;
}

fn wait_finish(case: &Case, mut auto_connection: AutoConnection, mut context: SharedContext, mut wg: WaitGroup, load: bool, quiet: bool) -> BenchmarkResult {
fn wait_finish(case: &Case, mut auto_connection: AutoConnection, mut context: SharedContext, mut wg: WaitGroup, quiet: bool) -> BenchmarkResult {
let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
let mut result = BenchmarkResult::default();

Expand Down Expand Up @@ -93,10 +97,10 @@ fn wait_finish(case: &Case, mut auto_connection: AutoConnection, mut context: Sh
result.qps = (cnt - overall_cnt_overhead) as f64 / overall_time.elapsed().as_secs_f64();
}
if !quiet {
if load {
print!("\r\x1B[2KData loading qps: {:.0}, {:.2}%", qps, histogram.cnt() as f64 / case.count as f64 * 100f64);
if context.is_loading {
println!("\x1B[F\x1B[2KData loading qps: {:.0}, {:.2}%", qps, histogram.cnt() as f64 / case.count as f64 * 100f64);
} else {
print!("\r\x1B[2Kqps: {:.0}(overall {:.0}), conn: {}, {}", qps, result.qps, conn, histogram);
println!("\x1B[F\x1B[2Kqps: {:.0}(overall {:.0}), conn: {}, {}", qps, result.qps, conn, histogram);
}
}
std::io::stdout().flush().unwrap();
Expand All @@ -113,10 +117,10 @@ fn wait_finish(case: &Case, mut auto_connection: AutoConnection, mut context: Sh
}
}
let conn: u64 = auto_connection.active_conn();
if load {
print!("\r\x1B[2KData loaded, qps: {:.0}, time elapsed: {:.2}s\n", result.qps, overall_time.elapsed().as_secs_f64());
if context.is_loading {
println!("\x1B[F\x1B[2KData loaded, qps: {:.0}, time elapsed: {:.2}s\n", result.qps, overall_time.elapsed().as_secs_f64());
} else {
print!("\r\x1B[2Kqps: {:.0}, conn: {}, {}\n", result.qps, conn, histogram)
println!("\x1B[F\x1B[2Kqps: {:.0}, conn: {}, {}\n", result.qps, conn, histogram)
};
result.avg_latency_ms = histogram.avg() as f64 / 1_000.0;
result.p99_latency_ms = histogram.percentile(0.99) as f64 / 1_000.0;
Expand All @@ -139,7 +143,7 @@ pub fn do_benchmark(client_config: ClientConfig, cores: Vec<u16>, case: Case, lo
let mut thread_handlers = Vec::new();
let wg = WaitGroup::new();
let core_ids = core_affinity::get_core_ids().unwrap();
let context = SharedContext::new(case.count, case.seconds);
let context = SharedContext::new(case.count, case.seconds, load);
for inx in 0..cores.len() {
let client_config = client_config.clone();
let case = case.clone();
Expand All @@ -159,7 +163,7 @@ pub fn do_benchmark(client_config: ClientConfig, cores: Vec<u16>, case: Case, lo
}

// log thread
let result = wait_finish(&case, auto_connection, context, wg, load, quiet);
let result = wait_finish(&case, auto_connection, context, wg, quiet);

// join all threads
for thread_handler in thread_handlers {
Expand Down
14 changes: 11 additions & 3 deletions src/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,30 @@ impl Command {
}
pub fn gen_cmd(&mut self) -> redis::Cmd {
let mut cmd = redis::Cmd::new();
let mut cmd_str = String::new();
for ph in self.argv.iter_mut() {
for arg in ph.gen() {
cmd.arg(arg);
cmd_str.push_str(&arg);
}
}
for word in cmd_str.split_whitespace() {
cmd.arg(word);
}
cmd
}
#[allow(dead_code)]
pub fn gen_cmd_with_lock(&mut self) -> redis::Cmd {
let mut cmd = redis::Cmd::new();
let _lock = self.lock.lock().unwrap();
let mut cmd = redis::Cmd::new();
let mut cmd_str = String::new();
for ph in self.argv.iter_mut() {
for arg in ph.gen() {
cmd.arg(arg);
cmd_str.push_str(&arg);
}
}
for word in cmd_str.split_whitespace() {
cmd.arg(word);
}
cmd
}
pub fn to_string(&self) -> String {
Expand Down
10 changes: 4 additions & 6 deletions src/command/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use nom::{
sequence::delimited,
branch::alt,
bytes::complete::{is_not, tag},
character::complete::multispace0,
multi::many0,
combinator::{map, all_consuming},
};
Expand All @@ -13,16 +12,15 @@ fn parse_string(input: &str) -> IResult<&str, PlaceholderEnum> {
let s = alt((
delimited(tag("\""), is_not("\""), tag("\"")),
delimited(tag("\'"), is_not("\'"), tag("\'")),
delimited(multispace0, is_not("{ "), multispace0)
));
is_not("{")
));
map(s, PlaceholderEnum::new_string)(input)
}


fn parse_placeholder(input: &str) -> IResult<&str, PlaceholderEnum> {
let inner = delimited(tag("{"), is_not("}"), tag("}"));
let eat_whitespace = delimited(multispace0, inner, multispace0);
map(eat_whitespace, PlaceholderEnum::new)(input)
map(inner, PlaceholderEnum::new)(input)
}


Expand All @@ -36,7 +34,7 @@ mod tests {

#[test]
fn test_root() {
let (nm, args) = match parse_all("aa {key sequence 100} bbb") {
let (nm, args) = match parse_all("aa test_{key sequence 100} bbb") {
Ok((nm, args)) => (nm, args),
Err(e) => {
println!("Error: {:?}", e);
Expand Down
4 changes: 3 additions & 1 deletion src/shared_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::time::Instant;

#[derive(Clone)]
pub struct SharedContext {
pub is_loading: bool,
// limit by max_count
current_count: Arc<AtomicU64>,
max_count: u64,
Expand All @@ -24,8 +25,9 @@ pub struct SharedContext {
}

impl SharedContext {
pub fn new(max_count: u64, max_seconds: u64) -> Self {
pub fn new(max_count: u64, max_seconds: u64, is_loading: bool) -> Self {
SharedContext {
is_loading,
current_count: Arc::new(AtomicU64::new(0)),
max_count,
instant: Arc::new(RwLock::new(None)),
Expand Down

0 comments on commit f13fd6a

Please sign in to comment.