Skip to content

Commit

Permalink
Merge pull request #344 from thekid/feature/streamtransfer-yield
Browse files Browse the repository at this point in the history
Add io.streams.StreamTransfer::transmit() which yields control after each chunk
  • Loading branch information
thekid authored Jun 23, 2024
2 parents 25a4366 + a42c261 commit 2e1f262
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 74 deletions.
29 changes: 21 additions & 8 deletions src/main/php/io/streams/StreamTransfer.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@
* @test net.xp_framework.unittest.io.streams.StreamTransferTest
*/
class StreamTransfer implements Closeable {
protected $in= null;
protected $out= null;
protected $in, $out;

/**
* Creates a new stream transfer
*
* @param io.streams.InputStream in
* @param io.streams.OutputStream out
* @param io.streams.InputStream $in
* @param io.streams.OutputStream $out
*/
public function __construct(InputStream $in, OutputStream $out) {
$this->in= $in;
Expand All @@ -36,22 +35,36 @@ public function __construct(InputStream $in, OutputStream $out) {
/**
* Copy all available input from in
*
* @return int number of bytes copied
* @throws io.IOException
* @return int number of bytes copied
* @throws io.IOException
*/
public function transferAll() {
$r= 0;
while ($this->in->available() > 0) {
while ($this->in->available()) {
$r+= $this->out->write($this->in->read());
}
return $r;
}

/**
* Transmit all available input from in, yielding control after each chunk.
*
* @return iterable
* @throws io.IOException
*/
public function transmit() {
while ($this->in->available()) {
$this->out->write($this->in->read());
yield;
}
}

/**
* Close input and output streams. Guarantees to try to close both
* streams even if one of the close() calls yields an exception.
*
* @throws io.IOException
* @return void
* @throws io.IOException
*/
public function close() {
$errors= '';
Expand Down
100 changes: 34 additions & 66 deletions src/test/php/io/unittest/StreamTransferTest.class.php
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
<?php namespace io\unittest;

use io\IOException;
use io\streams\{InputStream, MemoryInputStream, MemoryOutputStream, OutputStream, StreamTransfer};
use test\{Assert, Test};

class StreamTransferTest {

/**
* Returns an uncloseable input stream
*
* @return io.streams.InputStream
*/
/** Returns an uncloseable input stream */
protected function uncloseableInputStream() {
return new class() implements InputStream {
public function read($length= 8192) { }
Expand All @@ -18,53 +15,37 @@ public function close() { throw new \io\IOException("Close error"); }
};
}

/**
* Returns a closeable input stream
*
* @return io.streams.InputStream
*/
/** Returns a closeable input stream */
protected function closeableInputStream() {
return new class() implements InputStream {
public $closed= FALSE;
public $closed= false;
public function read($length= 8192) { }
public function available() { }
public function close() { $this->closed= TRUE; }
public function close() { $this->closed= true; }
};
}

/**
* Returns an uncloseable output stream
*
* @return io.streams.OutputStream
*/
/** Returns an uncloseable output stream */
protected function uncloseableOutputStream() {
return new class() implements OutputStream {
public function write($data) { }
public function flush() { }
public function close() { throw new \io\IOException("Close error"); }
public function close() { throw new IOException('Close error'); }
};
}

/**
* Returns a closeable output stream
*
* @return io.streams.OutputStream
*/
/** Returns a closeable output stream */
protected function closeableOutputStream() {
return new class() implements OutputStream {
public $closed= FALSE;
public $closed= false;
public function write($data) { }
public function flush() { }
public function close() { $this->closed= TRUE; }
public function close() { $this->closed= true; }
};
}

/**
* Test
*
*/
#[Test]
public function dataTransferred() {
public function transfer_all() {
$out= new MemoryOutputStream();

$s= new StreamTransfer(new MemoryInputStream('Hello'), $out);
Expand All @@ -73,12 +54,18 @@ public function dataTransferred() {
Assert::equals('Hello', $out->bytes());
}

/**
* Test
*
*/
#[Test]
public function nothingAvailableAfterTransfer() {
public function transmit() {
$out= new MemoryOutputStream();

$s= new StreamTransfer(new MemoryInputStream('Hello'), $out);
foreach ($s->transmit() as $yield) { }

Assert::equals('Hello', $out->bytes());
}

#[Test]
public function nothing_available_after_transfer() {
$in= new MemoryInputStream('Hello');

$s= new StreamTransfer($in, new MemoryOutputStream());
Expand All @@ -87,82 +74,63 @@ public function nothingAvailableAfterTransfer() {
Assert::equals(0, $in->available());
}

/**
* Test closing a stream twice has no effect.
*
* @see xp://lang.Closeable#close
*/
#[Test]
public function closingTwice() {
public function closing_twice() {
$s= new StreamTransfer(new MemoryInputStream('Hello'), new MemoryOutputStream());
$s->close();
$s->close();
}

/**
* Test close() method
*
*/
#[Test]
public function close() {
$in= $this->closeableInputStream();
$out= $this->closeableOutputStream();

(new StreamTransfer($in, $out))->close();

Assert::true($in->closed, 'input closed');
Assert::true($out->closed, 'output closed');
}

/**
* Test close() and exceptions
*
*/
#[Test]
public function closingOutputFails() {
public function closing_output_fails() {
$in= $this->closeableInputStream();
$out= $this->uncloseableOutputStream();

try {
(new StreamTransfer($in, $out))->close();
$this->fail('Expected exception not caught', null, 'io.IOException');
} catch (\io\IOException $expected) {
} catch (IOException $expected) {
Assert::equals('Could not close output stream: Close error', $expected->getMessage());
}

Assert::true($in->closed, 'input closed');
}

/**
* Test close() and exceptions
*
*/
#[Test]
public function closingInputFails() {
public function closing_input_fails() {
$in= $this->uncloseableInputStream();
$out= $this->closeableOutputStream();

try {
(new StreamTransfer($in, $out))->close();
$this->fail('Expected exception not caught', null, 'io.IOException');
} catch (\io\IOException $expected) {
} catch (IOException $expected) {
Assert::equals('Could not close input stream: Close error', $expected->getMessage());
}

Assert::true($out->closed, 'output closed');
}

/**
* Test close() and exceptions
*
*/
#[Test]
public function closingInputAndOutputFails() {
public function closing_input_and_output_fails() {
$in= $this->uncloseableInputStream();
$out= $this->uncloseableOutputStream();

try {
(new StreamTransfer($in, $out))->close();
$this->fail('Expected exception not caught', null, 'io.IOException');
} catch (\io\IOException $expected) {
} catch (IOException $expected) {
Assert::equals('Could not close input stream: Close error, Could not close output stream: Close error', $expected->getMessage());
}
}
Expand Down

0 comments on commit 2e1f262

Please sign in to comment.