diff --git a/README.md b/README.md index 7d93e01..410b5fa 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ operations, but keeping thousands of jobs in memory at once may easily take up all resources on your side. Instead, you can use this library to stream your arbitrarily large input list as individual records to a non-blocking (async) transformation handler. It uses -[ReactPHP](https://reactphp.org) to enable you to concurrently process multiple +[ReactPHP](https://reactphp.org/) to enable you to concurrently process multiple records at once. You can control the concurrency limit, so that by allowing it to process 10 operations at the same time, you can thus process this large input list around 10 times faster and at the same time you're no longer limited @@ -72,13 +72,17 @@ Once [installed](#install), you can use the following code to process an example user lists by sending a (RESTful) HTTP API request for each user record: ```php +get("https://ipapi.co/$user[ip]/country_name/")->then( - function (ResponseInterface $response) use ($user) { + function (Psr\Http\Message\ResponseInterface $response) use ($user) { // response successfully received // add country to user array and return updated user $user['country'] = (string)$response->getBody(); @@ -114,7 +118,9 @@ $transformer->on('data', function ($user) { $transformer->on('end', function () { echo '[DONE]' . PHP_EOL; }); -$transformer->on('error', 'printf'); +$transformer->on('error', function (Exception $e) { + echo 'Error: ' . $e->getMessage() . PHP_EOL; +}); ``` @@ -240,8 +246,8 @@ $transformer = new Transformer(10, function ($url) use ($browser) { return json_decode($response->getBody()); }, - function (Exception $error) { - var_dump('There was an error', $error->getMessage()); + function (Exception $e) { + echo 'Error: ' . $e->getMessage() . PHP_EOL; throw $error; } @@ -411,6 +417,10 @@ $transformer = new Transformer(10, function ($data) use ($http) { }); $source->pipe($gunzip)->pipe($ndjson)->pipe($transformer)->pipe($dest); + +$transformer->on('error', function (Exception $e) { + echo 'Error: ' . $e->getMessage() . PHP_EOL; +}); ``` Keep in mind that the transformation handler may return a rejected promise. @@ -456,6 +466,8 @@ $promise = Transformer::all($input, 3, function ($data) use ($browser, $url) { $promise->then(function ($count) { echo 'All ' . $count . ' jobs successful!' . PHP_EOL; +}, function (Exception $e) { + echo 'Error: ' . $e->getMessage() . PHP_EOL; }); ``` @@ -561,6 +573,8 @@ $promise = Transformer::any($input, 3, function ($data) use ($browser, $url) { $promise->then(function (ResponseInterface $response) { echo 'First successful job: ' . $response->getBody() . PHP_EOL; +}, function (Exception $e) { + echo 'Error: ' . $e->getMessage() . PHP_EOL; }); ``` diff --git a/examples/01-transform.php b/examples/01-transform.php index 1d50b0b..8c1f068 100644 --- a/examples/01-transform.php +++ b/examples/01-transform.php @@ -1,8 +1,5 @@ get("https://ipapi.co/$user[ip]/country_name/")->then( - function (ResponseInterface $response) use ($user) { + function (Psr\Http\Message\ResponseInterface $response) use ($user) { // response successfully received // add country to user array and return updated user $user['country'] = (string)$response->getBody(); @@ -49,5 +46,6 @@ function (ResponseInterface $response) use ($user) { $transformer->on('end', function () { echo '[DONE]' . PHP_EOL; }); -$transformer->on('error', 'printf'); - +$transformer->on('error', function (Exception $e) { + echo 'Error: ' . $e->getMessage() . PHP_EOL; +}); diff --git a/examples/02-transform-all.php b/examples/02-transform-all.php index c62f21e..06001ae 100644 --- a/examples/02-transform-all.php +++ b/examples/02-transform-all.php @@ -1,8 +1,5 @@ post( $url, array('Content-Type' => 'application/json'), json_encode($user) - )->then(function (ResponseInterface $response) { + )->then(function (Psr\Http\Message\ResponseInterface $response) { // demo HTTP response validation $body = json_decode($response->getBody()); if (!isset($body->json)) { @@ -40,10 +37,9 @@ function ($count) { echo 'Successfully processed all ' . $count . ' user records' . PHP_EOL; }, function (Exception $e) { - echo 'An error occurred: ' . $e->getMessage() . PHP_EOL; + echo 'Error: ' . $e->getMessage() . PHP_EOL; if ($e->getPrevious()) { echo 'Previous: ' . $e->getPrevious()->getMessage() . PHP_EOL; } } ); - diff --git a/examples/03-transform-any.php b/examples/03-transform-any.php index a1e5134..6f6367d 100644 --- a/examples/03-transform-any.php +++ b/examples/03-transform-any.php @@ -1,8 +1,5 @@ post( $url, array('Content-Type' => 'application/json'), json_encode($user) - )->then(function (ResponseInterface $response) use ($user) { + )->then(function (Psr\Http\Message\ResponseInterface $response) use ($user) { // demo HTTP response validation $body = json_decode($response->getBody()); if (!isset($body->json)) { @@ -44,10 +41,9 @@ function ($user) { echo 'Successfully processed user record:' . print_r($user, true) . PHP_EOL; }, function (Exception $e) { - echo 'An error occurred: ' . $e->getMessage() . PHP_EOL; + echo 'Error: ' . $e->getMessage() . PHP_EOL; if ($e->getPrevious()) { echo 'Previous: ' . $e->getPrevious()->getMessage() . PHP_EOL; } } ); - diff --git a/src/Transformer.php b/src/Transformer.php index 9ed8aa2..e07b53d 100644 --- a/src/Transformer.php +++ b/src/Transformer.php @@ -85,8 +85,8 @@ * * return json_decode($response->getBody()); * }, - * function (Exception $error) { - * var_dump('There was an error', $error->getMessage()); + * function (Exception $e) { + * echo 'Error: ' . $e->getMessage() . PHP_EOL; * * throw $error; * } @@ -256,6 +256,10 @@ * }); * * $source->pipe($gunzip)->pipe($ndjson)->pipe($transformer)->pipe($dest); + * + * $transformer->on('error', function (Exception $e) { + * echo 'Error: ' . $e->getMessage() . PHP_EOL; + * }); * ``` * * Keep in mind that the transformation handler may return a rejected promise. @@ -314,6 +318,8 @@ final class Transformer extends EventEmitter implements DuplexStreamInterface * * $promise->then(function ($count) { * echo 'All ' . $count . ' jobs successful!' . PHP_EOL; + * }, function (Exception $e) { + * echo 'Error: ' . $e->getMessage() . PHP_EOL; * }); * ``` * @@ -466,6 +472,8 @@ public static function all(ReadableStreamInterface $input, $concurrency, $callba * * $promise->then(function (ResponseInterface $response) { * echo 'First successful job: ' . $response->getBody() . PHP_EOL; + * }, function (Exception $e) { + * echo 'Error: ' . $e->getMessage() . PHP_EOL; * }); * ``` *