Skip to content

Commit

Permalink
Merge branch 'release/1.3.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
guss77 committed Jan 29, 2024
2 parents b7e81f7 + 5e3e636 commit 4df6ce1
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>io.cloudonix</groupId>
<artifactId>vertx-java.io</artifactId>
<version>1.2.1</version>
<version>1.3.0</version>
<name>Vert.x Java IO</name>
<description>Utilities for integrating Vert.x IO with Java IO</description>

Expand Down
15 changes: 15 additions & 0 deletions src/main/java/io/cloudonix/vertx/javaio/WriteToInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public int read(byte[] b, int off, int len) {
private volatile int maxBufferSize = Integer.MAX_VALUE;
private ConcurrentLinkedQueue<PendingWrite> buffer = new ConcurrentLinkedQueue<>();
private AtomicBoolean everFull = new AtomicBoolean();
private volatile boolean closed = false;
private ConcurrentLinkedQueue<CountDownLatch> readsWaiting = new ConcurrentLinkedQueue<>();
private Context context;

Expand Down Expand Up @@ -127,6 +128,9 @@ public WriteToInputStream exceptionHandler(Handler<Throwable> handler) {

@Override
public Future<Void> write(Buffer data) {
if (closed)
// accept all data and discard it, as unlike JDK9 Flow, we have no way to tell upstream to stop sending data
return Future.succeededFuture();
var promise = Promise.<Void>promise();
if (data == null) // end of stream
buffer.add(new PendingWrite(null, promise));
Expand Down Expand Up @@ -207,4 +211,15 @@ public int available() throws IOException {
return buffer.stream().map(PendingWrite::available).reduce(0, (i,a) -> i += a);
}

@Override
public void close() throws IOException {
super.close();
closed = true; // mark us closed, so that additional writes are NOPed
// if we have any buffered data, flush it and trigger the write completions
while (!buffer.isEmpty())
buffer.poll().completion.tryComplete();
// see if we need to call the drain handler to drain upstream
if (everFull.compareAndSet(true, false))
context.runOnContext(drainHandler::handle);
}
}

0 comments on commit 4df6ce1

Please sign in to comment.