-
Notifications
You must be signed in to change notification settings - Fork 601
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Groupwithin improvements #3186
base: main
Are you sure you want to change the base?
Groupwithin improvements #3186
Conversation
…-compr to avoid CI js failure
Sorry, this is my trigger 😆 This issue is not related to Scala.js. The core Scala language (i.e. syntax) actually works 100% identically for all three platforms (if you ever find a case where it doesn't, that's a legitimate bug and we should fix it :). Sometimes, a JS CI job will happen to fail first. That doesn't mean it's a JS-specific issue. /rant 😇 |
@armanbilge I had a look, this time it sounds like a Scala.js problem on scala 3 (setting the version to 2 and running the CI command I tried to set the compiler flag Also I've found a similar issue here: sounds like a complier issue from the discussion. Do you think it's worth spending more time on this ? |
You can't reproduce it with Scala JVM? |
Nope: the command is not available on jvm |
|
yeah |
|
I've run the jvm tests on scala 2, let me try with scala 3 @armanbilge yeah you're right I'm getting the same error. As mentioned earlier though using the
I'm leaning towards keeping it as it is, unless you know how to fix it: because I have no idea: the issue linked earlier labels it as a won't-fix. Hopefully deconstructing vs non-deconstructing the tuple is not a deal breaker. Either way should we move this to Discord ? The conversation is moving away from the PR 😢 |
It's not a problem at all, just a symptom of cross-compiling across Scala 2 and Scala 3. Sorry that I derailed things 😓 |
No worries: it's just because I've opened a bunch of PRs doing the same thing and it might get confusing. Thanks for explaining though, learned something again 🥇 |
* handling edge cases * refactoring * adding tests
@armanbilge I was tweaking the benchmark parameters and I've discovered a flaw in this implementation. Currently the buffer window is set to This is what I've found out:
And these are the results: visual vm & benchmarks (only one test case)(no longer relevant) suggested implementation (cats bounded Queue) - replacedexisting implementation - main branchsuggested implementation (Semaphore + Vector) - this branchIn light of this, I've decided to restore the So to summarize, compared to the existing implementation I'm seeing better performance overall and increased accuracy:
On top of that a few improvements:
So if the above makes sense and you guys are happy with these changes I think that the bulk of the work is done and what's left is:
Notes
|
Thanks for that update!
I have a question about this: I understand that |
@armanbilge sorry for the late reply. I see, thanks for explaining. When I ran the benchmark last time using I'm wondering if the difference between |
It doesn't just apply to |
Summary
Apologies in advance for the wall of text: hopefully this time it will be worth it (if not this is probably my last attempt on this for a while 😅)
This PR is a followup from #3162 (thanks all for the feedback)
Goals:
gc
benchmark stats)interruption
propagation misbehaviourTimeout behaviour
The current implementation has a small flaw in the timeout logic: when entering the timeout state the stream waits for the first element then tries to calculate how many elements are available using the supply semaphore. It finally uses that figure to decide where to split the buffer.
The problem is that even if the buffer has collected enough elements to make up a chunk, these won’t always be part of the emitted chunk, because of the above logic. I suspect this is what’s happening in the "accumulation and splitting" test, which is more flaky than it should probably be (some level of flakiness is probably acceptable since it’s fundamentally a time based test, but with the current logic it’s very easy to get a 50%, or even higher, failure rate).
When running the test below (a slightly modified version of the original test that includes an additional
sleep
) the situation is even worse: it’s nearly impossible to get the test passing)This PR introduces a new mechanism to improve the accuracy of the timeout logic. Instead of calculating the number of elements to be flushed, we flush them and then lower the supply accordingly. We also use
acquireN
vstryAcquireN
since the number of elements flushed is known.I'm also using a
SignallingRef
for state management and to provide an upper bound to the fiber that waits for the first chunk. I need this to comply to one to the test cases.The result is a more accurate behaviour (see below)
I've run both "accumulation and splitting" tests a number of times and had zero failures so far
Performance
benchmarks figures are better compared to the existing implementation: consistently faster, over 100% faster (
ops/sec
), especially on large streams, and with bettergc
stats (see screenshots below)Example
stress test (short execution): all elements are processed
(with range parameter set to10 mill
)suggested implementation
existing implementation
Simplified logic
This implementation is hopefully simpler to follow.
concurrently
)Semaphore
to what's needed, (demand
is gone in favour of a bounded queue)Option[Either[T, A]]
state flags to a singleboolean
Vector
Tests
This PR include new tests
accumulation and splitting 2
(I'm happy to remove this, or include thelongSleep
in the original test, it was just for illustration purposes)stress test (short execution): all elements are processed
. (copy of the benchmark tests with an integrity check at the end) I found this useful as it allowed me to verify the behaviour over a long stream of elements, uncovering a couple of bugs in the permits management logic. As mentioned elsewhere it is possible to write an implementation that passes all other tests consistently, but fails this onestress test (long execution): all elements are processed
: basically the same as the one above, but it pauses after each elements. in theory this allows to test the timeout logic more frequently than the previous test. Marked asignored
since it takes ~15 mins to complete with TestControlif the buffer fills up at the same time when the timeout expires there won't be a deadlock
(thanks @armanbilge for explaining the problem and for suggesting a nice way to reproduce the bug) this test allowed me to confirm the race condition bug and to make sure the fix was working as expectedNotes
EDIT: I've made a small change and I've seen the first failure (on CI) after several successful runs, so it is still a flaky test. The failure rate however has decreased significantly, I'm still seeing several successful test runs locally as shown in the screenshot above)
chunkSize == 0
(I believe this is a bug and I assume the behaviour of this implementation is the intended behaviour. I should be able to change it easily if that's not the case. Either way I'm happy to add a test to document the behaviour))Thank you