Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/improvement/UTAPI-97/reindex_onl…
Browse files Browse the repository at this point in the history
…y_latest_for_olock_buckets_option' into w/7.70/improvement/UTAPI-97/reindex_only_latest_for_olock_buckets_option
  • Loading branch information
tmacro committed Dec 11, 2023
2 parents 9341366 + b939981 commit e8882a2
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 42 deletions.
7 changes: 7 additions & 0 deletions lib/Config.js
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,13 @@ class Config {
+ 'expireMetrics must be a boolean');
this.expireMetrics = config.expireMetrics;
}

if (config.onlyCountLatestWhenObjectLocked !== undefined) {
assert(typeof config.onlyCountLatestWhenObjectLocked === 'boolean',
'bad config: onlyCountLatestWhenObjectLocked must be a boolean');
this.onlyCountLatestWhenObjectLocked = config.onlyCountLatestWhenObjectLocked;
}

return config;
}
}
Expand Down
7 changes: 7 additions & 0 deletions lib/UtapiReindex.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class UtapiReindex {
this._log = new werelogs.Logger('UtapiReindex', { level, dump });
}

this._onlyCountLatestWhenObjectLocked = (config && config.onlyCountLatestWhenObjectLocked === true);

this._requestLogger = this._log.newRequestLogger();
}

Expand Down Expand Up @@ -97,6 +99,7 @@ class UtapiReindex {
if (this._sentinel.sentinelPassword) {
flags.redis_password = this._sentinel.sentinelPassword;
}

/* eslint-enable camelcase */
const opts = [];
Object.keys(flags)
Expand All @@ -105,6 +108,10 @@ class UtapiReindex {
opts.push(name);
opts.push(flags[flag]);
});

if (this._onlyCountLatestWhenObjectLocked) {
opts.push('--only-latest-when-locked');
}
return opts;
}

Expand Down
125 changes: 84 additions & 41 deletions lib/reindex/s3_bucketd.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import urllib
from collections import defaultdict, namedtuple
from concurrent.futures import ThreadPoolExecutor
from pprint import pprint

import redis
import requests
Expand All @@ -34,6 +33,8 @@ def get_options():
parser.add_argument("-w", "--worker", default=10, type=int, help="Number of workers")
parser.add_argument("-b", "--bucket", default=None, help="Bucket to be processed")
parser.add_argument("-r", "--max-retries", default=2, type=int, help="Max retries before failing a bucketd request")
parser.add_argument("--only-latest-when-locked", action='store_true', help="Only index the latest version of a key when the bucket has a default object lock policy")
parser.add_argument("--debug", action='store_true', help="Enable debug logging")
return parser.parse_args()

def chunks(iterable, size):
Expand All @@ -49,7 +50,7 @@ def inner(*args, **kwargs):
return urllib.parse.quote(val.encode('utf-8'))
return inner

Bucket = namedtuple('Bucket', ['userid', 'name'])
Bucket = namedtuple('Bucket', ['userid', 'name', 'object_lock_enabled'], defaults=[False])
MPU = namedtuple('MPU', ['bucket', 'key', 'upload_id'])
BucketContents = namedtuple('BucketContents', ['bucket', 'obj_count', 'total_size'])

Expand All @@ -64,12 +65,14 @@ def __init__(self, bucket):
class BucketDClient:

'''Performs Listing calls against bucketd'''
__url_format = '{addr}/default/bucket/{bucket}'
__url_attribute_format = '{addr}/default/attributes/{bucket}'
__url_bucket_format = '{addr}/default/bucket/{bucket}'
__headers = {"x-scal-request-uids": "utapi-reindex-list-buckets"}

def __init__(self, bucketd_addr=None, max_retries=2):
def __init__(self, bucketd_addr=None, max_retries=2, only_latest_when_locked=False):
self._bucketd_addr = bucketd_addr
self._max_retries = max_retries
self._only_latest_when_locked = only_latest_when_locked
self._session = requests.Session()

def _do_req(self, url, check_500=True, **kwargs):
Expand Down Expand Up @@ -101,7 +104,7 @@ def _list_bucket(self, bucket, **kwargs):
parameters value. On the first request the function will be called with
`None` and should return its initial value. Return `None` for the param to be excluded.
'''
url = self.__url_format.format(addr=self._bucketd_addr, bucket=bucket)
url = self.__url_bucket_format.format(addr=self._bucketd_addr, bucket=bucket)
static_params = {k: v for k, v in kwargs.items() if not callable(v)}
dynamic_params = {k: v for k, v in kwargs.items() if callable(v)}
is_truncated = True # Set to True for first loop
Expand All @@ -114,6 +117,9 @@ def _list_bucket(self, bucket, **kwargs):
_log.debug('listing bucket bucket: %s params: %s'%(
bucket, ', '.join('%s=%s'%p for p in params.items())))
resp = self._do_req(url, params=params)
if resp.status_code == 404:
_log.debug('Bucket not found bucket: %s'%bucket)
return
if resp.status_code == 200:
payload = resp.json()
except ValueError as e:
Expand All @@ -135,6 +141,27 @@ def _list_bucket(self, bucket, **kwargs):
else:
is_truncated = len(payload) > 0

def _get_bucket_attributes(self, name):
url = self.__url_attribute_format.format(addr=self._bucketd_addr, bucket=name)
try:
resp = self._do_req(url)
if resp.status_code == 200:
return resp.json()
else:
_log.error('Error getting bucket attributes bucket:%s status_code:%s'%(name, resp.status_code))
raise InvalidListing(name)
except ValueError as e:
_log.exception(e)
_log.error('Invalid attributes response body! bucket:%s'%name)
raise
except MaxRetriesReached:
_log.error('Max retries reached getting bucket attributes bucket:%s'%name)
raise
except Exception as e:
_log.exception(e)
_log.error('Unhandled exception getting bucket attributes bucket:%s'%name)
raise

def list_buckets(self, name = None):

def get_next_marker(p):
Expand All @@ -149,11 +176,17 @@ def get_next_marker(p):
}
for _, payload in self._list_bucket(USERS_BUCKET, **params):
buckets = []
for result in payload['Contents']:
for result in payload.get('Contents', []):
match = re.match("(\w+)..\|..(\w+.*)", result['key'])
bucket = Bucket(*match.groups())
if name is None or bucket.name == name:
# We need to get the attributes for each bucket to determine if it is locked
if self._only_latest_when_locked:
bucket_attrs = self._get_bucket_attributes(bucket.name)
object_lock_enabled = bucket_attrs.get('objectLockEnabled', False)
bucket = bucket._replace(object_lock_enabled=object_lock_enabled)
buckets.append(bucket)

if buckets:
yield buckets
if name is not None:
Expand Down Expand Up @@ -196,18 +229,12 @@ def get_next_upload_id(p):
upload_id=key['value']['UploadId']))
return keys

def _sum_objects(self, bucket, listing):
def _sum_objects(self, bucket, listing, only_latest_when_locked = False):
count = 0
total_size = 0
last_master = None
last_size = None
for status_code, payload in listing:
contents = payload['Contents'] if isinstance(payload, dict) else payload
if contents is None:
_log.error('Invalid contents in listing. bucket:%s status_code:%s'%(bucket, status_code))
raise InvalidListing(bucket)
for obj in contents:
count += 1
last_key = None
try:
for obj in listing:
if isinstance(obj['value'], dict):
# bucketd v6 returns a dict:
data = obj.get('value', {})
Expand All @@ -216,47 +243,60 @@ def _sum_objects(self, bucket, listing):
# bucketd v7 returns an encoded string
data = json.loads(obj['value'])
size = data.get('content-length', 0)
total_size += size

# If versioned, subtract the size of the master to avoid double counting
if last_master is not None and obj['key'].startswith(last_master + '\x00'):
_log.debug('Detected versioned key: %s - subtracting master size: %i'% (
obj['key'],
last_size,
))
total_size -= last_size
count -= 1
last_master = None

# Only save master versions
elif '\x00' not in obj['key']:
last_master = obj['key']
last_size = size
is_latest = obj['key'] != last_key
last_key = obj['key']

if only_latest_when_locked and bucket.object_lock_enabled and not is_latest:
_log.debug('Skipping versioned key: %s'%obj['key'])
continue

count += 1
total_size += size

except InvalidListing:
_log.error('Invalid contents in listing. bucket:%s status_code:%s'%(bucket.name, status_code))
raise InvalidListing(bucket.name)
return count, total_size

def _extract_listing(self, key, listing):
for status_code, payload in listing:
contents = payload[key] if isinstance(payload, dict) else payload
if contents is None:
raise InvalidListing('')
for obj in contents:
yield obj

def count_bucket_contents(self, bucket):

def get_next_marker(p):
if p is None or len(p) == 0:
def get_key_marker(p):
if p is None:
return ''
return p.get('NextKeyMarker', '')

def get_vid_marker(p):
if p is None:
return ''
return p[-1].get('key', '')
return p.get('NextVersionIdMarker', '')

params = {
'listingType': 'Basic',
'listingType': 'DelimiterVersions',
'maxKeys': 1000,
'gt': get_next_marker,
'keyMarker': get_key_marker,
'versionIdMarker': get_vid_marker,
}

count, total_size = self._sum_objects(bucket.name, self._list_bucket(bucket.name, **params))
listing = self._list_bucket(bucket.name, **params)
count, total_size = self._sum_objects(bucket, self._extract_listing('Versions', listing), self._only_latest_when_locked)
return BucketContents(
bucket=bucket,
obj_count=count,
total_size=total_size
)

def count_mpu_parts(self, mpu):
_bucket = MPU_SHADOW_BUCKET_PREFIX + mpu.bucket.name
shadow_bucket_name = MPU_SHADOW_BUCKET_PREFIX + mpu.bucket.name
shadow_bucket = mpu.bucket._replace(name=shadow_bucket_name)

def get_prefix(p):
if p is None:
Expand All @@ -276,9 +316,10 @@ def get_next_marker(p):
'listingType': 'Delimiter',
}

count, total_size = self._sum_objects(_bucket, self._list_bucket(_bucket, **params))
listing = self._list_bucket(shadow_bucket_name, **params)
count, total_size = self._sum_objects(shadow_bucket, self._extract_listing('Contents', listing))
return BucketContents(
bucket=mpu.bucket._replace(name=_bucket),
bucket=shadow_bucket,
obj_count=0, # MPU parts are not counted towards numberOfObjects
total_size=total_size
)
Expand Down Expand Up @@ -361,7 +402,9 @@ def log_report(resource, name, obj_count, total_size):
if options.bucket is not None and not options.bucket.strip():
print('You must provide a bucket name with the --bucket flag')
sys.exit(1)
bucket_client = BucketDClient(options.bucketd_addr, options.max_retries)
if options.debug:
_log.setLevel(logging.DEBUG)
bucket_client = BucketDClient(options.bucketd_addr, options.max_retries, options.only_latest_when_locked)
redis_client = get_redis_client(options)
account_reports = {}
observed_buckets = set()
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"engines": {
"node": ">=16"
},
"version": "7.70.2",
"version": "7.70.3",
"description": "API for tracking resource utilization and reporting metrics",
"main": "index.js",
"repository": {
Expand Down
13 changes: 13 additions & 0 deletions tests/utils/mock/BucketD.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,17 @@ class BucketD {
return body;
}

_getBucketVersionResponse(bucketName) {
const body = {
CommonPrefixes: [],
IsTruncated: false,
Versions: (this._bucketContent[bucketName] || [])
// patch in a versionId to more closely match the real response
.map(entry => ({ ...entry, versionId: 'null' })),
};
return body;
}

_getShadowBucketOverviewResponse(bucketName) {
const mpus = (this._bucketContent[bucketName] || []).map(o => ({
key: o.key,
Expand All @@ -137,6 +148,8 @@ class BucketD {
|| req.query.listingType === 'Delimiter'
) {
req.body = this._getBucketResponse(bucketName);
} else if (req.query.listingType === 'DelimiterVersions') {
req.body = this._getBucketVersionResponse(bucketName);
}

// v2 reindex uses `Basic` listing type for everything
Expand Down

0 comments on commit e8882a2

Please sign in to comment.