diff --git a/lib/reindex/s3_bucketd.py b/lib/reindex/s3_bucketd.py index 9f26b47d..21dfb848 100644 --- a/lib/reindex/s3_bucketd.py +++ b/lib/reindex/s3_bucketd.py @@ -39,9 +39,9 @@ def get_options(): parser.add_argument("--dry-run", action="store_true", help="Do not update redis") group = parser.add_mutually_exclusive_group() group.add_argument("-a", "--account", default=[], help="account canonical ID (all account buckets will be processed)", action="append", type=nonempty_string('account')) - group.add_argument("--account-file", default=None, help="file containing account canonical IDs", type=existing_file) + group.add_argument("--account-file", default=None, help="file containing account canonical IDs, one ID per line", type=existing_file) group.add_argument("-b", "--bucket", default=[], help="bucket name", action="append", type=nonempty_string('bucket')) - group.add_argument("--bucket-file", default=None, help="file containing bucket names", type=existing_file) + group.add_argument("--bucket-file", default=None, help="file containing bucket names, one bucket name per line", type=existing_file) options = parser.parse_args() if options.bucket_file: @@ -63,7 +63,7 @@ def inner(value): def existing_file(path): path = Path(path).resolve() if not path.exists(): - raise argparse.ArgumentTypeError("File does not exist") + raise argparse.ArgumentTypeError("File does not exist: %s"%path) return path def chunks(iterable, size): @@ -204,7 +204,7 @@ def get_bucket_md(self, name): raise InvalidListing(name) return Bucket(canonId, name, md.get('objectLockEnabled', False)) - def list_buckets(self, names = None, account = None): + def list_buckets(self, account=None): def get_next_marker(p): if p is None: @@ -220,29 +220,20 @@ def get_next_marker(p): if account is not None: params['prefix'] = '%s..|..' % account - seen_buckets = set() - for _, payload in self._list_bucket(USERS_BUCKET, **params): buckets = [] for result in payload.get('Contents', []): match = re.match("(\w+)..\|..(\w+.*)", result['key']) bucket = Bucket(*match.groups(), False) - if names is None or bucket.name in names: - # We need to get the attributes for each bucket to determine if it is locked - if self._only_latest_when_locked: - bucket_attrs = self._get_bucket_attributes(bucket.name) - object_lock_enabled = bucket_attrs.get('objectLockEnabled', False) - bucket = bucket._replace(object_lock_enabled=object_lock_enabled) - buckets.append(bucket) + # We need to get the attributes for each bucket to determine if it is locked + if self._only_latest_when_locked: + bucket_attrs = self._get_bucket_attributes(bucket.name) + object_lock_enabled = bucket_attrs.get('objectLockEnabled', False) + bucket = bucket._replace(object_lock_enabled=object_lock_enabled) + buckets.append(bucket) if buckets: yield buckets - if names: - seen_buckets.update(b.name for b in buckets) - # Break if we have seen all the buckets we are looking for - if all(b in seen_buckets for b in names): - break - def list_mpus(self, bucket): _bucket = MPU_SHADOW_BUCKET_PREFIX + bucket.name @@ -305,7 +296,7 @@ def _sum_objects(self, bucket, listing, only_latest_when_locked = False): total_size += size except InvalidListing: - _log.error('Invalid contents in listing. bucket:%s status_code:%s'%(bucket.name, status_code)) + _log.error('Invalid contents in listing. bucket:%s'%bucket.name) raise InvalidListing(bucket.name) return count, total_size @@ -504,8 +495,8 @@ def log_report(resource, name, obj_count, total_size): # Bucket reports can be updated as we get them if options.dry_run: for bucket, report in bucket_reports.items(): - _log.warning( - "DryRun: resource buckets [%s] will be not updated with obj_count %i and total_size %i" % ( + _log.info( + "DryRun: resource buckets [%s] would be updated with obj_count %i and total_size %i" % ( bucket, report['obj_count'], report['total_size'] ) ) @@ -527,7 +518,7 @@ def log_report(resource, name, obj_count, total_size): _log.info('Found %s stale buckets' % len(stale_buckets)) if options.dry_run: - _log.warning("DryRun: not updating stale buckets") + _log.info("DryRun: not updating stale buckets") else: for chunk in chunks(stale_buckets, ACCOUNT_UPDATE_CHUNKSIZE): pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load @@ -544,8 +535,8 @@ def log_report(resource, name, obj_count, total_size): without_failed = filter(lambda x: x[0] not in failed_accounts, account_reports.items()) if options.dry_run: for userid, report in account_reports.items(): - _log.warning( - "DryRun: resource account [%s] will be not updated with obj_count %i and total_size %i" % ( + _log.info( + "DryRun: resource account [%s] would be updated with obj_count %i and total_size %i" % ( userid, report['obj_count'], report['total_size'] ) ) @@ -561,7 +552,7 @@ def log_report(resource, name, obj_count, total_size): if options.account: for account in options.account: if account in failed_accounts: - _log.error("No metrics updated for %s, one or more buckets failed" % account) + _log.error("No metrics updated for account %s, one or more buckets failed" % account) # Include failed_accounts in observed_accounts to avoid clearing metrics observed_accounts = failed_accounts.union(set(account_reports.keys())) @@ -575,7 +566,7 @@ def log_report(resource, name, obj_count, total_size): _log.info('Found %s stale accounts' % len(stale_accounts)) if options.dry_run: - _log.warning("DryRun: not updating stale accounts") + _log.info("DryRun: not updating stale accounts") else: for chunk in chunks(stale_accounts, ACCOUNT_UPDATE_CHUNKSIZE): pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load