diff --git a/api/datalake_api/querier.py b/api/datalake_api/querier.py index b0b2bbd..54e78f6 100644 --- a/api/datalake_api/querier.py +++ b/api/datalake_api/querier.py @@ -173,9 +173,10 @@ def _unpack(self, result): class ArchiveQuerier(object): - def __init__(self, table_name, dynamodb=None): + def __init__(self, table_name, dynamodb=None, use_latest=None): self.table_name = table_name self.dynamodb = dynamodb + self.use_latest = use_latest def query_by_work_id(self, work_id, what, where=None, cursor=None): kwargs = self._prepare_work_id_kwargs(work_id, what) @@ -331,18 +332,25 @@ def _cursor_for_time_query(self, response, results, current_bucket): @memoized_property def _table(self): return self.dynamodb.Table(self.table_name) + + @memoized_property + def _latest_table(self): + return self.dynamodb.Table('test_latest') - def query_latest(self, what, where, lookback_days=DEFAULT_LOOKBACK_DAYS): - current = int(time.time() * 1000) - end = current - lookback_days * _ONE_DAY_MS - while current >= end: - bucket = current/DatalakeRecord.TIME_BUCKET_SIZE_IN_MS - r = self._get_latest_record_in_bucket(bucket, what, where) - if r is not None: - return r - current -= _ONE_DAY_MS + def query_latest(self, what, where, lookback_days=DEFAULT_LOOKBACK_DAYS, use_latest=False): + if self.use_latest: + response = self._latest_table.query( + KeyConditionExpression=Key('what_where_key').eq(f'{what}:{where}') + ) + items = response.get('Items', []) + if not items: + return self._default_latest(what, where, lookback_days) + + latest_item = items[0] + return dict(url=latest_item['url'], metadata=latest_item['metadata']) - return None + else: + return self._default_latest(what, where, lookback_days) def _get_latest_record_in_bucket(self, bucket, what, where): kwargs = self._prepare_time_bucket_kwargs(bucket, what) @@ -367,12 +375,14 @@ def _get_all_records_in_bucket(self, bucket, **kwargs): kwargs['ExclusiveStartKey'] = response['LastEvaluatedKey'] return records - def query_latest_table(self, what, where): - response = self._table.query( - KeyConditionExpression=Key('what_where_key').eq(f'{what}:{where}') - ) - items = response.get('Items', []) - if not items: - return None - latest_item = items[0] - return dict(url=latest_item['url'], metadata=latest_item['metadata']) \ No newline at end of file + def _default_latest(self, what, where, lookback_days=DEFAULT_LOOKBACK_DAYS): + current = int(time.time() * 1000) + end = current - lookback_days * _ONE_DAY_MS + while current >= end: + bucket = current/DatalakeRecord.TIME_BUCKET_SIZE_IN_MS + r = self._get_latest_record_in_bucket(bucket, what, where) + if r is not None: + return r + current -= _ONE_DAY_MS + + return None \ No newline at end of file diff --git a/api/tests/conftest.py b/api/tests/conftest.py index 5be377c..da0c9ec 100644 --- a/api/tests/conftest.py +++ b/api/tests/conftest.py @@ -177,7 +177,6 @@ def _create_table(dynamodb, def _populate_table(table, records): - print(f'attempting to populate {table}') with table.batch_writer() as batch: for r in records: batch.put_item(Item=r) @@ -187,26 +186,25 @@ def _populate_table(table, records): @pytest.fixture def table_maker(request, dynamodb): - def maker(records, include_latest_key=False): - old_table_name = 'test' + def maker(records, use_latest=False): + table_name = 'test' latest_table_name = 'test_latest' latest_table = None - old_table = _create_table(dynamodb, old_table_name, attribute_definitions, key_schema, global_secondary) - _populate_table(old_table, records) + table = _create_table(dynamodb, table_name, attribute_definitions, key_schema, global_secondary) - if include_latest_key: - latest_table = _create_table(dynamodb, latest_table_name, latest_attribute_definitions, latest_key_schema) - _populate_table(latest_table, records) + latest_table = _create_table(dynamodb, latest_table_name, latest_attribute_definitions, latest_key_schema) + _populate_table(latest_table, records) + + _populate_table(table, records) def tear_down(): - _delete_table(old_table) - if include_latest_key: - _delete_table(latest_table) + _delete_table(table) + _delete_table(latest_table) request.addfinalizer(tear_down) - return old_table, latest_table + return (table, latest_table) return maker @@ -214,7 +212,7 @@ def tear_down(): @pytest.fixture def record_maker(s3_file_from_metadata): - def maker(include_latest_key=False, **kwargs): + def maker(**kwargs): m = generate_random_metadata() m.update(**kwargs) key = '/'.join([str(v) for v in kwargs.values()]) @@ -222,11 +220,10 @@ def maker(include_latest_key=False, **kwargs): s3_file_from_metadata(url, m) records = DatalakeRecord.list_from_metadata(url, m) - if include_latest_key: - what = kwargs.get('what') - where = kwargs.get('where') - for record in records: - record['what_where_key'] = f"{what}:{where}" + what = kwargs.get('what') + where = kwargs.get('where') + for record in records: + record['what_where_key'] = f"{what}:{where}" return records diff --git a/api/tests/test_archive_querier.py b/api/tests/test_archive_querier.py index 32f12d6..da89cb2 100644 --- a/api/tests/test_archive_querier.py +++ b/api/tests/test_archive_querier.py @@ -125,11 +125,8 @@ def query_latest(self, what, where): @pytest.fixture(params=[ArchiveQuerier, HttpQuerier], ids=['archive_querier', 'http']) -def querier(request): - def create_querier(dynamodb, table_name): - return request.param(table_name, dynamodb=dynamodb) - return create_querier - +def querier(request, dynamodb): + return request.param('test', dynamodb=dynamodb, use_latest=True) def in_url(result, part): url = result['url'] @@ -409,6 +406,10 @@ def test_no_end(table_maker, querier, s3_file_from_metadata): url = 's3://datalake-test/' + m['id'] s3_file_from_metadata(url, m) records = DatalakeRecord.list_from_metadata(url, m) + for record in records: + what = record.get('what') + where = record.get('where') + record['what_where_key'] = f'{what}:{where}' table_maker(records) results = querier.query_by_time(m['start'], m['start'] + 1, m['what']) assert len(results) == 1 @@ -421,7 +422,12 @@ def test_no_end_exclusion(table_maker, querier, s3_file_from_metadata): url = 's3://datalake-test/' + m['id'] s3_file_from_metadata(url, m) records = DatalakeRecord.list_from_metadata(url, m) + for record in records: + what = record.get('what') + where = record.get('where') + record['what_where_key'] = f'{what}:{where}' table_maker(records) + results = querier.query_by_time(m['start'] + 1, m['start'] + 2, m['what']) assert len(results) == 0 @@ -480,8 +486,7 @@ def test_latest_creation_time_breaks_tie(table_maker, querier, start = bucket * DatalakeRecord.TIME_BUCKET_SIZE_IN_MS interval = DatalakeRecord.TIME_BUCKET_SIZE_IN_MS/150 end = start + interval - table = table_maker([]) - + table = table_maker([])[0] for i in range(3): record = record_maker(start=start, end=end, @@ -531,17 +536,10 @@ def test_2x_max_results_in_one_bucket(table_maker, querier, record_maker): results = consolidate_pages(pages) assert len(results) == MAX_RESULTS * 2 -""" -Will have to go through all of the tests associated with -latest and correctly query from -the latest table that was created. -""" def test_latest_table_query(table_maker, querier, record_maker): - now = int(time.time() * 1000) - records = record_maker(include_latest_key=True, what='foo', where='boo') - _, latest_table = table_maker(records) - - querier_instance = querier(dynamodb=latest_table.dynamodb, table_name=latest_table.table_name) - result = querier_instance.query_latest_table('foo', 'boo') + records = record_maker(what='foo', where='boo') + table_maker(records) + querier.use_latest = True + result = querier.query_latest('foo', 'boo') _validate_latest_result(result, what='foo', where='boo') \ No newline at end of file