Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improper threads synchronization in the ThreadBarrier leading to snapshot failures #1842

Open
ydidukh opened this issue Sep 6, 2024 · 0 comments

Comments

@ydidukh
Copy link

ydidukh commented Sep 6, 2024

We have an application that utilizes the Siddhi engine. Our app processes a huge amount of events, which are passed to Siddhi runtimes. On a periodic basis, we capture snapshots of the Siddhi runtimes to restore them when our application scales up or down. We have observed that sometimes the process of taking a full snapshot fails with the error below, leading to crashes and restarts of our application.

Caused by: io.siddhi.core.exception.SiddhiAppRuntimeException: Siddhi App Bucket_#1_Query_#0 not stabilized for snapshot/restore, Active thread count is 1
at io.siddhi.core.util.snapshot.SnapshotService.waitForSystemStabilization(SnapshotService.java:769)
at io.siddhi.core.util.snapshot.SnapshotService.fullSnapshot(SnapshotService.java:101)
at io.siddhi.core.SiddhiAppRuntimeImpl.snapshot(SiddhiAppRuntimeImpl.java:710)
...

Analysis of the code has shown that, at the time of the full snapshot failure, the ThreadBarrier used by the SnapshotService class has an activeThreads count equal to 1, and this number does not change, leading to the failure. While we can handle the error and continue without an actual failure, the problem is that when this situation occurs, event processing in the affected thread is paused for 100 seconds, which is unacceptable for us.

Further analysis has shown that this may happen in two cases:

  1. When we have a multi-query application where the results of the execution of the first query are passed into a second query. Since the processing of the original event (which was put into processing in the first query) and the derived events passed to the second query are processed in the same thread, the activeThreads count is improperly incremented, creating a deadlock-like situation.

Stack trace:

Class: sun.misc.Unsafe, Method: park, File: Unsafe.java, Line: -2
Class: java.util.concurrent.locks.LockSupport, Method: park, File: LockSupport.java, Line: 175
Class: java.util.concurrent.locks.AbstractQueuedSynchronizer, Method: parkAndCheckInterrupt, File: AbstractQueuedSynchronizer.java, Line: 836
Class: java.util.concurrent.locks.AbstractQueuedSynchronizer, Method: acquireQueued, File: AbstractQueuedSynchronizer.java, Line: 870
Class: java.util.concurrent.locks.AbstractQueuedSynchronizer, Method: acquire, File: AbstractQueuedSynchronizer.java, Line: 1199
Class: java.util.concurrent.locks.ReentrantLock$NonfairSync, Method: lock, File: ReentrantLock.java, Line: 209
Class: java.util.concurrent.locks.ReentrantLock, Method: lock, File: ReentrantLock.java, Line: 285
Class: io.siddhi.core.util.ThreadBarrier, Method: enter, File: ThreadBarrier.java, Line: 36
Class: io.siddhi.core.query.input.stream.single.EntryValveProcessor, Method: process, File: EntryValveProcessor.java, Line: 49
Class: io.siddhi.core.query.processor.filter.FilterProcessor, Method: process, File: FilterProcessor.java, Line: 58
Class: io.siddhi.core.query.input.ProcessStreamReceiver, Method: processAndClear, File: ProcessStreamReceiver.java, Line: 182
Class: io.siddhi.core.query.input.ProcessStreamReceiver, Method: process, File: ProcessStreamReceiver.java, Line: 89
Class: io.siddhi.core.query.input.ProcessStreamReceiver, Method: receive, File: ProcessStreamReceiver.java, Line: 115
Class: io.siddhi.core.stream.StreamJunction, Method: sendEvent, File: StreamJunction.java, Line: 180
Class: io.siddhi.core.stream.StreamJunction$Publisher, Method: send, File: StreamJunction.java, Line: 497
Class: io.siddhi.core.query.output.callback.InsertIntoStreamCallback, Method: send, File: InsertIntoStreamCallback.java, Line: 56
Class: io.siddhi.core.query.output.ratelimit.OutputRateLimiter, Method: sendToCallBacks, File: OutputRateLimiter.java, Line: 104
Class: io.siddhi.core.query.output.ratelimit.PassThroughOutputRateLimiter, Method: process, File: PassThroughOutputRateLimiter.java, Line: 45
Class: io.siddhi.core.query.selector.QuerySelector, Method: process, File: QuerySelector.java, Line: 98
Class: io.siddhi.core.query.processor.filter.FilterProcessor, Method: process, File: FilterProcessor.java, Line: 58
Class: io.siddhi.core.query.input.ProcessStreamReceiver, Method: processAndClear, File: ProcessStreamReceiver.java, Line: 182
Class: io.siddhi.core.query.input.ProcessStreamReceiver, Method: process, File: ProcessStreamReceiver.java, Line: 89
Class: io.siddhi.core.query.input.ProcessStreamReceiver, Method: receive, File: ProcessStreamReceiver.java, Line: 127
Class: io.siddhi.core.stream.StreamJunction, Method: sendEvent, File: StreamJunction.java, Line: 203
Class: io.siddhi.core.stream.StreamJunction$Publisher, Method: send, File: StreamJunction.java, Line: 506
Class: io.siddhi.core.stream.input.InputDistributor, Method: send, File: InputDistributor.java, Line: 34
Class: io.siddhi.core.stream.input.InputEntryValve, Method: send, File: InputEntryValve.java, Line: 45
Class: io.siddhi.core.stream.input.InputHandler, Method: send, File: InputHandler.java, Line: 79
...

  1. When the sendTimerEvents method of the Scheduler class is called and a timer event is sent to the entryValve for further processing, leading to a deadlock-like situation.

Timeline that helps to understand the problem:

Time point 1 [Thread-1]:
InputHandler is called to process the original event; InputEntryValve.process(...) method is then called, which attempts to increase the activeThreads counter by invoking threadBarrier.enter(). Once that is done, the event is pushed into further processing by query stream receivers.

Time point 2 [Thread-2]:
SnapshotService.fullSnapshot() is invoked in a separate thread; threadBarrier.lock() is acquired.

Time point 3 [Thread-1]:
Event processing by Query 1 is completed, which results in another event being pushed into the output stream. Stream callbacks are configured to push that event into another input stream of Query 2, where the EntryValveProcessor class is called with that event.

EntryValveProcessor.process(...) internally calls threadBarrier.enter(), which attempts to increase the activeThreads count before passing the event further to the query runtime, even though it should not normally do so because the other ValveProcessor class already incremented the counter for the given thread.

threadBarrier.enter() has verification logic to acquire and immediately release the lock if another thread holds it. Since the lock is held by the SnapshotService thread (Thread-2), the EntryValveProcessor thread is blocked until the SnapshotService thread releases the lock. That means that the activeThreads count will remain 1 until the block is released by the SnapshotService.

Time point 4 [Thread-2]:
SnapshotService.fullSnapshot() calls waitForSystemStabilization(), which checks if the activeThreads count is 0. Since the EntryValveProcessor thread is blocked, the activeThreads count is 1, and the SnapshotService thread sleeps for 1000 ms before checking again. After 100 retries, the SnapshotService thread raises a "Stabilization" error, propagating the failure up the call stack and causing the application to crash.

Fix proposal:

To resolve this issue, we need to ensure that the activeThreads counter is not incremented when threadBarrier.enter() is called in the same thread for which the counter has already been incremented.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant