diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/SolaceConnector.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/SolaceConnector.java index 7b598b3..7f5baff 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/SolaceConnector.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/SolaceConnector.java @@ -22,10 +22,8 @@ import com.solace.messaging.MessagingService; -import io.quarkiverse.solace.i18n.SolaceLogging; import io.quarkiverse.solace.incoming.SolaceIncomingChannel; import io.quarkiverse.solace.outgoing.SolaceOutgoingChannel; -import io.quarkus.runtime.ShutdownEvent; import io.smallrye.reactive.messaging.annotations.ConnectorAttribute; import io.smallrye.reactive.messaging.connector.InboundConnector; import io.smallrye.reactive.messaging.connector.OutboundConnector; @@ -40,7 +38,8 @@ // TODO only persisted is implemented //@ConnectorAttribute(name = "client.type", type = "string", direction = INCOMING_AND_OUTGOING, description = "Direct or persisted", defaultValue = "persisted") @ConnectorAttribute(name = "client.lazy.start", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Whether the receiver or publisher is started at initialization or lazily at subscription time", defaultValue = "false") -@ConnectorAttribute(name = "client.shutdown.wait-timeout", type = "long", direction = INCOMING_AND_OUTGOING, description = "Timeout in milliseconds to wait for messages to finish processing before shutdown", defaultValue = "10000") +@ConnectorAttribute(name = "client.graceful-shutdown", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Whether to shutdown client gracefully", defaultValue = "true") +@ConnectorAttribute(name = "client.graceful-shutdown.wait-timeout", type = "long", direction = INCOMING_AND_OUTGOING, description = "Timeout in milliseconds to wait for messages to finish processing before shutdown", defaultValue = "10000") @ConnectorAttribute(name = "consumer.queue.name", type = "string", direction = INCOMING, description = "The queue name of receiver") @ConnectorAttribute(name = "consumer.queue.type", type = "string", direction = INCOMING, description = "The queue type of receiver", defaultValue = "durable-non-exclusive") @ConnectorAttribute(name = "consumer.queue.missing-resource-creation-strategy", type = "string", direction = INCOMING, description = "Missing resource creation strategy", defaultValue = "do-not-create") @@ -50,8 +49,6 @@ @ConnectorAttribute(name = "consumer.queue.replay.strategy", type = "string", direction = INCOMING, description = "The receiver replay strategy") @ConnectorAttribute(name = "consumer.queue.replay.timebased-start-time", type = "string", direction = INCOMING, description = "The receiver replay timebased start time") @ConnectorAttribute(name = "consumer.queue.replay.replication-group-message-id", type = "string", direction = INCOMING, description = "The receiver replay replication group message id") -// TODO implement consumer concurrency -//@ConnectorAttribute(name = "consumer.queue.concurrency", type = "int", direction = INCOMING, description = "The number of concurrent consumers", defaultValue = "1") @ConnectorAttribute(name = "consumer.queue.failure-strategy", type = "string", direction = INCOMING, description = "Specify the failure strategy to apply when a message consumed from Solace broker is nacked. Accepted values are `ignore` (default), `fail`, `discard`, `error_topic`.", defaultValue = "ignore") @ConnectorAttribute(name = "consumer.queue.error.topic", type = "string", direction = INCOMING, description = "The error topic where message should be published in case of error") @ConnectorAttribute(name = "consumer.queue.error.message.dmq-eligible", type = "boolean", direction = INCOMING, description = "Whether error message is eligible to move to dead message queue", defaultValue = "false") @@ -81,18 +78,6 @@ public class SolaceConnector implements InboundConnector, OutboundConnector, Hea List incomingChannels = new CopyOnWriteArrayList<>(); List outgoingChannels = new CopyOnWriteArrayList<>(); - public void onStop(@Observes ShutdownEvent shutdownEvent) { - if (solace.isConnected()) { - SolaceLogging.log.info("Waiting incoming channel messages to be acknowledged"); - incomingChannels.forEach(SolaceIncomingChannel::waitForUnAcknowledgedMessages); - SolaceLogging.log.info("All incoming channel messages are acknowledged"); - - SolaceLogging.log.info("Waiting for outgoing messages to be published"); - outgoingChannels.forEach(SolaceOutgoingChannel::waitForPublishedMessages); - SolaceLogging.log.info("All outgoing messages are published"); - } - } - public void terminate( @Observes(notifyObserver = Reception.IF_EXISTS) @Priority(50) @BeforeDestroyed(ApplicationScoped.class) Object event) { incomingChannels.forEach(SolaceIncomingChannel::close); diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/IncomingMessagesUnsignedCounterBarrier.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/IncomingMessagesUnsignedCounterBarrier.java index 9c8352b..53d9e57 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/IncomingMessagesUnsignedCounterBarrier.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/IncomingMessagesUnsignedCounterBarrier.java @@ -70,12 +70,24 @@ public boolean awaitEmpty(long timeout, TimeUnit unit) throws InterruptedExcepti return false; } isZero.await(realTimeout, TimeUnit.MILLISECONDS); + SolaceLogging.log + .info(String.format("Items remaining: %s", counter.get())); + if (counter.get() == 0l) { + SolaceLogging.log + .info(String.format("All incoming channel messages are acknowledged")); + } } return true; } else if (timeout < 0) { while (isGreaterThanZero()) { SolaceLogging.log.info(String.format("Waiting for %s items", counter.get())); isZero.await(5, TimeUnit.SECONDS); + SolaceLogging.log + .info(String.format("Items remaining: %s", counter.get())); + if (counter.get() == 0l) { + SolaceLogging.log + .info(String.format("All incoming channel messages are acknowledged")); + } } return true; } else { diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java index 9ebe065..21a8d19 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java @@ -45,7 +45,8 @@ public class SolaceIncomingChannel implements ReceiverActivationPassivationConfi private final PersistentMessageReceiver receiver; private final Flow.Publisher> stream; private final ExecutorService pollerThread; - private long waitTimeout = -1; + private final boolean gracefulShutdown; + private final long gracefulShutdownWaitTimeout; // Assuming we won't ever exceed the limit of an unsigned long... private final IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker = new IncomingMessagesUnsignedCounterBarrier(); @@ -53,7 +54,8 @@ public class SolaceIncomingChannel implements ReceiverActivationPassivationConfi public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration ic, MessagingService solace) { this.channel = ic.getChannel(); this.context = Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext()); - this.waitTimeout = ic.getClientShutdownWaitTimeout(); + this.gracefulShutdown = ic.getClientGracefulShutdown(); + this.gracefulShutdownWaitTimeout = ic.getClientGracefulShutdownWaitTimeout(); DirectMessageReceiver r = solace.createDirectMessageReceiverBuilder().build(); Outcome[] outcomes = new Outcome[] { Outcome.ACCEPTED }; if (ic.getConsumerQueueSupportsNacks()) { @@ -173,7 +175,8 @@ public Flow.Publisher> getStream() { public void waitForUnAcknowledgedMessages() { try { receiver.pause(); - if (!unacknowledgedMessageTracker.awaitEmpty(this.waitTimeout, TimeUnit.MILLISECONDS)) { + SolaceLogging.log.info("Waiting for incoming channel messages to be acknowledged"); + if (!unacknowledgedMessageTracker.awaitEmpty(this.gracefulShutdownWaitTimeout, TimeUnit.MILLISECONDS)) { SolaceLogging.log.info(String.format("Timed out while waiting for the" + " remaining messages to be acknowledged.")); } @@ -184,6 +187,9 @@ public void waitForUnAcknowledgedMessages() { } public void close() { + if (this.gracefulShutdown) { + waitForUnAcknowledgedMessages(); + } closed.compareAndSet(false, true); if (this.pollerThread != null) { this.pollerThread.shutdown(); diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/OutgoingMessagesUnsignedCounterBarrier.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/OutgoingMessagesUnsignedCounterBarrier.java index b38a030..e9f4f75 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/OutgoingMessagesUnsignedCounterBarrier.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/OutgoingMessagesUnsignedCounterBarrier.java @@ -70,12 +70,24 @@ public boolean awaitEmpty(long timeout, TimeUnit unit) throws InterruptedExcepti return false; } isZero.await(realTimeout, TimeUnit.MILLISECONDS); + SolaceLogging.log + .info(String.format("Items remaining: %s", counter.get())); + if (counter.get() == 0l) { + SolaceLogging.log + .info(String.format("All outgoing channel messages are published")); + } } return true; } else if (timeout < 0) { while (isGreaterThanZero()) { SolaceLogging.log.info(String.format("Waiting for %s items", counter.get())); isZero.await(5, TimeUnit.SECONDS); + SolaceLogging.log + .info(String.format("Items remaining: %s", counter.get())); + if (counter.get() == 0l) { + SolaceLogging.log + .info(String.format("All outgoing channel messages are published")); + } } return true; } else { diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java index ae8f794..4afb63c 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java @@ -35,9 +35,9 @@ public class SolaceOutgoingChannel private final Flow.Subscriber> subscriber; private final Topic topic; private final SenderProcessor processor; - private boolean isPublisherReady = true; - - private long waitTimeout = -1; + private final boolean gracefulShutdown; + private final long gracefulShutdownWaitTimeout; + private volatile boolean isPublisherReady = true; // Assuming we won't ever exceed the limit of an unsigned long... private final OutgoingMessagesUnsignedCounterBarrier publishedMessagesTracker = new OutgoingMessagesUnsignedCounterBarrier(); @@ -56,7 +56,8 @@ public SolaceOutgoingChannel(Vertx vertx, SolaceConnectorOutgoingConfiguration o builder.onBackPressureWait(oc.getProducerBackPressureBufferCapacity()); break; } - this.waitTimeout = oc.getClientShutdownWaitTimeout(); + this.gracefulShutdown = oc.getClientGracefulShutdown(); + this.gracefulShutdownWaitTimeout = oc.getClientGracefulShutdownWaitTimeout(); oc.getProducerDeliveryAckTimeout().ifPresent(builder::withDeliveryAckTimeout); oc.getProducerDeliveryAckWindowSize().ifPresent(builder::withDeliveryAckWindowSize); this.publisher = builder.build(); @@ -179,7 +180,8 @@ public Flow.Subscriber> getSubscriber() { public void waitForPublishedMessages() { try { - if (!publishedMessagesTracker.awaitEmpty(this.waitTimeout, TimeUnit.MILLISECONDS)) { + SolaceLogging.log.info("Waiting for outgoing messages to be published"); + if (!publishedMessagesTracker.awaitEmpty(this.gracefulShutdownWaitTimeout, TimeUnit.MILLISECONDS)) { SolaceLogging.log.info(String.format("Timed out while waiting for the" + " remaining messages to get publish acknowledgment.")); } @@ -190,6 +192,9 @@ public void waitForPublishedMessages() { } public void close() { + if (this.gracefulShutdown) { + waitForPublishedMessages(); + } if (processor != null) { processor.cancel(); } diff --git a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java index 5f949a3..69f9090 100644 --- a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java +++ b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java @@ -6,12 +6,13 @@ import java.util.List; import java.util.Properties; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.*; import jakarta.enterprise.context.ApplicationScoped; import org.awaitility.Durations; import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; @@ -26,8 +27,11 @@ import io.quarkiverse.solace.base.SolaceContainer; import io.quarkiverse.solace.base.WeldTestBase; +import io.quarkiverse.solace.incoming.SolaceIncomingChannel; import io.quarkiverse.solace.logging.SolaceTestAppender; +import io.smallrye.mutiny.Multi; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; +import io.vertx.mutiny.core.Vertx; @TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class SolaceConsumerTest extends WeldTestBase { @@ -220,6 +224,51 @@ void consumerPublishToErrorTopicPermissionException() { @Test @Order(7) + void consumerGracefulCloseTest() { + MapBasedConfig config = new MapBasedConfig() + .with("channel-name", "in") + .with("consumer.queue.name", queue) + .with("consumer.queue.add-additional-subscriptions", true) + .with("consumer.queue.missing-resource-creation-strategy", "create-on-start") + .with("consumer.queue.subscriptions", topic); + + // Initialize incoming channel to consumes messages + SolaceIncomingChannel solaceIncomingChannel = new SolaceIncomingChannel(Vertx.vertx(), + new SolaceConnectorIncomingConfiguration(config), messagingService); + + CopyOnWriteArrayList list = new CopyOnWriteArrayList<>(); + CopyOnWriteArrayList ackedMessageList = new CopyOnWriteArrayList<>(); + + Flow.Publisher> stream = solaceIncomingChannel.getStream(); + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + Multi.createFrom().publisher(stream).subscribe().with(message -> { + list.add(message); + executorService.schedule(() -> { + ackedMessageList.add(message); + CompletableFuture.runAsync(message::ack); + }, 1, TimeUnit.SECONDS); + }); + + // Produce messages + PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder() + .build() + .start(); + Topic tp = Topic.of(topic); + publisher.publish("1", tp); + publisher.publish("2", tp); + publisher.publish("3", tp); + publisher.publish("4", tp); + publisher.publish("5", tp); + + await().until(() -> list.size() == 5); + // Assert on acknowledged messages + solaceIncomingChannel.close(); + await().atMost(2, TimeUnit.MINUTES).until(() -> ackedMessageList.size() == 5); + executorService.shutdown(); + } + + @Test + @Order(8) void consumerCreateMissingResourceAddSubscriptionPermissionException() { MapBasedConfig config = new MapBasedConfig() .with("mp.messaging.incoming.in.connector", "quarkus-solace") diff --git a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolacePublisherTest.java b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolacePublisherTest.java index 669d51a..b07eca2 100644 --- a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolacePublisherTest.java +++ b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolacePublisherTest.java @@ -6,6 +6,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Flow; import jakarta.enterprise.context.ApplicationScoped; @@ -20,8 +21,10 @@ import io.quarkiverse.solace.base.WeldTestBase; import io.quarkiverse.solace.outgoing.SolaceOutboundMetadata; +import io.quarkiverse.solace.outgoing.SolaceOutgoingChannel; import io.smallrye.mutiny.Multi; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; +import io.vertx.mutiny.core.Vertx; public class SolacePublisherTest extends WeldTestBase { @@ -99,6 +102,34 @@ void publisherWithBackPressureReject() { await().untilAsserted(() -> assertThat(app.getAcked().size()).isLessThan(5)); } + @Test + void publisherGracefulCloseTest() { + MapBasedConfig config = new MapBasedConfig() + .with("channel-name", "out") + .with("producer.topic", topic); + + List expected = new CopyOnWriteArrayList<>(); + + // Start listening first + PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder() + .withSubscriptions(TopicSubscription.of(topic)) + .build(Queue.nonDurableExclusiveQueue()); + receiver.receiveAsync(inboundMessage -> expected.add(inboundMessage.getPayloadAsString())); + receiver.start(); + + SolaceOutgoingChannel solaceOutgoingChannel = new SolaceOutgoingChannel(Vertx.vertx(), + new SolaceConnectorOutgoingConfiguration(config), messagingService); + // Publish messages + Multi.createFrom().range(0, 10) + .map(Message::of) + .subscribe((Flow.Subscriber>) solaceOutgoingChannel.getSubscriber()); + + solaceOutgoingChannel.close(); + // Assert on received messages + await().untilAsserted(() -> assertThat(expected.size()).isEqualTo(10)); + + } + // @Test // void publisherWithBackPressureRejectWaitForPublisherReadiness() { // MapBasedConfig config = new MapBasedConfig() diff --git a/runtime/src/main/java/io/quarkiverse/solace/runtime/SolaceRecorder.java b/runtime/src/main/java/io/quarkiverse/solace/runtime/SolaceRecorder.java index b3b20ae..43ec4dd 100644 --- a/runtime/src/main/java/io/quarkiverse/solace/runtime/SolaceRecorder.java +++ b/runtime/src/main/java/io/quarkiverse/solace/runtime/SolaceRecorder.java @@ -55,7 +55,7 @@ public MessagingService apply(SyntheticCreationalContext conte } var tmp = service; - shutdown.addShutdownTask(() -> { + shutdown.addLastShutdownTask(() -> { if (tmp.isConnected()) { tmp.disconnect(); }