Skip to content
This repository has been archived by the owner on Feb 16, 2024. It is now read-only.

Commit

Permalink
reusing same array instead of creating new one
Browse files Browse the repository at this point in the history
  • Loading branch information
iammehrabalam committed Dec 26, 2021
1 parent 06dc83c commit 068233b
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ class PubsubReceiver(

var buffer: ArrayBuffer[ReceivedMessage] = createBufferArray()

var latestStorePushTime: Long = -1
var latestAttemptToPushInStoreTime: Long = -1

lazy val rateLimiter: RateLimiter = RateLimiter.create(getInitialRateLimit.toDouble)

Expand Down Expand Up @@ -325,7 +325,7 @@ class PubsubReceiver(
var backoff = INIT_BACKOFF

// To avoid the edge case when buffer is not full and no message pushed to store
latestStorePushTime = System.currentTimeMillis()
latestAttemptToPushInStoreTime = System.currentTimeMillis()

while (!isStopped()) {
try {
Expand Down Expand Up @@ -393,7 +393,7 @@ class PubsubReceiver(
*/
def push(): Unit = {

val diff = System.currentTimeMillis() - latestStorePushTime
val diff = System.currentTimeMillis() - latestAttemptToPushInStoreTime
if (buffer.length >= blockSize || (buffer.length < blockSize && diff >= blockIntervalMs)) {

// grouping messages into complete and partial blocks (if any)
Expand All @@ -404,18 +404,29 @@ class PubsubReceiver(
// messages in buffer is less than blockSize. So will push partial block
val iterator = if (completeBlocks.nonEmpty) completeBlocks else partialBlock

// Creating new buffer
buffer = createBufferArray()

// Pushing partial block messages back to buffer if complete blocks formed
if (completeBlocks.nonEmpty && partialBlock.hasNext) {
buffer.appendAll(partialBlock.next())
}
// Will push partial block messages back to buffer if complete blocks formed
val partial = if (completeBlocks.nonEmpty && partialBlock.nonEmpty) {
partialBlock.next()
} else null

while (iterator.hasNext) {
pushToStoreAndAck(iterator.next().toList)
latestStorePushTime = System.currentTimeMillis()
try {
pushToStoreAndAck(iterator.next().toList)
} catch {
case e: SparkException => reportError(
"Failed to write messages into reliable store", e)
case NonFatal(e) => reportError(
"Failed to write messages in reliable store", e)
} finally {
latestAttemptToPushInStoreTime = System.currentTimeMillis()
}
}

// clear existing buffer messages
buffer.clear()

// Pushing partial block messages back to buffer if complete blocks formed
if (partial != null) buffer.appendAll(partial)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class PubsubStreamSuite extends ConditionalSparkFunSuite with Eventually with Be

val batchDuration = Seconds(1)

val blockSize = 10
val blockSize = 15

private val master: String = "local[2]"

Expand Down Expand Up @@ -79,6 +79,7 @@ class PubsubStreamSuite extends ConditionalSparkFunSuite with Eventually with Be
conf.set("spark.streaming.receiver.maxRate", "100")
conf.set("spark.streaming.backpressure.pid.minRate", "10")
conf.set("spark.streaming.blockQueueSize", blockSize.toString)
conf.set("spark.streaming.blockInterval", "1000ms")
}


Expand Down

0 comments on commit 068233b

Please sign in to comment.