From 7d4bdd4f2bd5f6eef52ddf9865caf7403357cc96 Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Mon, 27 May 2024 15:39:46 +0800 Subject: [PATCH 1/5] force valid in limit windowing --- .../journal/dao/BaseJournalDaoWithReadMessages.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala index 629ac0c4..5caea607 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala @@ -51,11 +51,16 @@ trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages { toSequenceNr: Long, batchSize: Int, refreshInterval: Option[(FiniteDuration, Scheduler)]) = { + val initializedQueryState: Long = Math.max(1, fromSequenceNr) Source - .unfoldAsync[(Long, FlowControl), Seq[Try[(PersistentRepr, Long)]]]((Math.max(1, fromSequenceNr), Continue)) { + .unfoldAsync[(Long, FlowControl), Seq[Try[(PersistentRepr, Long)]]]((initializedQueryState, Continue)) { case (from, control) => def limitWindow(from: Long): Long = { - math.min(from + batchSize, toSequenceNr) + if (batchSize <= 0 || (Long.MaxValue - batchSize) < from) { + toSequenceNr + } else { + Math.min(from + batchSize, toSequenceNr) + } } def retrieveNextBatch(): Future[Option[((Long, FlowControl), Seq[Try[(PersistentRepr, Long)]])]] = { From a40d4fbe4d12fbe60bfb028cc8e6a9c60af613c3 Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Mon, 27 May 2024 16:46:04 +0800 Subject: [PATCH 2/5] fix snapshoted events case --- .../dao/BaseJournalDaoWithReadMessages.scala | 16 +++++--- .../jdbc/journal/dao/FlowControl.scala | 5 +++ .../LimitWindowingStreamTest.scala | 39 +++++++++++++++++++ 3 files changed, 54 insertions(+), 6 deletions(-) create mode 100644 integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/LimitWindowingStreamTest.scala diff --git a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala index 5caea607..ef07b223 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala @@ -19,7 +19,7 @@ import pekko.NotUsed import pekko.actor.Scheduler import pekko.annotation.InternalApi import pekko.persistence.PersistentRepr -import pekko.persistence.jdbc.journal.dao.FlowControl.{ Continue, ContinueDelayed, Stop } +import pekko.persistence.jdbc.journal.dao.FlowControl.{ Continue, ContinueDelayed, Fallback, Stop } import pekko.stream.Materializer import pekko.stream.scaladsl.{ Sink, Source } @@ -51,22 +51,24 @@ trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages { toSequenceNr: Long, batchSize: Int, refreshInterval: Option[(FiniteDuration, Scheduler)]) = { - val initializedQueryState: Long = Math.max(1, fromSequenceNr) + val firstSequenceVer: Long = Math.max(1, fromSequenceNr) Source - .unfoldAsync[(Long, FlowControl), Seq[Try[(PersistentRepr, Long)]]]((initializedQueryState, Continue)) { + .unfoldAsync[(Long, FlowControl), Seq[Try[(PersistentRepr, Long)]]]((firstSequenceVer, Continue)) { case (from, control) => - def limitWindow(from: Long): Long = { - if (batchSize <= 0 || (Long.MaxValue - batchSize) < from) { + def limitWindow(from: Long)(implicit fallback: Boolean): Long = { + if (fallback || batchSize <= 0 || (Long.MaxValue - batchSize) < from) { toSequenceNr } else { Math.min(from + batchSize, toSequenceNr) } } - def retrieveNextBatch(): Future[Option[((Long, FlowControl), Seq[Try[(PersistentRepr, Long)]])]] = { + def retrieveNextBatch(implicit fallback: Boolean = false) + : Future[Option[((Long, FlowControl), Seq[Try[(PersistentRepr, Long)]])]] = { for { xs <- messages(persistenceId, from, limitWindow(from), batchSize).runWith(Sink.seq) } yield { + val isEmptyEvents = xs.isEmpty val hasMoreEvents = xs.size == batchSize // Events are ordered by sequence number, therefore the last one is the largest) val lastSeqNrInBatch: Option[Long] = xs.lastOption match { @@ -78,6 +80,7 @@ trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages { val nextControl: FlowControl = if (hasLastEvent || from > toSequenceNr) Stop else if (hasMoreEvents) Continue + else if (isEmptyEvents && from == firstSequenceVer) Fallback else if (refreshInterval.isEmpty) Stop else ContinueDelayed @@ -96,6 +99,7 @@ trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages { case ContinueDelayed => val (delay, scheduler) = refreshInterval.get pekko.pattern.after(delay, scheduler)(retrieveNextBatch()) + case Fallback => retrieveNextBatch(fallback = true) } } } diff --git a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/FlowControl.scala b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/FlowControl.scala index dac08875..088addc2 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/FlowControl.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/FlowControl.scala @@ -27,6 +27,11 @@ private[jdbc] object FlowControl { */ case object ContinueDelayed extends FlowControl + /** + * if the limited windows unable query anything, then fallback to full windows. + */ + case object Fallback extends FlowControl + /** Stop querying - used when we reach the desired offset */ case object Stop extends FlowControl } diff --git a/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/LimitWindowingStreamTest.scala b/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/LimitWindowingStreamTest.scala new file mode 100644 index 00000000..6ffcbfbc --- /dev/null +++ b/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/LimitWindowingStreamTest.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pekko.persistence.jdbc.integration + +import org.apache.pekko.persistence.jdbc.journal.dao.LimitWindowingStreamTest +import org.apache.pekko.persistence.jdbc.query.{ MysqlCleaner, OracleCleaner, PostgresCleaner, SqlServerCleaner } + +class PostgresLimitWindowingStreamTest + extends LimitWindowingStreamTest("postgres-application.conf") + with PostgresCleaner + +class MySQLLimitWindowingStreamTest + extends LimitWindowingStreamTest("mysql-application.conf") + with MysqlCleaner + +class OracleLimitWindowingStreamTest + extends LimitWindowingStreamTest("oracle-application.conf") + with OracleCleaner + +class SqlServerLimitWindowingStreamTest + extends LimitWindowingStreamTest("sqlserver-application.conf") + with SqlServerCleaner From 114c25df547f5773467ef3ac0608c195e9623698 Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Mon, 27 May 2024 17:08:44 +0800 Subject: [PATCH 3/5] use full-windows for the first query --- .../journal/dao/BaseJournalDaoWithReadMessages.scala | 12 ++++-------- .../persistence/jdbc/journal/dao/FlowControl.scala | 5 ----- 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala index ef07b223..6683e4e5 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala @@ -19,7 +19,7 @@ import pekko.NotUsed import pekko.actor.Scheduler import pekko.annotation.InternalApi import pekko.persistence.PersistentRepr -import pekko.persistence.jdbc.journal.dao.FlowControl.{ Continue, ContinueDelayed, Fallback, Stop } +import pekko.persistence.jdbc.journal.dao.FlowControl.{ Continue, ContinueDelayed, Stop } import pekko.stream.Materializer import pekko.stream.scaladsl.{ Sink, Source } @@ -55,20 +55,18 @@ trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages { Source .unfoldAsync[(Long, FlowControl), Seq[Try[(PersistentRepr, Long)]]]((firstSequenceVer, Continue)) { case (from, control) => - def limitWindow(from: Long)(implicit fallback: Boolean): Long = { - if (fallback || batchSize <= 0 || (Long.MaxValue - batchSize) < from) { + def limitWindow(from: Long): Long = { + if (from == firstSequenceVer || batchSize <= 0 || (Long.MaxValue - batchSize) < from) { toSequenceNr } else { Math.min(from + batchSize, toSequenceNr) } } - def retrieveNextBatch(implicit fallback: Boolean = false) - : Future[Option[((Long, FlowControl), Seq[Try[(PersistentRepr, Long)]])]] = { + def retrieveNextBatch(): Future[Option[((Long, FlowControl), Seq[Try[(PersistentRepr, Long)]])]] = { for { xs <- messages(persistenceId, from, limitWindow(from), batchSize).runWith(Sink.seq) } yield { - val isEmptyEvents = xs.isEmpty val hasMoreEvents = xs.size == batchSize // Events are ordered by sequence number, therefore the last one is the largest) val lastSeqNrInBatch: Option[Long] = xs.lastOption match { @@ -80,7 +78,6 @@ trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages { val nextControl: FlowControl = if (hasLastEvent || from > toSequenceNr) Stop else if (hasMoreEvents) Continue - else if (isEmptyEvents && from == firstSequenceVer) Fallback else if (refreshInterval.isEmpty) Stop else ContinueDelayed @@ -99,7 +96,6 @@ trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages { case ContinueDelayed => val (delay, scheduler) = refreshInterval.get pekko.pattern.after(delay, scheduler)(retrieveNextBatch()) - case Fallback => retrieveNextBatch(fallback = true) } } } diff --git a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/FlowControl.scala b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/FlowControl.scala index 088addc2..dac08875 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/FlowControl.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/FlowControl.scala @@ -27,11 +27,6 @@ private[jdbc] object FlowControl { */ case object ContinueDelayed extends FlowControl - /** - * if the limited windows unable query anything, then fallback to full windows. - */ - case object Fallback extends FlowControl - /** Stop querying - used when we reach the desired offset */ case object Stop extends FlowControl } From 08f3b4d1999d4dca56f865c969675f97f554aed7 Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Mon, 27 May 2024 18:38:09 +0800 Subject: [PATCH 4/5] fix oracle insert issue --- .../journal/dao/BaseJournalDaoWithReadMessages.scala | 6 +++--- .../jdbc/journal/dao/LimitWindowingStreamTest.scala | 9 +++++---- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala index 6683e4e5..30f35697 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala @@ -51,12 +51,12 @@ trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages { toSequenceNr: Long, batchSize: Int, refreshInterval: Option[(FiniteDuration, Scheduler)]) = { - val firstSequenceVer: Long = Math.max(1, fromSequenceNr) + val firstSequenceNr: Long = Math.max(1, fromSequenceNr) Source - .unfoldAsync[(Long, FlowControl), Seq[Try[(PersistentRepr, Long)]]]((firstSequenceVer, Continue)) { + .unfoldAsync[(Long, FlowControl), Seq[Try[(PersistentRepr, Long)]]]((firstSequenceNr, Continue)) { case (from, control) => def limitWindow(from: Long): Long = { - if (from == firstSequenceVer || batchSize <= 0 || (Long.MaxValue - batchSize) < from) { + if (from == firstSequenceNr || batchSize <= 0 || (Long.MaxValue - batchSize) < from) { toSequenceNr } else { Math.min(from + batchSize, toSequenceNr) diff --git a/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala b/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala index e9e8ccdb..be2679fc 100644 --- a/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala +++ b/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala @@ -50,7 +50,8 @@ abstract class LimitWindowingStreamTest(configFile: String) implicit val mat: Materializer = SystemMaterializer(system).materializer val persistenceId = UUID.randomUUID().toString - val payload = 'a'.toByte + val writerUuid = UUID.randomUUID().toString + val payload = Array.fill(16)('a'.toByte) val eventsPerBatch = 1000 val numberOfInsertBatches = 16 val totalMessages = numberOfInsertBatches * eventsPerBatch @@ -58,14 +59,14 @@ abstract class LimitWindowingStreamTest(configFile: String) withDao { dao => val lastInsert = Source - .fromIterator(() => (1 to numberOfInsertBatches).toIterator) + .fromIterator(() => (1 to numberOfInsertBatches).iterator) .mapAsync(1) { i => val end = i * eventsPerBatch val start = end - (eventsPerBatch - 1) - log.info(s"batch $i (events from $start to $end") + log.info(s"batch $i - events from $start to $end") val atomicWrites = (start to end).map { j => - AtomicWrite(immutable.Seq(PersistentRepr(payload, j, persistenceId))) + AtomicWrite(immutable.Seq(PersistentRepr(payload, j, persistenceId, writerUuid = writerUuid))) } dao.asyncWriteMessages(atomicWrites).map(_ => i) } From 72483a060e6da8c0bab95ae3bf942e3ab2dec550 Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Mon, 27 May 2024 21:58:53 +0800 Subject: [PATCH 5/5] add tests to verify --- .../CurrentEventsByPersistenceIdTest.scala | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/CurrentEventsByPersistenceIdTest.scala b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/CurrentEventsByPersistenceIdTest.scala index 11242419..efc45ca4 100644 --- a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/CurrentEventsByPersistenceIdTest.scala +++ b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/CurrentEventsByPersistenceIdTest.scala @@ -22,6 +22,8 @@ import pekko.persistence.query.Offset import pekko.persistence.query.{ EventEnvelope, Sequence } import pekko.testkit.TestProbe +import scala.concurrent.Future + abstract class CurrentEventsByPersistenceIdTest(config: String) extends QueryTestSpec(config) { import QueryTestSpec.EventEnvelopeProbeOps @@ -219,6 +221,42 @@ abstract class CurrentEventsByPersistenceIdTest(config: String) extends QueryTes } } } + + it should "return event when has been archived more than batch size" in withActorSystem { implicit system => + import pekko.pattern.ask + import system.dispatcher + import scala.concurrent.duration._ + + val journalOps = new JavaDslJdbcReadJournalOperations(system) + val batchSize = readJournalConfig.maxBufferSize + withTestActors(replyToMessages = true) { (actor1, _, _) => + def sendMessages(numberOfMessages: Int): Future[Done] = { + val futures = for (i <- 1 to numberOfMessages) yield { + actor1 ? i + } + Future.sequence(futures).map(_ => Done) + } + + val numberOfEvents = batchSize << 2 + val archiveEventSum = numberOfEvents >> 1 + val batch = sendMessages(numberOfEvents) + + // wait for acknowledgement of the batch + batch.futureValue + + // and then archive some of event(delete it). + val deleteBatch = for (i <- 1 to archiveEventSum) yield { + actor1 ? DeleteCmd(i) + } + // blocking until all delete commands are processed + Future.sequence(deleteBatch).futureValue + + journalOps.withCurrentEventsByPersistenceId()("my-1", 0, Long.MaxValue) { tp => + val allEvents = tp.toStrict(atMost = 10.seconds) + allEvents.size shouldBe (numberOfEvents - archiveEventSum) + } + } + } } // Note: these tests use the shared-db configs, the test for all (so not only current) events use the regular db config