diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java index 9212bbabb9e3..46ac86c67c6b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java @@ -30,6 +30,8 @@ public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyListener { + String SN_PREFIX = "sf."; + SimpleString getName(); String getNodeID(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java index 9cb034c66937..d85d84889fa6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java @@ -84,7 +84,6 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private static final String SN_PREFIX = "sf."; /** * When getting member on node-up and down we have to remove the name from the transport config * as the setting we build here doesn't need to consider the name, so use the same name on all diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java index 4464fefdf898..fcf78aa0545c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java @@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueConfig; import org.apache.activemq.artemis.core.server.QueueFactory; +import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.utils.ExecutorFactory; @@ -94,6 +95,8 @@ public Queue createQueueWith(final QueueConfiguration config, PagingManager pagi PageSubscription pageSubscription = getPageSubscription(config, pagingManager, filter); if (lastValueKey(config) != null) { queue = new LastValueQueue(config.setLastValueKey(lastValueKey(config)), filter, pageSubscription != null ? pageSubscription.getPagingStore() : null, pageSubscription, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this); + } else if (isSnf(config)) { + queue = new StoreAndForwardQueue(config, filter, pageSubscription != null ? pageSubscription.getPagingStore() : null, pageSubscription, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this); } else { queue = new QueueImpl(config, filter, pageSubscription != null ? pageSubscription.getPagingStore() : null, pageSubscription, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this); } @@ -101,6 +104,13 @@ public Queue createQueueWith(final QueueConfiguration config, PagingManager pagi return queue; } + private boolean isSnf(QueueConfiguration config) { + if (config.isInternal()) { + return config.getName().toString().startsWith(server.getInternalNamingPrefix() + ClusterConnection.SN_PREFIX); + } + return false; + } + @Deprecated @Override public Queue createQueue(final long persistenceID, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/StoreAndForwardQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/StoreAndForwardQueue.java new file mode 100644 index 000000000000..5f202580cf8b --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/StoreAndForwardQueue.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.impl; + +import java.lang.invoke.MethodHandles; +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; +import org.apache.activemq.artemis.api.core.ActiveMQRoutingException; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.filter.Filter; +import org.apache.activemq.artemis.core.paging.PagingStore; +import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.postoffice.PostOffice; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.QueueFactory; +import org.apache.activemq.artemis.core.server.RoutingContext; +import org.apache.activemq.artemis.core.settings.HierarchicalRepository; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StoreAndForwardQueue extends QueueImpl { + + public StoreAndForwardQueue(QueueConfiguration queueConfiguration, + Filter filter, + PagingStore pagingStore, + PageSubscription pageSubscription, + ScheduledExecutorService scheduledExecutor, + PostOffice postOffice, + StorageManager storageManager, + HierarchicalRepository addressSettingsRepository, + ArtemisExecutor executor, + ActiveMQServer server, + QueueFactory factory) { + super(queueConfiguration, filter, pagingStore, pageSubscription, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory); + } + + @Override + public void route(final Message message, final RoutingContext context) throws Exception { + if (!isValidMessage(message)) { + throw ActiveMQExceptionType.ROUTING_EXCEPTION.createException("Invalid message being routed to store and forward queue"); + } + super.route(message, context); + } + + private boolean isValidMessage(final Message message) { + for (SimpleString propName : message.getPropertyNames()) { + if (propName.startsWith(Message.HDR_ROUTE_TO_IDS)) { + return true; + } + } + return false; + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java index e2ac8e8aa955..18b27c0649b0 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java @@ -19,6 +19,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import java.util.ArrayList; import java.util.UUID; @@ -28,12 +29,15 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQRoutingException; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.TopologyMember; import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl; +import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord; import org.apache.activemq.artemis.core.server.cluster.impl.BridgeTestAccessor; import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionBridge; @@ -305,6 +309,56 @@ public void testClusterBridgeAddRemoteBinding() throws Exception { stopServers(0, 1); } + @Test + public void testBadClientSendMessagesToSnFQueue() throws Exception { + setupServer(0, isFileStorage(), isNetty()); + setupServer(1, isFileStorage(), isNetty()); + + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1); + + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0); + + startServers(0, 1); + + setupSessionFactory(0, isNetty()); + setupSessionFactory(1, isNetty()); + + ClientSession session0 = sfs[0].createSession(); + ClientSession session1 = sfs[1].createSession(); + + session0.start(); + session1.start(); + + //sending directly to snf queues + SimpleString nodeId0 = servers[0].getNodeID(); + SimpleString nodeId1 = servers[1].getNodeID(); + ClusterConnectionImpl cc0 = (ClusterConnectionImpl) servers[0].getClusterManager().getClusterConnection("cluster0"); + SimpleString snfQueue0 = cc0.getSfQueueName(nodeId1.toString()); + ClusterConnectionImpl cc1 = (ClusterConnectionImpl) servers[1].getClusterManager().getClusterConnection("cluster1"); + SimpleString snfQueue1 = cc1.getSfQueueName(nodeId0.toString()); + + Wait.assertTrue(()->cc0.isStarted()); + Wait.assertTrue(()->cc1.isStarted()); + + try { + ClientProducer badProducer0 = session0.createProducer(snfQueue0); + Message normalMessage = session0.createMessage(false); + badProducer0.send(normalMessage); + fail("Message sent directly to snf queue should be rejected " + snfQueue0); + } catch (ActiveMQRoutingException e) { + // ok + } + try { + ClientProducer badProducer1 = session1.createProducer(snfQueue1); + Message normalMessage = session1.createMessage(false); + badProducer1.send(normalMessage); + fail("Message sent directly to snf queue should be rejected " + snfQueue1); + } catch (Exception e) { + // ok + } + + stopServers(0, 1); + } @Override @AfterEach