diff --git a/src/java.base/share/classes/java/nio/channels/Pipe.java b/src/java.base/share/classes/java/nio/channels/Pipe.java index b9a9b2512ae30..7aadbbf28d33b 100644 --- a/src/java.base/share/classes/java/nio/channels/Pipe.java +++ b/src/java.base/share/classes/java/nio/channels/Pipe.java @@ -28,6 +28,8 @@ import java.io.IOException; import java.nio.channels.spi.*; +import sun.nio.ch.Util; + /** * A pair of channels that implements a unidirectional pipe. @@ -60,6 +62,10 @@ public abstract static class SourceChannel extends AbstractSelectableChannel implements ReadableByteChannel, ScatteringByteChannel { + // MAX_SKIP_BUFFER_SIZE is used to determine the maximum buffer size to + // use when skipping. + private static final int MAX_SKIP_BUFFER_SIZE = 8192; + /** * Constructs a new instance of this class. * @@ -83,6 +89,42 @@ public final int validOps() { return SelectionKey.OP_READ; } + /** + * Skips up to n bytes. + * + * @param n The number of bytes to skip. + * + * @return The number of skipped bytes. + * + * @throws IOException + * If an I/O error occurs + */ + public long skip(final long n) throws IOException { + if (n == 0) + return 0; + + final int bs = Math.toIntExact(Math.min(n, MAX_SKIP_BUFFER_SIZE)); + long tn = 0; + final var bb = Util.getTemporaryDirectBuffer(bs); + try { + for (;;) { + final long remaining = n - tn; + final int count = Math.toIntExact(Math.min(remaining, bs)); + bb.limit(count); + final int nr = this.read(bb); + tn += nr; + if (nr < 0) + return tn; + if (nr == bs) { + bb.rewind(); + continue; + } + return tn; + } + } finally { + Util.releaseTemporaryDirectBuffer(bb); + } + } } /** diff --git a/src/java.base/share/classes/sun/nio/ch/ChannelInputStream.java b/src/java.base/share/classes/sun/nio/ch/ChannelInputStream.java index 8cf5c720b8d6b..6fd8a38e48608 100644 --- a/src/java.base/share/classes/sun/nio/ch/ChannelInputStream.java +++ b/src/java.base/share/classes/sun/nio/ch/ChannelInputStream.java @@ -31,6 +31,7 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.IllegalBlockingModeException; +import java.nio.channels.Pipe; import java.nio.channels.ReadableByteChannel; import java.nio.channels.SeekableByteChannel; import java.nio.channels.SelectableChannel; @@ -217,6 +218,12 @@ public synchronized long skip(long n) throws IOException { sbc.position(newPos); return newPos - pos; } + + // special case where the channel is to a pipe + if (ch instanceof Pipe.SourceChannel psc) { + return psc.skip(n); + } + return super.skip(n); } diff --git a/src/java.base/share/classes/sun/nio/ch/IOUtil.java b/src/java.base/share/classes/sun/nio/ch/IOUtil.java index d86d6decb14e7..a662dd686939a 100644 --- a/src/java.base/share/classes/sun/nio/ch/IOUtil.java +++ b/src/java.base/share/classes/sun/nio/ch/IOUtil.java @@ -587,6 +587,12 @@ public static FileDescriptor newFD(int i) { */ static native int drain1(int fd) throws IOException; + /** + * Read and discard at most n bytes + * @return the number of bytes read or IOS_INTERRUPTED + */ + static native long drainN(int fd, long n) throws IOException; + public static native void configureBlocking(FileDescriptor fd, boolean blocking) throws IOException; diff --git a/src/java.base/unix/classes/sun/nio/ch/SourceChannelImpl.java b/src/java.base/unix/classes/sun/nio/ch/SourceChannelImpl.java index 7be17040b4c91..d6515000425a7 100644 --- a/src/java.base/unix/classes/sun/nio/ch/SourceChannelImpl.java +++ b/src/java.base/unix/classes/sun/nio/ch/SourceChannelImpl.java @@ -360,4 +360,33 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { public long read(ByteBuffer[] dsts) throws IOException { return read(dsts, 0, dsts.length); } + + @Override + public long skip(long n) throws IOException { + if (n == 0) + return 0; + + readLock.lock(); + try { + boolean blocking = isBlocking(); + long ns = 0; + try { + beginRead(blocking); + configureSocketNonBlockingIfVirtualThread(); + ns = IOUtil.drainN(fdVal, n); + if (blocking) { + while (IOStatus.okayToRetry(ns) && isOpen()) { + park(Net.POLLIN); + ns = IOUtil.drainN(fdVal, n); + } + } + } finally { + endRead(blocking, ns > 0); + assert IOStatus.check(n); + } + return IOStatus.normalize(n); + } finally { + readLock.unlock(); + } + } } diff --git a/src/java.base/unix/native/libnio/ch/IOUtil.c b/src/java.base/unix/native/libnio/ch/IOUtil.c index dfa99658fa667..26baa6fcb1aa4 100644 --- a/src/java.base/unix/native/libnio/ch/IOUtil.c +++ b/src/java.base/unix/native/libnio/ch/IOUtil.c @@ -39,6 +39,9 @@ static jfieldID fd_fdID; /* for jint 'fd' in java.io.FileDescriptor */ +// MAX_SKIP_BUFFER_SIZE is used to determine the maximum buffer size to +// use when skipping. +static const ssize_t MAX_SKIP_BUFFER_SIZE = 4096; JNIEXPORT void JNICALL Java_sun_nio_ch_IOUtil_initIDs(JNIEnv *env, jclass clazz) @@ -149,6 +152,29 @@ Java_sun_nio_ch_IOUtil_drain1(JNIEnv *env, jclass cl, jint fd) return res; } +JNIEXPORT jlong JNICALL +Java_sun_nio_ch_IOUtil_drainN(JNIEnv *env, jclass cl, jint fd, jlong n) +{ + if (n == 0) + return 0; + + const long bs = n < MAX_SKIP_BUFFER_SIZE ? n : MAX_SKIP_BUFFER_SIZE; + char buf[bs]; + jlong tn = 0; + + for (;;) { + const jlong remaining = n - tn; + const ssize_t count = remaining < bs ? remaining : bs; + const ssize_t nr = read(fd, buf, count); + tn += nr; + if ((nr < 0) && (errno != EAGAIN && errno != EWOULDBLOCK)) + JNU_ThrowIOExceptionWithLastError(env, "DrainN"); + if (nr == bs) + continue; + return tn; + } +} + JNIEXPORT jint JNICALL Java_sun_nio_ch_IOUtil_fdLimit(JNIEnv *env, jclass this) {