From 34e09549dd2892d78e787f21eabd50c153348316 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Fri, 8 Dec 2023 15:19:41 +0100 Subject: [PATCH 1/4] support reading akka-persistence snapshots --- .../persistence/serialization/SnapshotSerializer.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/persistence/src/main/scala/org/apache/pekko/persistence/serialization/SnapshotSerializer.scala b/persistence/src/main/scala/org/apache/pekko/persistence/serialization/SnapshotSerializer.scala index d189c77b45c..e8e1c11927f 100644 --- a/persistence/src/main/scala/org/apache/pekko/persistence/serialization/SnapshotSerializer.scala +++ b/persistence/src/main/scala/org/apache/pekko/persistence/serialization/SnapshotSerializer.scala @@ -112,7 +112,15 @@ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer val (serializerId, manifest) = headerFromBinary(headerBytes) - serialization.deserialize(snapshotBytes, serializerId, manifest).get + // suggested in https://github.com/scullxbones/pekko-persistence-mongo/pull/14#issuecomment-1847223850 + serialization + .deserialize(snapshotBytes, serializerId, manifest) + .recover { + case _: NotSerializableException if manifest.startsWith("akka") => + serialization + .deserialize(snapshotBytes, serializerId, manifest.replaceFirst("akka", "org.apache.pekko")) + } + .get } private def writeInt(out: OutputStream, i: Int): Unit = { From 80e682b39ecf7d852e64a716eec95567b751f5fb Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Fri, 8 Dec 2023 22:01:13 +0100 Subject: [PATCH 2/4] add test --- .../SnapshotSerializerSpec.scala | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 persistence/src/test/scala/org/apache/pekko/persistence/serialization/SnapshotSerializerSpec.scala diff --git a/persistence/src/test/scala/org/apache/pekko/persistence/serialization/SnapshotSerializerSpec.scala b/persistence/src/test/scala/org/apache/pekko/persistence/serialization/SnapshotSerializerSpec.scala new file mode 100644 index 00000000000..377b9d847f1 --- /dev/null +++ b/persistence/src/test/scala/org/apache/pekko/persistence/serialization/SnapshotSerializerSpec.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.persistence.serialization + +import org.apache.pekko +import pekko.actor.ActorSystem +import pekko.persistence.fsm.PersistentFSM.PersistentFSMSnapshot +import pekko.serialization.SerializationExtension +import pekko.testkit.PekkoSpec + +import java.util.Base64 +import scala.util.Success + +class SnapshotSerializerSpec extends PekkoSpec { + + "Snapshot serializer" should { + "deserialize akka snapshots" in { + val system = ActorSystem() + val serialization = SerializationExtension(system) + // https://github.com/apache/incubator-pekko/pull/837#issuecomment-1847320309 + val data = "PAAAAAcAAABha2thLnBlcnNpc3RlbmNlLmZzbS5QZXJzaXN0ZW50RlNNJFBlcnNpc3RlbnRGU01TbmFwc2hvdAoPdGVzdC1pZGVudGlmaWVyEg0IFBIJdGVzdC1kYXRh" + val bytes = Base64.getDecoder.decode(data) + val result = serialization.deserialize(bytes, classOf[Snapshot]) + result.isSuccess shouldBe true + val deserialized = result.get.data + deserialized shouldBe a[Success[_]] + val innerResult = deserialized.asInstanceOf[Success[_]].get + innerResult shouldBe a[PersistentFSMSnapshot[_]] + val persistentFSMSnapshot = innerResult.asInstanceOf[PersistentFSMSnapshot[_]] + persistentFSMSnapshot shouldEqual PersistentFSMSnapshot[String]("test-identifier", "test-data", None) + } + } +} From 7caf4b4f2c751070ad8996b29154c9b6527befba Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Fri, 8 Dec 2023 22:10:46 +0100 Subject: [PATCH 3/4] Update SnapshotSerializerSpec.scala --- .../persistence/serialization/SnapshotSerializerSpec.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/persistence/src/test/scala/org/apache/pekko/persistence/serialization/SnapshotSerializerSpec.scala b/persistence/src/test/scala/org/apache/pekko/persistence/serialization/SnapshotSerializerSpec.scala index 377b9d847f1..1e81c1aeab2 100644 --- a/persistence/src/test/scala/org/apache/pekko/persistence/serialization/SnapshotSerializerSpec.scala +++ b/persistence/src/test/scala/org/apache/pekko/persistence/serialization/SnapshotSerializerSpec.scala @@ -33,7 +33,8 @@ class SnapshotSerializerSpec extends PekkoSpec { val system = ActorSystem() val serialization = SerializationExtension(system) // https://github.com/apache/incubator-pekko/pull/837#issuecomment-1847320309 - val data = "PAAAAAcAAABha2thLnBlcnNpc3RlbmNlLmZzbS5QZXJzaXN0ZW50RlNNJFBlcnNpc3RlbnRGU01TbmFwc2hvdAoPdGVzdC1pZGVudGlmaWVyEg0IFBIJdGVzdC1kYXRh" + val data = + "PAAAAAcAAABha2thLnBlcnNpc3RlbmNlLmZzbS5QZXJzaXN0ZW50RlNNJFBlcnNpc3RlbnRGU01TbmFwc2hvdAoPdGVzdC1pZGVudGlmaWVyEg0IFBIJdGVzdC1kYXRh" val bytes = Base64.getDecoder.decode(data) val result = serialization.deserialize(bytes, classOf[Snapshot]) result.isSuccess shouldBe true From 78ba31a5f6c361cd5fe30d3544c9f974c2d722c9 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Sat, 9 Dec 2023 11:04:23 +0100 Subject: [PATCH 4/4] Update SnapshotSerializerSpec.scala --- .../persistence/serialization/SnapshotSerializerSpec.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/persistence/src/test/scala/org/apache/pekko/persistence/serialization/SnapshotSerializerSpec.scala b/persistence/src/test/scala/org/apache/pekko/persistence/serialization/SnapshotSerializerSpec.scala index 1e81c1aeab2..824ff9c51f4 100644 --- a/persistence/src/test/scala/org/apache/pekko/persistence/serialization/SnapshotSerializerSpec.scala +++ b/persistence/src/test/scala/org/apache/pekko/persistence/serialization/SnapshotSerializerSpec.scala @@ -36,9 +36,8 @@ class SnapshotSerializerSpec extends PekkoSpec { val data = "PAAAAAcAAABha2thLnBlcnNpc3RlbmNlLmZzbS5QZXJzaXN0ZW50RlNNJFBlcnNpc3RlbnRGU01TbmFwc2hvdAoPdGVzdC1pZGVudGlmaWVyEg0IFBIJdGVzdC1kYXRh" val bytes = Base64.getDecoder.decode(data) - val result = serialization.deserialize(bytes, classOf[Snapshot]) - result.isSuccess shouldBe true - val deserialized = result.get.data + val result = serialization.deserialize(bytes, classOf[Snapshot]).get + val deserialized = result.data deserialized shouldBe a[Success[_]] val innerResult = deserialized.asInstanceOf[Success[_]].get innerResult shouldBe a[PersistentFSMSnapshot[_]]