diff --git a/src/ThroughStream.php b/src/ThroughStream.php index 6f73fb8..6e7438c 100644 --- a/src/ThroughStream.php +++ b/src/ThroughStream.php @@ -93,16 +93,19 @@ public function __construct($callback = null) public function pause() { - $this->paused = true; + // only allow pause if still readable, false otherwise + $this->paused = $this->readable; } public function resume() { + $this->paused = false; + + // emit drain even if previous write was paused (throttled) if ($this->drain) { $this->drain = false; $this->emit('drain'); } - $this->paused = false; } public function pipe(WritableStreamInterface $dest, array $options = array()) @@ -139,12 +142,13 @@ public function write($data) $this->emit('data', array($data)); + // emit drain event on next resume if currently paused (throttled) if ($this->paused) { $this->drain = true; - return false; } - return true; + // continue writing if still writable and not paused (throttled), false otherwise + return $this->writable && !$this->paused; } public function end($data = null) @@ -164,7 +168,7 @@ public function end($data = null) $this->readable = false; $this->writable = false; - $this->paused = true; + $this->paused = false; $this->drain = false; $this->emit('end'); @@ -179,9 +183,10 @@ public function close() $this->readable = false; $this->writable = false; - $this->closed = true; - $this->paused = true; + $this->paused = false; $this->drain = false; + + $this->closed = true; $this->callback = null; $this->emit('close'); diff --git a/tests/ThroughStreamTest.php b/tests/ThroughStreamTest.php index 444f3b1..42c251a 100644 --- a/tests/ThroughStreamTest.php +++ b/tests/ThroughStreamTest.php @@ -95,6 +95,30 @@ public function itShouldReturnFalseForAnyDataWrittenToItWhenPaused() $this->assertFalse($ret); } + /** @test */ + public function itShouldReturnFalseForAnyDataWrittenToItWhenDataEventEndsStream() + { + $through = new ThroughStream(); + $through->on('data', function () use ($through) { + $through->end(); + }); + $ret = $through->write('foo'); + + $this->assertFalse($ret); + } + + /** @test */ + public function itShouldReturnFalseForAnyDataWrittenToItWhenDataEventClosesStream() + { + $through = new ThroughStream(); + $through->on('data', function () use ($through) { + $through->close(); + }); + $ret = $through->write('foo'); + + $this->assertFalse($ret); + } + /** @test */ public function itShouldEmitDrainOnResumeAfterReturnFalseForAnyDataWrittenToItWhenPaused() { @@ -106,6 +130,40 @@ public function itShouldEmitDrainOnResumeAfterReturnFalseForAnyDataWrittenToItWh $through->resume(); } + /** @test */ + public function itShouldNotEmitDrainOnResumeAfterClose() + { + $through = new ThroughStream(); + $through->close(); + + $through->on('drain', $this->expectCallableNever()); + $through->resume(); + } + + /** @test */ + public function itShouldNotEmitDrainOnResumeAfterReturnFalseForAnyDataWrittenThatCausesStreamToClose() + { + $through = new ThroughStream(); + $through->on('data', function () use ($through) { $through->close(); }); + $through->write('foo'); + + $through->on('drain', $this->expectCallableNever()); + $through->resume(); + } + + /** @test */ + public function itShouldReturnFalseForAnyDataWrittenToItAfterPausingFromDrainEvent() + { + $through = new ThroughStream(); + $through->pause(); + $through->write('foo'); + + $through->on('drain', function () use ($through) { $through->pause(); }); + $through->resume(); + + $this->assertFalse($through->write('bar')); + } + /** @test */ public function itShouldReturnTrueForAnyDataWrittenToItWhenResumedAfterPause() {