-
Notifications
You must be signed in to change notification settings - Fork 35
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: R2dbcOffsetStore evict and delete per slice #1255
base: main
Are you sure you want to change the base?
Conversation
patriknw
commented
Nov 19, 2024
•
edited
Loading
edited
- evict time window for each slice
- remove keep-number-of-entries and evict-interval
- lazy loading of offsets
- delete detached from time window, separate delete-after config
akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala
Outdated
Show resolved
Hide resolved
@@ -269,6 +291,12 @@ private[projection] class R2dbcOffsetStore( | |||
system.executionContext)) | |||
else None | |||
|
|||
private def scheduleNextDelete(): Unit = { | |||
if (!settings.deleteInterval.isZero && !settings.deleteInterval.isNegative) | |||
system.scheduler.scheduleOnce(settings.deleteInterval, () => deleteOldTimestampOffsets(), system.executionContext) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those scheduled tasks is something I want to revisit in separate PR. Feels wrong that we never stop these scheduled tasks. Probably doesn't do much due to the idle flag but anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍🏼 good to make sure these are not continuing
this | ||
} else { | ||
// this will always keep at least one, latest per slice | ||
val until = recordsSortedByTimestamp.last.timestamp.minus(timeWindow) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This idea will not fly. After scaling it may start at an earlier offset from some other slice than what is evicted/deleted .
Good news is that I see where the problems are coming from now. I think we should change to the lazy loading approach of offsets as we did for dynamodb, and decouple deletes from the time window.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
incorporated true lazy loading of offsets, in same way as for DynamoDB, and thereby it's no problem to evict too much
@@ -414,6 +451,46 @@ private[projection] class R2dbcOffsetStore( | |||
} | |||
} | |||
|
|||
def load(pid: Pid): Future[State] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
incorporated lazy loading of offsets in same way as for DynamoDB
val recordsWithKeyFut = | ||
Source(minSlice to maxSlice) | ||
.mapAsyncUnordered(offsetSliceReadParallelism) { slice => | ||
dao.readTimestampOffset(slice) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to read offsets to find the start offset, and also good to pre-populate the state with latest offsets for each slice. Rest will be lazy loaded when needed.
val newState = State(recordsWithKey.map(_.record)) | ||
val newState = { | ||
val s = State(recordsWithKey.map(_.record)) | ||
// FIXME shall we evict here, or how does that impact the logic for moreThanOneProjectionKey and foreignOffsets? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that should be ok, as that starting offset logic is based on latest by slice already.
val currentState = getState() | ||
if ((triggerDeletion == null || triggerDeletion == TRUE) && currentState.bySliceSorted.contains(slice)) { | ||
val latest = currentState.bySliceSorted(slice).last | ||
val until = latest.timestamp.minus(settings.deleteAfter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The deletes are now much later than the time window (in memory). Not sure what would be a good/safe default? Picked one day without much thought.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kept the deletes to be per slice. Could maybe go back to slice range, but maybe good to have smaller transactions?
SELECT projection_key, slice, persistence_id, seq_nr, timestamp_offset | ||
FROM $timestampOffsetTable WHERE slice BETWEEN ? AND ? AND projection_name = ?""" | ||
SELECT projection_key, persistence_id, seq_nr, timestamp_offset | ||
FROM $timestampOffsetTable WHERE slice = ? AND projection_name = ? ORDER BY timestamp_offset DESC LIMIT ?""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intention is that this should make use of the primary key
PRIMARY KEY(slice, projection_name, timestamp_offset, persistence_id, seq_nr)
That, and the limit, is the reason for individual query per slice.
val evictedNewState = | ||
if (newState.size > settings.keepNumberOfEntries && evictThresholdReached && newState.window | ||
.compareTo(evictWindow) > 0) { | ||
val evictUntil = newState.latestTimestamp.minus(settings.timeWindow) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could have been a problem after scaling, where latestTimestamp could be far ahead from received events from other slices.
// it hasn't filled up the window yet | ||
Future.successful(0) | ||
} else { | ||
val until = currentState.latestTimestamp.minus(settings.timeWindow) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could have been a problem after scaling, where latestTimestamp could be far ahead from received events from other slices and then deleting too much.
bb9d82b
to
fa3ecfc
Compare
3cfb8e4
to
22c0c26
Compare
* evict time window for each slice * remove keep-number-of-entries and evict-interval * delete per slice, so that we always keep offsets within time window for each slice, also after projection scaling
* delete much later, still based on latest by slice * delete-after config * increase delete-interval
* read from each slice, desc timestamp and limit
22c0c26
to
9a60b78
Compare
@pvlugter I have a few FIXMEs remaining here, but before completing I'd like your first review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Can't think of any new issues from these approaches. But would be useful to test an application with these changes as well.
val newerRecords = recordsSortedByTimestamp.rangeImpl(Some(untilRecord), None) // inclusive of until | ||
val olderRecords = recordsSortedByTimestamp.rangeImpl(None, Some(untilRecord)) // exclusive of until |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val newerRecords = recordsSortedByTimestamp.rangeImpl(Some(untilRecord), None) // inclusive of until | |
val olderRecords = recordsSortedByTimestamp.rangeImpl(None, Some(untilRecord)) // exclusive of until | |
val newerRecords = recordsSortedByTimestamp.rangeFrom(untilRecord) // inclusive of until | |
val olderRecords = recordsSortedByTimestamp.rangeUntil(untilRecord) // exclusive of until |
@@ -269,6 +291,12 @@ private[projection] class R2dbcOffsetStore( | |||
system.executionContext)) | |||
else None | |||
|
|||
private def scheduleNextDelete(): Unit = { | |||
if (!settings.deleteInterval.isZero && !settings.deleteInterval.isNegative) | |||
system.scheduler.scheduleOnce(settings.deleteInterval, () => deleteOldTimestampOffsets(), system.executionContext) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍🏼 good to make sure these are not continuing
val newState = State(recordsWithKey.map(_.record)) | ||
val newState = { | ||
val s = State(recordsWithKey.map(_.record)) | ||
// FIXME shall we evict here, or how does that impact the logic for moreThanOneProjectionKey and foreignOffsets? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that should be ok, as that starting offset logic is based on latest by slice already.
Thanks, I'll test this with a real service before we merge |