Skip to content

Commit

Permalink
feat: Add AmphpExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
il-masaru-yamagishi committed Oct 15, 2023
1 parent 96d8640 commit c6a6a36
Show file tree
Hide file tree
Showing 9 changed files with 181 additions and 101 deletions.
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
],
"require": {
"php": ">=8.1",
"amphp/http-client": "v5.0.0-beta.17",
"amphp/amp": "^3.0",
"amphp/http-client": "^5.0.0",
"laminas/laminas-diactoros": "^3.0",
"psr/http-client": "^1.0",
"psr/http-factory": "^1.0",
Expand Down
62 changes: 49 additions & 13 deletions src/Console/Commands/RunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@

namespace Heavyrain\Console\Commands;

use Amp\DeferredCancellation;
use Closure;
use Heavyrain\Executor\ExecutorConfig;
use Heavyrain\Executor\ExecutorFactory;
use Heavyrain\Executor\SyncExecutor;
use Heavyrain\HttpClient\ClientFactory;
use Heavyrain\HttpClient\HttpProfiler;
use Heavyrain\Reporters\TableReporter;
Expand All @@ -25,13 +23,20 @@
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Heavyrain\Executor\AmphpExecutor;
use Heavyrain\Executor\OnceExecutor;

#[AsCommand(
name: 'run:single',
description: 'Run scenario once',
name: 'run',
description: 'Run target scenario',
)]
final class RunCommand extends Command implements SignalableCommandInterface
{
/**
* OS Signal cancellation token
*
* @var CancellationToken|null
*/
private ?CancellationToken $cancelToken = null;

protected function configure(): void
Expand All @@ -45,12 +50,23 @@ protected function configure(): void
'base-uri',
InputArgument::REQUIRED,
'Request base URI',
)->addOption(
'runner',
'r',
InputOption::VALUE_REQUIRED,
'Runner type(available: once, async, aggregator, worker)',
'once',
)->addOption(
'output',
'o',
InputOption::VALUE_REQUIRED,
'Output format',
'table',
)->addOption(
'users',
'u',
InputOption::VALUE_REQUIRED,
'Concurrent users',
);
}

Expand Down Expand Up @@ -86,19 +102,43 @@ protected function execute(InputInterface $input, OutputInterface $output)
if (\method_exists($scenarioFunction, 'isStatic') && !$scenarioFunction->isStatic()) {
$io->warning('Scenario Closure should be static: `return static function(...`');
}
$this->cancelToken = new CancellationToken();
$this->cancelToken = new CancellationToken(new DeferredCancellation);
$users = $input->getOption('users');
\assert(\is_int($users));

$io->definitionList(
['Base URI' => $baseUri],
['Scenario' => $scenarioFilePath],
['Users' => $users],
);

$profiler = new HttpProfiler();

$reporter = match ($input->getOption('output')) {
'table' => new TableReporter($io),
default => null,
};
if (\is_null($reporter)) {
$io->error('Unknown output format provided=' . (string)$input->getOption('output'));
return Command::INVALID;
}

$executor = match ($input->getOption('runner')) {
'once' => new OnceExecutor($scenarioFunction->getClosure(), new ClientFactory($profiler, $baseUri), $profiler),
'async' => new AmphpExecutor($scenarioFunction->getClosure(), new ClientFactory($profiler, $baseUri), $profiler, $users),
'aggregator' => throw new \RuntimeException('Runner:aggregator not implemented yet'),
'worker' => throw new \RuntimeException('Runner:worker not implemented yet'),
default => null,
};
if (\is_null($executor)) {
$io->error('Unknown runner type provided=' . (string)$input->getOption('runner'));
return Command::INVALID;
}

$startMicrosec = \microtime(true);
$io->writeln(\sprintf('Start execution at %s', \date('Y-m-d H:i:s')));
$profiler = new HttpProfiler();

(new SyncExecutor($scenarioFunction->getClosure(), new ClientFactory($profiler, $baseUri)))
->execute($this->cancelToken);
$executor->execute($this->cancelToken);

$io->writeln(
\sprintf(
Expand All @@ -108,10 +148,6 @@ protected function execute(InputInterface $input, OutputInterface $output)
),
);

$reporter = match ($input->getOption('output')) {
'table' => new TableReporter($io),
default => new TableReporter($io),
};
$reporter->report($profiler->getResults());

return Command::SUCCESS;
Expand Down
56 changes: 56 additions & 0 deletions src/Executor/AmphpExecutor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
<?php

/**
* @license MIT
*/

declare(strict_types=1);

namespace Heavyrain\Executor;

use Closure;
use Heavyrain\Contracts\CancellationTokenInterface;
use Heavyrain\Contracts\ExecutorInterface;
use Heavyrain\HttpClient\ClientFactory;
use Heavyrain\HttpClient\HttpProfiler;
use Heavyrain\HttpClient\RequestException;
use Throwable;

use function Amp\async;

/**
* Executes asynchronized with amphp
*/
class AmphpExecutor implements ExecutorInterface
{
public function __construct(
private readonly Closure $scenarioFunction,
private readonly ClientFactory $factory,
private readonly HttpProfiler $profiler,
private readonly int $userCount,
) {
}

public function execute(CancellationTokenInterface $token): iterable
{
$scenarioFunc = $this->scenarioFunction;
$factory = $this->factory;
$profiler = $this->profiler;

for ($i = 0; $i < $this->userCount; $i++) {
// TODO: ramp-up users
yield async(static function () use ($token, $scenarioFunc, $factory, $profiler): void {
while (true) {
try {
$scenarioFunc($factory->create($token));
} catch (RequestException $e) {
// do nothing because Heavyrain\HttpClient\RequestException was handled in AmphpClient
} catch (Throwable $e) {
$profiler->profileUncaughtException($e);
}
\sleep(1); // sleep 1 second to prevent CPU overload
}
});
}
}
}
32 changes: 0 additions & 32 deletions src/Executor/ExecutorFactory.php

This file was deleted.

41 changes: 41 additions & 0 deletions src/Executor/OnceExecutor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php

/**
* @license MIT
*/

declare(strict_types=1);

namespace Heavyrain\Executor;

use Closure;
use Heavyrain\Contracts\CancellationTokenInterface;
use Heavyrain\Contracts\ExecutorInterface;
use Heavyrain\HttpClient\ClientFactory;
use Heavyrain\HttpClient\HttpProfiler;
use Heavyrain\HttpClient\RequestException;
use Throwable;

/**
* Simply executes synchronized once
*/
class OnceExecutor implements ExecutorInterface
{
public function __construct(
private readonly Closure $scenarioFunction,
private readonly ClientFactory $factory,
private readonly HttpProfiler $profiler,
) {
}

public function execute(CancellationTokenInterface $token): iterable
{
try {
yield ($this->scenarioFunction)($this->factory->create($token));
} catch (RequestException $e) {
// do nothing because Heavyrain\HttpClient\RequestException was handled in AmphpClient
} catch (Throwable $e) {
$this->profiler->profileUncaughtException($e);
}
}
}
47 changes: 0 additions & 47 deletions src/Executor/SyncExecutor.php

This file was deleted.

8 changes: 5 additions & 3 deletions src/HttpClient/AmphpClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

namespace Heavyrain\HttpClient;

use Amp\Cancellation;
use Amp\Http\Client\HttpClient;
use Amp\Http\Client\Request;
use Amp\Http\Client\Response;
Expand All @@ -31,6 +32,7 @@ public function __construct(
private readonly HttpProfiler $profiler,
private readonly HttpClient $client,
private readonly ResponseFactoryInterface $responseFactory,
private readonly Cancellation $cancellation,
) {
}

Expand Down Expand Up @@ -70,18 +72,18 @@ private function handle(RequestInterface $request): ResponseInterface
$ampRequest->setTransferTimeout(10);

try {
$response = $this->client->request($ampRequest);
$ampResponse = $this->client->request($ampRequest, $this->cancellation);

// Profiles with HTTP events.
$this->profiler->profile($ampRequest, $response);
$this->profiler->profile($ampRequest, $ampResponse);
} catch (\Throwable $exception) {
// Profile exception during request
$this->profiler->profileException($ampRequest, $exception);

throw new RequestException('failed to fetch response', previous: $exception);
}

return $this->toPsrResponse($response);
return $this->toPsrResponse($ampResponse);
}

/**
Expand Down
17 changes: 15 additions & 2 deletions src/HttpClient/ClientFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@

namespace Heavyrain\HttpClient;

use Amp\CompositeCancellation;
use Amp\Http\Client\HttpClientBuilder;
use Amp\TimeoutCancellation;
use Heavyrain\Contracts\CancellationTokenInterface;
use Heavyrain\Contracts\ClientInterface;
use Heavyrain\Scenario\CancellationToken;
use Heavyrain\Scenario\Client;
use Heavyrain\Scenario\RequestBuilder;
use Laminas\Diactoros\RequestFactory;
Expand All @@ -26,11 +30,13 @@ class ClientFactory
* @param HttpProfiler $profiler The HTTP profiler.
* @param string $baseUri The base URI for the HTTP client.
* @param int $followRedirects Follows redirects count
* @param float $timeoutSeconds Timeout in seconds
*/
public function __construct(
public readonly HttpProfiler $profiler,
private readonly HttpProfiler $profiler,
private readonly string $baseUri,
private readonly int $followRedirects = 0,
private readonly float $timeoutSeconds = 30.0,
) {
}

Expand All @@ -39,8 +45,14 @@ public function __construct(
*
* @return ClientInterface The newly created HTTP client instance.
*/
public function create(): ClientInterface
public function create(CancellationTokenInterface $token): ClientInterface
{
\assert($token instanceof CancellationToken);
$cancellation = new CompositeCancellation(
new TimeoutCancellation($this->timeoutSeconds),
$token->getCancellation(),
);

$ampHttpClient = (new HttpClientBuilder())
->allowDeprecatedUriUserInfo()
->followRedirects($this->followRedirects)
Expand All @@ -54,6 +66,7 @@ public function create(): ClientInterface
$this->profiler,
$ampHttpClient,
new ResponseFactory(),
$cancellation,
);

$requestBuilder = new RequestBuilder(
Expand Down
Loading

0 comments on commit c6a6a36

Please sign in to comment.