Skip to content

Commit

Permalink
Optimize importing
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastian-meyer committed Jan 6, 2024
1 parent 00d4dc8 commit b0f8c49
Show file tree
Hide file tree
Showing 10 changed files with 217 additions and 103 deletions.
8 changes: 8 additions & 0 deletions src/Console/AddRecordCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@
)]
class AddRecordCommand extends Command
{
/**
* Executes the current command.
*
* @param InputInterface $input The input
* @param OutputInterface $output The output
*
* @return int 0 if everything went fine, or an error code
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
Database::getInstance()->pruneOrphanSets();
Expand Down
215 changes: 162 additions & 53 deletions src/Console/CsvImportCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@

use DateTime;
use OCC\OaiPmh2\Database;
use OCC\OaiPmh2\Database\Format;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Helper\ProgressIndicator;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
Expand All @@ -43,6 +45,11 @@
)]
class CsvImportCommand extends Command
{
/**
* Configures the current command.
*
* @return void
*/
protected function configure(): void
{
$this->addArgument(
Expand All @@ -61,106 +68,95 @@ function (): array {
);
$this->addOption(
'idColumn',
null,
'i',
InputOption::VALUE_OPTIONAL,
'Name of the CSV column which holds the records\' identifier.',
'identifier'
);
$this->addOption(
'contentColumn',
null,
'c',
InputOption::VALUE_OPTIONAL,
'Name of the CSV column which holds the records\' content.',
'content'
);
$this->addOption(
'dateColumn',
null,
'd',
InputOption::VALUE_OPTIONAL,
'Name of the CSV column which holds the records\' datetime of last change.',
'lastChanged'
);
$this->addOption(
'setColumn',
null,
's',
InputOption::VALUE_OPTIONAL,
'Name of the CSV column which holds the records\' sets list.',
'sets'
);
parent::configure();
}

/**
* Executes the current command.
*
* @param InputInterface $input The input
* @param OutputInterface $output The output
*
* @return int 0 if everything went fine, or an error code
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
/** @var array<string, string> */
$arguments = $input->getArguments();
/** @var array<string, string> */
$options = $input->getOptions();

$formats = Database::getInstance()->getMetadataFormats()->getQueryResult();
if (!in_array($arguments['format'], array_keys($formats), true)) {
// Error: Invalid metadata prefix
echo 1;
if (!$this->validateInput($input, $output)) {
return Command::INVALID;
}
$memoryLimit = $this->getMemoryLimit();

/** @var array<string, string> */
$arguments = $input->getArguments();
/** @var Format */
$format = Database::getInstance()->getEntityManager()->getReference(Format::class, $arguments['format']);
/** @var resource */
$file = fopen($arguments['file'], 'r');
if ($file === false) {
// Error: File not found or not readable
echo 2;
return Command::INVALID;
}

$headers = fgetcsv($file);
if (!is_array($headers)) {
// Error: No CSV
echo 3;
return Command::INVALID;
} else {
$headers = array_flip($headers);
}

$column = [];
foreach ($options as $option => $value) {
if (isset($headers[$value])) {
$column[$option] = $headers[$value];
}
}
if (!isset($column['idColumn']) || !isset($column['contentColumn'])) {
// Error: Required columns missing
echo 4;
$columns = $this->getColumnNames($input, $output, $file);
if (count($columns) === 0) {
return Command::INVALID;
}
$lastChanged = new DateTime();

$count = 0;
$progressIndicator = new ProgressIndicator($output, 'verbose', 200, ['', '', '', '', '', '', '', '']);
$progressIndicator->start('Importing...');

while ($record = fgetcsv($file)) {
$identifier = $record[$column['idColumn']];
$content = $record[$column['contentColumn']];
if ($content === '') {
$content = null;
}
if (isset($column['dateColumn'])) {
$lastChanged = new DateTime($record[$column['dateColumn']]);
}
// TODO: Complete support for sets.
$sets = null;
Database::getInstance()->addOrUpdateRecord(
$identifier,
$arguments['format'],
$content,
$lastChanged,
$sets,
$record[$columns['idColumn']],
$format,
trim($record[$columns['contentColumn']]),
new DateTime($record[$columns['dateColumn']] ?? 'now'),
// TODO: Complete support for sets.
/* $record[$columns['setColumn']] ?? */ null,
true
);

++$count;
if ($count % 500 === 0) {
$progressIndicator->advance();
$progressIndicator->setMessage((string) $count . ' done.');

// Flush to database if memory usage reaches 90% of available limit.
if (memory_get_usage() / $memoryLimit > 0.9) {
Database::getInstance()->flush(true);
/** @var Format */
$format = Database::getInstance()->getEntityManager()->getReference(Format::class, $arguments['format']);
}
}
Database::getInstance()->flush(true);
Database::getInstance()->pruneOrphanSets();

$progressIndicator->finish('All done!');

fclose($file);

$output->writeln([
'',
sprintf(
Expand All @@ -172,4 +168,117 @@ protected function execute(InputInterface $input, OutputInterface $output): int
]);
return Command::SUCCESS;
}

/**
* Get the column names of CSV.
*
* @param InputInterface $input The inputs
* @param OutputInterface $output The output interface
* @param resource $file The handle for the CSV file
*
* @return array<string, int|string> The mapped column names
*/
protected function getColumnNames(InputInterface $input, OutputInterface $output, $file): array
{
/** @var array<string, string> */
$options = $input->getOptions();

$columns = [];

$headers = fgetcsv($file);
if (!is_array($headers)) {
$output->writeln([
'',
sprintf(
' [ERROR] File "%s" does not contain valid CSV. ',
stream_get_meta_data($file)['uri']
),
''
]);
return [];
} else {
$headers = array_flip($headers);
}
foreach ($options as $option => $value) {
if (isset($headers[$value])) {
$columns[$option] = $headers[$value];
}
}

if (!isset($columns['idColumn']) || !isset($columns['contentColumn'])) {
$output->writeln([
'',
sprintf(
' [ERROR] File "%s" does not contain valid CSV. ',
stream_get_meta_data($file)['uri']
),
''
]);
return [];
}
return $columns;
}

/**
* Get the PHP memory limit in bytes.
*
* @return int The memory limit in bytes or -1 if unlimited
*/
protected function getMemoryLimit(): int
{
$ini = trim(ini_get('memory_limit'));
$limit = (int) $ini;
$unit = strtolower($ini[strlen($ini)-1]);
switch($unit) {
case 'g':
$limit *= 1024;
case 'm':
$limit *= 1024;
case 'k':
$limit *= 1024;
}
if ($limit < 0) {
return -1;
}
return $limit;
}

/**
* Validate input arguments.
*
* @param InputInterface $input The inputs
* @param OutputInterface $output The output interface
*
* @return bool Whether the inputs validate
*/
protected function validateInput(InputInterface $input, OutputInterface $output): bool
{
/** @var array<string, string> */
$arguments = $input->getArguments();

$formats = Database::getInstance()->getMetadataFormats()->getQueryResult();
if (!in_array($arguments['format'], array_keys($formats), true)) {
$output->writeln([
'',
sprintf(
' [ERROR] Metadata format "%s" is not supported. ',
$arguments['format']
),
''
]);
return false;
}
if (!is_readable($arguments['file'])) {
$output->writeln([
'',
sprintf(
' [ERROR] File "%s" not found or not readable. ',
$arguments['file']
),
''
]);
return false;
}
return true;
}
}
8 changes: 8 additions & 0 deletions src/Console/DeleteRecordCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@
)]
class DeleteRecordCommand extends Command
{
/**
* Executes the current command.
*
* @param InputInterface $input The input
* @param OutputInterface $output The output
*
* @return int 0 if everything went fine, or an error code
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
$policy = Configuration::getInstance()->deletedRecords;
Expand Down
15 changes: 14 additions & 1 deletion src/Console/PruneRecordsCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,30 @@
)]
class PruneRecordsCommand extends Command
{
/**
* Configures the current command.
*
* @return void
*/
protected function configure(): void
{
$this->addOption(
'force',
null,
'f',
InputOption::VALUE_NONE,
'Deletes records even under "transient" policy.'
);
parent::configure();
}

/**
* Executes the current command.
*
* @param InputInterface $input The input
* @param OutputInterface $output The output
*
* @return int 0 if everything went fine, or an error code
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
$policy = Configuration::getInstance()->deletedRecords;
Expand Down
8 changes: 8 additions & 0 deletions src/Console/PruneResumptionTokensCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@
)]
class PruneResumptionTokensCommand extends Command
{
/**
* Executes the current command.
*
* @param InputInterface $input The input
* @param OutputInterface $output The output
*
* @return int 0 if everything went fine, or an error code
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
$expired = Database::getInstance()->pruneResumptionTokens();
Expand Down
8 changes: 8 additions & 0 deletions src/Console/UpdateFormatsCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@
)]
class UpdateFormatsCommand extends Command
{
/**
* Executes the current command.
*
* @param InputInterface $input The input
* @param OutputInterface $output The output
*
* @return int 0 if everything went fine, or an error code
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
$formats = Configuration::getInstance()->metadataPrefix;
Expand Down
Loading

0 comments on commit b0f8c49

Please sign in to comment.