Skip to content

Commit

Permalink
API: Feature flag enables query from latest_table.
Browse files Browse the repository at this point in the history
  • Loading branch information
ABPLMC committed Jul 26, 2024
1 parent a0ed1db commit c13a6cd
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 32 deletions.
8 changes: 4 additions & 4 deletions api/datalake_api/querier.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,10 @@ class ArchiveQuerier(object):
def __init__(self, table_name,
latest_table_name=None,
use_latest_table=None,
latest_max_lookback=30,
dynamodb=None):
self.table_name = table_name
self.latest_table_name = latest_table_name
self.use_latest_table = use_latest_table
self.latest_max_lookback = latest_max_lookback
self.dynamodb = dynamodb


Expand Down Expand Up @@ -349,14 +347,15 @@ def _latest_table(self):
return self.dynamodb.Table(self.latest_table_name)

def query_latest(self, what, where, lookback_days=DEFAULT_LOOKBACK_DAYS):
log.info('Inside query_latest method')
if self.use_latest_table:
log.info('inside use_latest_table=TRUE')
response = self._latest_table.query(
KeyConditionExpression=Key('what_where_key').eq(f'{what}:{where}')
)
items = response.get('Items', [])
if not items and self.latest_max_lookback > 0:

if not items:
log.info('Falling back to default latest query')
return self._default_latest(what, where, lookback_days)

latest_item = items[0]
Expand Down Expand Up @@ -389,6 +388,7 @@ def _get_all_records_in_bucket(self, bucket, **kwargs):
return records

def _default_latest(self, what, where, lookback_days=DEFAULT_LOOKBACK_DAYS):
log.info("Using default latest behavior")
current = int(time.time() * 1000)
end = current - lookback_days * _ONE_DAY_MS
while current >= end:
Expand Down
4 changes: 1 addition & 3 deletions api/datalake_api/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

DYNAMODB_TABLE = 'test'
DYNAMODB_LATEST_TABLE = 'test_latest'
DATALAKE_USE_LATEST_TABLE = \
os.environ.get("DATALAKE_USE_LATEST_TABLE", "false").lower() == "true"
LATEST_MAX_LOOKBACK = int(os.environ.get("LATEST_MAX_LOOKBACK", "30"))
DATALAKE_USE_LATEST_TABLE = False

AWS_REGION = 'us-west-2'
AWS_ACCESS_KEY_ID = None
Expand Down
17 changes: 12 additions & 5 deletions api/datalake_api/v0.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

v0 = flask.Blueprint('v0', __name__, url_prefix='/v0')

_archive_querier = None

def _get_aws_kwargs():
kwargs = dict(
Expand All @@ -48,17 +49,23 @@ def get_dynamodb():


def get_archive_querier():
if not hasattr(app, 'archive_querier'):
global _archive_querier

if not _archive_querier:
table_name = app.config.get('DYNAMODB_TABLE')
latest_table_name = app.config.get('DYNAMODB_LATEST_TABLE')
use_latest_table = app.config.get('DATALAKE_USE_LATEST_TABLE')
latest_max_lookback = app.config.get("LATEST_MAX_LOOKBACK")
app.archive_querier = ArchiveQuerier(table_name,
_archive_querier = ArchiveQuerier(table_name,
latest_table_name,
use_latest_table,
latest_max_lookback,
dynamodb=get_dynamodb())
return app.archive_querier
return _archive_querier


def reset_archive_querier():
"""FOR TESTING PURPOSES ONLY"""
global _archive_querier
_archive_querier = None


@v0.route('/archive/')
Expand Down
6 changes: 4 additions & 2 deletions api/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@


def get_client():
from datalake_api import settings
datalake_api.app.config.from_object(settings)

datalake_api.app.config['TESTING'] = True
datalake_api.app.config['AWS_ACCESS_KEY_ID'] = 'abc'
datalake_api.app.config['AWS_SECRET_ACCESS_KEY'] = '123'
Expand Down Expand Up @@ -181,8 +184,7 @@ def _populate_table(table, records):
for r in records:
batch.put_item(Item=r)

# Adding latest table logic so latest table will be created and records will populate it
# Once that's possible, we will simply query the latest_table for what:where, no bucket logic

@pytest.fixture
def table_maker(request, dynamodb):

Expand Down
54 changes: 36 additions & 18 deletions api/tests/test_archive_querier.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.

import os
import pytest
from datalake_api.v0 import reset_archive_querier
from datalake_api import settings
# from flask import current_app as app
from datalake.common import DatalakeRecord
from datalake.tests import generate_random_metadata
import simplejson as json
Expand Down Expand Up @@ -124,27 +127,35 @@ def query_latest(self, what, where):



"""
Incorporate LATEST_MAX_LOOKBACK HERE
"""
@pytest.fixture(params=[
('archive', 'use_latest'),
('archive', 'use_default'),
('http', 'use_latest'),
('http', 'use_default')
], ids=['archive_latest', 'archive-default', 'http-latest', 'http-default'])
], ids=['archive-latest',
'archive-default',
'http-latest',
'http-default'
])
def querier(monkeypatch, request, dynamodb):

reset_archive_querier()
querier_type, table_usage = request.param

if table_usage == 'use_latest':
monkeypatch.setenv('DATALAKE_USE_LATEST_TABLE', 'true')
settings.DATALAKE_USE_LATEST_TABLE = True
else:
monkeypatch.setenv('DATALAKE_USE_LATEST_TABLE', 'false')
settings.DATALAKE_USE_LATEST_TABLE= False

if querier_type == 'http':
return HttpQuerier('test', 'test_latest', dynamodb=dynamodb)
return HttpQuerier('test',
'test_latest',
dynamodb=dynamodb)
else:
return ArchiveQuerier('test', 'test_latest', dynamodb=dynamodb)
return ArchiveQuerier('test',
'test_latest',
use_latest_table=True if table_usage == 'use_latest' else False,
dynamodb=dynamodb)

def in_url(result, part):
url = result['url']
Expand Down Expand Up @@ -567,17 +578,24 @@ def test_latest_table_query(table_maker, querier, record_maker):
what='boo',
where='hoo{}'.format(i))
table_maker(records)
querier.use_latest_table = True
result = querier.query_latest('boo', 'hoo0')
_validate_latest_result(result, what='boo', where='hoo0')

"""
Write tests:
With setup of latest table records,
with DYNAMODB_LATEST_TABLE set, with DATALAKE_USE_LATEST_TABLE=true, with LATEST_MAX_LOOKBACK=0, record is found

With setup of latest table records,
with DYNAMODB_LATEST_TABLE set, with DATALAKE_USE_LATEST_TABLE=false, with LATEST_MAX_LOOKBACK=0, record is not found
def test_query_latest_just_latest_table(table_maker, querier, record_maker):
use_latest_from_env = settings.DATALAKE_USE_LATEST_TABLE
table = table_maker([])[1]
for i in range(3):
record = record_maker(what='meow',
where=f'tree',
path='/{}'.format(i))

# only inserting into latest table
table.put_item(Item=record[0])
time.sleep(1.01)

2-4
"""
result = querier.query_latest('meow', 'tree')
if use_latest_from_env:
_validate_latest_result(result, what='meow', where='tree')
else:
assert result is None

0 comments on commit c13a6cd

Please sign in to comment.