Skip to content
This repository has been archived by the owner on Oct 25, 2024. It is now read-only.

Implement peek(int nth) #174

Open
aacic opened this issue Oct 17, 2017 · 9 comments
Open

Implement peek(int nth) #174

aacic opened this issue Oct 17, 2017 · 9 comments

Comments

@aacic
Copy link

aacic commented Oct 17, 2017

The method should read the nth element from the queue without changing the queue.

A potential use case of this method could be to read a portion from the queue in a loop and process it in a streaming API manner. If and only if all of the portion elements are processed successfully then the whole portion is removed from the queue.

@JakeWharton
Copy link
Collaborator

JakeWharton commented Oct 17, 2017 via email

@NightlyNexus
Copy link
Contributor

The hypothetical use case stated sounds like the existing APIs:

List<Data> section = queue.peek(num);
if (processSuccessfully(section)) {
  queue.pop(num);
}

Is this instead a request for Data nthElement = queue.peekBackInQueue(n)? I'm not sure what the use case would be, but could you do it in your application code by peeking n elements in the list, and getting the last element?

@f2prateek
Copy link
Collaborator

If the request is for getting the first n elements (and not specifically the nth element only); then the iterators can be used pretty efficiently as they're lazy.

    int i = 0;
    for (byte[] bytes : queueFile) {
      if (i++ > n) {
        break;
      }
      process(bytes);
    }

@jhass
Copy link

jhass commented Oct 18, 2017

peek(n) doesn't allow for minimal memory usage and the iterator doesn't allow for concurrent modification.

@aacic
Copy link
Author

aacic commented Oct 18, 2017

A potential use case would look like this:

int n = 10 // number of elements for procesing

for (int i = 0; i < n; i++) {
Element element = queue.peek(i); // reads the ith element
StreamingAPIProcessor.process(element);
}

if (StreamingAPIProcessor.isSuccessful()) {
queue.remove(n);
}

Given that there is a queue of 10 000 elements. And we want to process 1000 element as a portion (e.g. upload it to the server.). If we process 1000 element in the streaming manner we'll consume less memory. We can't use the iterator because it doesn't allow concurrent modifications.

@f2prateek
Copy link
Collaborator

f2prateek commented Oct 18, 2017

I don't understand the example you shared fully (StreamingAPIProcessor processes one element at a time but reports status about n elements?). But anyway, that example could be written simply as:

    int n = 10;
    int i = 0;
    for (byte[] bytes : queueFile) {
      if (i++ > n) {
        break;
      }
       StreamingAPIProcessor.process(bytes);
    }

   if (StreamingAPIProcessor.isSuccessful()) {
     queue.remove(n);
   }

I'm not sure where the question of concurrent modification comes in here - your example is synchronous, you're processing n elements one at a time, and removing n elements when all are done.

@aacic
Copy link
Author

aacic commented Oct 18, 2017

StreamingAPIProcessor is e.g. uploading n elements to the server using e.g. JsonGenerator (Jackson Streaming API). So, if the network request is successful we remove n elements. The idea is not to load n elements into the memory but to read only one element at a time.

In general, the system generates new elements and puts them into the queue (on some other threads). So that's why we can't use the iterator.

@jhass
Copy link

jhass commented Oct 18, 2017

So one has to provide some pseudo code with a concurrent access adding stuff to the queue just so you accept it as a valid usecase? Oh well

queue = new QueueFile()
new Thread() {
  run() {
    synchronized(queue) { queue.add(rand()) }
    sleep(rand())
  }
}

while (!queue.isEmpty()) {
  batchSize = min(queue.size(), MAX_BATCH_SIZE)
  for (int i = 0; i < batchSize; i++) {
    synchronized(queue) { item = queue.get(i) }
    stream(item)
  }
  if (isBatchSuccessfull()) {
    synchronized(queue) { queue.remove(batchSize) }
  }
}

@hadrienk
Copy link

Took me a while to realize that the iterator() is not available in the version of the documentation.

<dependency>
    <groupId>com.squareup</groupId>
    <artifactId>tape</artifactId>
    <version>1.2.3</version>
</dependency>

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants