Skip to content

Commit

Permalink
Fix drain event of ThroughStream to handle potential race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
clue committed Sep 24, 2023
1 parent 6fbc967 commit d6accb8
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 7 deletions.
19 changes: 12 additions & 7 deletions src/ThroughStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,19 @@ public function __construct($callback = null)

public function pause()
{
$this->paused = true;
// only allow pause if still readable, false otherwise
$this->paused = $this->readable;
}

public function resume()
{
$this->paused = false;

// emit drain even if previous write was paused (throttled)
if ($this->drain) {
$this->drain = false;
$this->emit('drain');
}
$this->paused = false;
}

public function pipe(WritableStreamInterface $dest, array $options = array())
Expand Down Expand Up @@ -139,12 +142,13 @@ public function write($data)

$this->emit('data', array($data));

// emit drain event on next resume if currently paused (throttled)
if ($this->paused) {
$this->drain = true;
return false;
}

return true;
// continue writing if still writable and not paused (throttled), false otherwise
return $this->writable && !$this->paused;
}

public function end($data = null)
Expand All @@ -164,7 +168,7 @@ public function end($data = null)

$this->readable = false;
$this->writable = false;
$this->paused = true;
$this->paused = false;
$this->drain = false;

$this->emit('end');
Expand All @@ -179,9 +183,10 @@ public function close()

$this->readable = false;
$this->writable = false;
$this->closed = true;
$this->paused = true;
$this->paused = false;
$this->drain = false;

$this->closed = true;
$this->callback = null;

$this->emit('close');
Expand Down
58 changes: 58 additions & 0 deletions tests/ThroughStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,30 @@ public function itShouldReturnFalseForAnyDataWrittenToItWhenPaused()
$this->assertFalse($ret);
}

/** @test */
public function itShouldReturnFalseForAnyDataWrittenToItWhenDataEventEndsStream()
{
$through = new ThroughStream();
$through->on('data', function () use ($through) {
$through->end();
});
$ret = $through->write('foo');

$this->assertFalse($ret);
}

/** @test */
public function itShouldReturnFalseForAnyDataWrittenToItWhenDataEventClosesStream()
{
$through = new ThroughStream();
$through->on('data', function () use ($through) {
$through->close();
});
$ret = $through->write('foo');

$this->assertFalse($ret);
}

/** @test */
public function itShouldEmitDrainOnResumeAfterReturnFalseForAnyDataWrittenToItWhenPaused()
{
Expand All @@ -106,6 +130,40 @@ public function itShouldEmitDrainOnResumeAfterReturnFalseForAnyDataWrittenToItWh
$through->resume();
}

/** @test */
public function itShouldNotEmitDrainOnResumeAfterClose()
{
$through = new ThroughStream();
$through->close();

$through->on('drain', $this->expectCallableNever());
$through->resume();
}

/** @test */
public function itShouldNotEmitDrainOnResumeAfterReturnFalseForAnyDataWrittenThatCausesStreamToClose()
{
$through = new ThroughStream();
$through->on('data', function () use ($through) { $through->close(); });
$through->write('foo');

$through->on('drain', $this->expectCallableNever());
$through->resume();
}

/** @test */
public function itShouldReturnFalseForAnyDataWrittenToItAfterPausingFromDrainEvent()
{
$through = new ThroughStream();
$through->pause();
$through->write('foo');

$through->on('drain', function () use ($through) { $through->pause(); });
$through->resume();

$this->assertFalse($through->write('bar'));
}

/** @test */
public function itShouldReturnTrueForAnyDataWrittenToItWhenResumedAfterPause()
{
Expand Down

0 comments on commit d6accb8

Please sign in to comment.