Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve documentation and examples #30

Merged
merged 1 commit into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ operations, but keeping thousands of jobs in memory at once may easily take up
all resources on your side.
Instead, you can use this library to stream your arbitrarily large input list
as individual records to a non-blocking (async) transformation handler. It uses
[ReactPHP](https://reactphp.org) to enable you to concurrently process multiple
[ReactPHP](https://reactphp.org/) to enable you to concurrently process multiple
records at once. You can control the concurrency limit, so that by allowing
it to process 10 operations at the same time, you can thus process this large
input list around 10 times faster and at the same time you're no longer limited
Expand Down Expand Up @@ -72,21 +72,25 @@ Once [installed](#install), you can use the following code to process an example
user lists by sending a (RESTful) HTTP API request for each user record:

```php
<?php

require __DIR__ . '/vendor/autoload.php';

$browser = new React\Http\Browser();

$concurrency = isset($argv[1]) ? $argv[1] : 3;

// each job should use the browser to GET a certain URL
// limit number of concurrent jobs here
$transformer = new Transformer($concurrency, function ($user) use ($browser) {
$transformer = new Clue\React\Flux\Transformer($concurrency, function ($user) use ($browser) {
// skip users that do not have an IP address listed
if (!isset($user['ip'])) {
return React\Promise\resolve($user);
}

// look up country for this IP
return $browser->get("https://ipapi.co/$user[ip]/country_name/")->then(
function (ResponseInterface $response) use ($user) {
function (Psr\Http\Message\ResponseInterface $response) use ($user) {
// response successfully received
// add country to user array and return updated user
$user['country'] = (string)$response->getBody();
Expand Down Expand Up @@ -114,7 +118,9 @@ $transformer->on('data', function ($user) {
$transformer->on('end', function () {
echo '[DONE]' . PHP_EOL;
});
$transformer->on('error', 'printf');
$transformer->on('error', function (Exception $e) {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

```

Expand Down Expand Up @@ -240,8 +246,8 @@ $transformer = new Transformer(10, function ($url) use ($browser) {

return json_decode($response->getBody());
},
function (Exception $error) {
var_dump('There was an error', $error->getMessage());
function (Exception $e) {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
yadaiio marked this conversation as resolved.
Show resolved Hide resolved

throw $error;
}
Expand Down Expand Up @@ -411,6 +417,10 @@ $transformer = new Transformer(10, function ($data) use ($http) {
});

$source->pipe($gunzip)->pipe($ndjson)->pipe($transformer)->pipe($dest);

$transformer->on('error', function (Exception $e) {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
});
```

Keep in mind that the transformation handler may return a rejected promise.
Expand Down Expand Up @@ -456,6 +466,8 @@ $promise = Transformer::all($input, 3, function ($data) use ($browser, $url) {

$promise->then(function ($count) {
echo 'All ' . $count . ' jobs successful!' . PHP_EOL;
}, function (Exception $e) {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
});
```

Expand Down Expand Up @@ -561,6 +573,8 @@ $promise = Transformer::any($input, 3, function ($data) use ($browser, $url) {

$promise->then(function (ResponseInterface $response) {
echo 'First successful job: ' . $response->getBody() . PHP_EOL;
}, function (Exception $e) {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
});
```

Expand Down
12 changes: 5 additions & 7 deletions examples/01-transform.php
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
<?php

use Clue\React\Flux\Transformer;
use Psr\Http\Message\ResponseInterface;

require __DIR__ . '/../vendor/autoload.php';

$browser = new React\Http\Browser();
Expand All @@ -11,7 +8,7 @@

// each job should use the browser to GET a certain URL
// limit number of concurrent jobs here
$transformer = new Transformer($concurrency, function ($user) use ($browser) {
$transformer = new Clue\React\Flux\Transformer($concurrency, function ($user) use ($browser) {
// skip users that do not have an IP address listed
if (!isset($user['ip'])) {
$user['country'] = 'n/a';
Expand All @@ -21,7 +18,7 @@

// look up country for this user's IP
return $browser->get("https://ipapi.co/$user[ip]/country_name/")->then(
function (ResponseInterface $response) use ($user) {
function (Psr\Http\Message\ResponseInterface $response) use ($user) {
// response successfully received
// add country to user array and return updated user
$user['country'] = (string)$response->getBody();
Expand Down Expand Up @@ -49,5 +46,6 @@ function (ResponseInterface $response) use ($user) {
$transformer->on('end', function () {
echo '[DONE]' . PHP_EOL;
});
$transformer->on('error', 'printf');

$transformer->on('error', function (Exception $e) {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
});
10 changes: 3 additions & 7 deletions examples/02-transform-all.php
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
<?php

use Clue\React\Flux\Transformer;
use Psr\Http\Message\ResponseInterface;

require __DIR__ . '/../vendor/autoload.php';

$browser = new React\Http\Browser();
Expand All @@ -21,12 +18,12 @@
// each job should use the browser to POST each user object to a certain URL
// process all users by processing all users through transformer
// limit number of concurrent jobs here
$promise = Transformer::all($input, $concurrency, function ($user) use ($browser, $url) {
$promise = Clue\React\Flux\Transformer::all($input, $concurrency, function ($user) use ($browser, $url) {
return $browser->post(
$url,
array('Content-Type' => 'application/json'),
json_encode($user)
)->then(function (ResponseInterface $response) {
)->then(function (Psr\Http\Message\ResponseInterface $response) {
// demo HTTP response validation
$body = json_decode($response->getBody());
if (!isset($body->json)) {
Expand All @@ -40,10 +37,9 @@ function ($count) {
echo 'Successfully processed all ' . $count . ' user records' . PHP_EOL;
},
function (Exception $e) {
echo 'An error occurred: ' . $e->getMessage() . PHP_EOL;
echo 'Error: ' . $e->getMessage() . PHP_EOL;
if ($e->getPrevious()) {
echo 'Previous: ' . $e->getPrevious()->getMessage() . PHP_EOL;
}
}
);

10 changes: 3 additions & 7 deletions examples/03-transform-any.php
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
<?php

use Clue\React\Flux\Transformer;
use Psr\Http\Message\ResponseInterface;

require __DIR__ . '/../vendor/autoload.php';

$browser = new React\Http\Browser();
Expand All @@ -21,12 +18,12 @@
// each job should use the browser to POST each user object to a certain URL
// process all users by processing all users through transformer
// limit number of concurrent jobs here
$promise = Transformer::any($input, $concurrency, function ($user) use ($browser, $url) {
$promise = Clue\React\Flux\Transformer::any($input, $concurrency, function ($user) use ($browser, $url) {
return $browser->post(
$url,
array('Content-Type' => 'application/json'),
json_encode($user)
)->then(function (ResponseInterface $response) use ($user) {
)->then(function (Psr\Http\Message\ResponseInterface $response) use ($user) {
// demo HTTP response validation
$body = json_decode($response->getBody());
if (!isset($body->json)) {
Expand All @@ -44,10 +41,9 @@ function ($user) {
echo 'Successfully processed user record:' . print_r($user, true) . PHP_EOL;
},
function (Exception $e) {
echo 'An error occurred: ' . $e->getMessage() . PHP_EOL;
echo 'Error: ' . $e->getMessage() . PHP_EOL;
if ($e->getPrevious()) {
echo 'Previous: ' . $e->getPrevious()->getMessage() . PHP_EOL;
}
}
);

12 changes: 10 additions & 2 deletions src/Transformer.php
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@
*
* return json_decode($response->getBody());
* },
* function (Exception $error) {
* var_dump('There was an error', $error->getMessage());
* function (Exception $e) {
* echo 'Error: ' . $e->getMessage() . PHP_EOL;
yadaiio marked this conversation as resolved.
Show resolved Hide resolved
*
* throw $error;
* }
Expand Down Expand Up @@ -256,6 +256,10 @@
* });
*
* $source->pipe($gunzip)->pipe($ndjson)->pipe($transformer)->pipe($dest);
*
* $transformer->on('error', function (Exception $e) {
* echo 'Error: ' . $e->getMessage() . PHP_EOL;
* });
* ```
*
* Keep in mind that the transformation handler may return a rejected promise.
Expand Down Expand Up @@ -314,6 +318,8 @@ final class Transformer extends EventEmitter implements DuplexStreamInterface
*
* $promise->then(function ($count) {
* echo 'All ' . $count . ' jobs successful!' . PHP_EOL;
* }, function (Exception $e) {
* echo 'Error: ' . $e->getMessage() . PHP_EOL;
* });
* ```
*
Expand Down Expand Up @@ -466,6 +472,8 @@ public static function all(ReadableStreamInterface $input, $concurrency, $callba
*
* $promise->then(function (ResponseInterface $response) {
* echo 'First successful job: ' . $response->getBody() . PHP_EOL;
* }, function (Exception $e) {
* echo 'Error: ' . $e->getMessage() . PHP_EOL;
* });
* ```
*
Expand Down