From 4bbbd929d0e1aea45f61c5709a5d36a1e760ed72 Mon Sep 17 00:00:00 2001 From: "Cai Yufei (INST/ECS1)" Date: Mon, 11 Jun 2018 16:34:53 +0200 Subject: [PATCH] make props of AMQP client actors serializable Because they are started possibly on a remote node by ClusterRouterPool. Signed-off-by: Cai Yufei (INST/ECS1) --- .../messaging/amqp/AmqpClientActor.java | 18 ++- .../rabbitmq/RabbitMQClientActor.java | 19 ++- ...efaultConnectionActorPropsFactoryTest.java | 111 ++++++++++++++++++ 3 files changed, 142 insertions(+), 6 deletions(-) create mode 100644 services/connectivity/messaging/src/test/java/org/eclipse/ditto/services/connectivity/messaging/DefaultConnectionActorPropsFactoryTest.java diff --git a/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/amqp/AmqpClientActor.java b/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/amqp/AmqpClientActor.java index a9bbd0128a..82954d0fd9 100644 --- a/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/amqp/AmqpClientActor.java +++ b/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/amqp/AmqpClientActor.java @@ -82,14 +82,26 @@ public final class AmqpClientActor extends BaseClientActor implements ExceptionL @Nullable private Session jmsSession; @Nullable private ActorRef amqpPublisherActor; + /* + * This constructor is called via reflection by the static method propsForTest. + */ private AmqpClientActor(final Connection connection, final ConnectionStatus connectionStatus, - final JmsConnectionFactory jmsConnectionFactory, final ActorRef conciergeForwarder) { + final JmsConnectionFactory jmsConnectionFactory, + final ActorRef conciergeForwarder) { super(connection, connectionStatus, conciergeForwarder); this.jmsConnectionFactory = jmsConnectionFactory; connectionListener = new ConnectionListener(); consumerMap = new HashMap<>(); } + /* + * This constructor is called via reflection by the static method props(Connection, ActorRef). + */ + private AmqpClientActor(final Connection connection, final ConnectionStatus connectionStatus, + final ActorRef conciergeForwarder) { + this(connection, connectionStatus, ConnectionBasedJmsConnectionFactory.getInstance(), conciergeForwarder); + } + /** * Creates Akka configuration object for this actor. * @@ -99,7 +111,7 @@ private AmqpClientActor(final Connection connection, final ConnectionStatus conn */ public static Props props(final Connection connection, final ActorRef conciergeForwarder) { return Props.create(AmqpClientActor.class, validateConnection(connection), connection.getConnectionStatus(), - ConnectionBasedJmsConnectionFactory.getInstance(), conciergeForwarder); + conciergeForwarder); } /** @@ -151,7 +163,7 @@ protected CompletionStage doTestConnection(final Connection conne ex.getClass().getSimpleName() + ": " + ex.getMessage() + "'") .cause(ex).build(); return new Status.Failure(failedException); - } else if (response instanceof ConnectionFailure){ + } else if (response instanceof ConnectionFailure) { return ((ConnectionFailure) response).getFailure(); } else { return new Status.Success(response); diff --git a/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/rabbitmq/RabbitMQClientActor.java b/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/rabbitmq/RabbitMQClientActor.java index fcdf1065e0..a705ec29d0 100644 --- a/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/rabbitmq/RabbitMQClientActor.java +++ b/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/rabbitmq/RabbitMQClientActor.java @@ -85,15 +85,28 @@ public final class RabbitMQClientActor extends BaseClientActor { private final Map consumedTagsToAddresses; + /* + * This constructor is called via reflection by the static method propsForTest. + */ private RabbitMQClientActor(final Connection connection, final ConnectionStatus connectionStatus, - final RabbitConnectionFactoryFactory rabbitConnectionFactoryFactory, - final ActorRef conciergeForwarder) { + final RabbitConnectionFactoryFactory rabbitConnectionFactoryFactory, + final ActorRef conciergeForwarder) { super(connection, connectionStatus, conciergeForwarder); this.rabbitConnectionFactoryFactory = rabbitConnectionFactoryFactory; consumedTagsToAddresses = new HashMap<>(); } + /* + * This constructor is called via reflection by the static method props(Connection, ActorRef). + */ + private RabbitMQClientActor(final Connection connection, final ConnectionStatus connectionStatus, + final ActorRef conciergeForwarder) { + this(connection, connectionStatus, ConnectionBasedRabbitConnectionFactoryFactory.getInstance(), + conciergeForwarder); + + } + /** * Creates Akka configuration object for this actor. * @@ -103,7 +116,7 @@ private RabbitMQClientActor(final Connection connection, final ConnectionStatus */ public static Props props(final Connection connection, final ActorRef conciergeForwarder) { return Props.create(RabbitMQClientActor.class, validateConnection(connection), connection.getConnectionStatus(), - ConnectionBasedRabbitConnectionFactoryFactory.getInstance(), conciergeForwarder); + conciergeForwarder); } /** diff --git a/services/connectivity/messaging/src/test/java/org/eclipse/ditto/services/connectivity/messaging/DefaultConnectionActorPropsFactoryTest.java b/services/connectivity/messaging/src/test/java/org/eclipse/ditto/services/connectivity/messaging/DefaultConnectionActorPropsFactoryTest.java new file mode 100644 index 0000000000..757a80c8d2 --- /dev/null +++ b/services/connectivity/messaging/src/test/java/org/eclipse/ditto/services/connectivity/messaging/DefaultConnectionActorPropsFactoryTest.java @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2017 Bosch Software Innovations GmbH. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * which accompanies this distribution, and is available at + * https://www.eclipse.org/org/documents/epl-2.0/index.php + * + * Contributors: + * Bosch Software Innovations GmbH - initial contribution + */ +package org.eclipse.ditto.services.connectivity.messaging; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.eclipse.ditto.model.connectivity.ConnectionType.AMQP_091; +import static org.eclipse.ditto.model.connectivity.ConnectionType.AMQP_10; + +import java.util.concurrent.TimeUnit; + +import org.eclipse.ditto.model.connectivity.Connection; +import org.eclipse.ditto.model.connectivity.ConnectionType; +import org.eclipse.ditto.model.connectivity.ConnectivityModelFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.remote.DaemonMsgCreate; +import akka.serialization.Serialization; +import akka.serialization.SerializationExtension; +import akka.serialization.Serializer; +import akka.testkit.javadsl.TestKit; + +/** + * Unit tests for {@link DefaultConnectionActorPropsFactory}. + */ +public class DefaultConnectionActorPropsFactoryTest { + + private ActorSystem actorSystem; + private Serialization serialization; + private ConnectionActorPropsFactory underTest; + + @Before + public void setUp() throws java.io.NotSerializableException { + actorSystem = ActorSystem.create("AkkaTestSystem", TestConstants.CONFIG); + serialization = SerializationExtension.get(actorSystem); + underTest = DefaultConnectionActorPropsFactory.getInstance(); + } + + @After + public void tearDown() { + if (actorSystem != null) { + TestKit.shutdownActorSystem(actorSystem, scala.concurrent.duration.Duration.apply(5, TimeUnit.SECONDS), + false); + } + } + + /** + * Tests serialization of props of AMQP_091 client actor. The props needs to be serializable because client actors + * may be created on a different connectivity service instance using a local connection object. + */ + @Test + public void amqp091ActorPropsIsSerializable() { + final Props props = underTest.getActorPropsForType(randomConnection(AMQP_091), actorSystem.deadLetters()); + final Object objectToSerialize = wrapForSerialization(props); + final byte[] bytes = serialization.findSerializerFor(objectToSerialize).toBinary(objectToSerialize); + final Object deserializedObject = serialization.deserialize(bytes, objectToSerialize.getClass()).get(); + + assertThat(deserializedObject).isEqualTo(objectToSerialize); + } + + /** + * Tests serialization of props of AMQP_10 client actor. The props needs to be serializable because client actors + * may be created on a different connectivity service instance using a local connection object. + */ + @Test + public void amqp10ActorPropsIsSerializable() { + final Props props = underTest.getActorPropsForType(randomConnection(AMQP_10), actorSystem.deadLetters()); + final Object objectToSerialize = wrapForSerialization(props); + final byte[] bytes = serialization.findSerializerFor(objectToSerialize).toBinary(objectToSerialize); + final Object deserializedObject = serialization.deserialize(bytes, objectToSerialize.getClass()).get(); + + assertThat(deserializedObject).isEqualTo(objectToSerialize); + } + + /** + * Wrap Props in an object with a reasonable Akka serializer, namely one that applies our configured + * serializer on each argument of Props. For Akka 2.5.13, that object belongs to the Akka-internal class + * DaemonMsgCreate. The class may change in future versions of Akka. + */ + private Object wrapForSerialization(final Props props) { + final String actorClassNameAsPath = props.actorClass().getSimpleName(); + return DaemonMsgCreate.apply(props, props.deploy(), actorClassNameAsPath, actorSystem.deadLetters()); + } + + private Connection randomConnection(final ConnectionType connectionType) { + final Connection template = + TestConstants.createConnection(TestConstants.createRandomConnectionId(), actorSystem); + + return ConnectivityModelFactory + .newConnectionBuilder(template.getId(), + connectionType, + template.getConnectionStatus(), + template.getUri(), + template.getAuthorizationContext()) + .sources(template.getSources()) + .targets(template.getTargets()) + .build(); + } +}