Skip to content

Commit

Permalink
fix: Timestamp validation of previous seqNr
Browse files Browse the repository at this point in the history
* base the validaiton time window for the previous sequence number
  on start timestamp and backtracking window
  • Loading branch information
patriknw committed Nov 25, 2024
1 parent b22a6d6 commit fa3ecfc
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,15 @@ class R2dbcTimestampOffsetStoreSpec
private def createOffsetStore(
projectionId: ProjectionId,
customSettings: R2dbcProjectionSettings = settings,
offsetStoreClock: TestClock = clock,
eventTimestampQueryClock: TestClock = clock) =
new R2dbcOffsetStore(
projectionId,
Some(new TestTimestampSourceProvider(0, persistenceExt.numberOfSlices - 1, eventTimestampQueryClock)),
system,
customSettings,
r2dbcExecutor)
r2dbcExecutor,
offsetStoreClock)

def createEnvelope(pid: Pid, seqNr: SeqNr, timestamp: Instant, event: String): EventEnvelope[String] = {
val entityType = PersistenceId.extractEntityType(pid)
Expand Down Expand Up @@ -455,6 +457,11 @@ class R2dbcTimestampOffsetStoreSpec
val eventTimestampQueryClock = TestClock.nowMicros()
val offsetStore = createOffsetStore(projectionId, eventTimestampQueryClock = eventTimestampQueryClock)

// some validation require the startTimestamp, which is set from readOffset
offsetStore.getState().startTimestamp shouldBe Instant.EPOCH
offsetStore.readOffset().futureValue
offsetStore.getState().startTimestamp shouldBe clock.instant()

val startTime = TestClock.nowMicros().instant()
val offset1 = TimestampOffset(startTime, Map("p1" -> 3L, "p2" -> 1L, "p3" -> 5L))
offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
Expand Down Expand Up @@ -1559,7 +1566,7 @@ class R2dbcTimestampOffsetStoreSpec
offsetStore3.deleteOldTimestampOffsets().futureValue shouldBe 17
}

"test window validation" in {
"validate timestamp of previous sequence number" in {
import R2dbcOffsetStore.Validation._

val projectionName = UUID.randomUUID().toString
Expand Down Expand Up @@ -1592,18 +1599,19 @@ class R2dbcTimestampOffsetStoreSpec
// scaled up to 4 projections, testing 512-767
val startOffset2 = TimestampOffset.toTimestampOffset(offsetStore2.readOffset().futureValue.get)
startOffset2.timestamp shouldBe time(2)
val latestTime = time(3)
offsetStore2.getState().startTimestamp shouldBe time(2)
val latestTime = time(10)
offsetStore2.saveOffset(OffsetPidSeqNr(TimestampOffset(latestTime, Map(p1 -> 2L)), p1, 2L)).futureValue
offsetStore2.getState().latestTimestamp shouldBe latestTime

// clock is used by TestTimestampSourceProvider.timestampOf for timestamp of previous seqNr.
// rejected if timestamp of previous seqNr within offset store time window
clock.setInstant(latestTime.minus(settings.timeWindow.minusSeconds(1)))
// rejected if timestamp of previous seqNr is after start timestamp minus backtracking window
clock.setInstant(startOffset2.timestamp.minus(settings.backtrackingWindow.minusSeconds(1)))
offsetStore2
.validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4")))
.futureValue shouldBe RejectedBacktrackingSeqNr
// accepted if timestamp of previous seqNr before offset store time window
clock.setInstant(latestTime.minus(settings.timeWindow.plusSeconds(1)))
// accepted if timestamp of previous seqNr is before start timestamp minus backtracking window
clock.setInstant(startOffset2.timestamp.minus(settings.timeWindow.plusSeconds(1)))
offsetStore2
.validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4")))
.futureValue shouldBe Accepted
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# internal
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.r2dbc.internal.R2dbcOffsetStore#State.copy")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.r2dbc.internal.R2dbcOffsetStore#State.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.r2dbc.internal.R2dbcOffsetStore#State.apply")
6 changes: 6 additions & 0 deletions akka-projection-r2dbc/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ akka.projection.r2dbc {
# within this time window from latest offset.
time-window = 5 minutes

# Backtracking window of the source (query). Should be equal to
# the akka.projection.r2dbc.offset-store.time-window that is used for the
# SourceProvider.
# It should not be larger than the akka.projection.r2dbc.offset-store.time-window.
backtracking-window = ${akka.persistence.r2dbc.query.backtracking.window}

# Keep this number of entries. Don't evict old entries until this threshold
# has been reached.
keep-number-of-entries = 10000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ object R2dbcProjectionSettings {
managementTable = config.getString("offset-store.management-table"),
useConnectionFactory = config.getString("use-connection-factory"),
timeWindow = config.getDuration("offset-store.time-window"),
backtrackingWindow = config.getDuration("offset-store.backtracking-window"),
keepNumberOfEntries = config.getInt("offset-store.keep-number-of-entries"),
evictInterval = config.getDuration("offset-store.evict-interval"),
deleteInterval,
Expand All @@ -82,6 +83,7 @@ final class R2dbcProjectionSettings private (
val managementTable: String,
val useConnectionFactory: String,
val timeWindow: JDuration,
val backtrackingWindow: JDuration,
val keepNumberOfEntries: Int,
val evictInterval: JDuration,
val deleteInterval: JDuration,
Expand Down Expand Up @@ -116,6 +118,12 @@ final class R2dbcProjectionSettings private (
def withTimeWindow(timeWindow: JDuration): R2dbcProjectionSettings =
copy(timeWindow = timeWindow)

def withBacktrackingWindow(backtrackingWindow: FiniteDuration): R2dbcProjectionSettings =
copy(backtrackingWindow = backtrackingWindow.toJava)

def withBacktrackingWindow(backtrackingWindow: JDuration): R2dbcProjectionSettings =
copy(backtrackingWindow = backtrackingWindow)

def withKeepNumberOfEntries(keepNumberOfEntries: Int): R2dbcProjectionSettings =
copy(keepNumberOfEntries = keepNumberOfEntries)

Expand Down Expand Up @@ -159,6 +167,7 @@ final class R2dbcProjectionSettings private (
managementTable: String = managementTable,
useConnectionFactory: String = useConnectionFactory,
timeWindow: JDuration = timeWindow,
backtrackingWindow: JDuration = backtrackingWindow,
keepNumberOfEntries: Int = keepNumberOfEntries,
evictInterval: JDuration = evictInterval,
deleteInterval: JDuration = deleteInterval,
Expand All @@ -174,6 +183,7 @@ final class R2dbcProjectionSettings private (
managementTable,
useConnectionFactory,
timeWindow,
backtrackingWindow,
keepNumberOfEntries,
evictInterval,
deleteInterval,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private[projection] object R2dbcOffsetStore {
final case class RecordWithProjectionKey(record: Record, projectionKey: String)

object State {
val empty: State = State(Map.empty, Vector.empty, Instant.EPOCH, 0)
val empty: State = State(Map.empty, Vector.empty, Instant.EPOCH, 0, Instant.EPOCH)

def apply(records: immutable.IndexedSeq[Record]): State = {
if (records.isEmpty) empty
Expand All @@ -69,7 +69,8 @@ private[projection] object R2dbcOffsetStore {
byPid: Map[Pid, Record],
latest: immutable.IndexedSeq[Record],
oldestTimestamp: Instant,
sizeAfterEvict: Int) {
sizeAfterEvict: Int,
startTimestamp: Instant) {

def size: Int = byPid.size

Expand Down Expand Up @@ -315,54 +316,69 @@ private[projection] class R2dbcOffsetStore(
private def readTimestampOffset(): Future[Option[TimestampOffset]] = {
idle.set(false)
val oldState = state.get()

dao.readTimestampOffset().map { recordsWithKey =>
val newState = State(recordsWithKey.map(_.record))
logger.debug(
"readTimestampOffset state with [{}] persistenceIds, oldest [{}], latest [{}]",
newState.byPid.size,
newState.oldestTimestamp,
newState.latestTimestamp)
if (!state.compareAndSet(oldState, newState))
throw new IllegalStateException("Unexpected concurrent modification of state from readOffset.")
clearInflight()
clearForeignOffsets()
clearLatestSeen()
if (newState == State.empty) {
None
} else if (moreThanOneProjectionKey(recordsWithKey)) {
// When downscaling projection instances (changing slice distribution) there
// is a possibility that one of the previous projection instances was further behind than the backtracking
// window, which would cause missed events if we started from latest. In that case we use the latest
// offset of the earliest slice range (distinct projection key).
val latestBySliceWithKey = recordsWithKey
.groupBy(_.record.slice)
.map {
case (_, records) => records.maxBy(_.record.timestamp)
}
.toVector
// Only needed if there's more than one projection key within the latest offsets by slice.
// To handle restarts after previous downscaling, and all latest are from the same instance.
if (moreThanOneProjectionKey(latestBySliceWithKey)) {
if (adoptingForeignOffsets) {
val foreignOffsets = latestBySliceWithKey
.filter(_.projectionKey != projectionId.key)
.sortBy(_.record.timestamp)
setForeignOffsets(foreignOffsets)
}
// Use the earliest of the latest from each projection instance (distinct projection key).
val latestByKey =
latestBySliceWithKey.groupBy(_.projectionKey).map {

val newState = State(recordsWithKey.map(_.record))

val startOffset =
if (newState == State.empty) {
None
} else if (moreThanOneProjectionKey(recordsWithKey)) {
// When downscaling projection instances (changing slice distribution) there
// is a possibility that one of the previous projection instances was further behind than the backtracking
// window, which would cause missed events if we started from latest. In that case we use the latest
// offset of the earliest slice range (distinct projection key).
val latestBySliceWithKey = recordsWithKey
.groupBy(_.record.slice)
.map {
case (_, records) => records.maxBy(_.record.timestamp)
}
val earliest = latestByKey.minBy(_.record.timestamp).record
// there could be other with same timestamp, but not important to reconstruct exactly the right `seen`
Some(TimestampOffset(earliest.timestamp, Map(earliest.pid -> earliest.seqNr)))
.toVector
// Only needed if there's more than one projection key within the latest offsets by slice.
// To handle restarts after previous downscaling, and all latest are from the same instance.
if (moreThanOneProjectionKey(latestBySliceWithKey)) {
if (adoptingForeignOffsets) {
val foreignOffsets = latestBySliceWithKey
.filter(_.projectionKey != projectionId.key)
.sortBy(_.record.timestamp)
setForeignOffsets(foreignOffsets)
}
// Use the earliest of the latest from each projection instance (distinct projection key).
val latestByKey =
latestBySliceWithKey.groupBy(_.projectionKey).map {
case (_, records) => records.maxBy(_.record.timestamp)
}
val earliest = latestByKey.minBy(_.record.timestamp).record
// there could be other with same timestamp, but not important to reconstruct exactly the right `seen`
Some(TimestampOffset(earliest.timestamp, Map(earliest.pid -> earliest.seqNr)))
} else {
newState.latestOffset
}
} else {
newState.latestOffset
}
} else {
newState.latestOffset

logger.debug(
"readTimestampOffset state with [{}] persistenceIds, oldest [{}], latest [{}], start offset [{}]",
newState.byPid.size,
newState.oldestTimestamp,
newState.latestTimestamp,
startOffset)

val startTimestamp = startOffset match {
case None => clock.instant()
case Some(offset) => offset.timestamp
}
val newStateWithStartOffset = newState.copy(startTimestamp = startTimestamp)

if (!state.compareAndSet(oldState, newStateWithStartOffset))
throw new IllegalStateException("Unexpected concurrent modification of state from readOffset.")

startOffset
}
}

Expand Down Expand Up @@ -701,56 +717,55 @@ private[projection] class R2dbcOffsetStore(
val pid = recordWithOffset.record.pid
val seqNr = recordWithOffset.record.seqNr

def logUnknown(previousTimestamp: Instant): Unit = {
if (recordWithOffset.fromPubSub) {
logger.debug(
"Rejecting pub-sub envelope, unknown sequence number [{}] for pid [{}] (might be accepted later): {}",
seqNr,
pid,
recordWithOffset.offset)
} else if (!recordWithOffset.fromBacktracking) {
// This may happen rather frequently when using `publish-events`, after reconnecting and such.
logger.debug(
"Rejecting unknown sequence number [{}] for pid [{}] (might be accepted later): {}",
seqNr,
pid,
recordWithOffset.offset)
} else {
logger.warn(
"Rejecting unknown sequence number [{}] for pid [{}]. Offset: {}, Previous timestamp [{}]",
seqNr,
pid,
recordWithOffset.offset,
previousTimestamp)
}
}

// Haven't see seen this pid within the time window. Since events can be missed
// when read at the tail we will only accept it if the event with previous seqNr has timestamp
// before the time window of the offset store.
// Backtracking will emit missed event again.
// before the startTimestamp minus backtracking window
timestampOf(pid, seqNr - 1).map {
case Some(previousTimestamp) =>
val before = currentState.latestTimestamp.minus(settings.timeWindow)
if (previousTimestamp.isBefore(before)) {
val acceptBefore = currentState.startTimestamp.minus(settings.backtrackingWindow)

if (previousTimestamp.isBefore(acceptBefore)) {
logger.debug(
"Accepting envelope with pid [{}], seqNr [{}], where previous event timestamp [{}] " +
"is before time window [{}].",
"is before start timestamp [{}] minus backtracking window [{}].",
pid,
seqNr,
previousTimestamp,
before)
currentState.startTimestamp,
settings.backtrackingWindow)
Accepted
} else if (!recordWithOffset.fromBacktracking) {
logUnknown(previousTimestamp)
} else if (recordWithOffset.fromPubSub) {
logger.debug(
"Rejecting pub-sub envelope, unknown sequence number [{}] for pid [{}] (might be accepted later): {}",
seqNr,
pid,
recordWithOffset.offset)
RejectedSeqNr
} else {
logUnknown(previousTimestamp)
} else if (recordWithOffset.fromBacktracking) {
// This will result in projection restart (with normal configuration)
logger.warn(
"Rejecting unknown sequence number [{}] for pid [{}]. Offset: {}, where previous event timestamp [{}] " +
"is after start timestamp [{}] minus backtracking window [{}].",
seqNr,
pid,
recordWithOffset.offset,
previousTimestamp,
currentState.startTimestamp,
settings.backtrackingWindow)
RejectedBacktrackingSeqNr
} else {
// This may happen rather frequently when using `publish-events`, after reconnecting and such.
logger.debug(
"Rejecting unknown sequence number [{}] for pid [{}] (might be accepted later): {}",
seqNr,
pid,
recordWithOffset.offset)
// Backtracking will emit missed event again.
RejectedSeqNr
}
case None =>
// previous not found, could have been deleted
logger.debug("Accepting envelope with pid [{}], seqNr [{}], where previous event not found.", pid, seqNr)
Accepted
}
}
Expand Down
2 changes: 1 addition & 1 deletion docs/src/main/paradox/r2dbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -319,4 +319,4 @@ Scala
: @@snip [Example.scala](/akka-projection-r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala){#customConnectionFactory}

Java
: @@snip [Example.java](/akka-projection-r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java){#customConnectionFactory}
: @@snip [Example.java](/akka-projection-r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java){#customConnectionFactory}

0 comments on commit fa3ecfc

Please sign in to comment.