Skip to content
This repository has been archived by the owner on Feb 16, 2024. It is now read-only.

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
iammehrabalam committed Dec 25, 2021
1 parent 615eec4 commit 06dc83c
Showing 1 changed file with 5 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,9 @@ class PubsubReceiver(
.setMaxMessages(maxNoOfMessageInRequest).setReturnImmediately(false)
var backoff = INIT_BACKOFF

// To avoid the edge case when buffer is not full and no message pushed to store
latestStorePushTime = System.currentTimeMillis()

while (!isStopped()) {
try {

Expand Down Expand Up @@ -391,8 +394,7 @@ class PubsubReceiver(
def push(): Unit = {

val diff = System.currentTimeMillis() - latestStorePushTime
if (buffer.length >= blockSize ||
(latestStorePushTime != -1 && buffer.length < blockSize && diff >= blockIntervalMs)) {
if (buffer.length >= blockSize || (buffer.length < blockSize && diff >= blockIntervalMs)) {

// grouping messages into complete and partial blocks (if any)
val (completeBlocks, partialBlock) = buffer.grouped(blockSize)
Expand All @@ -406,7 +408,7 @@ class PubsubReceiver(
buffer = createBufferArray()

// Pushing partial block messages back to buffer if complete blocks formed
if (completeBlocks.nonEmpty) {
if (completeBlocks.nonEmpty && partialBlock.hasNext) {
buffer.appendAll(partialBlock.next())
}

Expand Down

0 comments on commit 06dc83c

Please sign in to comment.