diff --git a/ingester/datalake_ingester/ingester.py b/ingester/datalake_ingester/ingester.py index 1524729..7058b53 100644 --- a/ingester/datalake_ingester/ingester.py +++ b/ingester/datalake_ingester/ingester.py @@ -69,28 +69,23 @@ def _make_record(self, r): class Ingester(object): - def __init__(self, storage, latest_storage=None, queue=None, reporter=None): + def __init__(self, storage, queue=None, reporter=None): self.storage = storage - self.latest_storage = latest_storage self.queue = queue self.reporter = reporter @classmethod def from_config(cls): storage = DynamoDBStorage.from_config() - if os.environ.get("DATALAKE_USE_LATEST_TABLE", False): - latest_storage = DynamoDBStorage.from_config(use_latest=True) queue = SQSQueue.from_config() reporter = SNSReporter.from_config() - return cls(storage, latest_storage=latest_storage, queue=queue, reporter=reporter) + return cls(storage, queue=queue, reporter=reporter) def ingest(self, url): '''ingest the metadata associated with the given url''' records = DatalakeRecord.list_from_url(url) for r in records: self.storage.store(r) - if self.latest_storage: - self.latest_storage.store_latest(r) def handler(self, msg): ir = IngesterReport().start() diff --git a/ingester/datalake_ingester/storage.py b/ingester/datalake_ingester/storage.py index 7f013b3..22cc072 100644 --- a/ingester/datalake_ingester/storage.py +++ b/ingester/datalake_ingester/storage.py @@ -26,11 +26,10 @@ class DynamoDBStorage(object): '''store datalake records in a dynamoDB table''' - def __init__(self, table_name=None, latest_table=None, connection=None): + def __init__(self, table_name=None, latest_table_name=None, connection=None): self.table_name = table_name self.latest_table_name = os.environ.get("DATALAKE_LATEST_TABLE", - f"{latest_table}") - self.use_latest = os.environ.get("DATALAKE_USE_LATEST_TABLE", False) + False) self._prepare_connection(connection) self.logger = logging.getLogger('storage') @@ -60,7 +59,7 @@ def _latest_table(self): return Table(self.latest_table_name, connection=self._connection) def store(self, record): - if self.use_latest: + if self.latest_table_name: self.store_latest(record) try: self._table.put_item(data=record) diff --git a/ingester/tests/test_ingester.py b/ingester/tests/test_ingester.py index 67f3332..a4c5445 100644 --- a/ingester/tests/test_ingester.py +++ b/ingester/tests/test_ingester.py @@ -13,10 +13,6 @@ def storage(dynamodb_records_table, dynamodb_connection): return DynamoDBStorage(table_name='records', connection=dynamodb_connection) -@pytest.fixture -def latest_storage(dynamodb_latest_table, dynamodb_connection): - return DynamoDBStorage(connection=dynamodb_connection) - @pytest.fixture def random_s3_file_maker(s3_file_from_metadata, random_metadata): @@ -36,11 +32,10 @@ def test_ingest_random(storage, dynamodb_records_table, random_s3_file_maker): for r in records: assert r['metadata'] == metadata -def test_ingest_random_latest(storage, latest_storage, dynamodb_latest_table, random_s3_file_maker): - latest_storage.latest_table_name = 'latest' - latest_storage.use_latest = True +def test_ingest_random_latest(storage, dynamodb_latest_table, random_s3_file_maker): + storage.latest_table_name = 'latest' url, metadata = random_s3_file_maker() - ingester = Ingester(storage, latest_storage=latest_storage) + ingester = Ingester(storage) ingester.ingest(url) records = [dict(r) for r in dynamodb_latest_table.scan()] def convert_metadata(metadata): diff --git a/ingester/tests/test_storage.py b/ingester/tests/test_storage.py index 0cf4470..a697b79 100644 --- a/ingester/tests/test_storage.py +++ b/ingester/tests/test_storage.py @@ -18,7 +18,8 @@ def test_store_duplicate(dynamodb_users_table, dynamodb_connection): assert dict(user) == expected_user def test_insert_new_record(dynamodb_latest_table, dynamodb_connection): - storage = DynamoDBStorage(latest_table='latest', connection=dynamodb_connection) + storage = DynamoDBStorage(connection=dynamodb_connection) + storage.latest_table_name = 'latest' new_record = { 'what_where_key': 'syslog:ground_server2', @@ -51,7 +52,8 @@ def test_insert_new_record(dynamodb_latest_table, dynamodb_connection): def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dynamodb_connection): - storage = DynamoDBStorage(latest_table='latest', connection=dynamodb_connection) + storage = DynamoDBStorage(connection=dynamodb_connection) + storage.latest_table_name = 'latest' file1 = { 'what_where_key': 'syslog:ground_server2',