Skip to content

Commit

Permalink
test of existing validation
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Nov 25, 2024
1 parent a35d083 commit b22a6d6
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1559,5 +1559,56 @@ class R2dbcTimestampOffsetStoreSpec
offsetStore3.deleteOldTimestampOffsets().futureValue shouldBe 17
}

"test window validation" in {
import R2dbcOffsetStore.Validation._

val projectionName = UUID.randomUUID().toString

def offsetStore(minSlice: Int, maxSlice: Int) =
new R2dbcOffsetStore(
ProjectionId(projectionName, s"$minSlice-$maxSlice"),
Some(new TestTimestampSourceProvider(minSlice, maxSlice, clock)),
system,
settings,
r2dbcExecutor)

// one projection at lower scale
val offsetStore1 = offsetStore(512, 1023)

// two projections at higher scale
val offsetStore2 = offsetStore(512, 767)

val p1 = "p-0960" // slice 576
val p2 = "p-6009" // slice 640
val p3 = "p-3039" // slice 832

val t0 = clock.instant().minusSeconds(100)
def time(step: Int) = t0.plusSeconds(step)

// starting with 2 projections, testing 512-1023
offsetStore1.saveOffset(OffsetPidSeqNr(TimestampOffset(time(2), Map(p1 -> 1L)), p1, 1L)).futureValue
offsetStore1.saveOffset(OffsetPidSeqNr(TimestampOffset(time(100), Map(p3 -> 1L)), p3, 1L)).futureValue

// scaled up to 4 projections, testing 512-767
val startOffset2 = TimestampOffset.toTimestampOffset(offsetStore2.readOffset().futureValue.get)
startOffset2.timestamp shouldBe time(2)
val latestTime = time(3)
offsetStore2.saveOffset(OffsetPidSeqNr(TimestampOffset(latestTime, Map(p1 -> 2L)), p1, 2L)).futureValue
offsetStore2.getState().latestTimestamp shouldBe latestTime

// clock is used by TestTimestampSourceProvider.timestampOf for timestamp of previous seqNr.
// rejected if timestamp of previous seqNr within offset store time window
clock.setInstant(latestTime.minus(settings.timeWindow.minusSeconds(1)))
offsetStore2
.validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4")))
.futureValue shouldBe RejectedBacktrackingSeqNr
// accepted if timestamp of previous seqNr before offset store time window
clock.setInstant(latestTime.minus(settings.timeWindow.plusSeconds(1)))
offsetStore2
.validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4")))
.futureValue shouldBe Accepted

}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ private[projection] class R2dbcOffsetStore(
val pid = recordWithOffset.record.pid
val seqNr = recordWithOffset.record.seqNr

def logUnknown(): Unit = {
def logUnknown(previousTimestamp: Instant): Unit = {
if (recordWithOffset.fromPubSub) {
logger.debug(
"Rejecting pub-sub envelope, unknown sequence number [{}] for pid [{}] (might be accepted later): {}",
Expand All @@ -717,10 +717,11 @@ private[projection] class R2dbcOffsetStore(
recordWithOffset.offset)
} else {
logger.warn(
"Rejecting unknown sequence number [{}] for pid [{}]. Offset: {}",
"Rejecting unknown sequence number [{}] for pid [{}]. Offset: {}, Previous timestamp [{}]",
seqNr,
pid,
recordWithOffset.offset)
recordWithOffset.offset,
previousTimestamp)
}
}

Expand All @@ -741,10 +742,10 @@ private[projection] class R2dbcOffsetStore(
before)
Accepted
} else if (!recordWithOffset.fromBacktracking) {
logUnknown()
logUnknown(previousTimestamp)
RejectedSeqNr
} else {
logUnknown()
logUnknown(previousTimestamp)
// This will result in projection restart (with normal configuration)
RejectedBacktrackingSeqNr
}
Expand Down

0 comments on commit b22a6d6

Please sign in to comment.