-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Bifrost trim gap handling support by fast-forwarding to the latest partition snapshot #2456
base: main
Are you sure you want to change the base?
Conversation
This allows us to signal the PPM about log trim gaps that the PP may encounter at runtime, which require special handling.
8a82f9e
to
140e7bf
Compare
unimplemented!("Handling trim gap is currently not supported") | ||
}; | ||
anyhow::Ok((lsn, envelope?)) | ||
if entry.is_data_record() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the moment, LogEntry.record
is not public, and neither are bifrost::{MaybeRecord, TrimGap}
- would we prefer to make those public and use pattern-matching directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can, but it doesn't sound like you need that. See my comments below.
let snapshot = match snapshot_repository { | ||
Some(repository) => { | ||
debug!("Looking for partition snapshot from which to bootstrap partition store"); | ||
// todo(pavel): pass target LSN to repository |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can optimize this by not downloading a snapshot that's older than the target LSN; I'll tackle this as a separate follow-up PR.
); | ||
} | ||
|
||
// We expect the processor startup attempt will fail, avoid spinning too fast. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems reasonable to me. I chose to rather delay and try start again, just in case something has changed in the log - but at this point we're unlikely to get this processor going again by following the log. What's a good way to post a metric that we're spinning?
crates/worker/src/partition/mod.rs
Outdated
Ok(stopped) => { | ||
match stopped { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tip: You can remove one level of nesting:
Ok(ProcessorStopReason::LogTrimGap { to_lsn }) => ....
Ok(_) => warn...
Err(err) => warn...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Much better, thank you! <3
unimplemented!("Handling trim gap is currently not supported") | ||
}; | ||
anyhow::Ok((lsn, envelope?)) | ||
if entry.is_data_record() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can, but it doesn't sound like you need that. See my comments below.
anyhow::Ok(Record::TrimGap( | ||
entry | ||
.trim_gap_to_sequence_number() | ||
.expect("trim gap has to-LSN"), | ||
)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to stop the read stream at the first gap and return Err instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand this to mean the map_ok
function translates a trim-gap into an Err(TrimGap {..})
instead? Maybe! The way we use anyhow::Result
pervasively makes this a deeper change than I wanted to tackle right away; but it also makes more sense to treat trim gaps as just another record in the stream, with errors reserved for actual failure conditions.
Zooming out a bit, modeling the Partition Processor overall outcome as Result<Canceled | StoppedAtTrimGap, ProcessingError>
seems accurate: the Ok / left path is an expected if rare reason to halt; the Err / right path is an exceptional failure condition.
If you have a few minutes, I'd love to hear more about how you'd solve this? I'm certain I am also missing some subtlety around properly consuming the log stream!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We discussed offline and agreed that it's best to represent this case as an error case.
pub fn start_runtime<F, R>( | ||
self: &Arc<Self>, | ||
root_task_kind: TaskKind, | ||
runtime_name: &'static str, | ||
partition_id: Option<PartitionId>, | ||
root_future: impl FnOnce() -> F + Send + 'static, | ||
) -> Result<RuntimeTaskHandle<anyhow::Result<()>>, RuntimeError> | ||
) -> Result<RuntimeTaskHandle<R>, RuntimeError> | ||
where | ||
F: Future<Output = anyhow::Result<()>> + 'static, | ||
F: Future<Output = R> + 'static, | ||
R: Send + 'static, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me, it seems more than you want to have control over the error type rather than make the runtime behave like an async task with a return value.
In that case, your PartitionProcessorStopReason becomes the error type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe! We use anyhow::Error
quite a bit in the PP now, so it would be difficult to disentangle the errors I care about, from other failure conditions. That aside, I still like modeling this as an outcome of either a known stop reason, or some other failure condition. I am treating PartitionProcessorStopReason
as a normal return since both canceling the PP, or encountering a trim gap, are expected over a long enough timeline.
.await | ||
&& fast_forward_lsn.is_none() | ||
{ | ||
return Ok(partition_store_manager |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tip: remove return
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without the return statement, I need to pull the rest of the method body into an else
arm - and I specifically wanted to keep it this way. I find it easier to read without the extra nesting. Open to change it back if you feel about using the if expression as the returned value of course :-)
tokio::time::sleep(Duration::from_millis( | ||
10_000 + rand::random::<u64>() % 10_000, | ||
)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would RetryPolicy and its internal jitter logic work for you here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely! I didn't want to plumb a retry count through just yet but maybe even without it, we can leverage the retry policy already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remembered why I didn't want to tackle this just yet - right now the way to get consecutive retry decisions is to get an iterator. I want to introduce an alternative API to RetryPolicy
which will make this more suitable for use cases like this one, but let me rather do that as a follow up PR!
warn!( | ||
partition_id = %partition_id, | ||
?snapshot_path, | ||
"Failed to remove local snapshot directory, continuing with startup: {:?}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's try and avoid using Debug
values in log messages higher than debug!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very insightful rule of thumb to keep in mind, thank you!
One bit of offline feedback from @AhmedSoliman was to think edge cases around partition leadership, when an active leader encounters a trim gap. Pushed an updated version which addresses the trim gap stop-reason as an explicit error, rather than an "ok" variant. This is because we really want to minimize the chances of misinterpreting a trim-gap record and accidentally consuming past it. One deviation from the previous behavior with this latest revision is that Also addressed most of the remaining smaller comments, with the major exception of not using retry policy just yet - I plan to, but let's do it as a follow-up as I want to make some changes to make it easier to use here. |
e2e487e
to
b76f3f9
Compare
b76f3f9
to
87a876d
Compare
} | ||
for record in command_buffer.drain(..) { | ||
match record { | ||
Record::Envelope(lsn, envelope) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Envelope
handling logic is unchanged, this block is just indented due to the match expression needed to handle gaps.
Closes: #2247