From fa3ecfc73fbd8a0b06287f7e42ae4b060bdd70f8 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 15 Nov 2024 16:08:51 +0100 Subject: [PATCH] fix: Timestamp validation of previous seqNr * base the validaiton time window for the previous sequence number on start timestamp and backtracking window --- .../r2dbc/R2dbcTimestampOffsetStoreSpec.scala | 22 ++- .../r2dbc-offset-store.excludes | 4 + .../src/main/resources/reference.conf | 6 + .../r2dbc/R2dbcProjectionSettings.scala | 10 ++ .../r2dbc/internal/R2dbcOffsetStore.scala | 163 ++++++++++-------- docs/src/main/paradox/r2dbc.md | 2 +- 6 files changed, 125 insertions(+), 82 deletions(-) create mode 100644 akka-projection-r2dbc/src/main/mima-filters/1.6.3.backwards.excludes/r2dbc-offset-store.excludes diff --git a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala index 232620929..b22201013 100644 --- a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala +++ b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala @@ -73,13 +73,15 @@ class R2dbcTimestampOffsetStoreSpec private def createOffsetStore( projectionId: ProjectionId, customSettings: R2dbcProjectionSettings = settings, + offsetStoreClock: TestClock = clock, eventTimestampQueryClock: TestClock = clock) = new R2dbcOffsetStore( projectionId, Some(new TestTimestampSourceProvider(0, persistenceExt.numberOfSlices - 1, eventTimestampQueryClock)), system, customSettings, - r2dbcExecutor) + r2dbcExecutor, + offsetStoreClock) def createEnvelope(pid: Pid, seqNr: SeqNr, timestamp: Instant, event: String): EventEnvelope[String] = { val entityType = PersistenceId.extractEntityType(pid) @@ -455,6 +457,11 @@ class R2dbcTimestampOffsetStoreSpec val eventTimestampQueryClock = TestClock.nowMicros() val offsetStore = createOffsetStore(projectionId, eventTimestampQueryClock = eventTimestampQueryClock) + // some validation require the startTimestamp, which is set from readOffset + offsetStore.getState().startTimestamp shouldBe Instant.EPOCH + offsetStore.readOffset().futureValue + offsetStore.getState().startTimestamp shouldBe clock.instant() + val startTime = TestClock.nowMicros().instant() val offset1 = TimestampOffset(startTime, Map("p1" -> 3L, "p2" -> 1L, "p3" -> 5L)) offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue @@ -1559,7 +1566,7 @@ class R2dbcTimestampOffsetStoreSpec offsetStore3.deleteOldTimestampOffsets().futureValue shouldBe 17 } - "test window validation" in { + "validate timestamp of previous sequence number" in { import R2dbcOffsetStore.Validation._ val projectionName = UUID.randomUUID().toString @@ -1592,18 +1599,19 @@ class R2dbcTimestampOffsetStoreSpec // scaled up to 4 projections, testing 512-767 val startOffset2 = TimestampOffset.toTimestampOffset(offsetStore2.readOffset().futureValue.get) startOffset2.timestamp shouldBe time(2) - val latestTime = time(3) + offsetStore2.getState().startTimestamp shouldBe time(2) + val latestTime = time(10) offsetStore2.saveOffset(OffsetPidSeqNr(TimestampOffset(latestTime, Map(p1 -> 2L)), p1, 2L)).futureValue offsetStore2.getState().latestTimestamp shouldBe latestTime // clock is used by TestTimestampSourceProvider.timestampOf for timestamp of previous seqNr. - // rejected if timestamp of previous seqNr within offset store time window - clock.setInstant(latestTime.minus(settings.timeWindow.minusSeconds(1))) + // rejected if timestamp of previous seqNr is after start timestamp minus backtracking window + clock.setInstant(startOffset2.timestamp.minus(settings.backtrackingWindow.minusSeconds(1))) offsetStore2 .validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4"))) .futureValue shouldBe RejectedBacktrackingSeqNr - // accepted if timestamp of previous seqNr before offset store time window - clock.setInstant(latestTime.minus(settings.timeWindow.plusSeconds(1))) + // accepted if timestamp of previous seqNr is before start timestamp minus backtracking window + clock.setInstant(startOffset2.timestamp.minus(settings.timeWindow.plusSeconds(1))) offsetStore2 .validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4"))) .futureValue shouldBe Accepted diff --git a/akka-projection-r2dbc/src/main/mima-filters/1.6.3.backwards.excludes/r2dbc-offset-store.excludes b/akka-projection-r2dbc/src/main/mima-filters/1.6.3.backwards.excludes/r2dbc-offset-store.excludes new file mode 100644 index 000000000..68c5c72e9 --- /dev/null +++ b/akka-projection-r2dbc/src/main/mima-filters/1.6.3.backwards.excludes/r2dbc-offset-store.excludes @@ -0,0 +1,4 @@ +# internal +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.r2dbc.internal.R2dbcOffsetStore#State.copy") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.r2dbc.internal.R2dbcOffsetStore#State.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.r2dbc.internal.R2dbcOffsetStore#State.apply") diff --git a/akka-projection-r2dbc/src/main/resources/reference.conf b/akka-projection-r2dbc/src/main/resources/reference.conf index cdb91a62a..e87a15504 100644 --- a/akka-projection-r2dbc/src/main/resources/reference.conf +++ b/akka-projection-r2dbc/src/main/resources/reference.conf @@ -21,6 +21,12 @@ akka.projection.r2dbc { # within this time window from latest offset. time-window = 5 minutes + # Backtracking window of the source (query). Should be equal to + # the akka.projection.r2dbc.offset-store.time-window that is used for the + # SourceProvider. + # It should not be larger than the akka.projection.r2dbc.offset-store.time-window. + backtracking-window = ${akka.persistence.r2dbc.query.backtracking.window} + # Keep this number of entries. Don't evict old entries until this threshold # has been reached. keep-number-of-entries = 10000 diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/R2dbcProjectionSettings.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/R2dbcProjectionSettings.scala index ed88f31a8..d4403af21 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/R2dbcProjectionSettings.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/R2dbcProjectionSettings.scala @@ -57,6 +57,7 @@ object R2dbcProjectionSettings { managementTable = config.getString("offset-store.management-table"), useConnectionFactory = config.getString("use-connection-factory"), timeWindow = config.getDuration("offset-store.time-window"), + backtrackingWindow = config.getDuration("offset-store.backtracking-window"), keepNumberOfEntries = config.getInt("offset-store.keep-number-of-entries"), evictInterval = config.getDuration("offset-store.evict-interval"), deleteInterval, @@ -82,6 +83,7 @@ final class R2dbcProjectionSettings private ( val managementTable: String, val useConnectionFactory: String, val timeWindow: JDuration, + val backtrackingWindow: JDuration, val keepNumberOfEntries: Int, val evictInterval: JDuration, val deleteInterval: JDuration, @@ -116,6 +118,12 @@ final class R2dbcProjectionSettings private ( def withTimeWindow(timeWindow: JDuration): R2dbcProjectionSettings = copy(timeWindow = timeWindow) + def withBacktrackingWindow(backtrackingWindow: FiniteDuration): R2dbcProjectionSettings = + copy(backtrackingWindow = backtrackingWindow.toJava) + + def withBacktrackingWindow(backtrackingWindow: JDuration): R2dbcProjectionSettings = + copy(backtrackingWindow = backtrackingWindow) + def withKeepNumberOfEntries(keepNumberOfEntries: Int): R2dbcProjectionSettings = copy(keepNumberOfEntries = keepNumberOfEntries) @@ -159,6 +167,7 @@ final class R2dbcProjectionSettings private ( managementTable: String = managementTable, useConnectionFactory: String = useConnectionFactory, timeWindow: JDuration = timeWindow, + backtrackingWindow: JDuration = backtrackingWindow, keepNumberOfEntries: Int = keepNumberOfEntries, evictInterval: JDuration = evictInterval, deleteInterval: JDuration = deleteInterval, @@ -174,6 +183,7 @@ final class R2dbcProjectionSettings private ( managementTable, useConnectionFactory, timeWindow, + backtrackingWindow, keepNumberOfEntries, evictInterval, deleteInterval, diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala index 520b1cec5..cb69c93bc 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala @@ -57,7 +57,7 @@ private[projection] object R2dbcOffsetStore { final case class RecordWithProjectionKey(record: Record, projectionKey: String) object State { - val empty: State = State(Map.empty, Vector.empty, Instant.EPOCH, 0) + val empty: State = State(Map.empty, Vector.empty, Instant.EPOCH, 0, Instant.EPOCH) def apply(records: immutable.IndexedSeq[Record]): State = { if (records.isEmpty) empty @@ -69,7 +69,8 @@ private[projection] object R2dbcOffsetStore { byPid: Map[Pid, Record], latest: immutable.IndexedSeq[Record], oldestTimestamp: Instant, - sizeAfterEvict: Int) { + sizeAfterEvict: Int, + startTimestamp: Instant) { def size: Int = byPid.size @@ -315,54 +316,69 @@ private[projection] class R2dbcOffsetStore( private def readTimestampOffset(): Future[Option[TimestampOffset]] = { idle.set(false) val oldState = state.get() + dao.readTimestampOffset().map { recordsWithKey => - val newState = State(recordsWithKey.map(_.record)) - logger.debug( - "readTimestampOffset state with [{}] persistenceIds, oldest [{}], latest [{}]", - newState.byPid.size, - newState.oldestTimestamp, - newState.latestTimestamp) - if (!state.compareAndSet(oldState, newState)) - throw new IllegalStateException("Unexpected concurrent modification of state from readOffset.") clearInflight() clearForeignOffsets() clearLatestSeen() - if (newState == State.empty) { - None - } else if (moreThanOneProjectionKey(recordsWithKey)) { - // When downscaling projection instances (changing slice distribution) there - // is a possibility that one of the previous projection instances was further behind than the backtracking - // window, which would cause missed events if we started from latest. In that case we use the latest - // offset of the earliest slice range (distinct projection key). - val latestBySliceWithKey = recordsWithKey - .groupBy(_.record.slice) - .map { - case (_, records) => records.maxBy(_.record.timestamp) - } - .toVector - // Only needed if there's more than one projection key within the latest offsets by slice. - // To handle restarts after previous downscaling, and all latest are from the same instance. - if (moreThanOneProjectionKey(latestBySliceWithKey)) { - if (adoptingForeignOffsets) { - val foreignOffsets = latestBySliceWithKey - .filter(_.projectionKey != projectionId.key) - .sortBy(_.record.timestamp) - setForeignOffsets(foreignOffsets) - } - // Use the earliest of the latest from each projection instance (distinct projection key). - val latestByKey = - latestBySliceWithKey.groupBy(_.projectionKey).map { + + val newState = State(recordsWithKey.map(_.record)) + + val startOffset = + if (newState == State.empty) { + None + } else if (moreThanOneProjectionKey(recordsWithKey)) { + // When downscaling projection instances (changing slice distribution) there + // is a possibility that one of the previous projection instances was further behind than the backtracking + // window, which would cause missed events if we started from latest. In that case we use the latest + // offset of the earliest slice range (distinct projection key). + val latestBySliceWithKey = recordsWithKey + .groupBy(_.record.slice) + .map { case (_, records) => records.maxBy(_.record.timestamp) } - val earliest = latestByKey.minBy(_.record.timestamp).record - // there could be other with same timestamp, but not important to reconstruct exactly the right `seen` - Some(TimestampOffset(earliest.timestamp, Map(earliest.pid -> earliest.seqNr))) + .toVector + // Only needed if there's more than one projection key within the latest offsets by slice. + // To handle restarts after previous downscaling, and all latest are from the same instance. + if (moreThanOneProjectionKey(latestBySliceWithKey)) { + if (adoptingForeignOffsets) { + val foreignOffsets = latestBySliceWithKey + .filter(_.projectionKey != projectionId.key) + .sortBy(_.record.timestamp) + setForeignOffsets(foreignOffsets) + } + // Use the earliest of the latest from each projection instance (distinct projection key). + val latestByKey = + latestBySliceWithKey.groupBy(_.projectionKey).map { + case (_, records) => records.maxBy(_.record.timestamp) + } + val earliest = latestByKey.minBy(_.record.timestamp).record + // there could be other with same timestamp, but not important to reconstruct exactly the right `seen` + Some(TimestampOffset(earliest.timestamp, Map(earliest.pid -> earliest.seqNr))) + } else { + newState.latestOffset + } } else { newState.latestOffset } - } else { - newState.latestOffset + + logger.debug( + "readTimestampOffset state with [{}] persistenceIds, oldest [{}], latest [{}], start offset [{}]", + newState.byPid.size, + newState.oldestTimestamp, + newState.latestTimestamp, + startOffset) + + val startTimestamp = startOffset match { + case None => clock.instant() + case Some(offset) => offset.timestamp } + val newStateWithStartOffset = newState.copy(startTimestamp = startTimestamp) + + if (!state.compareAndSet(oldState, newStateWithStartOffset)) + throw new IllegalStateException("Unexpected concurrent modification of state from readOffset.") + + startOffset } } @@ -701,56 +717,55 @@ private[projection] class R2dbcOffsetStore( val pid = recordWithOffset.record.pid val seqNr = recordWithOffset.record.seqNr - def logUnknown(previousTimestamp: Instant): Unit = { - if (recordWithOffset.fromPubSub) { - logger.debug( - "Rejecting pub-sub envelope, unknown sequence number [{}] for pid [{}] (might be accepted later): {}", - seqNr, - pid, - recordWithOffset.offset) - } else if (!recordWithOffset.fromBacktracking) { - // This may happen rather frequently when using `publish-events`, after reconnecting and such. - logger.debug( - "Rejecting unknown sequence number [{}] for pid [{}] (might be accepted later): {}", - seqNr, - pid, - recordWithOffset.offset) - } else { - logger.warn( - "Rejecting unknown sequence number [{}] for pid [{}]. Offset: {}, Previous timestamp [{}]", - seqNr, - pid, - recordWithOffset.offset, - previousTimestamp) - } - } - // Haven't see seen this pid within the time window. Since events can be missed // when read at the tail we will only accept it if the event with previous seqNr has timestamp - // before the time window of the offset store. - // Backtracking will emit missed event again. + // before the startTimestamp minus backtracking window timestampOf(pid, seqNr - 1).map { case Some(previousTimestamp) => - val before = currentState.latestTimestamp.minus(settings.timeWindow) - if (previousTimestamp.isBefore(before)) { + val acceptBefore = currentState.startTimestamp.minus(settings.backtrackingWindow) + + if (previousTimestamp.isBefore(acceptBefore)) { logger.debug( "Accepting envelope with pid [{}], seqNr [{}], where previous event timestamp [{}] " + - "is before time window [{}].", + "is before start timestamp [{}] minus backtracking window [{}].", pid, seqNr, previousTimestamp, - before) + currentState.startTimestamp, + settings.backtrackingWindow) Accepted - } else if (!recordWithOffset.fromBacktracking) { - logUnknown(previousTimestamp) + } else if (recordWithOffset.fromPubSub) { + logger.debug( + "Rejecting pub-sub envelope, unknown sequence number [{}] for pid [{}] (might be accepted later): {}", + seqNr, + pid, + recordWithOffset.offset) RejectedSeqNr - } else { - logUnknown(previousTimestamp) + } else if (recordWithOffset.fromBacktracking) { // This will result in projection restart (with normal configuration) + logger.warn( + "Rejecting unknown sequence number [{}] for pid [{}]. Offset: {}, where previous event timestamp [{}] " + + "is after start timestamp [{}] minus backtracking window [{}].", + seqNr, + pid, + recordWithOffset.offset, + previousTimestamp, + currentState.startTimestamp, + settings.backtrackingWindow) RejectedBacktrackingSeqNr + } else { + // This may happen rather frequently when using `publish-events`, after reconnecting and such. + logger.debug( + "Rejecting unknown sequence number [{}] for pid [{}] (might be accepted later): {}", + seqNr, + pid, + recordWithOffset.offset) + // Backtracking will emit missed event again. + RejectedSeqNr } case None => // previous not found, could have been deleted + logger.debug("Accepting envelope with pid [{}], seqNr [{}], where previous event not found.", pid, seqNr) Accepted } } diff --git a/docs/src/main/paradox/r2dbc.md b/docs/src/main/paradox/r2dbc.md index b6566e7aa..5acae383c 100644 --- a/docs/src/main/paradox/r2dbc.md +++ b/docs/src/main/paradox/r2dbc.md @@ -319,4 +319,4 @@ Scala : @@snip [Example.scala](/akka-projection-r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala){#customConnectionFactory} Java -: @@snip [Example.java](/akka-projection-r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java){#customConnectionFactory} \ No newline at end of file +: @@snip [Example.java](/akka-projection-r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java){#customConnectionFactory}