Skip to content

Commit

Permalink
[Cherry-pick] Fix panics on reading entries by stale index. (#370) (#…
Browse files Browse the repository at this point in the history
…373)

 

Signed-off-by: lucasliang <[email protected]>
  • Loading branch information
LykxSassinator authored Nov 8, 2024
1 parent 974bc9c commit 2a4faff
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 11 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ ubuntu-latest, macos-latest ]
os: [ ubuntu-22.04, macos-13 ]
steps:
- uses: actions/checkout@v2
with:
Expand All @@ -26,7 +26,7 @@ jobs:
with:
sharedKey: ${{ matrix.os }}
- name: Cache dependencies
if: ${{ matrix.os == 'ubuntu-latest' }}
if: ${{ matrix.os == 'ubuntu-22.04' }}
run: if [[ ! -e ~/.cargo/bin/grcov ]]; then cargo install --locked grcov --version 0.8.9; fi
- name: Format
run: |
Expand All @@ -42,7 +42,7 @@ jobs:
RUST_BACKTRACE: 1
EXTRA_CARGO_ARGS: '--verbose'
- name: Run asan tests
if: ${{ matrix.os == 'ubuntu-latest' }}
if: ${{ matrix.os == 'ubuntu-22.04' }}
run: make test
env:
RUST_BACKTRACE: 1
Expand All @@ -53,7 +53,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ ubuntu-latest ]
os: [ ubuntu-22.04 ]
steps:
- uses: actions/checkout@v2
with:
Expand All @@ -79,7 +79,7 @@ jobs:
EXTRA_CARGO_ARGS: '--verbose'
WITH_STABLE_TOOLCHAIN: 'force'
coverage:
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
needs: nightly
steps:
- uses: actions/checkout@v2
Expand All @@ -94,7 +94,7 @@ jobs:
components: llvm-tools-preview
- uses: Swatinem/rust-cache@v1
with:
sharedKey: ubuntu-latest
sharedKey: ubuntu-22.04
- name: Install grcov
run: if [[ ! -e ~/.cargo/bin/grcov ]]; then cargo install --locked grcov --version 0.8.9; fi
- name: Run tests
Expand Down
60 changes: 55 additions & 5 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,9 +331,12 @@ where
let _t = StopWatch::new(&*ENGINE_READ_ENTRY_DURATION_HISTOGRAM);
if let Some(memtable) = self.memtables.get(region_id) {
let mut ents_idx: Vec<EntryIndex> = Vec::with_capacity((end - begin) as usize);
memtable
.read()
.fetch_entries_to(begin, end, max_size, &mut ents_idx)?;
// Ensure that the corresponding memtable is locked with a read lock before
// completing the fetching of entries from the raft logs. This
// prevents the scenario where the index could become stale while
// being concurrently updated by the `rewrite` operation.
let immutable = memtable.read();
immutable.fetch_entries_to(begin, end, max_size, &mut ents_idx)?;
for i in ents_idx.iter() {
vec.push(read_entry_from_file::<M, _>(self.pipe_log.as_ref(), i)?);
}
Expand Down Expand Up @@ -632,9 +635,11 @@ pub(crate) mod tests {
use crate::util::ReadableSize;
use kvproto::raft_serverpb::RaftLocalState;
use raft::eraftpb::Entry;
use rand::{thread_rng, Rng};
use std::collections::{BTreeSet, HashSet};
use std::fs::OpenOptions;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};

pub(crate) type RaftLogEngine<F = DefaultFileSystem> = Engine<F>;
impl<F: FileSystem> RaftLogEngine<F> {
Expand Down Expand Up @@ -1925,8 +1930,6 @@ pub(crate) mod tests {
#[cfg(feature = "nightly")]
#[bench]
fn bench_engine_fetch_entries(b: &mut test::Bencher) {
use rand::{thread_rng, Rng};

let dir = tempfile::Builder::new()
.prefix("bench_engine_fetch_entries")
.tempdir()
Expand Down Expand Up @@ -2548,6 +2551,53 @@ pub(crate) mod tests {
assert!(data.is_empty());
}

#[test]
fn test_fetch_with_concurrently_rewrite() {
let dir = tempfile::Builder::new()
.prefix("test_fetch_with_concurrently_rewrite")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize(2048),
..Default::default()
};
let fs = Arc::new(DeleteMonitoredFileSystem::new());
let engine = Arc::new(RaftLogEngine::open_with_file_system(cfg, fs).unwrap());
let entry_data = vec![b'x'; 128];
// Set up a concurrent write with purge, and fetch.
let mut vec: Vec<Entry> = Vec::new();
let fetch_engine = engine.clone();
let flag = Arc::new(AtomicBool::new(false));
let start_flag = flag.clone();
let th = std::thread::spawn(move || {
while !start_flag.load(Ordering::Acquire) {
std::thread::sleep(Duration::from_millis(10));
}
for _ in 0..10 {
let region_id = thread_rng().gen_range(1..=10);
// Should not return file seqno out of range error.
let _ = fetch_engine
.fetch_entries_to::<Entry>(region_id, 1, 101, None, &mut vec)
.map_err(|e| {
assert!(!format!("{e}").contains("file seqno out of"));
});
vec.clear();
}
});
for i in 0..10 {
for rid in 1..=10 {
engine.append(rid, 1 + i * 10, 1 + i * 10 + 10, Some(&entry_data));
}
flag.store(true, Ordering::Release);
for rid in 1..=10 {
engine.clean(rid);
}
engine.purge_expired_files().unwrap();
}
th.join().unwrap();
}

#[test]
fn test_internal_key_filter() {
let dir = tempfile::Builder::new()
Expand Down

0 comments on commit 2a4faff

Please sign in to comment.