Skip to content
This repository has been archived by the owner on Feb 16, 2024. It is now read-only.

[BAHIR-295] Added backpressure & ratelimit support #101

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

iammehrabalam
Copy link

No description provided.

@eskabetxe
Copy link
Member

@iammehrabalam thanks for your contribution
@lresende could you check?

@lresende
Copy link
Member

@iammehrabalam What is the backward compatibility story for this change? Also, should the sample be using the new capabilities to demonstrate the new functionality?

@iammehrabalam
Copy link
Author

The behaviour will be exactly same what was earlier if below spark streaming config is not set.

  • spark.streaming.backpressure.initialRate
  • spark.streaming.receiver.maxRate
  • spark.streaming.backpressure.pid.minRate

So it means backward compatible.

Added a test case which demonstrate rate and batch size.

@lresende

@iammehrabalam
Copy link
Author

@lresende @eskabetxe reminder

@datasherlock
Copy link

datasherlock commented Feb 4, 2023

The backpressure implementation isn't working as expected. My understanding is that the backpressure mechanism will control the input rate but never exceed the spark.streaming.receiver.maxRate. But this doesn't seem to be honoured since we're noticing that the receiver input rate breaches the spark.streaming.receiver.maxRate every now and then and tends to put a lot of pressure on the pipeline.

Context - I created a Spark Scala app with 900 receivers, spark.streaming.receiver.maxRate=1500 and batchInterval=60s. My understanding is that the total number of records per batch should not be greater than 900*1500*60 = 81,000,000 records. But I am noticing that some batches are going as high as 776,732,455 records where the processing time is >>> batchInterval

@datasherlock
Copy link

Based on https://spark.apache.org/docs/latest/streaming-custom-receivers.html#receiver-reliability, the rate control mechanism will have to be implemented by the receiver (if reliable). I do not see any logic that caps the input rates to the maxRate in the code. Could that be the reason why the backpressure limits are not honoured?

@LeonardMeyer
Copy link

LeonardMeyer commented Mar 3, 2023

Just stumbled upon this PR. For anyone interested, my guess is that the correct implementation should use Spark Streaming's BlockGenerator class. It would give the whole process spark.streaming.backpressure.enabled support for free since its RateLimiter implementation can be notified by Spark.

@iammehrabalam
Copy link
Author

iammehrabalam commented Jun 4, 2023

@LeonardMeyer you are right but the rate limit will only be applied when single data is written into the store (https://github.com/apache/spark/blob/595ad30e6259f7e4e4252dfee7704b73fd4760f7/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala#L118). In case of writing iterator (i.e block) directly rate limit will not be applied by default.

In Pubsub Receiver, the iterator store method is called where we added rate limit (i.e the same rate limit is generated based on backpressure )

@iammehrabalam
Copy link
Author

@datasherlock Ideally it should work. If possible share spark configuration so I can help.
For rate limit logic you can check updateRateLimit & pushToStoreAndAck method in this PR.

@irajhedayati
Copy link

This change was suggested two years ago. Is there any plan to push it through?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants