Skip to content

Commit

Permalink
scala3 support (#85)
Browse files Browse the repository at this point in the history
* scala3 support

* Update unit-tests.yml
  • Loading branch information
pjfanning authored Aug 19, 2023
1 parent f170d7e commit 8a10b77
Show file tree
Hide file tree
Showing 17 changed files with 39 additions and 32 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ jobs:
matrix:
include:
- { javaVersion: '8', container: "cassandra-latest", scalaVersion: "++2.13.11", test: "test" }
- { javaVersion: '8', container: "cassandra-latest", scalaVersion: "++3.3.0", test: "test" }
- { javaVersion: '11', container: "cassandra-latest", scalaVersion: "++2.12.18", test: "test" }
- { javaVersion: '11', container: "cassandra-latest", scalaVersion: "++2.13.11", test: "test" }
- { javaVersion: '8', container: "cassandra-latest", scalaVersion: "++3.3.0", test: "test" }
- { javaVersion: '11', container: "cassandra2", scalaVersion: "++2.13.11", test: "'testOnly -- -l RequiresCassandraThree'"}
- { javaVersion: '11', container: "cassandra3", scalaVersion: "++2.13.11", test: "test" }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class EventsByTagMigration(
systemProvider: ClassicActorSystemProvider,
pluginConfigPath: String = "pekko.persistence.cassandra") {
private val system = systemProvider.classicSystem
private[pekko] val log = Logging.getLogger(system, getClass)
private[pekko] val log = Logging.getLogger(system, classOf[EventsByTagMigration])
private lazy val queries = PersistenceQuery(system).readJournalFor[CassandraReadJournal](pluginConfigPath + ".query")
private implicit val sys: ActorSystem = system

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ final class Cleanup(systemProvider: ClassicActorSystemProvider, settings: Cleanu
import settings._
import system.dispatcher

private val log = Logging(system, getClass)
private val log = Logging(system, classOf[Cleanup])

// operations on journal, snapshotStore and tagViews should be only be done when dry-run = false
private val journal: ActorRef = Persistence(system).journalFor(pluginLocation + ".journal")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.util.control.NonFatal

final class CassandraHealthCheck(system: ActorSystem) extends (() => Future[Boolean]) {

private val log = Logging.getLogger(system, getClass)
private val log = Logging.getLogger(system, classOf[CassandraHealthCheck])

private val settings = new PluginSettings(system, system.settings.config.getConfig("pekko.persistence.cassandra"))
private val healthCheckSettings = settings.healthCheckSettings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ import scala.util.{ Failure, Success, Try }
private val statements: CassandraStatements = new CassandraStatements(settings)
private val healthCheckCql = settings.healthCheckSettings.healthCheckCql
private val serialization = SerializationExtension(context.system)
private val log: LoggingAdapter = Logging(context.system, getClass)
private val log: LoggingAdapter = Logging(context.system, classOf[CassandraJournal])

private implicit val ec: ExecutionContext = context.dispatcher

Expand Down Expand Up @@ -266,7 +266,7 @@ import scala.util.{ Failure, Success, Try }
writeInProgress.put(pid, writeInProgressForPersistentId.future)

val toReturn: Future[Nil.type] = Future.sequence(writesWithUuids.map(w => serialize(w))).flatMap {
serialized: Seq[SerializedAtomicWrite] =>
(serialized: Seq[SerializedAtomicWrite]) =>
val result: Future[Any] =
if (messages.map(_.payload.size).sum <= journalSettings.maxMessageBatchSize) {
// optimize for the common case
Expand Down Expand Up @@ -397,7 +397,7 @@ import scala.util.{ Failure, Success, Try }
maxPnr - minPnr <= 1,
"Do not support AtomicWrites that span 3 partitions. Keep AtomicWrites <= max partition size.")

val writes: Seq[Future[BoundStatement]] = all.map { m: Serialized =>
val writes: Seq[Future[BoundStatement]] = all.map { (m: Serialized) =>
// using two separate statements with or without the meta data columns because
// then users doesn't have to alter table and add the new columns if they don't use
// the meta data feature
Expand Down Expand Up @@ -868,7 +868,7 @@ import scala.util.{ Failure, Success, Try }

class EventDeserializer(system: ActorSystem) {

private val log = Logging(system, this.getClass)
private val log = Logging(system, classOf[CassandraJournal])

private val serialization = SerializationExtension(system)
val columnDefinitionCache = new ColumnDefinitionCache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ import pekko.stream.connectors.cassandra.scaladsl.CassandraSession
private val serialization = SerializationExtension(system)

// used for local asks
private implicit val timeout = Timeout(10.second)
private implicit val timeout: Timeout = Timeout(10.second)

import statements._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@
package org.apache.pekko.persistence.cassandra.journal

import scala.collection.immutable
import scala.concurrent.Promise
import java.lang.{ Integer => JInt, Long => JLong }
import java.net.URLEncoder
import java.util.UUID

import scala.concurrent.Promise
import org.apache.pekko
import pekko.Done
import pekko.actor.SupervisorStrategy.Escalate
Expand All @@ -37,7 +36,6 @@ import pekko.dispatch.ExecutionContexts
import pekko.event.LoggingAdapter
import pekko.persistence.cassandra.journal.CassandraJournal._
import pekko.persistence.cassandra.journal.TagWriter._
import pekko.persistence.cassandra.journal.TagWriters._
import pekko.stream.connectors.cassandra.scaladsl.CassandraSession
import pekko.util.ByteString
import pekko.util.Timeout
Expand Down Expand Up @@ -180,11 +178,13 @@ import scala.util.Try
* INTERNAL API
* Manages all the tag writers.
*/
@InternalApi private[pekko] class TagWriters(settings: TagWriterSettings, tagWriterSession: TagWritersSession)
@InternalApi private[pekko] class TagWriters(settings: TagWriterSettings,
tagWriterSession: TagWriters.TagWritersSession)
extends Actor
with Timers
with ActorLogging {

import TagWriters._
import context.dispatcher

// eager init and val because used from Future callbacks
Expand All @@ -210,7 +210,7 @@ import scala.util.Try
scheduleWriteTagScanningTick()

def receive: Receive = {
case FlushAllTagWriters(t) =>
case FlushAllTagWriters(t: Timeout) =>
implicit val timeout: Timeout = t
if (log.isDebugEnabled)
log.debug("Flushing all tag writers [{}]", tagActors.keySet.mkString(", "))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ import com.typesafe.config.Config
class CassandraReadJournalProvider(system: ExtendedActorSystem, config: Config, configPath: String)
extends ReadJournalProvider {

override val scaladslReadJournal: scaladsl.CassandraReadJournal =
private val readJournalScala: scaladsl.CassandraReadJournal =
new scaladsl.CassandraReadJournal(system, config, configPath)

override val javadslReadJournal: javadsl.CassandraReadJournal =
new javadsl.CassandraReadJournal(scaladslReadJournal)
private val readJournalJava: javadsl.CassandraReadJournal =
new javadsl.CassandraReadJournal(readJournalScala)

override def scaladslReadJournal(): scaladsl.CassandraReadJournal = readJournalScala

override def javadslReadJournal(): javadsl.CassandraReadJournal = readJournalJava

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import pekko.persistence.cassandra.EventsByTagSettings.RetrySettings
import pekko.persistence.cassandra._
import pekko.persistence.cassandra.journal.CassandraJournal._
import pekko.persistence.cassandra.journal.TimeBucket
import pekko.persistence.cassandra.query.EventsByTagStage._
import pekko.persistence.cassandra.query.scaladsl.CassandraReadJournal.EventByTagStatements
import pekko.stream.stage._
import pekko.stream.{ Attributes, Outlet, SourceShape }
Expand Down Expand Up @@ -212,7 +211,7 @@ import scala.util.{ Failure, Success, Try }

/** INTERNAL API */
@InternalApi private[pekko] class EventsByTagStage(
session: TagStageSession,
session: EventsByTagStage.TagStageSession,
initialQueryOffset: UUID,
toOffset: Option[UUID],
settings: PluginSettings,
Expand All @@ -221,8 +220,9 @@ import scala.util.{ Failure, Success, Try }
usingOffset: Boolean,
initialTagPidSequenceNrs: Map[PersistenceId, (TagPidSequenceNr, UUID)],
scanner: TagViewSequenceNumberScanner)
extends GraphStage[SourceShape[UUIDRow]] {
extends GraphStage[SourceShape[EventsByTagStage.UUIDRow]] {

import EventsByTagStage._
import settings.{ eventsByTagSettings, querySettings }

private val out: Outlet[UUIDRow] = Outlet("event.out")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ import pekko.stream.scaladsl.Sink
@InternalApi private[pekko] class TagViewSequenceNumberScanner(session: Session, pluginDispatcher: String)(
implicit materializer: Materializer,
@nowarn("msg=never used") ec: ExecutionContext) {
private val log = Logging(materializer.system, getClass)
private val log = Logging(materializer.system, classOf[TagViewSequenceNumberScanner])

/**
* This could be its own stage and return half way through a query to better meet the deadline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.typesafe.config.Config
import org.apache.pekko
import pekko.actor.{ ActorSystem, ExtendedActorSystem }
import pekko.annotation.InternalApi
import pekko.dispatch.MessageDispatcher
import pekko.event.Logging
import pekko.persistence.cassandra.Extractors.Extractor
import pekko.persistence.cassandra.{ CassandraStatements, Extractors, PluginSettings }
Expand Down Expand Up @@ -113,7 +114,7 @@ class CassandraReadJournal protected (

import CassandraReadJournal.CombinedEventsByPersistenceIdStmts

private val log = Logging.getLogger(system, getClass)
private val log = Logging.getLogger(system, classOf[CassandraReadJournal])

private val settings = new PluginSettings(system, sharedConfig)
private val statements = new CassandraStatements(settings)
Expand All @@ -139,7 +140,7 @@ class CassandraReadJournal protected (
new CassandraJournal.EventDeserializer(system)

private val serialization = SerializationExtension(system)
implicit private val ec =
implicit private val ec: MessageDispatcher =
system.dispatchers.lookup(querySettings.pluginDispatcher)
implicit private val sys: ActorSystem = system

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ import scala.util.{ Failure, Success }
// this meta query gets slower than slower if snapshots are deleted without a criteria.minSequenceNr as
// all previous tombstones are scanned in the meta data query
metadata(snapshotMetaPs, persistenceId, criteria, limit = None).flatMap {
mds: immutable.Seq[SnapshotMetadata] =>
(mds: immutable.Seq[SnapshotMetadata]) =>
val boundStatementBatches = mds
.map(md =>
preparedDeleteSnapshot.map(_.bind(md.persistenceId, md.sequenceNr: JLong)
Expand Down Expand Up @@ -306,7 +306,7 @@ import scala.util.{ Failure, Success }
@InternalApi
private[pekko] class SnapshotSerialization(system: ActorSystem)(implicit val ec: ExecutionContext) {

private val log = Logging(system, this.getClass)
private val log = Logging(system, classOf[SnapshotSerialization])

private val serialization = SerializationExtension(system)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class CassandraEventsByTagLoadSpec extends CassandraSpec(CassandraEventsByTagLoa
var allReceived: Map[String, List[Long]] = Map.empty.withDefaultValue(List.empty)
probe.request(messagesPerPersistenceId * nrPersistenceIds)

(1L to (messagesPerPersistenceId * nrPersistenceIds)).foreach { i: Long =>
(1L to (messagesPerPersistenceId * nrPersistenceIds)).foreach { (i: Long) =>
val event =
try {
probe.expectNext(veryLongWait)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object CassandraEventUpdateSpec {

class CassandraEventUpdateSpec extends CassandraSpec(CassandraEventUpdateSpec.config) { s =>

private[pekko] val log = Logging(system, getClass)
private[pekko] val log = Logging(system, classOf[CassandraEventUpdateSpec])
private val serialization = SerializationExtension(system)

val updater = new CassandraEventUpdate {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ object CassandraLoadSpec {
override def receiveRecover: Receive = onEvent

override def receiveCommand: Receive = {
case c @ "start" => onStart(c)
case c @ "stop" => onStop(c)
case "start" => onStart("start")
case "stop" => onStop("stop")
case payload: String => onCommand(payload)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class TestActor(override val persistenceId: String, override val journalPluginId
val size = events.size
val handler = {
var count = 0
evt: String => {
(_: String) => {
count += 1
if (count == size)
sender() ! "PersistAll-done"
Expand Down
6 changes: 3 additions & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ object Dependencies {
// keep in sync with .github/workflows/unit-tests.yml
val scala212Version = "2.12.18"
val scala213Version = "2.13.11"
val scala3Version = "3.1.2" // not yet enabled - missing pekko-http/pekko-management Scala 3 artifacts
val scalaVersions = Seq(scala212Version, scala213Version)
val scala3Version = "3.3.0"
val scalaVersions = Seq(scala212Version, scala213Version, scala3Version)

val pekkoVersion = System.getProperty("override.pekko.version", "1.0.1")
val pekkoVersionInDocs = "current"
Expand All @@ -24,7 +24,7 @@ object Dependencies {
val driverVersion = "4.15.0"
val driverVersionInDocs = "4.14"

val pekkoConnectorsVersion = "0.0.0+144-703e9cca-SNAPSHOT"
val pekkoConnectorsVersion = "0.0.0+172-784827a8-SNAPSHOT"
val pekkoConnectorsVersionInDocs = "current"
// for example
val pekkoManagementVersion = "1.0.0"
Expand Down

0 comments on commit 8a10b77

Please sign in to comment.