Skip to content

Commit

Permalink
Merge pull request #89 from planetlabs/proto-query-latest-table
Browse files Browse the repository at this point in the history
Prototype querying latest table.
  • Loading branch information
ABPLMC authored Jul 30, 2024
2 parents d531092 + fb7b8ec commit 6932fb0
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 25 deletions.
1 change: 1 addition & 0 deletions api/datalake_api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from datalake_api import settings


logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__)


Expand Down
54 changes: 44 additions & 10 deletions api/datalake_api/querier.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@

from memoized_property import memoized_property
from datalake.common import DatalakeRecord
from boto3.dynamodb.conditions import Key
import base64
import json
import time
import os

import logging
log = logging.getLogger(__name__)


'''the maximum number of results to return to the user
Expand Down Expand Up @@ -172,9 +177,15 @@ def _unpack(self, result):

class ArchiveQuerier(object):

def __init__(self, table_name, dynamodb=None):
def __init__(self, table_name,
latest_table_name=None,
use_latest_table=None,
dynamodb=None):
self.table_name = table_name
self.latest_table_name = latest_table_name
self.use_latest_table = use_latest_table
self.dynamodb = dynamodb


def query_by_work_id(self, work_id, what, where=None, cursor=None):
kwargs = self._prepare_work_id_kwargs(work_id, what)
Expand Down Expand Up @@ -330,18 +341,28 @@ def _cursor_for_time_query(self, response, results, current_bucket):
@memoized_property
def _table(self):
return self.dynamodb.Table(self.table_name)

@memoized_property
def _latest_table(self):
return self.dynamodb.Table(self.latest_table_name)

def query_latest(self, what, where, lookback_days=DEFAULT_LOOKBACK_DAYS):
current = int(time.time() * 1000)
end = current - lookback_days * _ONE_DAY_MS
while current >= end:
bucket = current/DatalakeRecord.TIME_BUCKET_SIZE_IN_MS
r = self._get_latest_record_in_bucket(bucket, what, where)
if r is not None:
return r
current -= _ONE_DAY_MS
if self.use_latest_table:
log.info('inside use_latest_table=TRUE')
response = self._latest_table.query(
KeyConditionExpression=Key('what_where_key').eq(f'{what}:{where}')
)
items = response.get('Items', [])

if not items:
log.info('Falling back to default latest query')
return self._default_latest(what, where, lookback_days)

return None
latest_item = items[0]
return dict(url=latest_item['url'], metadata=latest_item['metadata'])

else:
return self._default_latest(what, where, lookback_days)

def _get_latest_record_in_bucket(self, bucket, what, where):
kwargs = self._prepare_time_bucket_kwargs(bucket, what)
Expand All @@ -365,3 +386,16 @@ def _get_all_records_in_bucket(self, bucket, **kwargs):
break
kwargs['ExclusiveStartKey'] = response['LastEvaluatedKey']
return records

def _default_latest(self, what, where, lookback_days=DEFAULT_LOOKBACK_DAYS):
log.info("Using default latest behavior")
current = int(time.time() * 1000)
end = current - lookback_days * _ONE_DAY_MS
while current >= end:
bucket = current/DatalakeRecord.TIME_BUCKET_SIZE_IN_MS
r = self._get_latest_record_in_bucket(bucket, what, where)
if r is not None:
return r
current -= _ONE_DAY_MS

return None
5 changes: 5 additions & 0 deletions api/datalake_api/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,14 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import os

# default settings

DYNAMODB_TABLE = 'test'
DYNAMODB_LATEST_TABLE = 'test_latest'
DATALAKE_USE_LATEST_TABLE = False

AWS_REGION = 'us-west-2'
AWS_ACCESS_KEY_ID = None
AWS_SECRET_ACCESS_KEY = None
Expand Down
24 changes: 21 additions & 3 deletions api/datalake_api/v0.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

v0 = flask.Blueprint('v0', __name__, url_prefix='/v0')

_archive_querier = None

def _get_aws_kwargs():
kwargs = dict(
Expand All @@ -48,11 +49,28 @@ def get_dynamodb():


def get_archive_querier():
if not hasattr(app, 'archive_querier'):
"""
we use global var here along with reset_archive_querier()
to allow test fixture to differentiate between
ArchiveQuerier vs HttpQuerier fixtures.
"""
global _archive_querier

if not _archive_querier:
table_name = app.config.get('DYNAMODB_TABLE')
app.archive_querier = ArchiveQuerier(table_name,
latest_table_name = app.config.get('DYNAMODB_LATEST_TABLE')
use_latest_table = app.config.get('DATALAKE_USE_LATEST_TABLE')
_archive_querier = ArchiveQuerier(table_name,
latest_table_name,
use_latest_table,
dynamodb=get_dynamodb())
return app.archive_querier
return _archive_querier


def reset_archive_querier():
"""FOR TESTING PURPOSES ONLY"""
global _archive_querier
_archive_querier = None


@v0.route('/archive/')
Expand Down
47 changes: 41 additions & 6 deletions api/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@


def get_client():
from datalake_api import settings
datalake_api.app.config.from_object(settings)

datalake_api.app.config['TESTING'] = True
datalake_api.app.config['AWS_ACCESS_KEY_ID'] = 'abc'
datalake_api.app.config['AWS_SECRET_ACCESS_KEY'] = '123'
Expand Down Expand Up @@ -107,6 +110,20 @@ def tear_down():
}
]

latest_attribute_definitions = [
{
'AttributeName': 'what_where_key',
'AttributeType': 'S'
}
]

latest_key_schema = [
{
'AttributeName': 'what_where_key',
'KeyType': 'HASH'
}
]

global_secondary = [{
'IndexName': 'work-id-index',
'KeySchema': [
Expand Down Expand Up @@ -140,19 +157,24 @@ def _delete_table(table):
raise e


def _create_table(dynamodb, table_name):
def _create_table(dynamodb,
table_name,
attribute_definitions,
key_schema,
global_secondary=None):
table = dynamodb.Table(table_name)
_delete_table(table)
kwargs = dict(
TableName=table_name,
AttributeDefinitions=attribute_definitions,
KeySchema=key_schema,
GlobalSecondaryIndexes=global_secondary,
ProvisionedThroughput={
'ReadCapacityUnits': 5,
'WriteCapacityUnits': 5
}
)
if global_secondary:
kwargs['GlobalSecondaryIndexes'] = global_secondary
dynamodb.create_table(**kwargs)
return dynamodb.Table(table_name)

Expand All @@ -168,14 +190,20 @@ def table_maker(request, dynamodb):

def maker(records):
table_name = 'test'
table = _create_table(dynamodb, table_name)
latest_table_name = 'test_latest'

table = _create_table(dynamodb, table_name, attribute_definitions, key_schema, global_secondary)
latest_table = _create_table(dynamodb, latest_table_name, latest_attribute_definitions, latest_key_schema)

_populate_table(latest_table, records)
_populate_table(table, records)

def tear_down():
_delete_table(table)
request.addfinalizer(tear_down)
_delete_table(latest_table)

return table
request.addfinalizer(tear_down)
return (table, latest_table)

return maker

Expand All @@ -189,6 +217,13 @@ def maker(**kwargs):
key = '/'.join([str(v) for v in kwargs.values()])
url = 's3://datalake-test/' + key
s3_file_from_metadata(url, m)
return DatalakeRecord.list_from_metadata(url, m)
records = DatalakeRecord.list_from_metadata(url, m)

what = kwargs.get('what')
where = kwargs.get('where')
for record in records:
record['what_where_key'] = f"{what}:{where}"

return records

return maker
82 changes: 76 additions & 6 deletions api/tests/test_archive_querier.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.

import os
import pytest
from datalake_api.v0 import reset_archive_querier
from datalake_api import settings
from datalake.common import DatalakeRecord
from datalake.tests import generate_random_metadata
import simplejson as json
Expand Down Expand Up @@ -123,11 +125,36 @@ def query_latest(self, what, where):
return HttpRecord(**record)


@pytest.fixture(params=[ArchiveQuerier, HttpQuerier],
ids=['archive_querier', 'http'])

@pytest.fixture(params=[
('archive', 'use_latest'),
('archive', 'use_default'),
('http', 'use_latest'),
('http', 'use_default')
], ids=['archive-latest',
'archive-default',
'http-latest',
'http-default'
])
def querier(request, dynamodb):
return request.param('test', dynamodb=dynamodb)

reset_archive_querier()
querier_type, table_usage = request.param

if table_usage == 'use_latest':
settings.DATALAKE_USE_LATEST_TABLE = True
else:
settings.DATALAKE_USE_LATEST_TABLE= False

if querier_type == 'http':
return HttpQuerier('test',
'test_latest',
dynamodb=dynamodb)
else:
return ArchiveQuerier('test',
'test_latest',
use_latest_table=True if table_usage == 'use_latest' else False,
dynamodb=dynamodb)

def in_url(result, part):
url = result['url']
Expand Down Expand Up @@ -407,6 +434,10 @@ def test_no_end(table_maker, querier, s3_file_from_metadata):
url = 's3://datalake-test/' + m['id']
s3_file_from_metadata(url, m)
records = DatalakeRecord.list_from_metadata(url, m)
for record in records:
what = record.get('what')
where = record.get('where')
record['what_where_key'] = f'{what}:{where}'
table_maker(records)
results = querier.query_by_time(m['start'], m['start'] + 1, m['what'])
assert len(results) == 1
Expand All @@ -419,7 +450,12 @@ def test_no_end_exclusion(table_maker, querier, s3_file_from_metadata):
url = 's3://datalake-test/' + m['id']
s3_file_from_metadata(url, m)
records = DatalakeRecord.list_from_metadata(url, m)
for record in records:
what = record.get('what')
where = record.get('where')
record['what_where_key'] = f'{what}:{where}'
table_maker(records)

results = querier.query_by_time(m['start'] + 1, m['start'] + 2, m['what'])
assert len(results) == 0

Expand Down Expand Up @@ -478,8 +514,7 @@ def test_latest_creation_time_breaks_tie(table_maker, querier,
start = bucket * DatalakeRecord.TIME_BUCKET_SIZE_IN_MS
interval = DatalakeRecord.TIME_BUCKET_SIZE_IN_MS/150
end = start + interval
table = table_maker([])

table = table_maker([])[0]
for i in range(3):
record = record_maker(start=start,
end=end,
Expand Down Expand Up @@ -528,3 +563,38 @@ def test_2x_max_results_in_one_bucket(table_maker, querier, record_maker):
pages = get_all_pages(querier.query_by_time, [start, end, 'boo'])
results = consolidate_pages(pages)
assert len(results) == MAX_RESULTS * 2


def test_latest_table_query(table_maker, querier, record_maker):
now = int(time.time() * 1000)
records = []
bucket = int(now/DatalakeRecord.TIME_BUCKET_SIZE_IN_MS)
start = bucket * DatalakeRecord.TIME_BUCKET_SIZE_IN_MS
end = start
for i in range(MAX_RESULTS):
records += record_maker(start=start,
end=end,
what='boo',
where='hoo{}'.format(i))
table_maker(records)
result = querier.query_latest('boo', 'hoo0')
_validate_latest_result(result, what='boo', where='hoo0')


def test_query_latest_just_latest_table(table_maker, querier, record_maker):
use_latest_from_env = settings.DATALAKE_USE_LATEST_TABLE
table = table_maker([])[1]
for i in range(3):
record = record_maker(what='meow',
where=f'tree',
path='/{}'.format(i))

# only inserting into latest table
table.put_item(Item=record[0])
time.sleep(1.01)

result = querier.query_latest('meow', 'tree')
if use_latest_from_env:
_validate_latest_result(result, what='meow', where='tree')
else:
assert result is None
4 changes: 4 additions & 0 deletions api/tests/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ def maker(content, metadata):
s3_file_maker('datalake-test', path, content, metadata)
url = 's3://datalake-test/' + path
records = DatalakeRecord.list_from_metadata(url, metadata)
for record in records:
what = record.get('what')
where = record.get('where')
record['what_where_key'] = f"{what}:{where}"
table_maker(records)

return maker
Expand Down

0 comments on commit 6932fb0

Please sign in to comment.