Skip to content

Commit

Permalink
perf: avoid large offset query via limit windowing (#180)
Browse files Browse the repository at this point in the history
* perf: avoid large offset query via limit windowing

* add unit tests, simplify legacy

* make legacy it works, new dao it

* fix config

* use single unit test

* optimized it

* fix

* scala 2.12 build, oracle npe

* fix scala 2.12 build, oracle npe

* optimized imports

* optimized import
  • Loading branch information
Roiocam authored May 27, 2024
1 parent 17252d7 commit ab2f6ce
Show file tree
Hide file tree
Showing 4 changed files with 231 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ package org.apache.pekko.persistence.jdbc.journal.dao
import org.apache.pekko
import pekko.NotUsed
import pekko.actor.Scheduler
import pekko.annotation.InternalApi
import pekko.persistence.PersistentRepr
import pekko.persistence.jdbc.journal.dao.FlowControl.{ Continue, ContinueDelayed, Stop }
import pekko.stream.Materializer
import pekko.stream.scaladsl.{ Sink, Source }

import scala.collection.immutable.Seq
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success, Try }

trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages {
Expand All @@ -38,13 +38,29 @@ trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages {
toSequenceNr: Long,
batchSize: Int,
refreshInterval: Option[(FiniteDuration, Scheduler)]): Source[Try[(PersistentRepr, Long)], NotUsed] = {
internalBatchStream(persistenceId, fromSequenceNr, toSequenceNr, batchSize, refreshInterval).mapConcat(identity)
}

/**
* separate this method for unit tests.
*/
@InternalApi
private[dao] def internalBatchStream(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long,
batchSize: Int,
refreshInterval: Option[(FiniteDuration, Scheduler)]) = {
Source
.unfoldAsync[(Long, FlowControl), Seq[Try[(PersistentRepr, Long)]]]((Math.max(1, fromSequenceNr), Continue)) {
case (from, control) =>
def limitWindow(from: Long): Long = {
math.min(from + batchSize, toSequenceNr)
}

def retrieveNextBatch(): Future[Option[((Long, FlowControl), Seq[Try[(PersistentRepr, Long)]])]] = {
for {
xs <- messages(persistenceId, from, toSequenceNr, batchSize).runWith(Sink.seq)
xs <- messages(persistenceId, from, limitWindow(from), batchSize).runWith(Sink.seq)
} yield {
val hasMoreEvents = xs.size == batchSize
// Events are ordered by sequence number, therefore the last one is the largest)
Expand Down Expand Up @@ -77,7 +93,6 @@ trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages {
pekko.pattern.after(delay, scheduler)(retrieveNextBatch())
}
}
.mapConcat(identity(_))
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pekko.persistence.jdbc.journal.dao

import org.apache.pekko
import pekko.persistence.jdbc.journal.dao.LimitWindowingStreamTest.fetchSize
import pekko.persistence.jdbc.query.{ H2Cleaner, QueryTestSpec }
import pekko.persistence.{ AtomicWrite, PersistentRepr }
import pekko.stream.scaladsl.{ Keep, Sink, Source }
import pekko.stream.{ Materializer, SystemMaterializer }
import com.typesafe.config.{ ConfigValue, ConfigValueFactory }
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.slf4j.LoggerFactory

import java.util.UUID
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.{ Await, ExecutionContext, Future }

object LimitWindowingStreamTest {
val fetchSize = 100
val configOverrides: Map[String, ConfigValue] =
Map("jdbc-journal.fetch-size" -> ConfigValueFactory.fromAnyRef(fetchSize))
}

abstract class LimitWindowingStreamTest(configFile: String)
extends QueryTestSpec(configFile, LimitWindowingStreamTest.configOverrides) {

private val log = LoggerFactory.getLogger(this.getClass)

it should "stream events with limit windowing" in withActorSystem { implicit system =>
implicit val ec: ExecutionContext = system.dispatcher
implicit val mat: Materializer = SystemMaterializer(system).materializer

val persistenceId = UUID.randomUUID().toString
val payload = 'a'.toByte
val eventsPerBatch = 1000
val numberOfInsertBatches = 16
val totalMessages = numberOfInsertBatches * eventsPerBatch

withDao { dao =>
val lastInsert =
Source
.fromIterator(() => (1 to numberOfInsertBatches).toIterator)
.mapAsync(1) { i =>
val end = i * eventsPerBatch
val start = end - (eventsPerBatch - 1)
log.info(s"batch $i (events from $start to $end")
val atomicWrites =
(start to end).map { j =>
AtomicWrite(immutable.Seq(PersistentRepr(payload, j, persistenceId)))
}
dao.asyncWriteMessages(atomicWrites).map(_ => i)
}
.runWith(Sink.last)

lastInsert.futureValue(Timeout(totalMessages.seconds))
val readMessagesDao = dao.asInstanceOf[BaseJournalDaoWithReadMessages]
val messagesSrc =
readMessagesDao.internalBatchStream(persistenceId, 0, totalMessages, batchSize = fetchSize, None)

val eventualSum: Future[(Int, Int)] = messagesSrc.toMat(Sink.fold((0, 0)) { case ((accBatch, accTotal), seq) =>
(accBatch + 1, accTotal + seq.size)
})(Keep.right).run()

val (batchCount, totalCount) = Await.result(eventualSum, Duration.Inf)
val totalBatch = totalMessages / fetchSize
batchCount shouldBe totalBatch
totalCount shouldBe totalMessages
}
}
}

class H2LimitWindowingStreamTest extends LimitWindowingStreamTest("h2-application.conf") with H2Cleaner
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,15 @@

package org.apache.pekko.persistence.jdbc.query

import java.lang.management.ManagementFactory
import java.lang.management.MemoryMXBean
import java.lang.management.{ ManagementFactory, MemoryMXBean }
import java.util.UUID

import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.persistence.jdbc.query.JournalDaoStreamMessagesMemoryTest.fetchSize
import pekko.persistence.{ AtomicWrite, PersistentRepr }
import pekko.persistence.jdbc.journal.dao.legacy.{ ByteArrayJournalDao, JournalTables }
import pekko.serialization.SerializationExtension
import pekko.stream.scaladsl.{ Sink, Source }
import pekko.stream.testkit.scaladsl.TestSink
import pekko.stream.{ Materializer, SystemMaterializer }
import com.typesafe.config.{ ConfigValue, ConfigValueFactory }
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.slf4j.LoggerFactory
Expand All @@ -32,120 +31,110 @@ import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.util.{ Failure, Success }
import pekko.stream.testkit.scaladsl.TestSink
import org.scalatest.matchers.should.Matchers

object JournalDaoStreamMessagesMemoryTest {

val configOverrides: Map[String, ConfigValue] = Map("jdbc-journal.fetch-size" -> ConfigValueFactory.fromAnyRef("100"))
val fetchSize: Int = 100
val MB: Int = 1024 * 1024

val MB = 1024 * 1024
val configOverrides: Map[String, ConfigValue] = Map(
"jdbc-journal.fetch-size" -> ConfigValueFactory.fromAnyRef("100"))
}

abstract class JournalDaoStreamMessagesMemoryTest(configFile: String)
extends QueryTestSpec(configFile, JournalDaoStreamMessagesMemoryTest.configOverrides)
with JournalTables
with Matchers {
extends QueryTestSpec(configFile, JournalDaoStreamMessagesMemoryTest.configOverrides) {

import JournalDaoStreamMessagesMemoryTest.MB

private val log = LoggerFactory.getLogger(this.getClass)

val journalSequenceActorConfig = readJournalConfig.journalSequenceRetrievalConfiguration
val journalTableCfg = journalConfig.journalTableConfiguration
val memoryMBean: MemoryMXBean = ManagementFactory.getMemoryMXBean

implicit val askTimeout: FiniteDuration = 50.millis
it should "stream events" in withActorSystem { implicit system =>
implicit val ec: ExecutionContext = system.dispatcher
implicit val mat: Materializer = SystemMaterializer(system).materializer

def generateId: Int = 0
withDao { dao =>
val persistenceId = UUID.randomUUID().toString

val memoryMBean: MemoryMXBean = ManagementFactory.getMemoryMXBean
val writerUuid = UUID.randomUUID().toString

val payloadSize = 5000 // 5000 bytes
val eventsPerBatch = 1000

behavior.of("Replaying Persistence Actor")

it should "stream events" in {
if (newDao)
pending
withActorSystem { implicit system: ActorSystem =>
withDatabase { db =>
implicit val ec: ExecutionContext = system.dispatcher

val persistenceId = UUID.randomUUID().toString
val dao = new ByteArrayJournalDao(db, profile, journalConfig, SerializationExtension(system))

val payloadSize = 5000 // 5000 bytes
val eventsPerBatch = 1000

val maxMem = 64 * MB

val numberOfInsertBatches = {
// calculate the number of batches using a factor to make sure we go a little bit over the limit
(maxMem / (payloadSize * eventsPerBatch) * 1.2).round.toInt
}
val totalMessages = numberOfInsertBatches * eventsPerBatch
val totalMessagePayload = totalMessages * payloadSize
log.info(
s"batches: $numberOfInsertBatches (with $eventsPerBatch events), total messages: $totalMessages, total msgs size: $totalMessagePayload")

// payload can be the same when inserting to avoid unnecessary memory usage
val payload = Array.fill(payloadSize)('a'.toByte)

val lastInsert =
Source
.fromIterator(() => (1 to numberOfInsertBatches).toIterator)
.mapAsync(1) { i =>
val end = i * eventsPerBatch
val start = end - (eventsPerBatch - 1)
log.info(s"batch $i - events from $start to $end")
val atomicWrites =
(start to end).map { j =>
AtomicWrite(immutable.Seq(PersistentRepr(payload, j, persistenceId)))
}.toSeq

dao.asyncWriteMessages(atomicWrites).map(_ => i)
}
.runWith(Sink.last)

// wait until we write all messages
// being very generous, 1 second per message
lastInsert.futureValue(Timeout(totalMessages.seconds))

log.info("Events written, starting replay")

// sleep and gc to have some kind of stable measurement of current heap usage
Thread.sleep(1000)
System.gc()
Thread.sleep(1000)
val usedBefore = memoryMBean.getHeapMemoryUsage.getUsed

val messagesSrc =
dao.messagesWithBatch(persistenceId, 0, totalMessages, batchSize = 100, None)
val probe =
messagesSrc
.map {
case Success((repr, _)) =>
if (repr.sequenceNr % 100 == 0)
log.info(s"fetched: ${repr.persistenceId} - ${repr.sequenceNr}/$totalMessages")
case Failure(exception) =>
log.error("Failure when reading messages.", exception)
}
.runWith(TestSink.probe)

probe.request(10)
probe.within(20.seconds) {
probe.expectNextN(10)
}

// sleep and gc to have some kind of stable measurement of current heap usage
Thread.sleep(2000)
System.gc()
Thread.sleep(1000)
val usedAfter = memoryMBean.getHeapMemoryUsage.getUsed

log.info(s"Used heap before ${usedBefore / MB} MB, after ${usedAfter / MB} MB")
// actual usage is much less than 10 MB
(usedAfter - usedBefore) should be <= (10L * MB)

probe.cancel()
val maxMem = 64 * MB

val numberOfInsertBatches = {
// calculate the number of batches using a factor to make sure we go a little bit over the limit
(maxMem / (payloadSize * eventsPerBatch) * 1.2).round.toInt
}
val totalMessages = numberOfInsertBatches * eventsPerBatch
val totalMessagePayload = totalMessages * payloadSize
log.info(
s"batches: $numberOfInsertBatches (with $eventsPerBatch events), total messages: $totalMessages, total msgs size: $totalMessagePayload")

// payload can be the same when inserting to avoid unnecessary memory usage
val payload = Array.fill(payloadSize)('a'.toByte)

val lastInsert =
Source
.fromIterator(() => (1 to numberOfInsertBatches).iterator)
.mapAsync(1) { i =>
val end = i * eventsPerBatch
val start = end - (eventsPerBatch - 1)
log.info(s"batch $i - events from $start to $end")
val atomicWrites =
(start to end).map { j =>
AtomicWrite(immutable.Seq(PersistentRepr(payload, j, persistenceId, writerUuid = writerUuid)))
}
dao.asyncWriteMessages(atomicWrites).map(_ => i)
}
.runWith(Sink.last)

// wait until we write all messages
// being very generous, 1 second per message
lastInsert.futureValue(Timeout(totalMessages.seconds))

log.info("Events written, starting replay")

// sleep and gc to have some kind of stable measurement of current heap usage
Thread.sleep(1000)
System.gc()
Thread.sleep(1000)
val usedBefore = memoryMBean.getHeapMemoryUsage.getUsed

val messagesSrc =
dao.messagesWithBatch(persistenceId, 0, totalMessages, batchSize = fetchSize, None)
val probe =
messagesSrc
.map {
case Success((repr, _)) =>
if (repr.sequenceNr % 100 == 0)
log.info(s"fetched: ${repr.persistenceId} - ${repr.sequenceNr}/$totalMessages")
case Failure(exception) =>
log.error("Failure when reading messages.", exception)
}
.runWith(TestSink.probe)

probe.request(10)
probe.within(20.seconds) {
probe.expectNextN(10)
}

// sleep and gc to have some kind of stable measurement of current heap usage
Thread.sleep(2000)
System.gc()
Thread.sleep(1000)
val usedAfter = memoryMBean.getHeapMemoryUsage.getUsed

log.info(s"Used heap before ${usedBefore / MB} MB, after ${usedAfter / MB} MB")
// actual usage is much less than 10 MB
(usedAfter - usedBefore) should be <= (10L * MB)

probe.cancel()
}
}
}

class H2JournalDaoStreamMessagesMemoryTest extends JournalDaoStreamMessagesMemoryTest("h2-application.conf")
with H2Cleaner
Loading

0 comments on commit ab2f6ce

Please sign in to comment.