Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make use of Guzzle Pool to improve efficiency #401

Merged
merged 6 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ $notifications = [
'payload' => '{"message":"Hello World!"}',
], [
// current PushSubscription format (browsers might change this in the future)
'subscription' => Subscription::create([
'subscription' => Subscription::create([
"endpoint" => "https://example.com/other/endpoint/of/another/vendor/abcdef...",
"keys" => [
'p256dh' => '(stringOf88Chars)',
Expand Down Expand Up @@ -253,18 +253,18 @@ foreach ($webPush->flush() as $report) {
echo "[v] Message sent successfully for subscription {$endpoint}.";
} else {
echo "[x] Message failed to sent for subscription {$endpoint}: {$report->getReason()}";

// also available (to get more info)

/** @var \Psr\Http\Message\RequestInterface $requestToPushService */
$requestToPushService = $report->getRequest();

/** @var \Psr\Http\Message\ResponseInterface $responseOfPushService */
$responseOfPushService = $report->getResponse();

/** @var string $failReason */
$failReason = $report->getReason();

/** @var bool $isTheEndpointWrongOrExpired */
$isTheEndpointWrongOrExpired = $report->isSubscriptionExpired();
}
Expand Down Expand Up @@ -364,6 +364,7 @@ Here are some ideas:
1. Make sure MultiCurl is available on your server
2. Find the right balance for your needs between security and performance (see above)
3. Find the right batch size (set it in `defaultOptions` or as parameter to `flush()`)
4. Use `flushPooled()` instead of `flush()`. The former uses concurrent requests, accelerating the process and often doubling the speed of the requests.

### How to solve "SSL certificate problem: unable to get local issuer certificate"?

Expand Down
61 changes: 58 additions & 3 deletions src/WebPush.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
namespace Minishlink\WebPush;

use GuzzleHttp\Client;
use GuzzleHttp\Pool;
use GuzzleHttp\Exception\RequestException;
use GuzzleHttp\Psr7\Request;
use ParagonIE\ConstantTime\Base64UrlSafe;
Expand All @@ -30,7 +31,7 @@ class WebPush
protected ?array $notifications = null;

/**
* @var array Default options: TTL, urgency, topic, batchSize
* @var array Default options: TTL, urgency, topic, batchSize, requestConcurrency
*/
protected array $defaultOptions;

Expand All @@ -53,7 +54,7 @@ class WebPush
* WebPush constructor.
*
* @param array $auth Some servers need authentication
* @param array $defaultOptions TTL, urgency, topic, batchSize
* @param array $defaultOptions TTL, urgency, topic, batchSize, requestConcurrency
* @param int|null $timeout Timeout of POST request
*
* @throws \ErrorException
Expand Down Expand Up @@ -175,6 +176,58 @@ public function flush(?int $batchSize = null): \Generator
}
}

/**
* Flush notifications. Triggers concurrent requests.
*
* @param callable(MessageSentReport): void $callback Callback for each notification
* @param null|int $batchSize Defaults the value defined in defaultOptions during instantiation (which defaults to 1000).
* @param null|int $requestConcurrency Defaults the value defined in defaultOptions during instantiation (which defaults to 100).
*/
public function flushPooled($callback, ?int $batchSize = null, ?int $requestConcurrency = null): void
{
if (empty($this->notifications)) {
return;
}

if (null === $batchSize) {
$batchSize = $this->defaultOptions['batchSize'];
}

if (null === $requestConcurrency) {
$requestConcurrency = $this->defaultOptions['requestConcurrency'];
}

$batches = array_chunk($this->notifications, $batchSize);
$this->notifications = [];

foreach ($batches as $batch) {
$batch = $this->prepare($batch);
$pool = new Pool($this->client, $batch, [
'requestConcurrency' => $requestConcurrency,
'fulfilled' => function (ResponseInterface $response, int $index) use ($callback, $batch) {
/** @var \Psr\Http\Message\RequestInterface $request **/
$request = $batch[$index];
$callback(new MessageSentReport($request, $response));
},
'rejected' => function (RequestException $reason) use ($callback) {
if (method_exists($reason, 'getResponse')) {
$response = $reason->getResponse();
} else {
$response = null;
}
$callback(new MessageSentReport($reason->getRequest(), $response, false, $reason->getMessage()));
},
]);

$promise = $pool->promise();
$promise->wait();
}

if ($this->reuseVAPIDHeaders) {
$this->vapidHeaders = [];
}
}

/**
* @throws \ErrorException|\Random\RandomException
*/
Expand Down Expand Up @@ -315,14 +368,16 @@ public function getDefaultOptions(): array
}

/**
* @param array $defaultOptions Keys 'TTL' (Time To Live, defaults 4 weeks), 'urgency', 'topic', 'batchSize'
* @param array $defaultOptions Keys 'TTL' (Time To Live, defaults 4 weeks), 'urgency', 'topic', 'batchSize', 'requestConcurrency'
*/
public function setDefaultOptions(array $defaultOptions): WebPush
{
$this->defaultOptions['TTL'] = $defaultOptions['TTL'] ?? 2419200;
$this->defaultOptions['urgency'] = $defaultOptions['urgency'] ?? null;
$this->defaultOptions['topic'] = $defaultOptions['topic'] ?? null;
$this->defaultOptions['batchSize'] = $defaultOptions['batchSize'] ?? 1000;
$this->defaultOptions['requestConcurrency'] = $defaultOptions['requestConcurrency'] ?? 100;


return $this;
}
Expand Down
40 changes: 40 additions & 0 deletions tests/WebPushTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,46 @@ public function testFlush(): void
}
}

/**
* @throws \ErrorException
* @throws \JsonException
*/
public function testFlushPooled(): void
{
$subscription = new Subscription(self::$endpoints['standard']);

$report = $this->webPush->sendOneNotification($subscription);
$this->assertFalse($report->isSuccess()); // it doesn't have VAPID

// queue has been reset
$this->assertEmpty(iterator_to_array($this->webPush->flush()));

$report = $this->webPush->sendOneNotification($subscription);
$this->assertFalse($report->isSuccess()); // it doesn't have VAPID

$nonExistentSubscription = Subscription::create([
'endpoint' => 'https://fcm.googleapis.com/fcm/send/fCd2-8nXJhU:APA91bGi2uaqFXGft4qdolwyRUcUPCL1XV_jWy1tpCRqnu4sk7ojUpC5gnq1PTncbCdMq9RCVQIIFIU9BjzScvjrDqpsI7J-K_3xYW8xo1xSNCfge1RvJ6Xs8RGL_Sw7JtbCyG1_EVgWDc22on1r_jozD8vsFbB0Fg',
'publicKey' => 'BME-1ZSAv2AyGjENQTzrXDj6vSnhAIdKso4n3NDY0lsd1DUgEzBw7ARMKjrYAm7JmJBPsilV5CWNH0mVPyJEt0Q',
'authToken' => 'hUIGbmiypj9_EQea8AnCKA',
'contentEncoding' => 'aes128gcm',
]);

// test multiple requests
$this->webPush->queueNotification($nonExistentSubscription, json_encode(['test' => 1], JSON_THROW_ON_ERROR));
$this->webPush->queueNotification($nonExistentSubscription, json_encode(['test' => 2], JSON_THROW_ON_ERROR));
$this->webPush->queueNotification($nonExistentSubscription, json_encode(['test' => 3], JSON_THROW_ON_ERROR));

$callback = function ($report) {
$this->assertFalse($report->isSuccess());
$this->assertTrue($report->isSubscriptionExpired());
$this->assertEquals(410, $report->getResponse()->getStatusCode());
$this->assertNotEmpty($report->getReason());
$this->assertNotFalse(filter_var($report->getEndpoint(), FILTER_VALIDATE_URL));
};

$this->webPush->flushPooled($callback);
}

public function testFlushEmpty(): void
{
$this->assertEmpty(iterator_to_array($this->webPush->flush(300)));
Expand Down