This is a middleware for Tactician to integrate it with pmg/queue.
Install with composer.
composer require pmg/queue-tactician
To use it, add the middleware to your middleware chain sometime before the default command handler middleware.
use League\Tactician\CommandBus;
use League\Tactician\Handler\CommandHandlerMiddleware;
use PMG\Queue\Producer;
use PMG\Queue\Tactician\QueueingMiddleware;
/** @var Producer */
$producer = createAQueueProducerSomehow();
$bus = new CommandBus([
new QueueingMiddleware($producer),
new CommandHandlerMiddleware(/*...*/),
]);
Any command that implements PMG\Queue\Message
will be put into the queue via
the producer and no further middlewares will be called.
use PMG\Queue\Message;
final class DoLongRunningStuff implements Message
{
/**
* {@inheritdoc}
*/
public function getName()
{
return 'LongRunningStuff';
}
}
// goes right into the queue
$bus->handle(new DoLongRunningStuff());
To use tactician to process the messages via the consumer, use
PMG\Queue\Handler\TacticianHandler
.
use PMG\Queue\DefaultConsumer;
use PMG\Queue\Handler\TacticianHandler;
/** @var League\Tactician\CommandBus $bus */
$handler = new TacticianHandler($bus);
/** @var PMG\Queue\Driver $driver */
$consumer = new DefaultConsumer($driver, $handler);
$consumer->run();
The above assumes that the CommandBus
instance still has the
QueueingMiddleware
installed. If not, you'll need to use your own handler that
invokes the command bus, perhaps via CallableHandler
.
use League\Tactician\CommandBus;
use League\Tactician\Handler\CommandHandlerMiddleware;
use PMG\Queue\DefaultConsumer;
use PMG\Queue\Message;
use PMG\Queue\Handler\CallableHandler;
// no QueueingMiddleware!
$differentBus = new CommandBus([
new CommandHandlerMiddleware(/*...*/),
]);
$handler = new CallableHandler([$bus, 'handle']);
/** @var PMG\Queue\Driver $driver */
$consumer = new DefaultConsumer($driver, $handler);
$consumer->run();
The shared instance of the command bus means that it's very likely that things like open database connections will cause issues if/when a child press is forked to handle messages.
Instead a better bet is to create a new command bus for each message.
CreatingTacticianHandler
can do that for you.
use League\Tactician\CommandBus;
use League\Tactician\Handler\CommandHandlerMiddleware;
use PMG\Queue\Message;
use PMG\Queue\Handler\CallableHandler;
use PMG\Queue\Tactician\QueuedCommand;
use PMG\Queue\Tactician\QueueingMiddleware;
use PMG\Queue\Handler\CreatingTacticianHandler;
$handler = new CreatingTacticianHandler(function () {
// this is invoked for every message
return new CommandBus([
new QueueingMiddleware(createAProduerSomehow()),
new CommandHandlerMiddlware(/* ... */)
]);
});
/** @var PMG\Queue\Driver $driver */
$consumer = new DefaultConsumer($driver, $handler);
$consumer->run();