-
Notifications
You must be signed in to change notification settings - Fork 644
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MQTT streaming: Document PubAck handling #1908
base: main
Are you sure you want to change the base?
Conversation
Yes, this helps. That scanning is not what I'd want to have every user to add to their code. That's definitely something I'd like to wrap into the convenience API, I've been experimenting with a while ago: #1565 |
In the case of publishing from a client, I think that we can provide the equivalent to the ask pattern where a Future[Done] is returned. This is in addition to the tell we already have. I do think that specific cases like client publishing can be made more convenient. WDYT? |
Here's what the ask pattern may look like: https://github.com/akka/alpakka/pull/1908/files#diff-57b7cf48120d900f9398000d526e9854R70 Thoughts @ennru ? |
Yes, that would be great and feel consistent with the |
897f522
to
8801394
Compare
@ennru I've just updated the PR with the |
I'm getting some MiMa issues given the new APIs... these changes should be permitted though, right? After all, origin is a 2.0 branch and there won't be any intention of binary compatibility, right? |
This improves the doc around how to back-pressure the publication of Publish commands to avoid buffer overflow in QoS 1+ cases. Implements the ask pattern for this purpose.
8801394
to
a520565
Compare
Would the
The ambition is to stay as source- and binary-compatible as feasible. These changes are definitely OK to allow. |
Yeah, that’s what I was thinking... |
@ennru This bit of code is essentially what we're doing today in production. The code completes the carry (which is a val (commands: SourceQueueWithComplete[Command[Promise[Done]]], events: Future[ControlPacket]) =
Source
.queue(2, OverflowStrategy.fail)
.via(mqttFlow)
.collect {
case Right(Event(p: Publish, _)) =>
p
case Right(Event(pa: PubAck, Some(carry))) =>
carry.success(Done)
pa
} My thinking now is have the docs explain the above requirement. Perhaps we can visit the higher level API PR you started and keep things the way they are here. Thoughts? |
Sorry, I guess this was on me to get back to. |
This attempts to improve the API around how to back-pressure the publication of Publish commands to avoid buffer overflow in QoS 1+ cases.
I'm looking for feedback as to whether this helps explain things, particularly given problems like this: https://discuss.lightbend.com/t/alpakka-mqtt-streaming-performance-problem/4913.
TODO: