Skip to content

Commit

Permalink
Merge pull request #99 from planetlabs/latest-ingestion-bugfix
Browse files Browse the repository at this point in the history
Ingester: Conditional put utilizes less than or equal to for records.…
  • Loading branch information
ABPLMC authored Oct 15, 2024
2 parents 392760c + f592436 commit f61854d
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 22 deletions.
1 change: 0 additions & 1 deletion api/datalake_api/querier.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import base64
import json
import time
import os

from datetime import datetime, timedelta
import decimal
Expand Down
1 change: 0 additions & 1 deletion api/tests/test_archive_querier.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,6 @@ def test_query_latest_future_record_exceeds_lookforward(table_maker, querier, re
record = record_maker(what='meow', where='tree', start=future_start, end=future_end)

default_table, latest_table = table_maker([])
print(default_table.__dict__, type(default_table))

default_table.put_item(Item=record[0])
latest_table.put_item(Item=record[0])
Expand Down
2 changes: 1 addition & 1 deletion ingester/datalake_ingester/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def store_latest(self, record):
Record must utilize AttributeValue syntax
for the conditional put.
"""
condition_expression = " attribute_not_exists(what_where_key) OR metadata.#metadata_start < :new_start"
condition_expression = " attribute_not_exists(what_where_key) OR metadata.#metadata_start <= :new_start"
expression_attribute_values = {
':new_start': {'N': str(record['metadata']['start'])}
}
Expand Down
91 changes: 72 additions & 19 deletions ingester/tests/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,15 @@ def test_insert_new_record(dynamodb_latest_table, dynamodb_connection):
'create_time': 1500000000000
}

try:
storage.store_latest(new_record)
except Exception as e:
print(f"Failed to store record: {str(e)}")
storage.store_latest(new_record)

stored_record = dynamodb_latest_table.get_item(
what_where_key=new_record['what_where_key']
)
assert stored_record['metadata']['start'] == new_record['metadata']['start']


def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dynamodb_connection):
storage = DynamoDBStorage(connection=dynamodb_connection)
storage.latest_table_name = 'latest'

def provide_test_records():
file1 = {
'what_where_key': 'syslog:ground_server2',
'time_index_key': '15219:zlcdzvawsp',
Expand All @@ -67,7 +61,7 @@ def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dyna
'work_id': 'abc-123',
'where': 'ground_server2',
'what': 'syslog',
'id': '34fb2d1ec54245c7a57e29ed5a6ea9b2',
'id': 'file1',
'hash': 'b4f2d8de24af342643d5b78a8f2b9b88'
},
'url': 's3://existingfile/url',
Expand All @@ -86,7 +80,7 @@ def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dyna
'work_id': 'abc-123',
'where': 'ground_server2',
'what': 'syslog',
'id': '45gb2d1ec54245c7a57e29ed5a6ea9b2',
'id': 'file2',
'hash': 'c5g3d8de24af342643d5b78a8f2b9b88'

},
Expand All @@ -106,14 +100,22 @@ def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dyna
'work_id': 'foo-bizz',
'where': 's114',
'what': 'syslog',
'id': '34fb2d1ec54245c7a57e29ed5a6ea9b2',
'id': 'file3',
'hash': 'b4f2d8de24af342643d5b78a8f2b9b88'
},
'url': 's3://datalake/path_to_file1',
'create_time': 1414877177402,
'size': 1048576
}

return (file1, file2, file3)


def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dynamodb_connection):
storage = DynamoDBStorage(connection=dynamodb_connection)
storage.latest_table_name = 'latest'
file1, file2, file3 = provide_test_records()

storage.store_latest(file3)
storage.store_latest(file1)
storage.store_latest(file2) # same what:where, but should replace file1 b/c newer
Expand All @@ -127,19 +129,70 @@ def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dyna
assert len(records) == 2
assert file2 == res


def test_store_conditional_put_newest_first(dynamodb_latest_table, dynamodb_connection):
storage = DynamoDBStorage(connection=dynamodb_connection)
storage.latest_table_name = 'latest'
file1, file2, file3 = provide_test_records()

storage.store_latest(file3)
storage.store_latest(file2)
storage.store_latest(file1)

records = [dict(i) for i in dynamodb_latest_table.scan()]
query_what_where = 'syslog:ground_server2'

res = dict(dynamodb_latest_table.get_item(what_where_key=query_what_where))
assert res['metadata']['id'] != file1['metadata']['id']
assert res['metadata']['id'] == file2['metadata']['id']

storage.store_latest(file1)
storage.store_latest(file1)
storage.store_latest(file2)
storage.store_latest(file3)
res = dict(dynamodb_latest_table.get_item(what_where_key=query_what_where))
assert res['metadata']['start'] == file2['metadata']['start']


def test_verify_replace_record_same_start(dynamodb_latest_table, dynamodb_connection):
storage = DynamoDBStorage(connection=dynamodb_connection)
storage.latest_table_name = 'latest'

record1 = {
'what_where_key': 'syslog:ground_server2',
'time_index_key': '15225:newlog',
'range_key': 'new_server:12345abcde',
'metadata': {
'version': 1,
'start': 1500000000000,
'end': 1500000000010,
'path': '/var/log/syslog.2',
'work_id': None,
'where': 'ground_server2',
'what': 'syslog',
'id': '34fb2d1ec54245c7a57e29ed5a6ea9b2',
'hash': 'b4f2d8de24af342643d5b78a8f2b9b88'
},
'url': 's3://newfile/url',
'create_time': 1500000000000
}

record2 = {
'what_where_key': 'syslog:ground_server2',
'time_index_key': '15225:newlog',
'range_key': 'new_server:12345abcde',
'metadata': {
'version': 1,
'start': 1500000000000,
'end': 1500000000010,
'path': '/var/log/syslog.2',
'work_id': None,
'where': 'ground_server2',
'what': 'syslog',
'id': 'abc123',
'hash': 'b4f2d8de24af342643d5b78a8f2b9b88'
},
'url': 's3://newfile/url',
'create_time': 1500000000000
}

storage.store_latest(record1)
storage.store_latest(record2)

stored_record = dynamodb_latest_table.get_item(
what_where_key="syslog:ground_server2"
)
assert stored_record['metadata']['start'] == record1['metadata']['start']
assert stored_record['metadata']['id'] == "abc123"

0 comments on commit f61854d

Please sign in to comment.