Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful shutdown #18

Merged
merged 4 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@

import com.solace.messaging.MessagingService;

import io.quarkiverse.solace.i18n.SolaceLogging;
import io.quarkiverse.solace.incoming.SolaceIncomingChannel;
import io.quarkiverse.solace.outgoing.SolaceOutgoingChannel;
import io.quarkus.runtime.ShutdownEvent;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.connector.InboundConnector;
import io.smallrye.reactive.messaging.connector.OutboundConnector;
Expand All @@ -40,7 +38,8 @@
// TODO only persisted is implemented
//@ConnectorAttribute(name = "client.type", type = "string", direction = INCOMING_AND_OUTGOING, description = "Direct or persisted", defaultValue = "persisted")
@ConnectorAttribute(name = "client.lazy.start", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Whether the receiver or publisher is started at initialization or lazily at subscription time", defaultValue = "false")
@ConnectorAttribute(name = "client.shutdown.wait-timeout", type = "long", direction = INCOMING_AND_OUTGOING, description = "Timeout in milliseconds to wait for messages to finish processing before shutdown", defaultValue = "10000")
@ConnectorAttribute(name = "client.graceful-shutdown", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Whether to shutdown client gracefully", defaultValue = "true")
@ConnectorAttribute(name = "client.graceful-shutdown.wait-timeout", type = "long", direction = INCOMING_AND_OUTGOING, description = "Timeout in milliseconds to wait for messages to finish processing before shutdown", defaultValue = "10000")
@ConnectorAttribute(name = "consumer.queue.name", type = "string", direction = INCOMING, description = "The queue name of receiver")
@ConnectorAttribute(name = "consumer.queue.type", type = "string", direction = INCOMING, description = "The queue type of receiver", defaultValue = "durable-non-exclusive")
@ConnectorAttribute(name = "consumer.queue.missing-resource-creation-strategy", type = "string", direction = INCOMING, description = "Missing resource creation strategy", defaultValue = "do-not-create")
Expand All @@ -50,8 +49,6 @@
@ConnectorAttribute(name = "consumer.queue.replay.strategy", type = "string", direction = INCOMING, description = "The receiver replay strategy")
@ConnectorAttribute(name = "consumer.queue.replay.timebased-start-time", type = "string", direction = INCOMING, description = "The receiver replay timebased start time")
@ConnectorAttribute(name = "consumer.queue.replay.replication-group-message-id", type = "string", direction = INCOMING, description = "The receiver replay replication group message id")
// TODO implement consumer concurrency
//@ConnectorAttribute(name = "consumer.queue.concurrency", type = "int", direction = INCOMING, description = "The number of concurrent consumers", defaultValue = "1")
@ConnectorAttribute(name = "consumer.queue.failure-strategy", type = "string", direction = INCOMING, description = "Specify the failure strategy to apply when a message consumed from Solace broker is nacked. Accepted values are `ignore` (default), `fail`, `discard`, `error_topic`.", defaultValue = "ignore")
@ConnectorAttribute(name = "consumer.queue.error.topic", type = "string", direction = INCOMING, description = "The error topic where message should be published in case of error")
@ConnectorAttribute(name = "consumer.queue.error.message.dmq-eligible", type = "boolean", direction = INCOMING, description = "Whether error message is eligible to move to dead message queue", defaultValue = "false")
Expand Down Expand Up @@ -81,18 +78,6 @@ public class SolaceConnector implements InboundConnector, OutboundConnector, Hea
List<SolaceIncomingChannel> incomingChannels = new CopyOnWriteArrayList<>();
List<SolaceOutgoingChannel> outgoingChannels = new CopyOnWriteArrayList<>();

public void onStop(@Observes ShutdownEvent shutdownEvent) {
if (solace.isConnected()) {
SolaceLogging.log.info("Waiting incoming channel messages to be acknowledged");
incomingChannels.forEach(SolaceIncomingChannel::waitForUnAcknowledgedMessages);
SolaceLogging.log.info("All incoming channel messages are acknowledged");

SolaceLogging.log.info("Waiting for outgoing messages to be published");
outgoingChannels.forEach(SolaceOutgoingChannel::waitForPublishedMessages);
SolaceLogging.log.info("All outgoing messages are published");
}
}

public void terminate(
@Observes(notifyObserver = Reception.IF_EXISTS) @Priority(50) @BeforeDestroyed(ApplicationScoped.class) Object event) {
incomingChannels.forEach(SolaceIncomingChannel::close);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,24 @@ public boolean awaitEmpty(long timeout, TimeUnit unit) throws InterruptedExcepti
return false;
}
isZero.await(realTimeout, TimeUnit.MILLISECONDS);
SolaceLogging.log
.info(String.format("Items remaining: %s", counter.get()));
if (counter.get() == 0l) {
SolaceLogging.log
.info(String.format("All incoming channel messages are acknowledged"));
}
}
return true;
} else if (timeout < 0) {
while (isGreaterThanZero()) {
SolaceLogging.log.info(String.format("Waiting for %s items", counter.get()));
isZero.await(5, TimeUnit.SECONDS);
SolaceLogging.log
.info(String.format("Items remaining: %s", counter.get()));
if (counter.get() == 0l) {
SolaceLogging.log
.info(String.format("All incoming channel messages are acknowledged"));
}
}
return true;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,17 @@ public class SolaceIncomingChannel implements ReceiverActivationPassivationConfi
private final PersistentMessageReceiver receiver;
private final Flow.Publisher<? extends Message<?>> stream;
private final ExecutorService pollerThread;
private long waitTimeout = -1;
private final boolean gracefulShutdown;
private final long gracefulShutdownWaitTimeout;

// Assuming we won't ever exceed the limit of an unsigned long...
private final IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker = new IncomingMessagesUnsignedCounterBarrier();

public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration ic, MessagingService solace) {
this.channel = ic.getChannel();
this.context = Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext());
this.waitTimeout = ic.getClientShutdownWaitTimeout();
this.gracefulShutdown = ic.getClientGracefulShutdown();
this.gracefulShutdownWaitTimeout = ic.getClientGracefulShutdownWaitTimeout();
DirectMessageReceiver r = solace.createDirectMessageReceiverBuilder().build();
Outcome[] outcomes = new Outcome[] { Outcome.ACCEPTED };
if (ic.getConsumerQueueSupportsNacks()) {
Expand Down Expand Up @@ -173,7 +175,8 @@ public Flow.Publisher<? extends Message<?>> getStream() {
public void waitForUnAcknowledgedMessages() {
try {
receiver.pause();
if (!unacknowledgedMessageTracker.awaitEmpty(this.waitTimeout, TimeUnit.MILLISECONDS)) {
SolaceLogging.log.info("Waiting for incoming channel messages to be acknowledged");
if (!unacknowledgedMessageTracker.awaitEmpty(this.gracefulShutdownWaitTimeout, TimeUnit.MILLISECONDS)) {
SolaceLogging.log.info(String.format("Timed out while waiting for the" +
" remaining messages to be acknowledged."));
}
Expand All @@ -184,6 +187,9 @@ public void waitForUnAcknowledgedMessages() {
}

public void close() {
if (this.gracefulShutdown) {
waitForUnAcknowledgedMessages();
}
closed.compareAndSet(false, true);
if (this.pollerThread != null) {
this.pollerThread.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,24 @@ public boolean awaitEmpty(long timeout, TimeUnit unit) throws InterruptedExcepti
return false;
}
isZero.await(realTimeout, TimeUnit.MILLISECONDS);
SolaceLogging.log
.info(String.format("Items remaining: %s", counter.get()));
if (counter.get() == 0l) {
SolaceLogging.log
.info(String.format("All outgoing channel messages are published"));
}
}
return true;
} else if (timeout < 0) {
while (isGreaterThanZero()) {
SolaceLogging.log.info(String.format("Waiting for %s items", counter.get()));
isZero.await(5, TimeUnit.SECONDS);
SolaceLogging.log
.info(String.format("Items remaining: %s", counter.get()));
if (counter.get() == 0l) {
SolaceLogging.log
.info(String.format("All outgoing channel messages are published"));
}
}
return true;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ public class SolaceOutgoingChannel
private final Flow.Subscriber<? extends Message<?>> subscriber;
private final Topic topic;
private final SenderProcessor processor;
private boolean isPublisherReady = true;

private long waitTimeout = -1;
private final boolean gracefulShutdown;
private final long gracefulShutdownWaitTimeout;
private volatile boolean isPublisherReady = true;

// Assuming we won't ever exceed the limit of an unsigned long...
private final OutgoingMessagesUnsignedCounterBarrier publishedMessagesTracker = new OutgoingMessagesUnsignedCounterBarrier();
Expand All @@ -56,7 +56,8 @@ public SolaceOutgoingChannel(Vertx vertx, SolaceConnectorOutgoingConfiguration o
builder.onBackPressureWait(oc.getProducerBackPressureBufferCapacity());
break;
}
this.waitTimeout = oc.getClientShutdownWaitTimeout();
this.gracefulShutdown = oc.getClientGracefulShutdown();
this.gracefulShutdownWaitTimeout = oc.getClientGracefulShutdownWaitTimeout();
oc.getProducerDeliveryAckTimeout().ifPresent(builder::withDeliveryAckTimeout);
oc.getProducerDeliveryAckWindowSize().ifPresent(builder::withDeliveryAckWindowSize);
this.publisher = builder.build();
Expand Down Expand Up @@ -179,7 +180,8 @@ public Flow.Subscriber<? extends Message<?>> getSubscriber() {

public void waitForPublishedMessages() {
try {
if (!publishedMessagesTracker.awaitEmpty(this.waitTimeout, TimeUnit.MILLISECONDS)) {
SolaceLogging.log.info("Waiting for outgoing messages to be published");
if (!publishedMessagesTracker.awaitEmpty(this.gracefulShutdownWaitTimeout, TimeUnit.MILLISECONDS)) {
SolaceLogging.log.info(String.format("Timed out while waiting for the" +
" remaining messages to get publish acknowledgment."));
}
Expand All @@ -190,6 +192,9 @@ public void waitForPublishedMessages() {
}

public void close() {
if (this.gracefulShutdown) {
waitForPublishedMessages();
}
if (processor != null) {
processor.cancel();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@

import java.util.List;
import java.util.Properties;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.*;

import jakarta.enterprise.context.ApplicationScoped;

import org.awaitility.Durations;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
Expand All @@ -26,8 +27,11 @@

import io.quarkiverse.solace.base.SolaceContainer;
import io.quarkiverse.solace.base.WeldTestBase;
import io.quarkiverse.solace.incoming.SolaceIncomingChannel;
import io.quarkiverse.solace.logging.SolaceTestAppender;
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
import io.vertx.mutiny.core.Vertx;

@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class SolaceConsumerTest extends WeldTestBase {
Expand Down Expand Up @@ -220,6 +224,51 @@ void consumerPublishToErrorTopicPermissionException() {

@Test
@Order(7)
void consumerGracefulCloseTest() {
MapBasedConfig config = new MapBasedConfig()
.with("channel-name", "in")
.with("consumer.queue.name", queue)
.with("consumer.queue.add-additional-subscriptions", true)
.with("consumer.queue.missing-resource-creation-strategy", "create-on-start")
.with("consumer.queue.subscriptions", topic);

// Initialize incoming channel to consumes messages
SolaceIncomingChannel solaceIncomingChannel = new SolaceIncomingChannel(Vertx.vertx(),
new SolaceConnectorIncomingConfiguration(config), messagingService);

CopyOnWriteArrayList<Object> list = new CopyOnWriteArrayList<>();
CopyOnWriteArrayList<Object> ackedMessageList = new CopyOnWriteArrayList<>();

Flow.Publisher<? extends Message<?>> stream = solaceIncomingChannel.getStream();
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
Multi.createFrom().publisher(stream).subscribe().with(message -> {
list.add(message);
executorService.schedule(() -> {
ackedMessageList.add(message);
CompletableFuture.runAsync(message::ack);
}, 1, TimeUnit.SECONDS);
});

// Produce messages
PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
.build()
.start();
Topic tp = Topic.of(topic);
publisher.publish("1", tp);
publisher.publish("2", tp);
publisher.publish("3", tp);
publisher.publish("4", tp);
publisher.publish("5", tp);

await().until(() -> list.size() == 5);
SravanThotakura05 marked this conversation as resolved.
Show resolved Hide resolved
// Assert on acknowledged messages
solaceIncomingChannel.close();
await().atMost(2, TimeUnit.MINUTES).until(() -> ackedMessageList.size() == 5);
executorService.shutdown();
}

@Test
@Order(8)
void consumerCreateMissingResourceAddSubscriptionPermissionException() {
MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;

import jakarta.enterprise.context.ApplicationScoped;

Expand All @@ -20,8 +21,10 @@

import io.quarkiverse.solace.base.WeldTestBase;
import io.quarkiverse.solace.outgoing.SolaceOutboundMetadata;
import io.quarkiverse.solace.outgoing.SolaceOutgoingChannel;
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
import io.vertx.mutiny.core.Vertx;

public class SolacePublisherTest extends WeldTestBase {

Expand Down Expand Up @@ -99,6 +102,34 @@ void publisherWithBackPressureReject() {
await().untilAsserted(() -> assertThat(app.getAcked().size()).isLessThan(5));
}

@Test
void publisherGracefulCloseTest() {
MapBasedConfig config = new MapBasedConfig()
.with("channel-name", "out")
.with("producer.topic", topic);

List<String> expected = new CopyOnWriteArrayList<>();

// Start listening first
PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder()
.withSubscriptions(TopicSubscription.of(topic))
.build(Queue.nonDurableExclusiveQueue());
receiver.receiveAsync(inboundMessage -> expected.add(inboundMessage.getPayloadAsString()));
receiver.start();

SolaceOutgoingChannel solaceOutgoingChannel = new SolaceOutgoingChannel(Vertx.vertx(),
new SolaceConnectorOutgoingConfiguration(config), messagingService);
// Publish messages
Multi.createFrom().range(0, 10)
.map(Message::of)
.subscribe((Flow.Subscriber<? super Message<Integer>>) solaceOutgoingChannel.getSubscriber());

solaceOutgoingChannel.close();
// Assert on received messages
await().untilAsserted(() -> assertThat(expected.size()).isEqualTo(10));

}

// @Test
// void publisherWithBackPressureRejectWaitForPublisherReadiness() {
// MapBasedConfig config = new MapBasedConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public MessagingService apply(SyntheticCreationalContext<MessagingService> conte
}

var tmp = service;
shutdown.addShutdownTask(() -> {
shutdown.addLastShutdownTask(() -> {
if (tmp.isConnected()) {
tmp.disconnect();
}
Expand Down