Skip to content

Commit

Permalink
ArchiveQuerier utilizing use_latest flag.
Browse files Browse the repository at this point in the history
  • Loading branch information
ABPLMC committed May 16, 2024
1 parent 5368ebf commit 483febf
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 56 deletions.
50 changes: 30 additions & 20 deletions api/datalake_api/querier.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,10 @@ def _unpack(self, result):

class ArchiveQuerier(object):

def __init__(self, table_name, dynamodb=None):
def __init__(self, table_name, dynamodb=None, use_latest=None):
self.table_name = table_name
self.dynamodb = dynamodb
self.use_latest = use_latest

def query_by_work_id(self, work_id, what, where=None, cursor=None):
kwargs = self._prepare_work_id_kwargs(work_id, what)
Expand Down Expand Up @@ -331,18 +332,25 @@ def _cursor_for_time_query(self, response, results, current_bucket):
@memoized_property
def _table(self):
return self.dynamodb.Table(self.table_name)

@memoized_property
def _latest_table(self):
return self.dynamodb.Table('test_latest')

def query_latest(self, what, where, lookback_days=DEFAULT_LOOKBACK_DAYS):
current = int(time.time() * 1000)
end = current - lookback_days * _ONE_DAY_MS
while current >= end:
bucket = current/DatalakeRecord.TIME_BUCKET_SIZE_IN_MS
r = self._get_latest_record_in_bucket(bucket, what, where)
if r is not None:
return r
current -= _ONE_DAY_MS
def query_latest(self, what, where, lookback_days=DEFAULT_LOOKBACK_DAYS, use_latest=False):
if self.use_latest:
response = self._latest_table.query(
KeyConditionExpression=Key('what_where_key').eq(f'{what}:{where}')
)
items = response.get('Items', [])
if not items:
return self._default_latest(what, where, lookback_days)

latest_item = items[0]
return dict(url=latest_item['url'], metadata=latest_item['metadata'])

return None
else:
return self._default_latest(what, where, lookback_days)

def _get_latest_record_in_bucket(self, bucket, what, where):
kwargs = self._prepare_time_bucket_kwargs(bucket, what)
Expand All @@ -367,12 +375,14 @@ def _get_all_records_in_bucket(self, bucket, **kwargs):
kwargs['ExclusiveStartKey'] = response['LastEvaluatedKey']
return records

def query_latest_table(self, what, where):
response = self._table.query(
KeyConditionExpression=Key('what_where_key').eq(f'{what}:{where}')
)
items = response.get('Items', [])
if not items:
return None
latest_item = items[0]
return dict(url=latest_item['url'], metadata=latest_item['metadata'])
def _default_latest(self, what, where, lookback_days=DEFAULT_LOOKBACK_DAYS):
current = int(time.time() * 1000)
end = current - lookback_days * _ONE_DAY_MS
while current >= end:
bucket = current/DatalakeRecord.TIME_BUCKET_SIZE_IN_MS
r = self._get_latest_record_in_bucket(bucket, what, where)
if r is not None:
return r
current -= _ONE_DAY_MS

return None
33 changes: 15 additions & 18 deletions api/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ def _create_table(dynamodb,


def _populate_table(table, records):
print(f'attempting to populate {table}')
with table.batch_writer() as batch:
for r in records:
batch.put_item(Item=r)
Expand All @@ -187,46 +186,44 @@ def _populate_table(table, records):
@pytest.fixture
def table_maker(request, dynamodb):

def maker(records, include_latest_key=False):
old_table_name = 'test'
def maker(records, use_latest=False):
table_name = 'test'
latest_table_name = 'test_latest'
latest_table = None

old_table = _create_table(dynamodb, old_table_name, attribute_definitions, key_schema, global_secondary)
_populate_table(old_table, records)
table = _create_table(dynamodb, table_name, attribute_definitions, key_schema, global_secondary)

if include_latest_key:
latest_table = _create_table(dynamodb, latest_table_name, latest_attribute_definitions, latest_key_schema)
_populate_table(latest_table, records)
latest_table = _create_table(dynamodb, latest_table_name, latest_attribute_definitions, latest_key_schema)
_populate_table(latest_table, records)

_populate_table(table, records)

def tear_down():
_delete_table(old_table)
if include_latest_key:
_delete_table(latest_table)
_delete_table(table)
_delete_table(latest_table)

request.addfinalizer(tear_down)

return old_table, latest_table
return (table, latest_table)

return maker


@pytest.fixture
def record_maker(s3_file_from_metadata):

def maker(include_latest_key=False, **kwargs):
def maker(**kwargs):
m = generate_random_metadata()
m.update(**kwargs)
key = '/'.join([str(v) for v in kwargs.values()])
url = 's3://datalake-test/' + key
s3_file_from_metadata(url, m)
records = DatalakeRecord.list_from_metadata(url, m)

if include_latest_key:
what = kwargs.get('what')
where = kwargs.get('where')
for record in records:
record['what_where_key'] = f"{what}:{where}"
what = kwargs.get('what')
where = kwargs.get('where')
for record in records:
record['what_where_key'] = f"{what}:{where}"

return records

Expand Down
34 changes: 16 additions & 18 deletions api/tests/test_archive_querier.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,8 @@ def query_latest(self, what, where):

@pytest.fixture(params=[ArchiveQuerier, HttpQuerier],
ids=['archive_querier', 'http'])
def querier(request):
def create_querier(dynamodb, table_name):
return request.param(table_name, dynamodb=dynamodb)
return create_querier

def querier(request, dynamodb):
return request.param('test', dynamodb=dynamodb, use_latest=True)

def in_url(result, part):
url = result['url']
Expand Down Expand Up @@ -409,6 +406,10 @@ def test_no_end(table_maker, querier, s3_file_from_metadata):
url = 's3://datalake-test/' + m['id']
s3_file_from_metadata(url, m)
records = DatalakeRecord.list_from_metadata(url, m)
for record in records:
what = record.get('what')
where = record.get('where')
record['what_where_key'] = f'{what}:{where}'
table_maker(records)
results = querier.query_by_time(m['start'], m['start'] + 1, m['what'])
assert len(results) == 1
Expand All @@ -421,7 +422,12 @@ def test_no_end_exclusion(table_maker, querier, s3_file_from_metadata):
url = 's3://datalake-test/' + m['id']
s3_file_from_metadata(url, m)
records = DatalakeRecord.list_from_metadata(url, m)
for record in records:
what = record.get('what')
where = record.get('where')
record['what_where_key'] = f'{what}:{where}'
table_maker(records)

results = querier.query_by_time(m['start'] + 1, m['start'] + 2, m['what'])
assert len(results) == 0

Expand Down Expand Up @@ -480,8 +486,7 @@ def test_latest_creation_time_breaks_tie(table_maker, querier,
start = bucket * DatalakeRecord.TIME_BUCKET_SIZE_IN_MS
interval = DatalakeRecord.TIME_BUCKET_SIZE_IN_MS/150
end = start + interval
table = table_maker([])

table = table_maker([])[0]
for i in range(3):
record = record_maker(start=start,
end=end,
Expand Down Expand Up @@ -531,17 +536,10 @@ def test_2x_max_results_in_one_bucket(table_maker, querier, record_maker):
results = consolidate_pages(pages)
assert len(results) == MAX_RESULTS * 2

"""
Will have to go through all of the tests associated with
latest and correctly query from
the latest table that was created.
"""

def test_latest_table_query(table_maker, querier, record_maker):
now = int(time.time() * 1000)
records = record_maker(include_latest_key=True, what='foo', where='boo')
_, latest_table = table_maker(records)

querier_instance = querier(dynamodb=latest_table.dynamodb, table_name=latest_table.table_name)
result = querier_instance.query_latest_table('foo', 'boo')
records = record_maker(what='foo', where='boo')
table_maker(records)
querier.use_latest = True
result = querier.query_latest('foo', 'boo')
_validate_latest_result(result, what='foo', where='boo')

0 comments on commit 483febf

Please sign in to comment.