Skip to content

Commit

Permalink
Pipe.SourceChannel::skip(n)
Browse files Browse the repository at this point in the history
  • Loading branch information
mkarg committed Mar 3, 2024
1 parent 37e01ef commit a1c52ca
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 0 deletions.
42 changes: 42 additions & 0 deletions src/java.base/share/classes/java/nio/channels/Pipe.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.io.IOException;
import java.nio.channels.spi.*;

import sun.nio.ch.Util;


/**
* A pair of channels that implements a unidirectional pipe.
Expand Down Expand Up @@ -60,6 +62,10 @@ public abstract static class SourceChannel
extends AbstractSelectableChannel
implements ReadableByteChannel, ScatteringByteChannel
{
// MAX_SKIP_BUFFER_SIZE is used to determine the maximum buffer size to
// use when skipping.
private static final int MAX_SKIP_BUFFER_SIZE = 8192;

/**
* Constructs a new instance of this class.
*
Expand All @@ -83,6 +89,42 @@ public final int validOps() {
return SelectionKey.OP_READ;
}

/**
* Skips up to n bytes.
*
* @param n The number of bytes to skip.
*
* @return The number of skipped bytes.
*
* @throws IOException
* If an I/O error occurs
*/
public long skip(final long n) throws IOException {
if (n == 0)
return 0;

final int bs = Math.toIntExact(Math.min(n, MAX_SKIP_BUFFER_SIZE));
long tn = 0;
final var bb = Util.getTemporaryDirectBuffer(bs);
try {
for (;;) {
final long remaining = n - tn;
final int count = Math.toIntExact(Math.min(remaining, bs));
bb.limit(count);
final int nr = this.read(bb);
tn += nr;
if (nr < 0)
return tn;
if (nr == bs) {
bb.rewind();
continue;
}
return tn;
}
} finally {
Util.releaseTemporaryDirectBuffer(bb);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.IllegalBlockingModeException;
import java.nio.channels.Pipe;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.channels.SelectableChannel;
Expand Down Expand Up @@ -217,6 +218,12 @@ public synchronized long skip(long n) throws IOException {
sbc.position(newPos);
return newPos - pos;
}

// special case where the channel is to a pipe
if (ch instanceof Pipe.SourceChannel psc) {
return psc.skip(n);
}

return super.skip(n);
}

Expand Down
6 changes: 6 additions & 0 deletions src/java.base/share/classes/sun/nio/ch/IOUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,12 @@ public static FileDescriptor newFD(int i) {
*/
static native int drain1(int fd) throws IOException;

/**
* Read and discard at most n bytes
* @return the number of bytes read or IOS_INTERRUPTED
*/
static native long drainN(int fd, long n) throws IOException;

public static native void configureBlocking(FileDescriptor fd,
boolean blocking)
throws IOException;
Expand Down
29 changes: 29 additions & 0 deletions src/java.base/unix/classes/sun/nio/ch/SourceChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -360,4 +360,33 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
public long read(ByteBuffer[] dsts) throws IOException {
return read(dsts, 0, dsts.length);
}

@Override
public long skip(long n) throws IOException {
if (n == 0)
return 0;

readLock.lock();
try {
boolean blocking = isBlocking();
long ns = 0;
try {
beginRead(blocking);
configureSocketNonBlockingIfVirtualThread();
ns = IOUtil.drainN(fdVal, n);
if (blocking) {
while (IOStatus.okayToRetry(ns) && isOpen()) {
park(Net.POLLIN);
ns = IOUtil.drainN(fdVal, n);
}
}
} finally {
endRead(blocking, ns > 0);
assert IOStatus.check(n);
}
return IOStatus.normalize(n);
} finally {
readLock.unlock();
}
}
}
26 changes: 26 additions & 0 deletions src/java.base/unix/native/libnio/ch/IOUtil.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@

static jfieldID fd_fdID; /* for jint 'fd' in java.io.FileDescriptor */

// MAX_SKIP_BUFFER_SIZE is used to determine the maximum buffer size to
// use when skipping.
static const ssize_t MAX_SKIP_BUFFER_SIZE = 4096;

JNIEXPORT void JNICALL
Java_sun_nio_ch_IOUtil_initIDs(JNIEnv *env, jclass clazz)
Expand Down Expand Up @@ -149,6 +152,29 @@ Java_sun_nio_ch_IOUtil_drain1(JNIEnv *env, jclass cl, jint fd)
return res;
}

JNIEXPORT jlong JNICALL
Java_sun_nio_ch_IOUtil_drainN(JNIEnv *env, jclass cl, jint fd, jlong n)
{
if (n == 0)
return 0;

const long bs = n < MAX_SKIP_BUFFER_SIZE ? n : MAX_SKIP_BUFFER_SIZE;
char buf[bs];
jlong tn = 0;

for (;;) {
const jlong remaining = n - tn;
const ssize_t count = remaining < bs ? remaining : bs;
const ssize_t nr = read(fd, buf, count);
tn += nr;
if ((nr < 0) && (errno != EAGAIN && errno != EWOULDBLOCK))
JNU_ThrowIOExceptionWithLastError(env, "DrainN");
if (nr == bs)
continue;
return tn;
}
}

JNIEXPORT jint JNICALL
Java_sun_nio_ch_IOUtil_fdLimit(JNIEnv *env, jclass this)
{
Expand Down

0 comments on commit a1c52ca

Please sign in to comment.