Skip to content

Commit

Permalink
Add integration test and make it pass
Browse files Browse the repository at this point in the history
  • Loading branch information
fabianaromagnoli committed Nov 7, 2024
1 parent 326b9a3 commit cf0dd8a
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 34 deletions.
43 changes: 24 additions & 19 deletions src/Service/QueueManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -232,29 +232,29 @@ private function processBatch(): \Generator
{
$this->logger->debug('Processing batch');
$result = yield $this->elasticSearch->bulkIndexJobs($this->batch, $this->flowConfig->getTube());
$successfullyIndexedJobs = $this->batch;
$notIndexedJobs = [];

if ($result['errors'] === true) {
$successfullyIndexedJobs = array_filter($this->batch, function ($job) use ($result) {
return $result['items'][$job->getUuid()]['index']['status'] === 201;
});

$notIndexedJobs = array_filter($this->batch, function ($job) use ($result) {
return $result['items'][$job->getUuid()]['index']['status'] !== 201;
});
}

foreach ($notIndexedJobs as $singleJob) {
$this->logger->error(
sprintf(
'Job with UUID "%s" could not be indexed in ElasticSearch',
$singleJob->getUuid()
)
);
foreach ($result['items'] as $item) {
if (!array_key_exists('index', $item)) {
$this->logger->error(
'Unexpected response item in bulk index response',
['bulk_index_response_item' => $item]
);
continue;
}
$itemStatusCode = $item['index']['status'] ?? null;
if (!$this->isSuccessfulStatusCode($itemStatusCode)) {
$uuid = $item['index']['_id'];
unset($this->batch[$uuid]);
$this->logger->error(
'Job could not be indexed in ElasticSearch',
['bulk_index_response_item' => $item]
);
}
}
}

foreach ($successfullyIndexedJobs as $singleJob) {
foreach ($this->batch as $singleJob) {
yield $this->beanstalkClient->put(
$singleJob->getUuid(),
$singleJob->getTimeout(),
Expand Down Expand Up @@ -288,4 +288,9 @@ private function getJobBeanstalkId(JobInterface $job): int

throw new \RuntimeException("Unknown Beanstalk id for job {$uuid}");
}

public function isSuccessfulStatusCode(?int $statusCode): bool
{
return $statusCode !== null && $statusCode >= 200 && $statusCode < 300;
}
}
7 changes: 6 additions & 1 deletion tests/DummyFilesystemRepeatProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,12 @@ public function produce($data = null): Iterator
continue;
}
yield $this->longRunningOperation();
yield $emit(new Job(['file' => $file, 'data' => (yield File\read($file))]));
$fileContent = yield File\read($file);
$fileContentAsArray = json_decode($fileContent, true);
$payloadData = is_array($fileContentAsArray) ?
$fileContentAsArray :
['file' => $file, 'data' => $fileContent];
yield $emit(new Job($payloadData));
yield \Amp\File\deleteFile($file);
}
});
Expand Down
18 changes: 4 additions & 14 deletions tests/Integration/ElasticSearchIndexingTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,10 @@
use Webgriffe\Esb\KernelTestCase;
use Webgriffe\Esb\Model\ErroredJobEvent;
use Webgriffe\Esb\Model\Job;
use Webgriffe\Esb\Model\JobEventInterface;
use Webgriffe\Esb\Model\ProducedJobEvent;
use Webgriffe\Esb\Model\ReservedJobEvent;
use Webgriffe\Esb\Model\WorkedJobEvent;
use Webgriffe\Esb\TestUtils;
use Webgriffe\Esb\Unit\Model\DummyJobEvent;
use function Amp\Http\formatDateHeader;

class ElasticSearchIndexingTest extends KernelTestCase
{
Expand Down Expand Up @@ -199,16 +196,9 @@ public function itLogsAndSkipsJobsThatCouldNotBeIndexedOntoElasticSearchWithAllE
Loop::delay(
200,
function () use ($producerDir) {
touch($producerDir . DIRECTORY_SEPARATOR . 'job1');
// TODO: It needs to become a document with more than 1000 fields
$veryLargeDocument = 'TODO';
$veryLargeDocument = json_encode(array_fill_keys(range(1, 1001), 'value'));
file_put_contents($producerDir . DIRECTORY_SEPARATOR . 'job1', $veryLargeDocument);
Loop::delay(
200,
function () use ($producerDir) {
touch($producerDir . DIRECTORY_SEPARATOR . 'job2');
}
);
touch($producerDir . DIRECTORY_SEPARATOR . 'job2');
}
);
$this->stopWhen(function () {
Expand All @@ -224,8 +214,8 @@ function ($log) {

Promise\wait($this->esClient->refresh());
$search = Promise\wait($this->esClient->uriSearchOneIndex(self::FLOW_CODE, ''));
$this->assertCount(1, $search['hits']['hits']); // TODO: Make it green
// TODO: Add assertions on logs
$this->assertCount(1, $search['hits']['hits']);
$this->assertTrue($this->logHandler()->hasErrorThatContains('Job could not be indexed in ElasticSearch'));
}

private function assertForEachJob(callable $callable, array $jobsData)
Expand Down

0 comments on commit cf0dd8a

Please sign in to comment.