diff --git a/src/CallInvokerInterface.php b/src/CallInvokerInterface.php deleted file mode 100644 index 7327ad40..00000000 --- a/src/CallInvokerInterface.php +++ /dev/null @@ -1,32 +0,0 @@ -loop = $adapter->getLoop(); - $this->adapter = $adapter; - } - - /** - * @param string $function - * @param array $args - * @param int $errorResultCode - * @return \React\Promise\ExtendedPromiseInterface - */ - public function invokeCall($function, $args, $errorResultCode = -1) - { - return $this-> - adapter-> - callFilesystem($function, $args, $errorResultCode); - } - - /** - * @return bool - */ - public function isEmpty() - { - return true; - } -} diff --git a/src/PooledInvoker.php b/src/PooledInvoker.php deleted file mode 100644 index 715139ab..00000000 --- a/src/PooledInvoker.php +++ /dev/null @@ -1,119 +0,0 @@ -loop = $adapter->getLoop(); - $this->adapter = $adapter; - $this->callQueue = new \SplQueue(); - $this->maxSimultaneousOperations = $maxSimultaneousOperations; - } - - /** - * @param string $function - * @param array $args - * @param int $errorResultCode - * @return \React\Promise\ExtendedPromiseInterface - */ - public function invokeCall($function, $args, $errorResultCode = -1) - { - $this->callQueueActive = true; - $deferred = new Deferred(); - - $this->callQueue->enqueue(new QueuedCall($deferred, $function, $args, $errorResultCode)); - - if (!$this->callQueue->isEmpty() && $this->runningOperations < $this->maxSimultaneousOperations) { - $this->processQueue(); - } - - return $deferred->promise()->then(function ($data) { - return $this-> - adapter-> - callFilesystem($data['function'], $data['args'], $data['errorResultCode'])-> - then($this->filesystemResultHandler('React\Promise\resolve'), $this->filesystemResultHandler('React\Promise\reject')); - }); - } - - /** - * @return bool - */ - public function isEmpty() - { - return $this->callQueue->isEmpty(); - } - - protected function processQueue() - { - $this->loop->futureTick(function () { - if ($this->callQueue->isEmpty()) { - return; - } - - $this->runningOperations++; - - $message = $this->callQueue->dequeue(); - $data = [ - 'function' => $message->getFunction(), - 'args' => $message->getArgs(), - 'errorResultCode' => $message->getErrorResultCode(), - ]; - - $message->getDeferred()->resolve($data); - }); - } - - protected function filesystemResultHandler($func) - { - return function ($mixed) use ($func) { - if ($this->callQueue->count() == 0) { - $this->callQueueActive = false; - } else { - $this->processQueue(); - } - - $this->runningOperations--; - - return $func($mixed); - }; - } -} diff --git a/src/QueuedInvoker.php b/src/QueuedInvoker.php deleted file mode 100644 index d6b18b43..00000000 --- a/src/QueuedInvoker.php +++ /dev/null @@ -1,105 +0,0 @@ -loop = $adapter->getLoop(); - $this->adapter = $adapter; - $this->callQueue = new \SplQueue(); - } - - /** - * @param string $function - * @param array $args - * @param int $errorResultCode - * @return \React\Promise\ExtendedPromiseInterface - */ - public function invokeCall($function, $args, $errorResultCode = -1) - { - $this->callQueueActive = true; - $deferred = new Deferred(); - - $this->callQueue->enqueue(new QueuedCall($deferred, $function, $args, $errorResultCode)); - - if (!$this->callQueue->isEmpty()) { - $this->processQueue(); - } - - return $deferred->promise()->then(function ($data) { - return $this-> - adapter-> - callFilesystem($data['function'], $data['args'], $data['errorResultCode'])-> - then($this->filesystemResultHandler('React\Promise\resolve'), $this->filesystemResultHandler('React\Promise\reject')); - }); - } - - /** - * @return bool - */ - public function isEmpty() - { - return $this->callQueue->isEmpty(); - } - - protected function processQueue() - { - $this->loop->futureTick(function () { - if ($this->callQueue->isEmpty()) { - return; - } - - $message = $this->callQueue->dequeue(); - $data = [ - 'function' => $message->getFunction(), - 'args' => $message->getArgs(), - 'errorResultCode' => $message->getErrorResultCode(), - ]; - $message->getDeferred()->resolve($data); - }); - } - - /** - * @param callable $func - * @return callable - */ - protected function filesystemResultHandler(callable $func) - { - return function ($mixed) use ($func) { - if ($this->callQueue->count() == 0) { - $this->callQueueActive = false; - } else { - $this->processQueue(); - } - return $func($mixed); - }; - } -} diff --git a/src/ThrottledQueuedInvoker.php b/src/ThrottledQueuedInvoker.php deleted file mode 100644 index f12f48a9..00000000 --- a/src/ThrottledQueuedInvoker.php +++ /dev/null @@ -1,133 +0,0 @@ -loop = $adapter->getLoop(); - $this->adapter = $adapter; - $this->callQueue = new \SplQueue(); - $this->interval = $interval; - } - /** - * @param float $interval - */ - public function setInterval($interval) - { - $this->interval = $interval; - } - - /** - * @return float - */ - public function getInterval() - { - return $this->interval; - } - - /** - * @param string $function - * @param array $args - * @param int $errorResultCode - * @return \React\Promise\ExtendedPromiseInterface - */ - public function invokeCall($function, $args, $errorResultCode = -1) - { - $this->callQueueActive = true; - $deferred = new Deferred(); - - $this->callQueue->enqueue(new QueuedCall($deferred, $function, $args, $errorResultCode)); - - if (!$this->callQueue->isEmpty()) { - $this->processQueue(); - } - - return $deferred->promise()->then(function ($data) { - return $this-> - adapter-> - callFilesystem($data['function'], $data['args'], $data['errorResultCode'])-> - then($this->filesystemResultHandler('React\Promise\resolve'), $this->filesystemResultHandler('React\Promise\reject')); - }); - } - - /** - * @return bool - */ - public function isEmpty() - { - return $this->callQueue->isEmpty(); - } - - protected function processQueue() - { - $this->loop->addTimer($this->interval, function () { - if ($this->callQueue->isEmpty()) { - return; - } - - $message = $this->callQueue->dequeue(); - $data = [ - 'function' => $message->getFunction(), - 'args' => $message->getArgs(), - 'errorResultCode' => $message->getErrorResultCode(), - ]; - $message->getDeferred()->resolve($data); - }); - } - - /** - * @param $func - * - * @return callable - */ - protected function filesystemResultHandler($func) - { - return function ($mixed) use ($func) { - if ($this->callQueue->count() == 0) { - $this->callQueueActive = false; - } else { - $this->processQueue(); - } - return $func($mixed); - }; - } -}