From d4712f2304c23b0fd7c2aaee4c5ea51b3eeaaefe Mon Sep 17 00:00:00 2001 From: Alex Bednarek Date: Mon, 24 Jun 2024 18:38:10 -0400 Subject: [PATCH] Ingester: call store_latest on proper storage object. --- client/datalake/tests/conftest.py | 2 -- ingester/datalake_ingester/ingester.py | 10 ++++++++-- ingester/datalake_ingester/storage.py | 13 ++++++------- ingester/tests/test_ingester.py | 21 +++++++++++++++++++++ 4 files changed, 35 insertions(+), 11 deletions(-) diff --git a/client/datalake/tests/conftest.py b/client/datalake/tests/conftest.py index 99aac43..797cc35 100644 --- a/client/datalake/tests/conftest.py +++ b/client/datalake/tests/conftest.py @@ -67,8 +67,6 @@ def random_interval(): def random_work_id(): - if random.randint(0, 1): - return None return '{}-{}'.format(random_word(5), random.randint(0, 2**15)) diff --git a/ingester/datalake_ingester/ingester.py b/ingester/datalake_ingester/ingester.py index e79e790..1524729 100644 --- a/ingester/datalake_ingester/ingester.py +++ b/ingester/datalake_ingester/ingester.py @@ -3,6 +3,7 @@ UnsupportedTimeRange, NoSuchDatalakeFile, UnsupportedS3Event from .s3_notification import S3Notification import time +import os import logging from .storage import DynamoDBStorage from .queue import SQSQueue @@ -68,23 +69,28 @@ def _make_record(self, r): class Ingester(object): - def __init__(self, storage, queue=None, reporter=None): + def __init__(self, storage, latest_storage=None, queue=None, reporter=None): self.storage = storage + self.latest_storage = latest_storage self.queue = queue self.reporter = reporter @classmethod def from_config(cls): storage = DynamoDBStorage.from_config() + if os.environ.get("DATALAKE_USE_LATEST_TABLE", False): + latest_storage = DynamoDBStorage.from_config(use_latest=True) queue = SQSQueue.from_config() reporter = SNSReporter.from_config() - return cls(storage, queue=queue, reporter=reporter) + return cls(storage, latest_storage=latest_storage, queue=queue, reporter=reporter) def ingest(self, url): '''ingest the metadata associated with the given url''' records = DatalakeRecord.list_from_url(url) for r in records: self.storage.store(r) + if self.latest_storage: + self.latest_storage.store_latest(r) def handler(self, msg): ir = IngesterReport().start() diff --git a/ingester/datalake_ingester/storage.py b/ingester/datalake_ingester/storage.py index 5ac236f..7f013b3 100644 --- a/ingester/datalake_ingester/storage.py +++ b/ingester/datalake_ingester/storage.py @@ -20,6 +20,7 @@ import os from datalake.common.errors import InsufficientConfiguration import logging +import decimal class DynamoDBStorage(object): @@ -61,12 +62,11 @@ def _latest_table(self): def store(self, record): if self.use_latest: self.store_latest(record) - else: - try: - self._table.put_item(data=record) - except ConditionalCheckFailedException: - # Tolerate duplicate stores - pass + try: + self._table.put_item(data=record) + except ConditionalCheckFailedException: + # Tolerate duplicate stores + pass def update(self, record): self._table.put_item(data=record, overwrite=True) @@ -76,7 +76,6 @@ def store_latest(self, record): note: Record must utilize AttributeValue syntax for the conditional put. """ - condition_expression = " attribute_not_exists(what_where_key) OR metadata.start < :new_start" expression_attribute_values = { ':new_start': {'N': str(record['metadata']['start'])} diff --git a/ingester/tests/test_ingester.py b/ingester/tests/test_ingester.py index cae7800..67f3332 100644 --- a/ingester/tests/test_ingester.py +++ b/ingester/tests/test_ingester.py @@ -13,6 +13,10 @@ def storage(dynamodb_records_table, dynamodb_connection): return DynamoDBStorage(table_name='records', connection=dynamodb_connection) +@pytest.fixture +def latest_storage(dynamodb_latest_table, dynamodb_connection): + return DynamoDBStorage(connection=dynamodb_connection) + @pytest.fixture def random_s3_file_maker(s3_file_from_metadata, random_metadata): @@ -32,6 +36,23 @@ def test_ingest_random(storage, dynamodb_records_table, random_s3_file_maker): for r in records: assert r['metadata'] == metadata +def test_ingest_random_latest(storage, latest_storage, dynamodb_latest_table, random_s3_file_maker): + latest_storage.latest_table_name = 'latest' + latest_storage.use_latest = True + url, metadata = random_s3_file_maker() + ingester = Ingester(storage, latest_storage=latest_storage) + ingester.ingest(url) + records = [dict(r) for r in dynamodb_latest_table.scan()] + def convert_metadata(metadata): + import decimal + return {k: (decimal.Decimal(str(v)) if isinstance(v, (int, float)) else v) for k, v in metadata.items()} + + converted_metadata = convert_metadata(metadata) + + assert len(records) >= 1 + for r in records: + assert r['metadata'] == converted_metadata + def test_ingest_no_end(storage, dynamodb_records_table, s3_file_from_metadata, random_metadata):