From f1177e978b6f78cbeccd65ffda2f1b7e7c91e15c Mon Sep 17 00:00:00 2001 From: Alex Bednarek Date: Thu, 9 May 2024 13:58:44 -0400 Subject: [PATCH 01/12] WIP: adding query_latest_table method to ArchiveQuerier. --- api/datalake_api/querier.py | 11 ++++++ api/tests/conftest.py | 61 +++++++++++++++++++++++++------ api/tests/test_archive_querier.py | 21 ++++++++++- 3 files changed, 80 insertions(+), 13 deletions(-) diff --git a/api/datalake_api/querier.py b/api/datalake_api/querier.py index 4e80885..b0b2bbd 100644 --- a/api/datalake_api/querier.py +++ b/api/datalake_api/querier.py @@ -14,6 +14,7 @@ from memoized_property import memoized_property from datalake.common import DatalakeRecord +from boto3.dynamodb.conditions import Key import base64 import json import time @@ -365,3 +366,13 @@ def _get_all_records_in_bucket(self, bucket, **kwargs): break kwargs['ExclusiveStartKey'] = response['LastEvaluatedKey'] return records + + def query_latest_table(self, what, where): + response = self._table.query( + KeyConditionExpression=Key('what_where_key').eq(f'{what}:{where}') + ) + items = response.get('Items', []) + if not items: + return None + latest_item = items[0] + return dict(url=latest_item['url'], metadata=latest_item['metadata']) \ No newline at end of file diff --git a/api/tests/conftest.py b/api/tests/conftest.py index 3b090cf..5be377c 100644 --- a/api/tests/conftest.py +++ b/api/tests/conftest.py @@ -107,6 +107,20 @@ def tear_down(): } ] +latest_attribute_definitions = [ + { + 'AttributeName': 'what_where_key', + 'AttributeType': 'S' + } +] + +latest_key_schema = [ + { + 'AttributeName': 'what_where_key', + 'KeyType': 'HASH' + } +] + global_secondary = [{ 'IndexName': 'work-id-index', 'KeySchema': [ @@ -140,42 +154,59 @@ def _delete_table(table): raise e -def _create_table(dynamodb, table_name): +def _create_table(dynamodb, + table_name, + attribute_definitions, + key_schema, + global_secondary=None): table = dynamodb.Table(table_name) _delete_table(table) kwargs = dict( TableName=table_name, AttributeDefinitions=attribute_definitions, KeySchema=key_schema, - GlobalSecondaryIndexes=global_secondary, ProvisionedThroughput={ 'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5 } ) + if global_secondary: + kwargs['GlobalSecondaryIndexes'] = global_secondary dynamodb.create_table(**kwargs) return dynamodb.Table(table_name) def _populate_table(table, records): + print(f'attempting to populate {table}') with table.batch_writer() as batch: for r in records: batch.put_item(Item=r) - +# Adding latest table logic so latest table will be created and records will populate it +# Once that's possible, we will simply query the latest_table for what:where, no bucket logic @pytest.fixture def table_maker(request, dynamodb): - def maker(records): - table_name = 'test' - table = _create_table(dynamodb, table_name) - _populate_table(table, records) + def maker(records, include_latest_key=False): + old_table_name = 'test' + latest_table_name = 'test_latest' + latest_table = None + + old_table = _create_table(dynamodb, old_table_name, attribute_definitions, key_schema, global_secondary) + _populate_table(old_table, records) + + if include_latest_key: + latest_table = _create_table(dynamodb, latest_table_name, latest_attribute_definitions, latest_key_schema) + _populate_table(latest_table, records) def tear_down(): - _delete_table(table) + _delete_table(old_table) + if include_latest_key: + _delete_table(latest_table) + request.addfinalizer(tear_down) - return table + return old_table, latest_table return maker @@ -183,12 +214,20 @@ def tear_down(): @pytest.fixture def record_maker(s3_file_from_metadata): - def maker(**kwargs): + def maker(include_latest_key=False, **kwargs): m = generate_random_metadata() m.update(**kwargs) key = '/'.join([str(v) for v in kwargs.values()]) url = 's3://datalake-test/' + key s3_file_from_metadata(url, m) - return DatalakeRecord.list_from_metadata(url, m) + records = DatalakeRecord.list_from_metadata(url, m) + + if include_latest_key: + what = kwargs.get('what') + where = kwargs.get('where') + for record in records: + record['what_where_key'] = f"{what}:{where}" + + return records return maker diff --git a/api/tests/test_archive_querier.py b/api/tests/test_archive_querier.py index 4f7c71c..32f12d6 100644 --- a/api/tests/test_archive_querier.py +++ b/api/tests/test_archive_querier.py @@ -125,8 +125,10 @@ def query_latest(self, what, where): @pytest.fixture(params=[ArchiveQuerier, HttpQuerier], ids=['archive_querier', 'http']) -def querier(request, dynamodb): - return request.param('test', dynamodb=dynamodb) +def querier(request): + def create_querier(dynamodb, table_name): + return request.param(table_name, dynamodb=dynamodb) + return create_querier def in_url(result, part): @@ -528,3 +530,18 @@ def test_2x_max_results_in_one_bucket(table_maker, querier, record_maker): pages = get_all_pages(querier.query_by_time, [start, end, 'boo']) results = consolidate_pages(pages) assert len(results) == MAX_RESULTS * 2 + +""" +Will have to go through all of the tests associated with +latest and correctly query from +the latest table that was created. +""" + +def test_latest_table_query(table_maker, querier, record_maker): + now = int(time.time() * 1000) + records = record_maker(include_latest_key=True, what='foo', where='boo') + _, latest_table = table_maker(records) + + querier_instance = querier(dynamodb=latest_table.dynamodb, table_name=latest_table.table_name) + result = querier_instance.query_latest_table('foo', 'boo') + _validate_latest_result(result, what='foo', where='boo') \ No newline at end of file From 7c66eaa50c1c227630ff6caf6f45d097f7bd6e6c Mon Sep 17 00:00:00 2001 From: Alex Bednarek Date: Thu, 16 May 2024 14:44:19 -0400 Subject: [PATCH 02/12] ArchiveQuerier utilizing use_latest flag. --- api/datalake_api/querier.py | 50 ++++++++++++++++++------------- api/tests/conftest.py | 33 ++++++++++---------- api/tests/test_archive_querier.py | 34 ++++++++++----------- 3 files changed, 61 insertions(+), 56 deletions(-) diff --git a/api/datalake_api/querier.py b/api/datalake_api/querier.py index b0b2bbd..54e78f6 100644 --- a/api/datalake_api/querier.py +++ b/api/datalake_api/querier.py @@ -173,9 +173,10 @@ def _unpack(self, result): class ArchiveQuerier(object): - def __init__(self, table_name, dynamodb=None): + def __init__(self, table_name, dynamodb=None, use_latest=None): self.table_name = table_name self.dynamodb = dynamodb + self.use_latest = use_latest def query_by_work_id(self, work_id, what, where=None, cursor=None): kwargs = self._prepare_work_id_kwargs(work_id, what) @@ -331,18 +332,25 @@ def _cursor_for_time_query(self, response, results, current_bucket): @memoized_property def _table(self): return self.dynamodb.Table(self.table_name) + + @memoized_property + def _latest_table(self): + return self.dynamodb.Table('test_latest') - def query_latest(self, what, where, lookback_days=DEFAULT_LOOKBACK_DAYS): - current = int(time.time() * 1000) - end = current - lookback_days * _ONE_DAY_MS - while current >= end: - bucket = current/DatalakeRecord.TIME_BUCKET_SIZE_IN_MS - r = self._get_latest_record_in_bucket(bucket, what, where) - if r is not None: - return r - current -= _ONE_DAY_MS + def query_latest(self, what, where, lookback_days=DEFAULT_LOOKBACK_DAYS, use_latest=False): + if self.use_latest: + response = self._latest_table.query( + KeyConditionExpression=Key('what_where_key').eq(f'{what}:{where}') + ) + items = response.get('Items', []) + if not items: + return self._default_latest(what, where, lookback_days) + + latest_item = items[0] + return dict(url=latest_item['url'], metadata=latest_item['metadata']) - return None + else: + return self._default_latest(what, where, lookback_days) def _get_latest_record_in_bucket(self, bucket, what, where): kwargs = self._prepare_time_bucket_kwargs(bucket, what) @@ -367,12 +375,14 @@ def _get_all_records_in_bucket(self, bucket, **kwargs): kwargs['ExclusiveStartKey'] = response['LastEvaluatedKey'] return records - def query_latest_table(self, what, where): - response = self._table.query( - KeyConditionExpression=Key('what_where_key').eq(f'{what}:{where}') - ) - items = response.get('Items', []) - if not items: - return None - latest_item = items[0] - return dict(url=latest_item['url'], metadata=latest_item['metadata']) \ No newline at end of file + def _default_latest(self, what, where, lookback_days=DEFAULT_LOOKBACK_DAYS): + current = int(time.time() * 1000) + end = current - lookback_days * _ONE_DAY_MS + while current >= end: + bucket = current/DatalakeRecord.TIME_BUCKET_SIZE_IN_MS + r = self._get_latest_record_in_bucket(bucket, what, where) + if r is not None: + return r + current -= _ONE_DAY_MS + + return None \ No newline at end of file diff --git a/api/tests/conftest.py b/api/tests/conftest.py index 5be377c..da0c9ec 100644 --- a/api/tests/conftest.py +++ b/api/tests/conftest.py @@ -177,7 +177,6 @@ def _create_table(dynamodb, def _populate_table(table, records): - print(f'attempting to populate {table}') with table.batch_writer() as batch: for r in records: batch.put_item(Item=r) @@ -187,26 +186,25 @@ def _populate_table(table, records): @pytest.fixture def table_maker(request, dynamodb): - def maker(records, include_latest_key=False): - old_table_name = 'test' + def maker(records, use_latest=False): + table_name = 'test' latest_table_name = 'test_latest' latest_table = None - old_table = _create_table(dynamodb, old_table_name, attribute_definitions, key_schema, global_secondary) - _populate_table(old_table, records) + table = _create_table(dynamodb, table_name, attribute_definitions, key_schema, global_secondary) - if include_latest_key: - latest_table = _create_table(dynamodb, latest_table_name, latest_attribute_definitions, latest_key_schema) - _populate_table(latest_table, records) + latest_table = _create_table(dynamodb, latest_table_name, latest_attribute_definitions, latest_key_schema) + _populate_table(latest_table, records) + + _populate_table(table, records) def tear_down(): - _delete_table(old_table) - if include_latest_key: - _delete_table(latest_table) + _delete_table(table) + _delete_table(latest_table) request.addfinalizer(tear_down) - return old_table, latest_table + return (table, latest_table) return maker @@ -214,7 +212,7 @@ def tear_down(): @pytest.fixture def record_maker(s3_file_from_metadata): - def maker(include_latest_key=False, **kwargs): + def maker(**kwargs): m = generate_random_metadata() m.update(**kwargs) key = '/'.join([str(v) for v in kwargs.values()]) @@ -222,11 +220,10 @@ def maker(include_latest_key=False, **kwargs): s3_file_from_metadata(url, m) records = DatalakeRecord.list_from_metadata(url, m) - if include_latest_key: - what = kwargs.get('what') - where = kwargs.get('where') - for record in records: - record['what_where_key'] = f"{what}:{where}" + what = kwargs.get('what') + where = kwargs.get('where') + for record in records: + record['what_where_key'] = f"{what}:{where}" return records diff --git a/api/tests/test_archive_querier.py b/api/tests/test_archive_querier.py index 32f12d6..da89cb2 100644 --- a/api/tests/test_archive_querier.py +++ b/api/tests/test_archive_querier.py @@ -125,11 +125,8 @@ def query_latest(self, what, where): @pytest.fixture(params=[ArchiveQuerier, HttpQuerier], ids=['archive_querier', 'http']) -def querier(request): - def create_querier(dynamodb, table_name): - return request.param(table_name, dynamodb=dynamodb) - return create_querier - +def querier(request, dynamodb): + return request.param('test', dynamodb=dynamodb, use_latest=True) def in_url(result, part): url = result['url'] @@ -409,6 +406,10 @@ def test_no_end(table_maker, querier, s3_file_from_metadata): url = 's3://datalake-test/' + m['id'] s3_file_from_metadata(url, m) records = DatalakeRecord.list_from_metadata(url, m) + for record in records: + what = record.get('what') + where = record.get('where') + record['what_where_key'] = f'{what}:{where}' table_maker(records) results = querier.query_by_time(m['start'], m['start'] + 1, m['what']) assert len(results) == 1 @@ -421,7 +422,12 @@ def test_no_end_exclusion(table_maker, querier, s3_file_from_metadata): url = 's3://datalake-test/' + m['id'] s3_file_from_metadata(url, m) records = DatalakeRecord.list_from_metadata(url, m) + for record in records: + what = record.get('what') + where = record.get('where') + record['what_where_key'] = f'{what}:{where}' table_maker(records) + results = querier.query_by_time(m['start'] + 1, m['start'] + 2, m['what']) assert len(results) == 0 @@ -480,8 +486,7 @@ def test_latest_creation_time_breaks_tie(table_maker, querier, start = bucket * DatalakeRecord.TIME_BUCKET_SIZE_IN_MS interval = DatalakeRecord.TIME_BUCKET_SIZE_IN_MS/150 end = start + interval - table = table_maker([]) - + table = table_maker([])[0] for i in range(3): record = record_maker(start=start, end=end, @@ -531,17 +536,10 @@ def test_2x_max_results_in_one_bucket(table_maker, querier, record_maker): results = consolidate_pages(pages) assert len(results) == MAX_RESULTS * 2 -""" -Will have to go through all of the tests associated with -latest and correctly query from -the latest table that was created. -""" def test_latest_table_query(table_maker, querier, record_maker): - now = int(time.time() * 1000) - records = record_maker(include_latest_key=True, what='foo', where='boo') - _, latest_table = table_maker(records) - - querier_instance = querier(dynamodb=latest_table.dynamodb, table_name=latest_table.table_name) - result = querier_instance.query_latest_table('foo', 'boo') + records = record_maker(what='foo', where='boo') + table_maker(records) + querier.use_latest = True + result = querier.query_latest('foo', 'boo') _validate_latest_result(result, what='foo', where='boo') \ No newline at end of file From 3c80c243761c0a131c06fc43ec727a707a0160dc Mon Sep 17 00:00:00 2001 From: Alex Bednarek Date: Thu, 16 May 2024 16:37:37 -0400 Subject: [PATCH 03/12] Cleanup tests. --- api/datalake_api/querier.py | 6 +++--- api/tests/conftest.py | 2 +- api/tests/test_archive_querier.py | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/api/datalake_api/querier.py b/api/datalake_api/querier.py index 54e78f6..c1ef474 100644 --- a/api/datalake_api/querier.py +++ b/api/datalake_api/querier.py @@ -173,10 +173,10 @@ def _unpack(self, result): class ArchiveQuerier(object): - def __init__(self, table_name, dynamodb=None, use_latest=None): + def __init__(self, table_name, dynamodb=None): self.table_name = table_name self.dynamodb = dynamodb - self.use_latest = use_latest + self.use_latest = False def query_by_work_id(self, work_id, what, where=None, cursor=None): kwargs = self._prepare_work_id_kwargs(work_id, what) @@ -337,7 +337,7 @@ def _table(self): def _latest_table(self): return self.dynamodb.Table('test_latest') - def query_latest(self, what, where, lookback_days=DEFAULT_LOOKBACK_DAYS, use_latest=False): + def query_latest(self, what, where, lookback_days=DEFAULT_LOOKBACK_DAYS): if self.use_latest: response = self._latest_table.query( KeyConditionExpression=Key('what_where_key').eq(f'{what}:{where}') diff --git a/api/tests/conftest.py b/api/tests/conftest.py index da0c9ec..44da97b 100644 --- a/api/tests/conftest.py +++ b/api/tests/conftest.py @@ -186,7 +186,7 @@ def _populate_table(table, records): @pytest.fixture def table_maker(request, dynamodb): - def maker(records, use_latest=False): + def maker(records): table_name = 'test' latest_table_name = 'test_latest' latest_table = None diff --git a/api/tests/test_archive_querier.py b/api/tests/test_archive_querier.py index da89cb2..159aa5c 100644 --- a/api/tests/test_archive_querier.py +++ b/api/tests/test_archive_querier.py @@ -126,7 +126,7 @@ def query_latest(self, what, where): @pytest.fixture(params=[ArchiveQuerier, HttpQuerier], ids=['archive_querier', 'http']) def querier(request, dynamodb): - return request.param('test', dynamodb=dynamodb, use_latest=True) + return request.param('test', dynamodb=dynamodb) def in_url(result, part): url = result['url'] From c746d0cf6382795e08006f740cfad35c67b942aa Mon Sep 17 00:00:00 2001 From: Alex Bednarek Date: Thu, 16 May 2024 16:45:07 -0400 Subject: [PATCH 04/12] Cleanup tests. --- api/tests/conftest.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/api/tests/conftest.py b/api/tests/conftest.py index 44da97b..648e8ce 100644 --- a/api/tests/conftest.py +++ b/api/tests/conftest.py @@ -189,13 +189,11 @@ def table_maker(request, dynamodb): def maker(records): table_name = 'test' latest_table_name = 'test_latest' - latest_table = None table = _create_table(dynamodb, table_name, attribute_definitions, key_schema, global_secondary) - latest_table = _create_table(dynamodb, latest_table_name, latest_attribute_definitions, latest_key_schema) - _populate_table(latest_table, records) + _populate_table(latest_table, records) _populate_table(table, records) def tear_down(): @@ -203,7 +201,6 @@ def tear_down(): _delete_table(latest_table) request.addfinalizer(tear_down) - return (table, latest_table) return maker From 6ce5b667b007114eca2a5b4c558923c170aa69b8 Mon Sep 17 00:00:00 2001 From: Alex Bednarek Date: Tue, 21 May 2024 17:28:42 -0400 Subject: [PATCH 05/12] Querier: Update tests and querier to utilize what_where_key in search. --- api/datalake_api/querier.py | 8 +++++--- api/tests/test_archive_querier.py | 18 ++++++++++++++---- api/tests/test_file.py | 4 ++++ 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/api/datalake_api/querier.py b/api/datalake_api/querier.py index c1ef474..a9c1a80 100644 --- a/api/datalake_api/querier.py +++ b/api/datalake_api/querier.py @@ -18,6 +18,7 @@ import base64 import json import time +import os '''the maximum number of results to return to the user @@ -173,10 +174,11 @@ def _unpack(self, result): class ArchiveQuerier(object): - def __init__(self, table_name, dynamodb=None): + def __init__(self, table_name, latest_table_name=None, dynamodb=None): self.table_name = table_name + self.latest_table_name = latest_table_name self.dynamodb = dynamodb - self.use_latest = False + self.use_latest = os.environ.get("DATALAKE_USE_LATEST_TABLE", False) def query_by_work_id(self, work_id, what, where=None, cursor=None): kwargs = self._prepare_work_id_kwargs(work_id, what) @@ -335,7 +337,7 @@ def _table(self): @memoized_property def _latest_table(self): - return self.dynamodb.Table('test_latest') + return self.dynamodb.Table(self.latest_table_name) def query_latest(self, what, where, lookback_days=DEFAULT_LOOKBACK_DAYS): if self.use_latest: diff --git a/api/tests/test_archive_querier.py b/api/tests/test_archive_querier.py index 159aa5c..a574879 100644 --- a/api/tests/test_archive_querier.py +++ b/api/tests/test_archive_querier.py @@ -126,7 +126,7 @@ def query_latest(self, what, where): @pytest.fixture(params=[ArchiveQuerier, HttpQuerier], ids=['archive_querier', 'http']) def querier(request, dynamodb): - return request.param('test', dynamodb=dynamodb) + return request.param('test', 'test_latest', dynamodb=dynamodb) def in_url(result, part): url = result['url'] @@ -513,6 +513,7 @@ def test_max_results_in_one_bucket(table_maker, querier, record_maker): end=end, what='boo', where='hoo{}'.format(i)) + print(f'records are {records}') table_maker(records) pages = get_all_pages(querier.query_by_time, [start, end, 'boo']) results = consolidate_pages(pages) @@ -538,8 +539,17 @@ def test_2x_max_results_in_one_bucket(table_maker, querier, record_maker): def test_latest_table_query(table_maker, querier, record_maker): - records = record_maker(what='foo', where='boo') + now = int(time.time() * 1000) + records = [] + bucket = int(now/DatalakeRecord.TIME_BUCKET_SIZE_IN_MS) + start = bucket * DatalakeRecord.TIME_BUCKET_SIZE_IN_MS + end = start + for i in range(MAX_RESULTS): + records += record_maker(start=start, + end=end, + what='boo', + where='hoo{}'.format(i)) table_maker(records) querier.use_latest = True - result = querier.query_latest('foo', 'boo') - _validate_latest_result(result, what='foo', where='boo') \ No newline at end of file + result = querier.query_latest('boo', 'hoo0') + _validate_latest_result(result, what='boo', where='hoo0') \ No newline at end of file diff --git a/api/tests/test_file.py b/api/tests/test_file.py index 309af13..b919c89 100644 --- a/api/tests/test_file.py +++ b/api/tests/test_file.py @@ -190,6 +190,10 @@ def maker(content, metadata): s3_file_maker('datalake-test', path, content, metadata) url = 's3://datalake-test/' + path records = DatalakeRecord.list_from_metadata(url, metadata) + for record in records: + what = record.get('what') + where = record.get('where') + record['what_where_key'] = f"{what}:{where}" table_maker(records) return maker From 0818b5ef22c1f796d2da4765cad7b2c06f6d123f Mon Sep 17 00:00:00 2001 From: Alex Bednarek Date: Tue, 21 May 2024 17:40:15 -0400 Subject: [PATCH 06/12] Querier: Update tests and querier to utilize what_where_key in search. --- api/tests/test_archive_querier.py | 1 - 1 file changed, 1 deletion(-) diff --git a/api/tests/test_archive_querier.py b/api/tests/test_archive_querier.py index a574879..8f689fa 100644 --- a/api/tests/test_archive_querier.py +++ b/api/tests/test_archive_querier.py @@ -513,7 +513,6 @@ def test_max_results_in_one_bucket(table_maker, querier, record_maker): end=end, what='boo', where='hoo{}'.format(i)) - print(f'records are {records}') table_maker(records) pages = get_all_pages(querier.query_by_time, [start, end, 'boo']) results = consolidate_pages(pages) From b06d770a80e26429f12677ccce14d358885b732e Mon Sep 17 00:00:00 2001 From: Alex Bednarek Date: Tue, 21 May 2024 17:43:37 -0400 Subject: [PATCH 07/12] Querier: Update tests and querier to utilize what_where_key in search. --- api/datalake_api/querier.py | 5 +++-- api/tests/test_archive_querier.py | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/api/datalake_api/querier.py b/api/datalake_api/querier.py index a9c1a80..7091ba9 100644 --- a/api/datalake_api/querier.py +++ b/api/datalake_api/querier.py @@ -178,7 +178,8 @@ def __init__(self, table_name, latest_table_name=None, dynamodb=None): self.table_name = table_name self.latest_table_name = latest_table_name self.dynamodb = dynamodb - self.use_latest = os.environ.get("DATALAKE_USE_LATEST_TABLE", False) + self.use_latest_table = os.environ.get("DATALAKE_USE_LATEST_TABLE", + False) def query_by_work_id(self, work_id, what, where=None, cursor=None): kwargs = self._prepare_work_id_kwargs(work_id, what) @@ -340,7 +341,7 @@ def _latest_table(self): return self.dynamodb.Table(self.latest_table_name) def query_latest(self, what, where, lookback_days=DEFAULT_LOOKBACK_DAYS): - if self.use_latest: + if self.use_latest_table: response = self._latest_table.query( KeyConditionExpression=Key('what_where_key').eq(f'{what}:{where}') ) diff --git a/api/tests/test_archive_querier.py b/api/tests/test_archive_querier.py index 8f689fa..5e00114 100644 --- a/api/tests/test_archive_querier.py +++ b/api/tests/test_archive_querier.py @@ -549,6 +549,6 @@ def test_latest_table_query(table_maker, querier, record_maker): what='boo', where='hoo{}'.format(i)) table_maker(records) - querier.use_latest = True + querier.use_latest_table = True result = querier.query_latest('boo', 'hoo0') _validate_latest_result(result, what='boo', where='hoo0') \ No newline at end of file From 327514d4b18dc741a060460f38b94bff20a941b1 Mon Sep 17 00:00:00 2001 From: Alex Bednarek Date: Tue, 28 May 2024 12:19:02 -0400 Subject: [PATCH 08/12] QUERIER: Configuring env variables to utilize latest table query in tests + logging. --- api/datalake_api/querier.py | 9 ++++++++- api/datalake_api/settings.py | 1 + api/datalake_api/v0.py | 2 ++ api/tests/test_archive_querier.py | 22 ++++++++++++++++++---- 4 files changed, 29 insertions(+), 5 deletions(-) diff --git a/api/datalake_api/querier.py b/api/datalake_api/querier.py index 7091ba9..141c735 100644 --- a/api/datalake_api/querier.py +++ b/api/datalake_api/querier.py @@ -20,6 +20,11 @@ import time import os +import logging +logging.basicConfig(level=logging.INFO) +log = logging.getLogger(__name__) +log.setLevel(logging.INFO) + '''the maximum number of results to return to the user @@ -179,7 +184,7 @@ def __init__(self, table_name, latest_table_name=None, dynamodb=None): self.latest_table_name = latest_table_name self.dynamodb = dynamodb self.use_latest_table = os.environ.get("DATALAKE_USE_LATEST_TABLE", - False) + "false").lower() == "true" def query_by_work_id(self, work_id, what, where=None, cursor=None): kwargs = self._prepare_work_id_kwargs(work_id, what) @@ -341,7 +346,9 @@ def _latest_table(self): return self.dynamodb.Table(self.latest_table_name) def query_latest(self, what, where, lookback_days=DEFAULT_LOOKBACK_DAYS): + log.info('Inside query_latest method') if self.use_latest_table: + log.info('inside use_latest_table=TRUE') response = self._latest_table.query( KeyConditionExpression=Key('what_where_key').eq(f'{what}:{where}') ) diff --git a/api/datalake_api/settings.py b/api/datalake_api/settings.py index d38bfd6..e55c6d1 100644 --- a/api/datalake_api/settings.py +++ b/api/datalake_api/settings.py @@ -14,6 +14,7 @@ # default settings DYNAMODB_TABLE = 'test' +DYNAMODB_LATEST_TABLE = 'test_latest' AWS_REGION = 'us-west-2' AWS_ACCESS_KEY_ID = None AWS_SECRET_ACCESS_KEY = None diff --git a/api/datalake_api/v0.py b/api/datalake_api/v0.py index dde7c3a..927997d 100644 --- a/api/datalake_api/v0.py +++ b/api/datalake_api/v0.py @@ -50,7 +50,9 @@ def get_dynamodb(): def get_archive_querier(): if not hasattr(app, 'archive_querier'): table_name = app.config.get('DYNAMODB_TABLE') + latest_table_name = app.config.get('DYNAMODB_LATEST_TABLE') app.archive_querier = ArchiveQuerier(table_name, + latest_table_name, dynamodb=get_dynamodb()) return app.archive_querier diff --git a/api/tests/test_archive_querier.py b/api/tests/test_archive_querier.py index 5e00114..2c6fe37 100644 --- a/api/tests/test_archive_querier.py +++ b/api/tests/test_archive_querier.py @@ -123,10 +123,24 @@ def query_latest(self, what, where): return HttpRecord(**record) -@pytest.fixture(params=[ArchiveQuerier, HttpQuerier], - ids=['archive_querier', 'http']) -def querier(request, dynamodb): - return request.param('test', 'test_latest', dynamodb=dynamodb) +@pytest.fixture(params=[ + ('archive', 'use_latest'), + ('archive', 'use_default'), + ('http', 'use_latest'), + ('http', 'use_default') +], ids=['archive_latest', 'archive-default', 'http-latest', 'http-default']) +def querier(monkeypatch, request, dynamodb): + querier_type, table_usage = request.param + + if table_usage == 'use_latest': + monkeypatch.setenv('DATALAKE_USE_LATEST_TABLE', 'true') + else: + monkeypatch.setenv('DATALAKE_USE_LATEST_TABLE', 'false') + + if querier_type == 'http': + return HttpQuerier('test', 'test_latest', dynamodb=dynamodb) + else: + return ArchiveQuerier('test', 'test_latest', dynamodb=dynamodb) def in_url(result, part): url = result['url'] From a0ed1dbab5beacb034efafb442a853c9e61f0ea6 Mon Sep 17 00:00:00 2001 From: Alex Bednarek Date: Wed, 29 May 2024 15:56:30 -0400 Subject: [PATCH 09/12] WIP: Addressing LATEST_MAX_LOOKBACK with the new latest functionality. --- api/datalake_api/app.py | 1 + api/datalake_api/querier.py | 15 +++++++++------ api/datalake_api/settings.py | 6 ++++++ api/datalake_api/v0.py | 4 ++++ api/tests/test_archive_querier.py | 17 ++++++++++++++++- 5 files changed, 36 insertions(+), 7 deletions(-) diff --git a/api/datalake_api/app.py b/api/datalake_api/app.py index 0d61e2b..af94d74 100644 --- a/api/datalake_api/app.py +++ b/api/datalake_api/app.py @@ -23,6 +23,7 @@ from datalake_api import settings +logging.basicConfig(level=logging.INFO) LOGGER = logging.getLogger(__name__) diff --git a/api/datalake_api/querier.py b/api/datalake_api/querier.py index 141c735..18bdd94 100644 --- a/api/datalake_api/querier.py +++ b/api/datalake_api/querier.py @@ -21,9 +21,7 @@ import os import logging -logging.basicConfig(level=logging.INFO) log = logging.getLogger(__name__) -log.setLevel(logging.INFO) '''the maximum number of results to return to the user @@ -179,12 +177,17 @@ def _unpack(self, result): class ArchiveQuerier(object): - def __init__(self, table_name, latest_table_name=None, dynamodb=None): + def __init__(self, table_name, + latest_table_name=None, + use_latest_table=None, + latest_max_lookback=30, + dynamodb=None): self.table_name = table_name self.latest_table_name = latest_table_name + self.use_latest_table = use_latest_table + self.latest_max_lookback = latest_max_lookback self.dynamodb = dynamodb - self.use_latest_table = os.environ.get("DATALAKE_USE_LATEST_TABLE", - "false").lower() == "true" + def query_by_work_id(self, work_id, what, where=None, cursor=None): kwargs = self._prepare_work_id_kwargs(work_id, what) @@ -353,7 +356,7 @@ def query_latest(self, what, where, lookback_days=DEFAULT_LOOKBACK_DAYS): KeyConditionExpression=Key('what_where_key').eq(f'{what}:{where}') ) items = response.get('Items', []) - if not items: + if not items and self.latest_max_lookback > 0: return self._default_latest(what, where, lookback_days) latest_item = items[0] diff --git a/api/datalake_api/settings.py b/api/datalake_api/settings.py index e55c6d1..3d79650 100644 --- a/api/datalake_api/settings.py +++ b/api/datalake_api/settings.py @@ -11,10 +11,16 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations under # the License. +import os # default settings + DYNAMODB_TABLE = 'test' DYNAMODB_LATEST_TABLE = 'test_latest' +DATALAKE_USE_LATEST_TABLE = \ + os.environ.get("DATALAKE_USE_LATEST_TABLE", "false").lower() == "true" +LATEST_MAX_LOOKBACK = int(os.environ.get("LATEST_MAX_LOOKBACK", "30")) + AWS_REGION = 'us-west-2' AWS_ACCESS_KEY_ID = None AWS_SECRET_ACCESS_KEY = None diff --git a/api/datalake_api/v0.py b/api/datalake_api/v0.py index 927997d..aba95f1 100644 --- a/api/datalake_api/v0.py +++ b/api/datalake_api/v0.py @@ -51,8 +51,12 @@ def get_archive_querier(): if not hasattr(app, 'archive_querier'): table_name = app.config.get('DYNAMODB_TABLE') latest_table_name = app.config.get('DYNAMODB_LATEST_TABLE') + use_latest_table = app.config.get('DATALAKE_USE_LATEST_TABLE') + latest_max_lookback = app.config.get("LATEST_MAX_LOOKBACK") app.archive_querier = ArchiveQuerier(table_name, latest_table_name, + use_latest_table, + latest_max_lookback, dynamodb=get_dynamodb()) return app.archive_querier diff --git a/api/tests/test_archive_querier.py b/api/tests/test_archive_querier.py index 2c6fe37..00daa7c 100644 --- a/api/tests/test_archive_querier.py +++ b/api/tests/test_archive_querier.py @@ -123,6 +123,10 @@ def query_latest(self, what, where): return HttpRecord(**record) + +""" +Incorporate LATEST_MAX_LOOKBACK HERE +""" @pytest.fixture(params=[ ('archive', 'use_latest'), ('archive', 'use_default'), @@ -565,4 +569,15 @@ def test_latest_table_query(table_maker, querier, record_maker): table_maker(records) querier.use_latest_table = True result = querier.query_latest('boo', 'hoo0') - _validate_latest_result(result, what='boo', where='hoo0') \ No newline at end of file + _validate_latest_result(result, what='boo', where='hoo0') + +""" +Write tests: +With setup of latest table records, +with DYNAMODB_LATEST_TABLE set, with DATALAKE_USE_LATEST_TABLE=true, with LATEST_MAX_LOOKBACK=0, record is found + +With setup of latest table records, +with DYNAMODB_LATEST_TABLE set, with DATALAKE_USE_LATEST_TABLE=false, with LATEST_MAX_LOOKBACK=0, record is not found + +2-4 +""" \ No newline at end of file From c13a6cdd437b9ba324fc43fb6439fd3f52d553ed Mon Sep 17 00:00:00 2001 From: Alex Bednarek Date: Mon, 3 Jun 2024 16:02:08 -0400 Subject: [PATCH 10/12] API: Feature flag enables query from latest_table. --- api/datalake_api/querier.py | 8 ++--- api/datalake_api/settings.py | 4 +-- api/datalake_api/v0.py | 17 +++++++--- api/tests/conftest.py | 6 ++-- api/tests/test_archive_querier.py | 54 ++++++++++++++++++++----------- 5 files changed, 57 insertions(+), 32 deletions(-) diff --git a/api/datalake_api/querier.py b/api/datalake_api/querier.py index 18bdd94..4b00c05 100644 --- a/api/datalake_api/querier.py +++ b/api/datalake_api/querier.py @@ -180,12 +180,10 @@ class ArchiveQuerier(object): def __init__(self, table_name, latest_table_name=None, use_latest_table=None, - latest_max_lookback=30, dynamodb=None): self.table_name = table_name self.latest_table_name = latest_table_name self.use_latest_table = use_latest_table - self.latest_max_lookback = latest_max_lookback self.dynamodb = dynamodb @@ -349,14 +347,15 @@ def _latest_table(self): return self.dynamodb.Table(self.latest_table_name) def query_latest(self, what, where, lookback_days=DEFAULT_LOOKBACK_DAYS): - log.info('Inside query_latest method') if self.use_latest_table: log.info('inside use_latest_table=TRUE') response = self._latest_table.query( KeyConditionExpression=Key('what_where_key').eq(f'{what}:{where}') ) items = response.get('Items', []) - if not items and self.latest_max_lookback > 0: + + if not items: + log.info('Falling back to default latest query') return self._default_latest(what, where, lookback_days) latest_item = items[0] @@ -389,6 +388,7 @@ def _get_all_records_in_bucket(self, bucket, **kwargs): return records def _default_latest(self, what, where, lookback_days=DEFAULT_LOOKBACK_DAYS): + log.info("Using default latest behavior") current = int(time.time() * 1000) end = current - lookback_days * _ONE_DAY_MS while current >= end: diff --git a/api/datalake_api/settings.py b/api/datalake_api/settings.py index 3d79650..800f062 100644 --- a/api/datalake_api/settings.py +++ b/api/datalake_api/settings.py @@ -17,9 +17,7 @@ DYNAMODB_TABLE = 'test' DYNAMODB_LATEST_TABLE = 'test_latest' -DATALAKE_USE_LATEST_TABLE = \ - os.environ.get("DATALAKE_USE_LATEST_TABLE", "false").lower() == "true" -LATEST_MAX_LOOKBACK = int(os.environ.get("LATEST_MAX_LOOKBACK", "30")) +DATALAKE_USE_LATEST_TABLE = False AWS_REGION = 'us-west-2' AWS_ACCESS_KEY_ID = None diff --git a/api/datalake_api/v0.py b/api/datalake_api/v0.py index aba95f1..3520512 100644 --- a/api/datalake_api/v0.py +++ b/api/datalake_api/v0.py @@ -27,6 +27,7 @@ v0 = flask.Blueprint('v0', __name__, url_prefix='/v0') +_archive_querier = None def _get_aws_kwargs(): kwargs = dict( @@ -48,17 +49,23 @@ def get_dynamodb(): def get_archive_querier(): - if not hasattr(app, 'archive_querier'): + global _archive_querier + + if not _archive_querier: table_name = app.config.get('DYNAMODB_TABLE') latest_table_name = app.config.get('DYNAMODB_LATEST_TABLE') use_latest_table = app.config.get('DATALAKE_USE_LATEST_TABLE') - latest_max_lookback = app.config.get("LATEST_MAX_LOOKBACK") - app.archive_querier = ArchiveQuerier(table_name, + _archive_querier = ArchiveQuerier(table_name, latest_table_name, use_latest_table, - latest_max_lookback, dynamodb=get_dynamodb()) - return app.archive_querier + return _archive_querier + + +def reset_archive_querier(): + """FOR TESTING PURPOSES ONLY""" + global _archive_querier + _archive_querier = None @v0.route('/archive/') diff --git a/api/tests/conftest.py b/api/tests/conftest.py index 648e8ce..bf166fa 100644 --- a/api/tests/conftest.py +++ b/api/tests/conftest.py @@ -42,6 +42,9 @@ def get_client(): + from datalake_api import settings + datalake_api.app.config.from_object(settings) + datalake_api.app.config['TESTING'] = True datalake_api.app.config['AWS_ACCESS_KEY_ID'] = 'abc' datalake_api.app.config['AWS_SECRET_ACCESS_KEY'] = '123' @@ -181,8 +184,7 @@ def _populate_table(table, records): for r in records: batch.put_item(Item=r) -# Adding latest table logic so latest table will be created and records will populate it -# Once that's possible, we will simply query the latest_table for what:where, no bucket logic + @pytest.fixture def table_maker(request, dynamodb): diff --git a/api/tests/test_archive_querier.py b/api/tests/test_archive_querier.py index 00daa7c..e5aca16 100644 --- a/api/tests/test_archive_querier.py +++ b/api/tests/test_archive_querier.py @@ -11,8 +11,11 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations under # the License. - +import os import pytest +from datalake_api.v0 import reset_archive_querier +from datalake_api import settings +# from flask import current_app as app from datalake.common import DatalakeRecord from datalake.tests import generate_random_metadata import simplejson as json @@ -124,27 +127,35 @@ def query_latest(self, what, where): -""" -Incorporate LATEST_MAX_LOOKBACK HERE -""" @pytest.fixture(params=[ ('archive', 'use_latest'), ('archive', 'use_default'), ('http', 'use_latest'), ('http', 'use_default') -], ids=['archive_latest', 'archive-default', 'http-latest', 'http-default']) +], ids=['archive-latest', + 'archive-default', + 'http-latest', + 'http-default' + ]) def querier(monkeypatch, request, dynamodb): + + reset_archive_querier() querier_type, table_usage = request.param if table_usage == 'use_latest': - monkeypatch.setenv('DATALAKE_USE_LATEST_TABLE', 'true') + settings.DATALAKE_USE_LATEST_TABLE = True else: - monkeypatch.setenv('DATALAKE_USE_LATEST_TABLE', 'false') + settings.DATALAKE_USE_LATEST_TABLE= False if querier_type == 'http': - return HttpQuerier('test', 'test_latest', dynamodb=dynamodb) + return HttpQuerier('test', + 'test_latest', + dynamodb=dynamodb) else: - return ArchiveQuerier('test', 'test_latest', dynamodb=dynamodb) + return ArchiveQuerier('test', + 'test_latest', + use_latest_table=True if table_usage == 'use_latest' else False, + dynamodb=dynamodb) def in_url(result, part): url = result['url'] @@ -567,17 +578,24 @@ def test_latest_table_query(table_maker, querier, record_maker): what='boo', where='hoo{}'.format(i)) table_maker(records) - querier.use_latest_table = True result = querier.query_latest('boo', 'hoo0') _validate_latest_result(result, what='boo', where='hoo0') -""" -Write tests: -With setup of latest table records, -with DYNAMODB_LATEST_TABLE set, with DATALAKE_USE_LATEST_TABLE=true, with LATEST_MAX_LOOKBACK=0, record is found -With setup of latest table records, -with DYNAMODB_LATEST_TABLE set, with DATALAKE_USE_LATEST_TABLE=false, with LATEST_MAX_LOOKBACK=0, record is not found +def test_query_latest_just_latest_table(table_maker, querier, record_maker): + use_latest_from_env = settings.DATALAKE_USE_LATEST_TABLE + table = table_maker([])[1] + for i in range(3): + record = record_maker(what='meow', + where=f'tree', + path='/{}'.format(i)) + + # only inserting into latest table + table.put_item(Item=record[0]) + time.sleep(1.01) -2-4 -""" \ No newline at end of file + result = querier.query_latest('meow', 'tree') + if use_latest_from_env: + _validate_latest_result(result, what='meow', where='tree') + else: + assert result is None From 3ded3061d20ff35e68f84d855ac56438b283e91f Mon Sep 17 00:00:00 2001 From: Alex Bednarek Date: Mon, 29 Jul 2024 14:20:22 -0700 Subject: [PATCH 11/12] API: minor comments/cleanup. --- api/datalake_api/v0.py | 5 +++++ api/tests/test_archive_querier.py | 1 - 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/api/datalake_api/v0.py b/api/datalake_api/v0.py index 3520512..c1aad72 100644 --- a/api/datalake_api/v0.py +++ b/api/datalake_api/v0.py @@ -49,6 +49,11 @@ def get_dynamodb(): def get_archive_querier(): + """ + we use global var here along with reset_archive_querier() + to allow test fixture to differentiate between + ArchiveQuerier vs HttpQuerier fixtures. + """ global _archive_querier if not _archive_querier: diff --git a/api/tests/test_archive_querier.py b/api/tests/test_archive_querier.py index e5aca16..d9cf7da 100644 --- a/api/tests/test_archive_querier.py +++ b/api/tests/test_archive_querier.py @@ -15,7 +15,6 @@ import pytest from datalake_api.v0 import reset_archive_querier from datalake_api import settings -# from flask import current_app as app from datalake.common import DatalakeRecord from datalake.tests import generate_random_metadata import simplejson as json From fb7b8ec4c23896575073509fdbaab03506cc7c95 Mon Sep 17 00:00:00 2001 From: Alex Bednarek Date: Mon, 29 Jul 2024 17:17:24 -0700 Subject: [PATCH 12/12] API: minor comments/cleanup. --- api/tests/test_archive_querier.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/tests/test_archive_querier.py b/api/tests/test_archive_querier.py index d9cf7da..399d1d7 100644 --- a/api/tests/test_archive_querier.py +++ b/api/tests/test_archive_querier.py @@ -136,7 +136,7 @@ def query_latest(self, what, where): 'http-latest', 'http-default' ]) -def querier(monkeypatch, request, dynamodb): +def querier(request, dynamodb): reset_archive_querier() querier_type, table_usage = request.param