You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Using such AtomicReference and mapMaterializedValue is rather painful. I think it's very common to use a RestartSource with the Alpakka Kafka consumers. Could we come up with something that's more convenient? Could we include it automatically if enabled in ConsumerSettings?
We discussed making a new experimental API for RestartSource to propagate materialized values better. I think that's worth experimenting within Alpakka Kafka.
I don't think just wrapping the consumer sources in RestartSource (when a feature flag is enabled) will work because part of the controlled shutdown process is to wait for any outstanding commits to complete, which is returned as a materialized value by a committer stage, or the combined producer committer sink (I think) later in the stream. The draining control makes use of the future emitted by the sink to determine that draining is complete.
Short description
Combining
RestartSource
with controlled shutdown a laDrainingControl
should be illustrated with an example.Details
The example in https://doc.akka.io/docs/alpakka-kafka/current/errorhandling.html#restarting-the-stream-with-a-backoff-stage should show controlled shutdown:
control.get().drainAndShutdown(result)
The text was updated successfully, but these errors were encountered: