Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return error instead of panicking if rewriting fails #343

Merged
merged 25 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: 1.66.0
toolchain: 1.67.1
override: true
components: rustfmt, clippy, rust-src
- uses: Swatinem/rust-cache@v1
Expand Down
4 changes: 1 addition & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "raft-engine"
version = "0.4.1"
authors = ["The TiKV Project Developers"]
edition = "2018"
rust-version = "1.66.0"
rust-version = "1.67.1"
description = "A persistent storage engine for Multi-Raft logs"
readme = "README.md"
repository = "https://github.com/tikv/raft-engine"
Expand Down Expand Up @@ -95,8 +95,6 @@ nightly_group = ["nightly", "swap"]
raft-proto = { git = "https://github.com/tikv/raft-rs", branch = "master" }
protobuf = { git = "https://github.com/pingcap/rust-protobuf", branch = "v2.8" }
protobuf-codegen = { git = "https://github.com/pingcap/rust-protobuf", branch = "v2.8" }
# TODO: Use official grpc-rs once https://github.com/tikv/grpc-rs/pull/622 is merged.
grpcio = { git = "https://github.com/tabokie/grpc-rs", branch = "v0.10.x-win" }

[workspace]
members = ["stress", "ctl"]
6 changes: 3 additions & 3 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,9 @@ where
}
perf_context!(log_write_duration).observe_since(now);
if sync {
// As per trait protocol, this error should be retriable. But we panic anyway to
// As per trait protocol, sync error should be retriable. But we panic anyway to
// save the trouble of propagating it to other group members.
self.pipe_log.sync(LogQueue::Append).expect("pipe::sync()");
self.pipe_log.sync(LogQueue::Append);
}
// Pass the perf context diff to all the writers.
let diff = get_perf_context();
Expand Down Expand Up @@ -2576,7 +2576,7 @@ pub(crate) mod tests {
flush(&mut log_batch);
engine.pipe_log.rotate(LogQueue::Rewrite).unwrap();
}
engine.pipe_log.sync(LogQueue::Rewrite).unwrap();
engine.pipe_log.sync(LogQueue::Rewrite);

let engine = engine.reopen();
for rid in engine.raft_groups() {
Expand Down
24 changes: 11 additions & 13 deletions src/file_pipe_log/log_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,28 +67,26 @@ impl<F: FileSystem> LogFileWriter<F> {
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment to this struct stating it should be fail-safe, i.e. user can still use the writer without breaking data consistency if any operation has failed.


fn write_header(&mut self, format: LogFileFormat) -> IoResult<()> {
self.writer.seek(SeekFrom::Start(0))?;
self.writer.rewind()?;
self.written = 0;
let mut buf = Vec::with_capacity(LogFileFormat::encoded_len(format.version));
format.encode(&mut buf).unwrap();
self.write(&buf, 0)
}

pub fn close(&mut self) -> IoResult<()> {
pub fn close(&mut self) {
v01dstar marked this conversation as resolved.
Show resolved Hide resolved
// Necessary to truncate extra zeros from fallocate().
self.truncate()?;
self.sync()
self.truncate();
self.sync();
}

pub fn truncate(&mut self) -> IoResult<()> {
pub fn truncate(&mut self) {
if self.written < self.capacity {
fail_point!("file_pipe_log::log_file_writer::skip_truncate", |_| {
Ok(())
});
self.writer.truncate(self.written)?;
fail_point!("file_pipe_log::log_file_writer::skip_truncate", |_| {});
// Panic if truncate fails, in case of data loss.
v01dstar marked this conversation as resolved.
Show resolved Hide resolved
self.writer.truncate(self.written).unwrap();
self.capacity = self.written;
}
Ok(())
}

pub fn write(&mut self, buf: &[u8], target_size_hint: usize) -> IoResult<()> {
Expand Down Expand Up @@ -117,10 +115,10 @@ impl<F: FileSystem> LogFileWriter<F> {
Ok(())
}

pub fn sync(&mut self) -> IoResult<()> {
pub fn sync(&mut self) {
v01dstar marked this conversation as resolved.
Show resolved Hide resolved
let _t = StopWatch::new(&*LOG_SYNC_DURATION_HISTOGRAM);
self.handle.sync()?;
Ok(())
// Panic if sync fails, in case of data loss.
self.handle.sync().unwrap();
}

#[inline]
Expand Down
8 changes: 4 additions & 4 deletions src/file_pipe_log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ pub mod debug {
len,
});
}
writer.close().unwrap();
writer.close();
// Read and verify.
let mut reader =
LogItemReader::new_file_reader(file_system.clone(), &file_path).unwrap();
Expand Down Expand Up @@ -280,7 +280,7 @@ pub mod debug {
true, /* create */
)
.unwrap();
writer.close().unwrap();
writer.close();

assert!(LogItemReader::new_file_reader(file_system.clone(), dir.path()).is_err());
assert!(
Expand Down Expand Up @@ -330,7 +330,7 @@ pub mod debug {
.unwrap();
let f = std::fs::OpenOptions::new().write(true).open(&path).unwrap();
let len = writer.offset();
writer.close().unwrap();
writer.close();
if shorter {
f.set_len(len as u64 - 1).unwrap();
}
Expand All @@ -341,7 +341,7 @@ pub mod debug {
false, /* create */
)
.unwrap();
writer.close().unwrap();
writer.close();
let mut reader = build_file_reader(file_system.as_ref(), &path).unwrap();
assert_eq!(reader.parse_format().unwrap(), to);
std::fs::remove_file(&path).unwrap();
Expand Down
47 changes: 13 additions & 34 deletions src/file_pipe_log/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ pub(super) struct SinglePipe<F: FileSystem> {
impl<F: FileSystem> Drop for SinglePipe<F> {
fn drop(&mut self) {
let mut writable_file = self.writable_file.lock();
if let Err(e) = writable_file.writer.close() {
error!("error while closing the active writer: {e}");
}
writable_file.writer.close();
let mut recycled_files = self.recycled_files.write();
let mut next_reserved_seq = recycled_files
.iter()
Expand Down Expand Up @@ -248,7 +246,7 @@ impl<F: FileSystem> SinglePipe<F> {
let new_seq = writable_file.seq + 1;
debug_assert!(new_seq > DEFAULT_FIRST_FILE_SEQ);

writable_file.writer.close()?;
writable_file.writer.close();

let (path_id, handle) = self
.recycle_file(new_seq)
Expand All @@ -272,7 +270,7 @@ impl<F: FileSystem> SinglePipe<F> {
};
// File header must be persisted. This way we can recover gracefully if power
// loss before a new entry is written.
new_file.writer.sync()?;
new_file.writer.sync();
self.sync_dir(path_id)?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error needs to be handled carefully now. (e.g. remove the newly created file and make sure the old writer is okay to write again) Better just unwrap it as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

build_file_writer above is the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made sync_dir panic if it fails.

But build_file_writer should be fine, right? It is the type of panic this PR trying to avoid (this can be confirmed by test_no_space_write_error). If it fails, the new file won't be used for writing and will be recycled the next time rotate_impl is called. So, it already meet your expectation?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably.. I suggest add a few restart in test_file_rotate_error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a few more verifications in test_file_rotate_error test, should be able to address your concern? PTAL


**writable_file = new_file;
Expand Down Expand Up @@ -321,12 +319,7 @@ impl<F: FileSystem> SinglePipe<F> {
fail_point!("file_pipe_log::append");
let mut writable_file = self.writable_file.lock();
if writable_file.writer.offset() >= self.target_file_size {
if let Err(e) = self.rotate_imp(&mut writable_file) {
panic!(
"error when rotate [{:?}:{}]: {e}",
self.queue, writable_file.seq,
);
}
self.rotate_imp(&mut writable_file)?;
v01dstar marked this conversation as resolved.
Show resolved Hide resolved
}

let seq = writable_file.seq;
Expand Down Expand Up @@ -359,9 +352,7 @@ impl<F: FileSystem> SinglePipe<F> {
}
let start_offset = writer.offset();
if let Err(e) = writer.write(bytes.as_bytes(&ctx), self.target_file_size) {
if let Err(te) = writer.truncate() {
panic!("error when truncate {seq} after error: {e}, get: {}", te);
}
writer.truncate();
if is_no_space_err(&e) {
// TODO: There exists several corner cases should be tackled if
// `bytes.len()` > `target_file_size`. For example,
Expand All @@ -372,12 +363,7 @@ impl<F: FileSystem> SinglePipe<F> {
// - [3] Both main-dir and spill-dir have several recycled logs.
// But as `bytes.len()` is always smaller than `target_file_size` in common
// cases, this issue will be ignored temprorarily.
if let Err(e) = self.rotate_imp(&mut writable_file) {
panic!(
"error when rotate [{:?}:{}]: {e}",
self.queue, writable_file.seq
);
}
self.rotate_imp(&mut writable_file)?;
v01dstar marked this conversation as resolved.
Show resolved Hide resolved
// If there still exists free space for this record, rotate the file
// and return a special TryAgain Err (for retry) to the caller.
return Err(Error::TryAgain(format!(
Expand All @@ -401,18 +387,11 @@ impl<F: FileSystem> SinglePipe<F> {
Ok(handle)
}

fn sync(&self) -> Result<()> {
fn sync(&self) {
let mut writable_file = self.writable_file.lock();
let seq = writable_file.seq;
let writer = &mut writable_file.writer;
{
let _t = StopWatch::new(perf_context!(log_sync_duration));
if let Err(e) = writer.sync() {
panic!("error when sync [{:?}:{seq}]: {e}", self.queue);
}
}

Ok(())
let _t = StopWatch::new(perf_context!(log_sync_duration));
writer.sync();
}

fn file_span(&self) -> (FileSeq, FileSeq) {
Expand Down Expand Up @@ -520,8 +499,8 @@ impl<F: FileSystem> PipeLog for DualPipes<F> {
}

#[inline]
fn sync(&self, queue: LogQueue) -> Result<()> {
self.pipes[queue as usize].sync()
fn sync(&self, queue: LogQueue) {
self.pipes[queue as usize].sync();
}

#[inline]
Expand Down Expand Up @@ -716,7 +695,7 @@ mod tests {
let mut handles = Vec::new();
for i in 0..10 {
handles.push(pipe_log.append(&mut &content(i)).unwrap());
pipe_log.sync().unwrap();
pipe_log.sync();
}
pipe_log.rotate().unwrap();
let (first, last) = pipe_log.file_span();
Expand All @@ -733,7 +712,7 @@ mod tests {
let mut handles = Vec::new();
for i in 0..10 {
handles.push(pipe_log.append(&mut &content(i + 1)).unwrap());
pipe_log.sync().unwrap();
pipe_log.sync();
}
// Verify the data.
for (i, handle) in handles.into_iter().enumerate() {
Expand Down
2 changes: 1 addition & 1 deletion src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ impl RhaiFilterMachine {
)?;
log_batch.drain();
}
writer.close()?;
writer.close();
}
}
// Delete backup file and defuse the guard.
Expand Down
2 changes: 1 addition & 1 deletion src/pipe_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ pub trait PipeLog: Sized {
///
/// This operation might incurs a great latency overhead. It's advised to
/// call it once every batch of writes.
fn sync(&self, queue: LogQueue) -> Result<()>;
fn sync(&self, queue: LogQueue);

/// Returns the smallest and largest file sequence number, still in use,
/// of the specified log queue.
Expand Down
6 changes: 3 additions & 3 deletions src/purge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ where
// Rewrites the entire rewrite queue into new log files.
fn rewrite_rewrite_queue(&self) -> Result<Vec<u64>> {
let _t = StopWatch::new(&*ENGINE_REWRITE_REWRITE_DURATION_HISTOGRAM);
self.pipe_log.rotate(LogQueue::Rewrite)?;
self.pipe_log.rotate(LogQueue::Rewrite).unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why unwrap this?


let mut force_compact_regions = vec![];
let memtables = self.memtables.collect(|t| {
Expand Down Expand Up @@ -430,7 +430,7 @@ where
) -> Result<Option<FileBlockHandle>> {
if log_batch.is_empty() {
debug_assert!(sync);
self.pipe_log.sync(LogQueue::Rewrite)?;
self.pipe_log.sync(LogQueue::Rewrite);
return Ok(None);
}
log_batch.finish_populate(
Expand All @@ -439,7 +439,7 @@ where
)?;
let file_handle = self.pipe_log.append(LogQueue::Rewrite, log_batch)?;
if sync {
self.pipe_log.sync(LogQueue::Rewrite)?
self.pipe_log.sync(LogQueue::Rewrite);
}
log_batch.finish_write(file_handle);
self.memtables.apply_rewrite_writes(
Expand Down
34 changes: 13 additions & 21 deletions tests/failpoints/test_io_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,19 +165,17 @@ fn test_file_rotate_error() {
{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make two versions of this test: fn test_file_rotate_error(restart: bool)

// case 1
if restart {
  let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
}
// case 2
// ...

// Fail to create new log file.
let _f = FailGuard::new("default_fs::create::err", "return");
assert!(catch_unwind_silent(|| {
let _ = engine.write(&mut generate_batch(1, 4, 5, Some(&entry)), false);
})
.is_err());
assert!(engine
.write(&mut generate_batch(1, 4, 5, Some(&entry)), false)
.is_err());
assert_eq!(engine.file_span(LogQueue::Append).1, 1);
}
{
// Fail to write header of new log file.
let _f = FailGuard::new("log_file::write::err", "1*off->return");
assert!(catch_unwind_silent(|| {
let _ = engine.write(&mut generate_batch(1, 4, 5, Some(&entry)), false);
})
.is_err());
assert!(engine
.write(&mut generate_batch(1, 4, 5, Some(&entry)), false)
.is_err());
assert_eq!(engine.file_span(LogQueue::Append).1, 1);
}
{
Expand Down Expand Up @@ -527,20 +525,17 @@ fn test_no_space_write_error() {
cfg.dir = dir.path().to_str().unwrap().to_owned();
cfg.spill_dir = Some(spill_dir.path().to_str().unwrap().to_owned());
{
// Case 1: `Write` is abnormal for no space left, Engine should panic at
// Case 1: `Write` is abnormal for no space left, Engine should fail at
// `rotate`.
let cfg_err = Config {
target_file_size: ReadableSize(1),
..cfg.clone()
};
let engine = Engine::open(cfg_err).unwrap();
let _f = FailGuard::new("log_fd::write::no_space_err", "return");
assert!(catch_unwind_silent(|| {
engine
.write(&mut generate_batch(2, 11, 21, Some(&entry)), true)
.unwrap_err();
})
.is_err());
assert!(engine
.write(&mut generate_batch(2, 11, 21, Some(&entry)), true)
.is_err());
assert_eq!(
0,
engine
Expand All @@ -554,12 +549,9 @@ fn test_no_space_write_error() {
let _f1 = FailGuard::new("log_fd::write::no_space_err", "2*return->off");
let _f2 = FailGuard::new("file_pipe_log::force_choose_dir", "return");
// The first write should fail, because all dirs run out of space for writing.
assert!(catch_unwind_silent(|| {
engine
.write(&mut generate_batch(2, 11, 21, Some(&entry)), true)
.unwrap_err();
})
.is_err());
assert!(engine
.write(&mut generate_batch(2, 11, 21, Some(&entry)), true)
.is_err());
assert_eq!(
0,
engine
Expand Down
Loading