Skip to content

Commit

Permalink
Merge pull request #18 from SolaceCoEExt/fix-issues
Browse files Browse the repository at this point in the history
Graceful shutdown
  • Loading branch information
ozangunalp authored Jan 9, 2024
2 parents a9b4978 + 33a4acf commit 2366810
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@

import com.solace.messaging.MessagingService;

import io.quarkiverse.solace.i18n.SolaceLogging;
import io.quarkiverse.solace.incoming.SolaceIncomingChannel;
import io.quarkiverse.solace.outgoing.SolaceOutgoingChannel;
import io.quarkus.runtime.ShutdownEvent;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.connector.InboundConnector;
import io.smallrye.reactive.messaging.connector.OutboundConnector;
Expand All @@ -40,7 +38,8 @@
// TODO only persisted is implemented
//@ConnectorAttribute(name = "client.type", type = "string", direction = INCOMING_AND_OUTGOING, description = "Direct or persisted", defaultValue = "persisted")
@ConnectorAttribute(name = "client.lazy.start", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Whether the receiver or publisher is started at initialization or lazily at subscription time", defaultValue = "false")
@ConnectorAttribute(name = "client.shutdown.wait-timeout", type = "long", direction = INCOMING_AND_OUTGOING, description = "Timeout in milliseconds to wait for messages to finish processing before shutdown", defaultValue = "10000")
@ConnectorAttribute(name = "client.graceful-shutdown", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Whether to shutdown client gracefully", defaultValue = "true")
@ConnectorAttribute(name = "client.graceful-shutdown.wait-timeout", type = "long", direction = INCOMING_AND_OUTGOING, description = "Timeout in milliseconds to wait for messages to finish processing before shutdown", defaultValue = "10000")
@ConnectorAttribute(name = "consumer.queue.name", type = "string", direction = INCOMING, description = "The queue name of receiver")
@ConnectorAttribute(name = "consumer.queue.type", type = "string", direction = INCOMING, description = "The queue type of receiver", defaultValue = "durable-non-exclusive")
@ConnectorAttribute(name = "consumer.queue.missing-resource-creation-strategy", type = "string", direction = INCOMING, description = "Missing resource creation strategy", defaultValue = "do-not-create")
Expand All @@ -50,8 +49,6 @@
@ConnectorAttribute(name = "consumer.queue.replay.strategy", type = "string", direction = INCOMING, description = "The receiver replay strategy")
@ConnectorAttribute(name = "consumer.queue.replay.timebased-start-time", type = "string", direction = INCOMING, description = "The receiver replay timebased start time")
@ConnectorAttribute(name = "consumer.queue.replay.replication-group-message-id", type = "string", direction = INCOMING, description = "The receiver replay replication group message id")
// TODO implement consumer concurrency
//@ConnectorAttribute(name = "consumer.queue.concurrency", type = "int", direction = INCOMING, description = "The number of concurrent consumers", defaultValue = "1")
@ConnectorAttribute(name = "consumer.queue.failure-strategy", type = "string", direction = INCOMING, description = "Specify the failure strategy to apply when a message consumed from Solace broker is nacked. Accepted values are `ignore` (default), `fail`, `discard`, `error_topic`.", defaultValue = "ignore")
@ConnectorAttribute(name = "consumer.queue.error.topic", type = "string", direction = INCOMING, description = "The error topic where message should be published in case of error")
@ConnectorAttribute(name = "consumer.queue.error.message.dmq-eligible", type = "boolean", direction = INCOMING, description = "Whether error message is eligible to move to dead message queue", defaultValue = "false")
Expand Down Expand Up @@ -81,18 +78,6 @@ public class SolaceConnector implements InboundConnector, OutboundConnector, Hea
List<SolaceIncomingChannel> incomingChannels = new CopyOnWriteArrayList<>();
List<SolaceOutgoingChannel> outgoingChannels = new CopyOnWriteArrayList<>();

public void onStop(@Observes ShutdownEvent shutdownEvent) {
if (solace.isConnected()) {
SolaceLogging.log.info("Waiting incoming channel messages to be acknowledged");
incomingChannels.forEach(SolaceIncomingChannel::waitForUnAcknowledgedMessages);
SolaceLogging.log.info("All incoming channel messages are acknowledged");

SolaceLogging.log.info("Waiting for outgoing messages to be published");
outgoingChannels.forEach(SolaceOutgoingChannel::waitForPublishedMessages);
SolaceLogging.log.info("All outgoing messages are published");
}
}

public void terminate(
@Observes(notifyObserver = Reception.IF_EXISTS) @Priority(50) @BeforeDestroyed(ApplicationScoped.class) Object event) {
incomingChannels.forEach(SolaceIncomingChannel::close);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,24 @@ public boolean awaitEmpty(long timeout, TimeUnit unit) throws InterruptedExcepti
return false;
}
isZero.await(realTimeout, TimeUnit.MILLISECONDS);
SolaceLogging.log
.info(String.format("Items remaining: %s", counter.get()));
if (counter.get() == 0l) {
SolaceLogging.log
.info(String.format("All incoming channel messages are acknowledged"));
}
}
return true;
} else if (timeout < 0) {
while (isGreaterThanZero()) {
SolaceLogging.log.info(String.format("Waiting for %s items", counter.get()));
isZero.await(5, TimeUnit.SECONDS);
SolaceLogging.log
.info(String.format("Items remaining: %s", counter.get()));
if (counter.get() == 0l) {
SolaceLogging.log
.info(String.format("All incoming channel messages are acknowledged"));
}
}
return true;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,17 @@ public class SolaceIncomingChannel implements ReceiverActivationPassivationConfi
private final PersistentMessageReceiver receiver;
private final Flow.Publisher<? extends Message<?>> stream;
private final ExecutorService pollerThread;
private long waitTimeout = -1;
private final boolean gracefulShutdown;
private final long gracefulShutdownWaitTimeout;

// Assuming we won't ever exceed the limit of an unsigned long...
private final IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker = new IncomingMessagesUnsignedCounterBarrier();

public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration ic, MessagingService solace) {
this.channel = ic.getChannel();
this.context = Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext());
this.waitTimeout = ic.getClientShutdownWaitTimeout();
this.gracefulShutdown = ic.getClientGracefulShutdown();
this.gracefulShutdownWaitTimeout = ic.getClientGracefulShutdownWaitTimeout();
DirectMessageReceiver r = solace.createDirectMessageReceiverBuilder().build();
Outcome[] outcomes = new Outcome[] { Outcome.ACCEPTED };
if (ic.getConsumerQueueSupportsNacks()) {
Expand Down Expand Up @@ -173,7 +175,8 @@ public Flow.Publisher<? extends Message<?>> getStream() {
public void waitForUnAcknowledgedMessages() {
try {
receiver.pause();
if (!unacknowledgedMessageTracker.awaitEmpty(this.waitTimeout, TimeUnit.MILLISECONDS)) {
SolaceLogging.log.info("Waiting for incoming channel messages to be acknowledged");
if (!unacknowledgedMessageTracker.awaitEmpty(this.gracefulShutdownWaitTimeout, TimeUnit.MILLISECONDS)) {
SolaceLogging.log.info(String.format("Timed out while waiting for the" +
" remaining messages to be acknowledged."));
}
Expand All @@ -184,6 +187,9 @@ public void waitForUnAcknowledgedMessages() {
}

public void close() {
if (this.gracefulShutdown) {
waitForUnAcknowledgedMessages();
}
closed.compareAndSet(false, true);
if (this.pollerThread != null) {
this.pollerThread.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,24 @@ public boolean awaitEmpty(long timeout, TimeUnit unit) throws InterruptedExcepti
return false;
}
isZero.await(realTimeout, TimeUnit.MILLISECONDS);
SolaceLogging.log
.info(String.format("Items remaining: %s", counter.get()));
if (counter.get() == 0l) {
SolaceLogging.log
.info(String.format("All outgoing channel messages are published"));
}
}
return true;
} else if (timeout < 0) {
while (isGreaterThanZero()) {
SolaceLogging.log.info(String.format("Waiting for %s items", counter.get()));
isZero.await(5, TimeUnit.SECONDS);
SolaceLogging.log
.info(String.format("Items remaining: %s", counter.get()));
if (counter.get() == 0l) {
SolaceLogging.log
.info(String.format("All outgoing channel messages are published"));
}
}
return true;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ public class SolaceOutgoingChannel
private final Flow.Subscriber<? extends Message<?>> subscriber;
private final Topic topic;
private final SenderProcessor processor;
private boolean isPublisherReady = true;

private long waitTimeout = -1;
private final boolean gracefulShutdown;
private final long gracefulShutdownWaitTimeout;
private volatile boolean isPublisherReady = true;

// Assuming we won't ever exceed the limit of an unsigned long...
private final OutgoingMessagesUnsignedCounterBarrier publishedMessagesTracker = new OutgoingMessagesUnsignedCounterBarrier();
Expand All @@ -56,7 +56,8 @@ public SolaceOutgoingChannel(Vertx vertx, SolaceConnectorOutgoingConfiguration o
builder.onBackPressureWait(oc.getProducerBackPressureBufferCapacity());
break;
}
this.waitTimeout = oc.getClientShutdownWaitTimeout();
this.gracefulShutdown = oc.getClientGracefulShutdown();
this.gracefulShutdownWaitTimeout = oc.getClientGracefulShutdownWaitTimeout();
oc.getProducerDeliveryAckTimeout().ifPresent(builder::withDeliveryAckTimeout);
oc.getProducerDeliveryAckWindowSize().ifPresent(builder::withDeliveryAckWindowSize);
this.publisher = builder.build();
Expand Down Expand Up @@ -179,7 +180,8 @@ public Flow.Subscriber<? extends Message<?>> getSubscriber() {

public void waitForPublishedMessages() {
try {
if (!publishedMessagesTracker.awaitEmpty(this.waitTimeout, TimeUnit.MILLISECONDS)) {
SolaceLogging.log.info("Waiting for outgoing messages to be published");
if (!publishedMessagesTracker.awaitEmpty(this.gracefulShutdownWaitTimeout, TimeUnit.MILLISECONDS)) {
SolaceLogging.log.info(String.format("Timed out while waiting for the" +
" remaining messages to get publish acknowledgment."));
}
Expand All @@ -190,6 +192,9 @@ public void waitForPublishedMessages() {
}

public void close() {
if (this.gracefulShutdown) {
waitForPublishedMessages();
}
if (processor != null) {
processor.cancel();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@

import java.util.List;
import java.util.Properties;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.*;

import jakarta.enterprise.context.ApplicationScoped;

import org.awaitility.Durations;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
Expand All @@ -26,8 +27,11 @@

import io.quarkiverse.solace.base.SolaceContainer;
import io.quarkiverse.solace.base.WeldTestBase;
import io.quarkiverse.solace.incoming.SolaceIncomingChannel;
import io.quarkiverse.solace.logging.SolaceTestAppender;
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
import io.vertx.mutiny.core.Vertx;

@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class SolaceConsumerTest extends WeldTestBase {
Expand Down Expand Up @@ -220,6 +224,51 @@ void consumerPublishToErrorTopicPermissionException() {

@Test
@Order(7)
void consumerGracefulCloseTest() {
MapBasedConfig config = new MapBasedConfig()
.with("channel-name", "in")
.with("consumer.queue.name", queue)
.with("consumer.queue.add-additional-subscriptions", true)
.with("consumer.queue.missing-resource-creation-strategy", "create-on-start")
.with("consumer.queue.subscriptions", topic);

// Initialize incoming channel to consumes messages
SolaceIncomingChannel solaceIncomingChannel = new SolaceIncomingChannel(Vertx.vertx(),
new SolaceConnectorIncomingConfiguration(config), messagingService);

CopyOnWriteArrayList<Object> list = new CopyOnWriteArrayList<>();
CopyOnWriteArrayList<Object> ackedMessageList = new CopyOnWriteArrayList<>();

Flow.Publisher<? extends Message<?>> stream = solaceIncomingChannel.getStream();
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
Multi.createFrom().publisher(stream).subscribe().with(message -> {
list.add(message);
executorService.schedule(() -> {
ackedMessageList.add(message);
CompletableFuture.runAsync(message::ack);
}, 1, TimeUnit.SECONDS);
});

// Produce messages
PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
.build()
.start();
Topic tp = Topic.of(topic);
publisher.publish("1", tp);
publisher.publish("2", tp);
publisher.publish("3", tp);
publisher.publish("4", tp);
publisher.publish("5", tp);

await().until(() -> list.size() == 5);
// Assert on acknowledged messages
solaceIncomingChannel.close();
await().atMost(2, TimeUnit.MINUTES).until(() -> ackedMessageList.size() == 5);
executorService.shutdown();
}

@Test
@Order(8)
void consumerCreateMissingResourceAddSubscriptionPermissionException() {
MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;

import jakarta.enterprise.context.ApplicationScoped;

Expand All @@ -20,8 +21,10 @@

import io.quarkiverse.solace.base.WeldTestBase;
import io.quarkiverse.solace.outgoing.SolaceOutboundMetadata;
import io.quarkiverse.solace.outgoing.SolaceOutgoingChannel;
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
import io.vertx.mutiny.core.Vertx;

public class SolacePublisherTest extends WeldTestBase {

Expand Down Expand Up @@ -99,6 +102,34 @@ void publisherWithBackPressureReject() {
await().untilAsserted(() -> assertThat(app.getAcked().size()).isLessThan(5));
}

@Test
void publisherGracefulCloseTest() {
MapBasedConfig config = new MapBasedConfig()
.with("channel-name", "out")
.with("producer.topic", topic);

List<String> expected = new CopyOnWriteArrayList<>();

// Start listening first
PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder()
.withSubscriptions(TopicSubscription.of(topic))
.build(Queue.nonDurableExclusiveQueue());
receiver.receiveAsync(inboundMessage -> expected.add(inboundMessage.getPayloadAsString()));
receiver.start();

SolaceOutgoingChannel solaceOutgoingChannel = new SolaceOutgoingChannel(Vertx.vertx(),
new SolaceConnectorOutgoingConfiguration(config), messagingService);
// Publish messages
Multi.createFrom().range(0, 10)
.map(Message::of)
.subscribe((Flow.Subscriber<? super Message<Integer>>) solaceOutgoingChannel.getSubscriber());

solaceOutgoingChannel.close();
// Assert on received messages
await().untilAsserted(() -> assertThat(expected.size()).isEqualTo(10));

}

// @Test
// void publisherWithBackPressureRejectWaitForPublisherReadiness() {
// MapBasedConfig config = new MapBasedConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public MessagingService apply(SyntheticCreationalContext<MessagingService> conte
}

var tmp = service;
shutdown.addShutdownTask(() -> {
shutdown.addLastShutdownTask(() -> {
if (tmp.isConnected()) {
tmp.disconnect();
}
Expand Down

0 comments on commit 2366810

Please sign in to comment.