Skip to content

Commit

Permalink
perf(memory): use thread-local sequence-based memory eviction policy (#…
Browse files Browse the repository at this point in the history
…16087)

Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx authored May 27, 2024
1 parent 3893df6 commit 240f0b9
Show file tree
Hide file tree
Showing 33 changed files with 1,098 additions and 426 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

18 changes: 11 additions & 7 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -3697,22 +3697,26 @@ def section_memory_manager(outer_panels):
),
],
),
panels.timeseries_count(
"LRU manager watermark steps",
panels.timeseries(
"LRU manager eviction policy",
"",
[
panels.target(
f"{metric('lru_watermark_step')}",
f"{metric('lru_eviction_policy')}",
"",
),
],
),
panels.timeseries_ms(
"LRU manager diff between watermark_time and now (ms)",
"watermark_time is the current lower watermark of cached data. physical_now is the current time of the machine. The diff (physical_now - watermark_time) shows how much data is cached.",
panels.timeseries(
"LRU manager sequence",
"",
[
panels.target(
f"{metric('lru_physical_now_ms')} - {metric('lru_current_watermark_time_ms')}",
f"{metric('lru_latest_sequence')}",
"",
),
panels.target(
f"{metric('lru_watermark_sequence')}",
"",
),
],
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ ignored = ["workspace-hack"]
normal = ["workspace-hack"]

[dependencies]
ahash = "0.8"
anyhow = "1"
arc-swap = "1"
arrow-array = { workspace = true }
Expand Down Expand Up @@ -48,6 +49,7 @@ fixedbitset = "0.5"
foyer = { workspace = true }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
governor = { version = "0.6", default-features = false, features = ["std"] }
hashbrown = "0.14"
hex = "0.4.3"
http = "0.2"
humantime = "2.1"
Expand Down Expand Up @@ -135,6 +137,7 @@ libc = "0.2"
mach2 = "0.4"

[dev-dependencies]
coarsetime = "0.1"
criterion = { workspace = true }
expect-test = "1"
more-asserts = "0.3"
Expand Down Expand Up @@ -167,5 +170,13 @@ harness = false
name = "bench_array"
harness = false

[[bench]]
name = "bench_sequencer"
harness = false

[[bench]]
name = "bench_lru"
harness = false

[lints]
workspace = true
88 changes: 88 additions & 0 deletions src/common/benches/bench_lru.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::hint::black_box;
use std::sync::atomic::Ordering;
use std::time::{Duration, Instant};

use itertools::Itertools;
use lru::LruCache;
use risingwave_common::lru::LruCache as RwLruCache;
use risingwave_common::sequence::SEQUENCE_GLOBAL;

fn lru(loops: usize, evict_ratio: u64) -> (usize, Duration) {
let mut lru = LruCache::unbounded();
let mut evicted = 0;
let now = Instant::now();
for i in 0..loops as u64 {
if i % evict_ratio == 0 && i != 0 {
lru.update_epoch(i);
while lru.pop_lru_by_epoch(i).is_some() {
evicted += 1;
}
}
lru.put(i, i);
}

(evicted, now.elapsed())
}

fn rw_lru(loops: usize, evict_ratio: u64) -> (usize, Duration) {
let mut lru = RwLruCache::unbounded();
let mut evicted = 0;
let now = Instant::now();
for i in 0..loops as u64 {
if i % evict_ratio == 0 {
let sequence = SEQUENCE_GLOBAL.load(Ordering::Relaxed);
while lru.pop_with_sequence(sequence).is_some() {
evicted += 1;
}
}
lru.put(i, i);
}

(evicted, now.elapsed())
}

fn benchmark<F>(name: &str, threads: usize, loops: usize, f: F)
where
F: Fn() -> (usize, Duration) + Clone + Send + 'static,
{
let handles = (0..threads)
.map(|_| std::thread::spawn(black_box(f.clone())))
.collect_vec();
let mut dur = Duration::from_nanos(0);
let mut evicted = 0;
for handle in handles {
let (e, d) = handle.join().unwrap();
evicted += e;
dur += d;
}
println!(
"{:20} {} threads {} loops: {:?} per iter, total evicted: {}",
name,
threads,
loops,
Duration::from_nanos((dur.as_nanos() / threads as u128 / loops as u128) as u64),
evicted,
);
}

fn main() {
for threads in [1, 4, 8, 16, 32, 64] {
println!();
benchmark("lru - 1024", threads, 1000000, || lru(1000000, 1024));
benchmark("rw - 1024", threads, 1000000, || rw_lru(1000000, 1024));
}
}
170 changes: 170 additions & 0 deletions src/common/benches/bench_sequencer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(lint_reasons)]

use std::cell::RefCell;
use std::hint::black_box;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};

use itertools::Itertools;
use risingwave_common::sequence::*;

thread_local! {
pub static SEQUENCER_64_8: RefCell<Sequencer> = const { RefCell::new(Sequencer::new(64, 64 * 8)) };
pub static SEQUENCER_64_16: RefCell<Sequencer> = const { RefCell::new(Sequencer::new(64, 64 * 16)) };
pub static SEQUENCER_64_32: RefCell<Sequencer> = const { RefCell::new(Sequencer::new(64, 64 * 32)) };
pub static SEQUENCER_128_8: RefCell<Sequencer> = const { RefCell::new(Sequencer::new(128, 128 * 8)) };
pub static SEQUENCER_128_16: RefCell<Sequencer> = const { RefCell::new(Sequencer::new(128, 128 * 16)) };
pub static SEQUENCER_128_32: RefCell<Sequencer> = const { RefCell::new(Sequencer::new(128, 128 * 32)) };
}

fn coarse(loops: usize) -> Duration {
let now = Instant::now();
for _ in 0..loops {
let _ = coarsetime::Instant::now();
}
now.elapsed()
}

#[expect(clippy::explicit_counter_loop)]
fn primitive(loops: usize) -> Duration {
let mut cnt = 0usize;
let now = Instant::now();
for _ in 0..loops {
cnt += 1;
let _ = cnt;
}
now.elapsed()
}

fn atomic(loops: usize, atomic: Arc<AtomicUsize>) -> Duration {
let now = Instant::now();
for _ in 0..loops {
let _ = atomic.fetch_add(1, Ordering::Relaxed);
}
now.elapsed()
}

fn atomic_skip(loops: usize, atomic: Arc<AtomicUsize>, skip: usize) -> Duration {
let mut cnt = 0usize;
let now = Instant::now();
for _ in 0..loops {
cnt += 1;
let _ = cnt;
if cnt % skip == 0 {
let _ = atomic.fetch_add(skip, Ordering::Relaxed);
} else {
let _ = atomic.load(Ordering::Relaxed);
}
}
now.elapsed()
}

fn sequencer(loops: usize, step: Sequence, lag_amp: Sequence) -> Duration {
let sequencer = match (step, lag_amp) {
(64, 8) => &SEQUENCER_64_8,
(64, 16) => &SEQUENCER_64_16,
(64, 32) => &SEQUENCER_64_32,
(128, 8) => &SEQUENCER_128_8,
(128, 16) => &SEQUENCER_128_16,
(128, 32) => &SEQUENCER_128_32,
_ => unimplemented!(),
};
let now = Instant::now();
for _ in 0..loops {
let _ = sequencer.with(|s| s.borrow_mut().alloc());
}
now.elapsed()
}

fn benchmark<F>(name: &str, threads: usize, loops: usize, f: F)
where
F: Fn() -> Duration + Clone + Send + 'static,
{
let handles = (0..threads)
.map(|_| std::thread::spawn(black_box(f.clone())))
.collect_vec();
let mut dur = Duration::from_nanos(0);
for handle in handles {
dur += handle.join().unwrap();
}
println!(
"{:20} {} threads {} loops: {:?} per iter",
name,
threads,
loops,
Duration::from_nanos((dur.as_nanos() / threads as u128 / loops as u128) as u64)
);
}

fn main() {
for (threads, loops) in [
(1, 10_000_000),
(4, 10_000_000),
(8, 10_000_000),
(16, 10_000_000),
(32, 10_000_000),
] {
println!();

benchmark("primitive", threads, loops, move || primitive(loops));

let a = Arc::new(AtomicUsize::new(0));
benchmark("atomic", threads, loops, move || atomic(loops, a.clone()));

let a = Arc::new(AtomicUsize::new(0));
benchmark("atomic skip 8", threads, loops, move || {
atomic_skip(loops, a.clone(), 8)
});

let a = Arc::new(AtomicUsize::new(0));
benchmark("atomic skip 16", threads, loops, move || {
atomic_skip(loops, a.clone(), 16)
});

let a = Arc::new(AtomicUsize::new(0));
benchmark("atomic skip 32", threads, loops, move || {
atomic_skip(loops, a.clone(), 32)
});

let a = Arc::new(AtomicUsize::new(0));
benchmark("atomic skip 64", threads, loops, move || {
atomic_skip(loops, a.clone(), 64)
});

benchmark("sequencer(64,8)", threads, loops, move || {
sequencer(loops, 64, 8)
});
benchmark("sequencer(64,16)", threads, loops, move || {
sequencer(loops, 64, 16)
});
benchmark("sequencer(64,32)", threads, loops, move || {
sequencer(loops, 64, 32)
});
benchmark("sequencer(128,8)", threads, loops, move || {
sequencer(loops, 128, 8)
});
benchmark("sequencer(128,16)", threads, loops, move || {
sequencer(loops, 128, 16)
});
benchmark("sequencer(128,32)", threads, loops, move || {
sequencer(loops, 128, 32)
});

benchmark("coarse", threads, loops, move || coarse(loops));
}
}
Loading

0 comments on commit 240f0b9

Please sign in to comment.