Skip to content

Commit

Permalink
Requested updates.
Browse files Browse the repository at this point in the history
- removed unncessary global channel prop
- add doc blocks to driver methods
- added baseline for phpstan
  • Loading branch information
rocksfrow committed Nov 9, 2023
1 parent 4a71799 commit add24f8
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 9 deletions.
6 changes: 2 additions & 4 deletions bin/phpstan.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
#!/bin/bash

cd $(dirname $0)

../vendor/bin/phpstan analyze --level=5 ../src # -c ../phpstan.neon

cd $(dirname $0)/../
./vendor/bin/phpstan analyze --level=5 src
6 changes: 6 additions & 0 deletions phpstan-baseline.neon
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
parameters:
ignoreErrors:
-
message: "#^Dead catch \\- PhpAmqpLib\\\\Exception\\\\AMQPProtocolChannelException is never thrown in the try block\\.$#"
count: 3
path: src/Drivers/PhpAmqpLibAmqpDriver.php
2 changes: 2 additions & 0 deletions phpstan.neon
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
includes:
- phpstan-baseline.neon
82 changes: 77 additions & 5 deletions src/Drivers/PhpAmqpLibAmqpDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
namespace Smuuf\CeleryForPhp\Drivers;

use PhpAmqpLib\Wire\AMQPTable;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Connection\AbstractConnection as PhpAmqpLibAbstractConnection;
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
Expand All @@ -19,12 +18,35 @@ class PhpAmqpLibAmqpDriver implements IAmqpDriver {

use StrictObject;

private ?AMQPChannel $channel = null;

public function __construct(
private PhpAmqpLibAbstractConnection $amqp,
) {}

/**
* Publishes a message to an AMQP queue and/or exchange.
*
* This method declares an exchange and a queue (if a routing key is provided),
* and then publishes a message to the specified exchange. If a routing key is
* provided, a 'direct' exchange is declared, and the queue is bound to this
* exchange using the routing key. If no routing key is provided, a 'fanout'
* exchange is declared.
*
* The message, along with additional properties and headers, is then published
* to the exchange. This method handles both scenarios of having and not having
* a routing key.
*
* @param string $queue The name of the queue to declare and to which the message
* will be published if a routing key is provided.
* @param string $exchange The name of the exchange to declare and to which the
* message will be published.
* @param string $routingKey The routing key for binding the queue to the exchange
* and for publishing the message. If empty, a 'fanout'
* exchange is used.
* @param string $message The message to be published.
* @param array $properties Additional properties for the message, such as content type,
* delivery mode, etc.
* @param array $headers Headers to be included in the message.
*/
public function publish(
string $queue,
string $exchange,
Expand Down Expand Up @@ -72,6 +94,19 @@ public function publish(

}

/**
* Deletes a specified AMQP exchange.
*
* Attempts to delete the exchange with the given name. If the exchange does not exist,
* which is identified by an AMQPProtocolChannelException with a 404 error code,
* the exception is caught and ignored. Any other types of
* AMQPProtocolChannelException are re-thrown.
*
* @param string $exchange The name of the exchange to be deleted.
*
* @throws \PhpAmqpLib\Exception\AMQPProtocolChannelException If an AMQP protocol error occurs,
* other than a non-existent exchange (404 error).
*/
public function deleteExchange(string $exchange): void {
$ch = $this->amqp->channel();

Expand All @@ -87,6 +122,19 @@ public function deleteExchange(string $exchange): void {

}

/**
* Deletes a specified AMQP queue.
*
* Attempts to delete the queue with the given name. If the queue does not exist,
* which is identified by an AMQPProtocolChannelException with a 404 error code,
* the exception is caught and ignored. Any other types of
* AMQPProtocolChannelException are re-thrown.
*
* @param string $queueName The name of the queue to be deleted.
*
* @throws \PhpAmqpLib\Exception\AMQPProtocolChannelException If an AMQP protocol error occurs,
* other than a non-existent queue (404 error).
*/
public function deleteQueue(string $queueName): void {
$ch = $this->amqp->channel();

Expand All @@ -102,13 +150,26 @@ public function deleteQueue(string $queueName): void {

}

/**
* Purges all messages from the specified queue.
*
* This method attempts to purge a queue with the given name. If the queue
* does not exist (identified by the 404 code), the exception is caught and
* ignored. For other types of AMQPProtocolChannelException, the exception
* is re-thrown.
*
* @param string $queueName The name of the queue to be purged.
*
* @throws \PhpAmqpLib\Exception\AMQPProtocolChannelException If an AMQP protocol error occurs,
* other than a non-existent queue (404 error).
*/
public function purgeQueue(string $queueName): void {
$ch = $this->amqp->channel();

try {
$ch->queue_purge($queueName);
} catch (AMQPProtocolChannelException $e) {
if ($e->getCode() !== 404) { // ignore queue 404
} catch (\PhpAmqpLib\Exception\AMQPProtocolChannelException $e) {
if ($e->getCode() !== 404) {
throw $e;
}
}
Expand All @@ -117,6 +178,17 @@ public function purgeQueue(string $queueName): void {

}

/**
* Retrieves the number of messages in a specified AMQP queue.
*
* This method declares a passive, durable queue with the given name to check its existence
* and obtain the current message count. It returns the number of messages currently in the queue.
* The method uses a passive declaration to ensure that it does not modify or create the queue.
*
* @param string $queueName The name of the queue for which the message count is required.
*
* @return int The number of messages currently in the specified queue.
*/
public function queueLength(string $queueName): int {

$ch = $this->amqp->channel();
Expand Down

0 comments on commit add24f8

Please sign in to comment.