diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/UnsignedCounterBarrier.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/IncomingMessagesUnsignedCounterBarrier.java similarity index 91% rename from pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/UnsignedCounterBarrier.java rename to pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/IncomingMessagesUnsignedCounterBarrier.java index 0f9e21e..1336ac3 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/UnsignedCounterBarrier.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/IncomingMessagesUnsignedCounterBarrier.java @@ -9,18 +9,18 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -class UnsignedCounterBarrier { +class IncomingMessagesUnsignedCounterBarrier { private final AtomicLong counter; // Treated as an unsigned long (i.e. range from 0 -> -1) private final Lock awaitLock = new ReentrantLock(); private final Condition isZero = awaitLock.newCondition(); - private static final Log logger = LogFactory.getLog(UnsignedCounterBarrier.class); + private static final Log logger = LogFactory.getLog(IncomingMessagesUnsignedCounterBarrier.class); - public UnsignedCounterBarrier(long initialValue) { + public IncomingMessagesUnsignedCounterBarrier(long initialValue) { counter = new AtomicLong(initialValue); } - public UnsignedCounterBarrier() { + public IncomingMessagesUnsignedCounterBarrier() { this(0); } diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceFailureHandler.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceFailureHandler.java index 1f4cb09..64d39b0 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceFailureHandler.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceFailureHandler.java @@ -30,8 +30,7 @@ public CompletionStage handle(SolaceInboundMessage msg, Throwable reaso if (metadata != null) { outcome = metadata.get(SettleMetadata.class) .map(SettleMetadata::getOutcome) - .orElseGet(() -> messageOutCome != null ? messageOutCome - : MessageAcknowledgementConfiguration.Outcome.FAILED /* TODO get outcome from reason */); + .orElseGet(() -> messageOutCome /* TODO get outcome from reason */); } else { outcome = messageOutCome != null ? messageOutCome : MessageAcknowledgementConfiguration.Outcome.FAILED; diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceInboundMessage.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceInboundMessage.java index 712a8ea..b8c58f5 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceInboundMessage.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceInboundMessage.java @@ -27,13 +27,13 @@ public class SolaceInboundMessage implements ContextAwareMessage, Metadata private final SolaceErrorTopicPublisherHandler solaceErrorTopicPublisherHandler; private final SolaceConnectorIncomingConfiguration ic; private final T payload; - private final UnsignedCounterBarrier unacknowledgedMessageTracker; + private final IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker; private Metadata metadata; public SolaceInboundMessage(InboundMessage message, SolaceAckHandler ackHandler, SolaceFailureHandler nackHandler, SolaceErrorTopicPublisherHandler solaceErrorTopicPublisherHandler, - SolaceConnectorIncomingConfiguration ic, UnsignedCounterBarrier unacknowledgedMessageTracker) { + SolaceConnectorIncomingConfiguration ic, IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker) { this.msg = message; this.unacknowledgedMessageTracker = unacknowledgedMessageTracker; this.payload = (T) convertPayload(); @@ -103,7 +103,7 @@ public CompletionStage nack(Throwable reason, Metadata nackMetadata) { .atMost(ic.getConsumerQueueErrorMessageMaxDeliveryAttempts()) .onFailure().transform((throwable -> { SolaceLogging.log.unsuccessfulToTopic(ic.getConsumerQueueErrorTopic().get(), ic.getChannel()); - throw new RuntimeException(throwable); + throw new RuntimeException(throwable); // TODO How to catch this exception in tests })) .await().atMost(Duration.ofSeconds(30)); @@ -115,14 +115,14 @@ public CompletionStage nack(Throwable reason, Metadata nackMetadata) { MessageAcknowledgementConfiguration.Outcome outcome = ic.getConsumerQueueEnableNacks() && ic.getConsumerQueueDiscardMessagesOnFailure() && solaceErrorTopicPublisherHandler == null - ? MessageAcknowledgementConfiguration.Outcome.REJECTED - : MessageAcknowledgementConfiguration.Outcome.FAILED; + ? MessageAcknowledgementConfiguration.Outcome.REJECTED // will move message to DMQ is enabled on queue & message + : MessageAcknowledgementConfiguration.Outcome.FAILED; // will redeliver the message if (outcome == MessageAcknowledgementConfiguration.Outcome.REJECTED) { this.unacknowledgedMessageTracker.decrement(); } return ic.getConsumerQueueEnableNacks() ? nackHandler.handle(this, reason, nackMetadata, outcome) - : Uni.createFrom().voidItem().subscribeAsCompletionStage(); + : Uni.createFrom().voidItem().subscribeAsCompletionStage(); // TODO Disconnect and reconnect the receiver in order to redeliver the message. Required when nacks are not supported by the broker version. } @Override diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java index 65075bc..0301670 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java @@ -46,7 +46,7 @@ public class SolaceIncomingChannel implements ReceiverActivationPassivationConfi private long waitTimeout = -1; // Assuming we won't ever exceed the limit of an unsigned long... - private final UnsignedCounterBarrier unacknowledgedMessageTracker = new UnsignedCounterBarrier(); + private final IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker = new IncomingMessagesUnsignedCounterBarrier(); public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration ic, MessagingService solace) { this.channel = ic.getChannel(); diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/UnsignedCounterBarrier.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/OutgoingMessagesUnsignedCounterBarrier.java similarity index 91% rename from pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/UnsignedCounterBarrier.java rename to pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/OutgoingMessagesUnsignedCounterBarrier.java index 7afc29e..6855941 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/UnsignedCounterBarrier.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/OutgoingMessagesUnsignedCounterBarrier.java @@ -9,18 +9,18 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -class UnsignedCounterBarrier { +class OutgoingMessagesUnsignedCounterBarrier { private final AtomicLong counter; // Treated as an unsigned long (i.e. range from 0 -> -1) private final Lock awaitLock = new ReentrantLock(); private final Condition isZero = awaitLock.newCondition(); - private static final Log logger = LogFactory.getLog(UnsignedCounterBarrier.class); + private static final Log logger = LogFactory.getLog(OutgoingMessagesUnsignedCounterBarrier.class); - public UnsignedCounterBarrier(long initialValue) { + public OutgoingMessagesUnsignedCounterBarrier(long initialValue) { counter = new AtomicLong(initialValue); } - public UnsignedCounterBarrier() { + public OutgoingMessagesUnsignedCounterBarrier() { this(0); } diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java index 008e6df..8909afd 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java @@ -40,7 +40,7 @@ public class SolaceOutgoingChannel private long waitTimeout = -1; // Assuming we won't ever exceed the limit of an unsigned long... - private final UnsignedCounterBarrier publishedMessagesTracker = new UnsignedCounterBarrier(); + private final OutgoingMessagesUnsignedCounterBarrier publishedMessagesTracker = new OutgoingMessagesUnsignedCounterBarrier(); public SolaceOutgoingChannel(Vertx vertx, SolaceConnectorOutgoingConfiguration oc, MessagingService solace) { this.channel = oc.getChannel(); diff --git a/samples/hello-connector-solace/src/main/java/io/quarkiverse/solace/samples/HelloConsumer.java b/samples/hello-connector-solace/src/main/java/io/quarkiverse/solace/samples/HelloConsumer.java index 8ab752e..9124b09 100644 --- a/samples/hello-connector-solace/src/main/java/io/quarkiverse/solace/samples/HelloConsumer.java +++ b/samples/hello-connector-solace/src/main/java/io/quarkiverse/solace/samples/HelloConsumer.java @@ -45,26 +45,26 @@ Message consumeAndPublish(SolaceInboundMessage p) { * * @param p */ - @Incoming("dynamic-destination-in") - @Outgoing("dynamic-destination-out") - @Acknowledgment(Acknowledgment.Strategy.MANUAL) - Message consumeAndPublishToDynamicTopic(SolaceInboundMessage p) { - Log.infof("Received message: %s", new String(p.getMessage().getPayloadAsBytes(), StandardCharsets.UTF_8)); - SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder() - .setApplicationMessageId("test").setDynamicDestination("hello/foobar/" + p.getMessage().getSenderId()) // make sure senderId is available on incoming message - .createPubSubOutboundMetadata(); - Message outboundMessage = Message.of(p.getPayload(), Metadata.of(outboundMetadata), () -> { - CompletableFuture completableFuture = new CompletableFuture(); - p.ack(); - completableFuture.complete(null); - return completableFuture; - }, (throwable) -> { - CompletableFuture completableFuture = new CompletableFuture(); - p.nack(throwable, p.getMetadata()); - completableFuture.complete(null); - return completableFuture; - }); - return outboundMessage; - } + // @Incoming("dynamic-destination-in") + // @Outgoing("dynamic-destination-out") + // @Acknowledgment(Acknowledgment.Strategy.MANUAL) + // Message consumeAndPublishToDynamicTopic(SolaceInboundMessage p) { + // Log.infof("Received message: %s", new String(p.getMessage().getPayloadAsBytes(), StandardCharsets.UTF_8)); + // SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder() + // .setApplicationMessageId("test").setDynamicDestination("hello/foobar/" + p.getMessage().getSenderId()) // make sure senderId is available on incoming message + // .createPubSubOutboundMetadata(); + // Message outboundMessage = Message.of(p.getPayload(), Metadata.of(outboundMetadata), () -> { + // CompletableFuture completableFuture = new CompletableFuture(); + // p.ack(); + // completableFuture.complete(null); + // return completableFuture; + // }, (throwable) -> { + // CompletableFuture completableFuture = new CompletableFuture(); + // p.nack(throwable, p.getMetadata()); + // completableFuture.complete(null); + // return completableFuture; + // }); + // return outboundMessage; + // } } diff --git a/samples/hello-connector-solace/src/main/resources/application.properties b/samples/hello-connector-solace/src/main/resources/application.properties index 7f1fcf2..de604fe 100644 --- a/samples/hello-connector-solace/src/main/resources/application.properties +++ b/samples/hello-connector-solace/src/main/resources/application.properties @@ -1,17 +1,17 @@ -quarkus.solace.host= -quarkus.solace.vpn= -quarkus.solace.authentication.basic.username= -quarkus.solace.authentication.basic.password= +quarkus.solace.host=tcps://mr-connection-h0zr2jc6v7f.messaging.solace.cloud:55443 +quarkus.solace.vpn=sjthotak_solace +quarkus.solace.authentication.basic.username=solace-cloud-client +quarkus.solace.authentication.basic.password=qu03808nfjfprlk3ck458u7bv4 mp.messaging.outgoing.hello-out.connector=quarkus-solace -mp.messaging.outgoing.hello-out.producer.topic= +mp.messaging.outgoing.hello-out.producer.topic=quarkus/static/topic #mp.messaging.outgoing.hello-out.producer.back-pressure.strategy=wait #mp.messaging.outgoing.hello-out.producer.back-pressure.buffer-capacity=1 #mp.messaging.outgoing.hello-out.producer.waitForPublishReceipt=false mp.messaging.incoming.hello-in.connector=quarkus-solace mp.messaging.incoming.hello-in.consumer.queue.enable-nacks=true -mp.messaging.incoming.hello-in.consumer.queue.name= +mp.messaging.incoming.hello-in.consumer.queue.name=queue.orders.outgoing mp.messaging.incoming.hello-in.consumer.queue.type=durable-exclusive mp.messaging.incoming.hello-in.consumer.queue.discard-messages-on-failure=false mp.messaging.incoming.hello-in.consumer.queue.publish-to-error-topic-on-failure=true @@ -19,11 +19,11 @@ mp.messaging.incoming.hello-in.consumer.queue.error.topic=solace/quarkus/error mp.messaging.incoming.dynamic-destination-in.connector=quarkus-solace mp.messaging.incoming.dynamic-destination-in.consumer.queue.enable-nacks=true -mp.messaging.incoming.dynamic-destination-in.consumer.queue.name= +mp.messaging.incoming.dynamic-destination-in.consumer.queue.name=queue.orders.outgoing mp.messaging.incoming.dynamic-destination-in.consumer.queue.type=durable-exclusive mp.messaging.incoming.dynamic-destination-in.consumer.queue.discard-messages-on-failure=false mp.messaging.incoming.dynamic-destination-in.consumer.queue.publish-to-error-topic-on-failure=true mp.messaging.incoming.dynamic-destination-in.consumer.queue.error.topic=solace/quarkus/error mp.messaging.outgoing.dynamic-destination-out.connector=quarkus-solace -mp.messaging.outgoing.dynamic-destination-out.producer.topic= +mp.messaging.outgoing.dynamic-destination-out.producer.topic=quarkus/dynamic/topic