Skip to content

Commit

Permalink
Fix errored job removal from batch
Browse files Browse the repository at this point in the history
  • Loading branch information
fabianaromagnoli committed Nov 7, 2024
1 parent b0b7956 commit 1f1abc1
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 10 deletions.
8 changes: 0 additions & 8 deletions src/ProducerInstance.php
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,6 @@ public function produceAndQueueJobs($data = null): Promise
$job = $jobs->getCurrent();
$job->addEvent(new ProducedJobEvent(new \DateTime(), \get_class($this->producer)));
$jobsCount += yield $this->queueManager->enqueue($job);
$this->logger->info(
'Successfully produced a new Job',
[
'producer' => \get_class($this->producer),
'job_uuid' => $job->getUuid(),
'payload_data' => NonUtf8Cleaner::clean($job->getPayloadData())
]
);
}

$jobsCount += yield $this->queueManager->flush();
Expand Down
11 changes: 10 additions & 1 deletion src/Service/QueueManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Webgriffe\Esb\Service;

use Amp\Beanstalk\BeanstalkClient;
use Webgriffe\Esb\NonUtf8Cleaner;
use function Amp\call;
use Amp\Promise;
use Psr\Log\LoggerInterface;
Expand Down Expand Up @@ -96,7 +97,7 @@ public function enqueue(JobInterface $job): Promise
)
);
}
$this->batch[] = $job;
$this->batch[$job->getUuid()] = $job;

$count = count($this->batch);
if ($count < $this->batchSize) {
Expand Down Expand Up @@ -248,6 +249,14 @@ private function processBatch(): \Generator
$singleJob->getDelay(),
$singleJob->getPriority()
);
$this->logger->info(
'Successfully enqueued a new Job',
[
'flow_name' => $this->flowConfig->getName(),
'job_uuid' => $singleJob->getUuid(),
'payload_data' => NonUtf8Cleaner::clean($singleJob->getPayloadData())
]
);
}

$this->batch = [];
Expand Down
10 changes: 9 additions & 1 deletion tests/Integration/ElasticSearchIndexingTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ function (Job $job) {
/**
* @test
*/
public function itLogsAndSkipsJobsThatCouldNotBeIndexedOntoElasticSearchWithAllEvents()
public function itLogsAndSkipsJobsThatCouldNotBeIndexedOntoElasticSearch()
{
$producerDir = vfsStream::url('root/producer_dir');
$workerFile = vfsStream::url('root/worker.data');
Expand Down Expand Up @@ -216,6 +216,14 @@ function ($log) {
$search = Promise\wait($this->esClient->uriSearchOneIndex(self::FLOW_CODE, ''));
$this->assertCount(1, $search['hits']['hits']);
$this->assertTrue($this->logHandler()->hasErrorThatContains('Job could not be indexed in ElasticSearch'));
$logRecords = $this->logHandler()->getRecords();
$successfullyIndexedLog = array_filter(
$logRecords,
function ($log) {
return strpos($log['message'], 'Successfully enqueued a new Job') !== false;
}
);
$this->assertCount(1, $successfullyIndexedLog);
}

private function assertForEachJob(callable $callable, array $jobsData)
Expand Down

0 comments on commit 1f1abc1

Please sign in to comment.