Skip to content

Commit

Permalink
Merge pull request #37 from M6Web/feature/sf-http-client
Browse files Browse the repository at this point in the history
Adding Symfony HttpClient support
  • Loading branch information
b-viguier authored Nov 22, 2019
2 parents 5cfefb5 + 16609e5 commit 6bb5396
Show file tree
Hide file tree
Showing 8 changed files with 310 additions and 32 deletions.
16 changes: 11 additions & 5 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,19 @@
"ext-curl": "^7.1",
"react/event-loop": "^1.0",
"react/promise": "^2.7",
"phpstan/phpstan": "^0.10.5"
"phpstan/phpstan": "^0.10.5",
"symfony/http-client": "^4.3",
"psr/http-factory": "^1.0",
"http-interop/http-factory-guzzle": "^1.0"
},
"suggest": {
"amphp/amp": "Needed to use Tornado\\Adapter\\Amp\\EventLoop",
"react/event-loop": "Needed to use Tornado\\Adapter\\ReactPhp\\EventLoop",
"react/promise": "Needed to use Tornado\\Adapter\\ReactPhp\\EventLoop",
"guzzlehttp/guzzle": "Needed to use Tornado\\Adapter\\Guzzle\\HttpClient"
"ext-curl": "Required to use Curl and HTTP2 features",
"amphp/amp": "Required to use Tornado\\Adapter\\Amp\\EventLoop",
"react/event-loop": "Required to use Tornado\\Adapter\\ReactPhp\\EventLoop",
"react/promise": "Required to use Tornado\\Adapter\\ReactPhp\\EventLoop",
"guzzlehttp/guzzle": "Required to use Tornado\\Adapter\\Guzzle\\HttpClient",
"symfony/http-client": "Required to use Tornado\\Adapter\\Symfony\\HttpClient",
"psr/http-factory": "Required to use Tornado\\Adapter\\Symfony\\HttpClient"
},
"config": {
"bin-dir": "bin/"
Expand Down
28 changes: 17 additions & 11 deletions examples/03-http-client.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
function monitorRequest(EventLoop $eventLoop, HttpClient $httpClient, string $uri): \Generator
{
// Let's use Guzzle Psr7 implementation
$request = new \GuzzleHttp\Psr7\Request('GET', $uri);
$request = new \GuzzleHttp\Psr7\Request('GET', $uri, [], null, '2.0');

$start = microtime(true);
/** @var \Psr\Http\Message\ResponseInterface */
Expand All @@ -26,22 +26,28 @@ function monitorRequest(EventLoop $eventLoop, HttpClient $httpClient, string $ur
//$eventLoop = new Adapter\Amp\EventLoop();
//$eventLoop = new Adapter\ReactPhp\EventLoop(new React\EventLoop\StreamSelectLoop());

// Tornado provides only one HttpClient implementation, using Guzzle
$httpClient = new Adapter\Guzzle\HttpClient($eventLoop, new Adapter\Guzzle\CurlMultiClientWrapper());
// Choose your adapter
$httpClient = new Adapter\Symfony\HttpClient(new \Symfony\Component\HttpClient\CurlHttpClient(), $eventLoop, new \Http\Factory\Guzzle\ResponseFactory(), new \Http\Factory\Guzzle\StreamFactory());
//$httpClient = new Adapter\Guzzle\HttpClient($eventLoop, new Adapter\Guzzle\CurlMultiClientWrapper());

// Let's call several endpoints… concurrently!
echo "Let's start!\n";
echo "Requests in progress…\n";
$start = microtime(true);
$results = $eventLoop->wait(
$eventLoop->promiseAll(
$eventLoop->async(monitorRequest($eventLoop, $httpClient, 'http://httpbin.org/status/404')),
$eventLoop->async(monitorRequest($eventLoop, $httpClient, 'http://www.google.com')),
$eventLoop->async(monitorRequest($eventLoop, $httpClient, 'http://www.example.com'))
)
);
$promises = [];
// You can download up to 379 parts.
// Check https://http2.akamai.com/demo for the full HTTP2 demonstration.
for ($i = 0; $i < 10; $i++) {
$promises[] = $eventLoop->async(monitorRequest(
$eventLoop,
$httpClient,
"https://http2.akamai.com/demo/tile-$i.png"
));
}

$results = $eventLoop->wait($eventLoop->promiseAll(...$promises));
$duration = microtime(true) - $start;

echo "Global duration: $duration\n";
echo implode(PHP_EOL, $results).PHP_EOL;
echo "Global duration: $duration\n";
echo "Finished!\n";
47 changes: 43 additions & 4 deletions examples/tests/ExamplesTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,29 @@ public function testExampleShouldRun(string $exampleFile, string $eventloopName,
}
}

private function extractExampleCode(string $exampleFile): iterable
{
$originalContent = file($exampleFile);

foreach ($this->selectEventLoop($originalContent) as $nameEL => $contentEL) {
$exampleUseHttpClient = false;

foreach ($this->selectHttpClient($contentEL) as $nameHC => $contentELHC) {
$exampleUseHttpClient = true;
yield "$nameEL - $nameHC" => implode('', $contentELHC);
}

if (!$exampleUseHttpClient) {
yield $nameEL => implode('', $contentEL);
}
}
}

/**
* Very naive approach to iterate over various eventLoop implementations.
*/
private function extractExampleCode(string $exampleFiles): iterable
private function selectEventLoop(array $originalContent): iterable
{
$originalContent = file($exampleFiles);

foreach ($originalContent as &$line) {
if (false === strpos($line, '$eventLoop = new ')) {
continue;
Expand All @@ -68,7 +84,30 @@ private function extractExampleCode(string $exampleFiles): iterable
// Enable current eventLoop
$line = ltrim($line, '/');

yield $name => implode('', $originalContent);
yield $name => $originalContent;

// Disable this eventLoop
$line = "//$line";
}
}

/**
* Very naive approach to iterate over various httpClient implementations.
*/
private function selectHttpClient(array $originalContent): iterable
{
foreach ($originalContent as &$line) {
if (false === strpos($line, '$httpClient = new ')) {
continue;
}

// Extract relevant name
$name = strstr(strstr($line, '(', true), 'Adapter\\');

// Enable current eventLoop
$line = ltrim($line, '/');

yield $name => $originalContent;

// Disable this eventLoop
$line = "//$line";
Expand Down
20 changes: 20 additions & 0 deletions src/Adapter/Guzzle/HttpClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public function __construct(EventLoop $eventLoop, GuzzleClientWrapper $clientWra
*/
public function sendRequest(RequestInterface $request): Promise
{
$request = $this->http2fallback($request);
$deferred = $this->eventLoop->deferred();

$this->clientWrapper->getClient()->sendAsync($request)->then(
Expand All @@ -59,6 +60,25 @@ function (\Exception $exception) use ($deferred) {
return $deferred->getPromise();
}

private function http2fallback(RequestInterface $request): RequestInterface
{
if ($request->getProtocolVersion() !== '2.0') {
return $request;
}

// Check that HTTP/2 is effectively supported by the system, and fallback to HTTP/1.1 if needed.
// Inspired from https://github.com/symfony/http-client/blob/master/CurlHttpClient.php
if (
'https' !== $request->getUri()->getScheme()
|| !\defined('CURL_VERSION_HTTP2')
|| !(CURL_VERSION_HTTP2 & curl_version()['features'])
) {
return $request->withProtocolVersion('1.1');
}

return $request;
}

private function guzzleEventLoop(): \Generator
{
do {
Expand Down
155 changes: 155 additions & 0 deletions src/Adapter/Symfony/HttpClient.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
<?php

namespace M6Web\Tornado\Adapter\Symfony;

use M6Web\Tornado\Deferred;
use M6Web\Tornado\EventLoop;
use M6Web\Tornado\Promise;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseFactoryInterface;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\StreamFactoryInterface;
use Symfony\Contracts\HttpClient\ChunkInterface;
use Symfony\Contracts\HttpClient\HttpClientInterface as SfHttpClientInterface;
use Symfony\Contracts\HttpClient\ResponseInterface as SfResponseInterface;

class HttpClient implements \M6Web\Tornado\HttpClient
{
/** @var SfHttpClientInterface */
private $symfonyClient;

/** @var EventLoop */
private $eventLoop;

/** @var SfResponseInterface[] */
private $jobs = [];

/** @var ResponseFactoryInterface */
private $responseFactory;

/** @var StreamFactoryInterface */
private $streamFactory;

/** @var int */
private $lastRequestId = 0;

public function __construct(SfHttpClientInterface $symfonyClient, EventLoop $eventLoop, ResponseFactoryInterface $responseFactory, StreamFactoryInterface $streamFactory)
{
$this->symfonyClient = $symfonyClient;
$this->eventLoop = $eventLoop;
$this->responseFactory = $responseFactory;
$this->streamFactory = $streamFactory;
}

public function sendRequest(RequestInterface $request): Promise
{
$body = $request->getBody();
if ($body->isSeekable()) {
$body->seek(0);
}

$requestId = $this->lastRequestId = ++$this->lastRequestId % PHP_INT_MAX;
try {
$this->jobs[$requestId] = $this->symfonyClient->request($request->getMethod(), (string) $request->getUri(), [
'headers' => $request->getHeaders(),
'body' => $body->getContents(),
'http_version' => $request->getProtocolVersion(),
'user_data' => [
$deferred = $this->eventLoop->deferred(),
$requestId,
],
]);
} catch (\Exception $exception) {
return $this->eventLoop->promiseRejected($exception);
}

// Register the internal event loop only for the first request
if (count($this->jobs) === 1) {
$this->eventLoop->async($this->symfonyEventLoop());
}

return $deferred->getPromise();
}

private function symfonyEventLoop(): \Generator
{
do {
yield $this->eventLoop->idle();

$currentJobs = $this->jobs;
$this->jobs = [];
/**
* @var SfResponseInterface
* @var ChunkInterface $chunk
*/
foreach ($this->symfonyClient->stream($currentJobs, 0) as $response => $chunk) {
/** @var Deferred $deferred */
[$deferred, $requestId] = $response->getInfo('user_data');

try {
if ($chunk->isTimeout() || !$chunk->isLast()) {
// To prevent the client to throw an exception
// https://github.com/symfony/symfony/issues/32673#issuecomment-548327270
$response->getStatusCode();
$this->jobs[$requestId] = $response;
continue;
}

// the full content of $response just completed
// $response->getContent() is now a non-blocking call
$deferred->resolve($this->toPsrResponse($response));

// Stream loop may yield the same response several times,
// then the response may already by in the list of responses to process.
// To prevent to resolve it twice, remove it.
unset($this->jobs[$requestId]);
} catch (\Throwable $exception) {
$deferred->reject($exception);
}
}
} while ($this->jobs);
}

/**
* Inspired from https://github.com/symfony/http-client/blob/master/Psr18Client.php
*
* @throws \Symfony\Contracts\HttpClient\Exception\ClientExceptionInterface
* @throws \Symfony\Contracts\HttpClient\Exception\RedirectionExceptionInterface
* @throws \Symfony\Contracts\HttpClient\Exception\ServerExceptionInterface
* @throws \Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface
*/
private function toPsrResponse(SfResponseInterface $response): ResponseInterface
{
$psrResponse = $this->responseFactory->createResponse($response->getStatusCode());
foreach ($response->getHeaders(false) as $name => $values) {
foreach ($values as $value) {
$psrResponse = $psrResponse->withAddedHeader($name, $value);
}
}

$body = $this->streamFactory->createStream($response->getContent(false));

if ($body->isSeekable()) {
$body->seek(0);
}

return $psrResponse->withBody($body);
}
}

/**
* @internal
*/
class InternalSymfonyJob
{
/** @var Deferred */
public $deferred;
/** @var SfResponseInterface */
public $response;

public function __construct(Deferred $deferred, SfResponseInterface $response)
{
$this->deferred = $deferred;
$this->response = $response;
}
}
6 changes: 0 additions & 6 deletions src/EventLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ public function promiseAll(Promise ...$promises): Promise;
*
* @param \Traversable|array $traversable Input elements
* @param callable $function must return a generator from an input value, and an optional key
*
* @return Promise
*/
public function promiseForeach($traversable, callable $function): Promise;

Expand Down Expand Up @@ -71,17 +69,13 @@ public function deferred(): Deferred;
* Returns a promise that will be resolved with the input stream when it becomes readable.
*
* @param resource $stream
*
* @return Promise
*/
public function readable($stream): Promise;

/**
* Returns a promise that will be resolved with the input stream when it becomes writable.
*
* @param resource $stream
*
* @return Promise
*/
public function writable($stream): Promise;
}
43 changes: 43 additions & 0 deletions tests/Adapter/Symfony/HttpClientTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?php

namespace M6WebTest\Tornado\Adapter\Symfony;

use M6Web\Tornado\EventLoop;
use M6Web\Tornado\HttpClient;
use Psr\Http\Message\ResponseInterface;

class HttpClientTest extends \M6WebTest\Tornado\HttpClientTest
{
protected function createHttpClient(EventLoop $eventLoop, array $responsesOrExceptions): HttpClient
{
$callback = function ($method, $url, $options) use (&$responsesOrExceptions) {
$response = array_shift($responsesOrExceptions);
if ($response instanceof \Exception) {
throw $response;
}
/* @var ResponseInterface $response */

return new \Symfony\Component\HttpClient\Response\MockResponse(
(string) $response->getBody(),
[
'response_headers' => $response->getHeaders(),
'redirect_count' => 0,
'redirect_url' => null,
'start_time' => microtime(true),
'http_method' => $method,
'http_code' => $response->getStatusCode(),
'error' => null,
'user_data' => $options['user_data'],
'url' => $url,
]
);
};

return new \M6Web\Tornado\Adapter\Symfony\HttpClient(
new \Symfony\Component\HttpClient\MockHttpClient($callback),
$eventLoop,
new \Http\Factory\Guzzle\ResponseFactory(),
new \Http\Factory\Guzzle\StreamFactory()
);
}
}
Loading

0 comments on commit 6bb5396

Please sign in to comment.