Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix testing queue/queue-rabbitmq - RabbitMQMailQueueTest.deQueueMails - RabbitFluxException #1075

Closed
wants to merge 3 commits into from

Conversation

vttranlina
Copy link
Contributor

CI sometimes pass, sometimes fail at org.apache.james.queue.rabbitmq.RabbitMQMailQueueTest

Error logs

reactor.rabbitmq.RabbitFluxException: Not retryable exception, cannot retry

	at reactor.rabbitmq.ExceptionHandlers$SimpleRetryTemplate.retry(ExceptionHandlers.java:125)
	at reactor.rabbitmq.ExceptionHandlers$RetryAcknowledgmentExceptionHandler.accept(ExceptionHandlers.java:143)
	at reactor.rabbitmq.ExceptionHandlers$RetryAcknowledgmentExceptionHandler.accept(ExceptionHandlers.java:130)
	at reactor.rabbitmq.AcknowledgableDelivery.retry(AcknowledgableDelivery.java:130)
	at reactor.rabbitmq.AcknowledgableDelivery.ack(AcknowledgableDelivery.java:64)
	at reactor.rabbitmq.AcknowledgableDelivery.ack(AcknowledgableDelivery.java:76)
	at org.apache.james.queue.rabbitmq.Dequeuer.lambda$ack$4(Dequeuer.java:130)
	at com.github.fge.lambdas.consumers.ThrowingConsumer.accept(ThrowingConsumer.java:22)
	at org.apache.james.queue.rabbitmq.Dequeuer$RabbitMQMailQueueItem.done(Dequeuer.java:75)
	at org.apache.james.queue.api.RawMailQueueItem.done(RawMailQueueItem.java:39)
	at org.apache.james.queue.rabbitmq.RabbitMQMailQueueTest$MailQueueSizeMetricsEnabled.lambda$dequeueMails$15(RabbitMQMailQueueTest.java:655)
	at reactor.core.publisher.MonoCallable.call(MonoCallable.java:92)
	at reactor.core.publisher.FluxSubscribeOnCallable$CallableSubscribeOnSubscription.run(FluxSubscribeOnCallable.java:227)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
		at reactor.core.publisher.Flux.blockLast(Flux.java:2645)
		at org.apache.james.queue.rabbitmq.RabbitMQMailQueueTest$MailQueueSizeMetricsEnabled.dequeueMails(RabbitMQMailQueueTest.java:658)
		at org.apache.james.queue.rabbitmq.RabbitMQMailQueueTest$MailQueueSizeMetricsEnabled.browseAndDequeueShouldCombineWellWhenDifferentSlices(RabbitMQMailQueueTest.java:291)
		at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
		at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
		at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
		at java.base/java.lang.reflect.Method.invoke(Method.java:566)
		at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
		at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
		at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
		at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
		at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
		at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
		at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
		at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
		at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
		at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
		at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
		at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
		at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
		at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
		at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
		at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
		at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
		at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
		at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
		at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
		at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
		at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
		at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
		at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
		at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
		at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
		at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
		at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
		at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
		at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
		at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
		at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
		at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
		at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
		at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
		at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
		at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
		at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
		at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
		at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
		at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
		at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
		at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
		at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
		at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
		at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
		at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
		at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
		at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
		at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71)
		at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
		at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:30)
		at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
		at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
		at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: com.rabbitmq.client.AlreadyClosedException: channel is already closed due to clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
	at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258)
	at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:427)
	at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:421)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN.basicAck(RecoveryAwareChannelN.java:93)
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicAck(AutorecoveringChannel.java:439)
	at reactor.rabbitmq.AcknowledgableDelivery.basicAck(AcknowledgableDelivery.java:110)
	at reactor.rabbitmq.AcknowledgableDelivery.ack(AcknowledgableDelivery.java:62)
	... 15 more

…meoutException

- Regression: org.apache.james.queue.rabbitmq.RabbitMQMailQueueTest$MailQueueSizeMetricsEnabled.dequeueShouldBeConcurrent (from org.apache.james.queue.rabbitmq.RabbitMQMailQueueTest)
Condition with lambda expression in org.apache.james.queue.api.MailQueueContract that uses java.util.concurrent.ConcurrentLinkedDeque, java.util.concurrent.ConcurrentLinkedDequeint was not fulfilled within 1 minutes.
}).subscribeOn(Schedulers.elastic()))
}).subscribeOn(Schedulers.newBoundedElastic(2, 20, "DeQueueSchedulerTest")))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Failing test percent (Mean of 3)
Before: 7 %
After: 2.333 %

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better if we can understand the root cause IMO

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tried single scheduler?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tried single scheduler?

  • Schedulers.single() -> .block is not allowed on single scheduler.
  • reduce newBoundedElastic thread size to 1 -> the failing probability seems reduce a bit. about <= 2%

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 will be easier to detect the parallel vulnerable. Furthermore here is the test code, IMO, no need to be too strict 1 or 2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some tests for easier debugging issues with reactor-rabbitmq (maybe?)
See d40cdea

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you see in these tests if using boundedElastic/single helps?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Open a ticket upstream too maybe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Open a ticket upstream too maybe.

Where I can? https://issues.apache.org/jira/projects/JAMES ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you see in these tests if using boundedElastic/single helps?

  • single: No, it fails as immediately
  • boundedElastic: it makes the % fail decreasingly

Comment on lines +577 to +578
Scheduler newBoundedElastic = Schedulers.newBoundedElastic(10 * Runtime.getRuntime().availableProcessors(), 10, "MailQueueTest");

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@chibenwa chibenwa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see written answers to quan questions.

I do not understand the rationnals behind this.

Maybe the test should just be disabled?

@vttranlina
Copy link
Contributor Author

vttranlina commented Jul 7, 2022

I do not see written answers to quan questions.

I do not understand the rationnals behind this.

Maybe the test should just be disabled?

I'm not sure for threads deeper here, I guess the creation threads as "elastic" make test cases sometimes fail (rarely).
So When I tried to use boundedElastic threads, I saw the test result is better. (meaning of test case still does not change).

@vttranlina
Copy link
Contributor Author

I added some tests for easier debugging issues with reactor-rabbitmq (maybe?)
See d40cdea

@chibenwa
Copy link
Contributor

chibenwa commented Jul 9, 2022

My feeling is that this could explain crashing consumers cc @Arsnael

@Arsnael
Copy link
Contributor

Arsnael commented Jul 11, 2022

The crashing consumers we have sometimes in prod? Could be yes...

@chibenwa
Copy link
Contributor

Does usenio in rabbitmq client setting affects this?

@chibenwa
Copy link
Contributor

chibenwa commented Aug 1, 2022

CF reactor/reactor-rabbitmq#176

@chibenwa
Copy link
Contributor

chibenwa commented Aug 1, 2022

See #1099

@chibenwa chibenwa closed this Aug 1, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants