Skip to content

Commit

Permalink
Ingester storage conditional put updates with tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
ABPLMC committed Apr 30, 2024
1 parent f806d07 commit db1ce4e
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 84 deletions.
68 changes: 56 additions & 12 deletions ingester/datalake_ingester/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,26 @@
# License for the specific language governing permissions and limitations under
# the License.

import boto3
from boto3.dynamodb.conditions import Attr

from memoized_property import memoized_property
import boto.dynamodb2
from boto.dynamodb2.table import Table
from boto.dynamodb2.exceptions import ConditionalCheckFailedException
import os
from datalake.common.errors import InsufficientConfiguration
import logging


class DynamoDBStorage(object):
'''store datalake records in a dynamoDB table'''

def __init__(self, table_name, connection=None):
def __init__(self, table_name=None, latest_table=None, connection=None):
self.table_name = table_name
self.latest_table_name = os.environ.get("DATALAKE_DNAMODB_LATEST_TABLE",
f"{self.table_name}-latest")
f"{latest_table}")
self.use_latest = os.environ.get("DATALAKE_LATEST_FLAG", False)
self._prepare_connection(connection)
self.logger = logging.getLogger('storage')

@classmethod
def from_config(cls):
Expand All @@ -56,8 +56,7 @@ def _table(self):

@memoized_property
def _latest_table(self):
dynamodb = boto3.resource('dynamodb')
return dynamodb.Table(self.latest_table_name, connection=self._connection)
return Table(self.latest_table_name, connection=self._connection)

def store(self, record):
try:
Expand All @@ -70,12 +69,57 @@ def update(self, record):
self._table.put_item(data=record, overwrite=True)

def store_latest(self, record):
item_attrs = {'time_index_key': record['time_index_key'],
'range_key': record['range_key']}
condition = (Attr('metadata.start').lt(record['metadata']['start']))
"""
note: Record must utilize AttributeValue syntax
for the conditional put.
"""
record = {
'time_index_key': {"S": record['time_index_key']},
'range_key': {"S": record['range_key']},
'metadata': {
'M': {
'start': {
'N': str(record['metadata']['start'])
},
'end': {
'N': str(record['metadata']['end'])
},
'id': {
'S': str(record['metadata']['id'])
},
'path': {
'S': str(record['metadata']['path'])
},
'hash': {
'S': str(record['metadata']['hash'])
},
'version': {
'N': str(record['metadata']['version'])
},
'what': {
'S': str(record['metadata']['what'])
},
'where': {
'S': str(record['metadata']['where'])
},
'work_id': {
'S': str(record['metadata']['work_id'])
}
}
},
'url': {"S": record['url']},
'create_time': {'N': str(record['create_time'])}
}
try:
self._latest_table.put_item(item_attrs,
condition)
self._connection.put_item(
table_name=self.latest_table_name,
item=record,
condition_expression=\
f"attribute_not_exists(metadata.M.start.N) OR metadata.M.start.N < {record['metadata']['M']['start']['N']}",
)
self.logger.info("Record stored successfully.")
except ConditionalCheckFailedException:
pass
self.logger.error("Condition not met, no operation was performed.")
except Exception as e:
self.logger.error(f"Error occurred: {str(e)}")

62 changes: 3 additions & 59 deletions ingester/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,17 @@
import boto.sns
import boto.sqs


import boto3
from boto3 import client
from botocore.exceptions import ClientError

from datalake.tests import * # noqa

from datalake_ingester import SQSQueue

import logging
logging.basicConfig(level=logging.DEBUG)

@pytest.fixture
def dynamodb_connection(aws_connector):
return aws_connector(mock_dynamodb2,
lambda: boto.dynamodb2.connect_to_region('us-west-1'))


@pytest.fixture
def dynamodb_latest_connection(aws_connector):
return aws_connector(mock_dynamodb2,
lambda: boto3.resource('dynamodb',
region_name='us-west-1'))


def _delete_table_if_exists(conn, name):
try:
table = Table(name, connection=conn)
Expand All @@ -47,16 +33,6 @@ def _delete_table_if_exists(conn, name):
if e.status == 400 and e.error_code == 'ResourceNotFoundException':
return
raise e

def _delete_latest_if_exists(dynamodb, name):
try:
table = dynamodb.Table(name)
table.delete()
table.wait_until_not_exists()
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceNotFoundException':
return
raise e


@pytest.fixture
Expand All @@ -79,31 +55,6 @@ def tear_down():
return table_maker


@pytest.fixture
def dynamodb_latest_table_maker(request, dynamodb_latest_connection):

def table_maker(name, key_schema, attributes):
_delete_latest_if_exists(dynamodb_latest_connection, name)
table = dynamodb_latest_connection.create_table(
TableName=name,
KeySchema=key_schema,
AttributeDefinitions=attributes,
ProvisionedThroughput={
'ReadCapacityUnits': 5,
'WriteCapacityUnits': 5
}
)
table.wait_until_exists()

def tear_down():
table.delete()
table.wait_until_not_exists()

request.addfinalizer(tear_down)
return table

return table_maker


@pytest.fixture
def dynamodb_users_table(dynamodb_table_maker):
Expand All @@ -118,16 +69,9 @@ def dynamodb_records_table(dynamodb_table_maker):


@pytest.fixture
def dynamodb_latest_table(dynamodb_latest_table_maker):
schema = [
{'AttributeName': 'time_index_key', 'KeyType': 'HASH'},
{'AttributeName': 'range_key', 'KeyType': 'RANGE'}
]
attributes = [
{'AttributeName': 'time_index_key', 'AttributeType': 'S'},
{'AttributeName': 'range_key', 'AttributeType': 'S'}
]
return dynamodb_latest_table_maker('latest', schema, attributes)
def dynamodb_latest_table(dynamodb_table_maker):
schema = [HashKey('time_index_key'), RangeKey('range_key')]
return dynamodb_table_maker('latest', schema)


@pytest.fixture
Expand Down
77 changes: 64 additions & 13 deletions ingester/tests/test_storage.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,47 @@
from datalake_ingester import DynamoDBStorage
from decimal import Decimal
import boto3


def test_dynamodb_store(dynamodb_users_table, dynamodb_connection):
storage = DynamoDBStorage('users', connection=dynamodb_connection)
expected_user = {'name': 'John', 'last_name': 'Muir'}
storage.store(expected_user)
user = dict(dynamodb_users_table.get_item(name='John', last_name='Muir'))
assert dict(user) == expected_user

def test_store_duplicate(dynamodb_users_table, dynamodb_connection):
storage = DynamoDBStorage('users', connection=dynamodb_connection)
expected_user = {'name': 'Vanilla', 'last_name': 'Ice'}
storage.store(expected_user)
storage.store(expected_user)
user = dict(dynamodb_users_table.get_item(name='Vanilla', last_name='Ice'))
assert dict(user) == expected_user

def test_insert_new_record(dynamodb_latest_table, dynamodb_connection):
boto3.setup_default_session(fake_credentials=True)
storage = DynamoDBStorage('latest', connection=dynamodb_connection)
storage = DynamoDBStorage(latest_table='latest', connection=dynamodb_connection)

new_record = {
'time_index_key': '15225:newlog',
'range_key': 'new_server:12345abcde',
'metadata': {
'start': 1500000000000
'version': 1,
'start': 1500000000000,
'end': 1500000000010,
'path': '/var/log/syslog.2',
'work_id': None,
'where': 'ground_server2',
'what': 'syslog',
'id': '34fb2d1ec54245c7a57e29ed5a6ea9b2',
'hash': 'b4f2d8de24af342643d5b78a8f2b9b88'
},
'url': 's3://newfile/url',
'create_time': 1500000000000
}

storage.store_latest(new_record)
try:
storage.store_latest(new_record)
except Exception as e:
print(f"Failed to store record: {str(e)}")

stored_record = dynamodb_latest_table.get_item(
time_index_key='15225:newlog',
Expand All @@ -25,13 +51,21 @@ def test_insert_new_record(dynamodb_latest_table, dynamodb_connection):


def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dynamodb_connection):
storage = DynamoDBStorage('latest', connection=dynamodb_connection)
storage = DynamoDBStorage(latest_table='latest', connection=dynamodb_connection)

file1 = {
'time_index_key': '15219:zlcdzvawsp',
'range_key': 'lawvuunyws:447a4a801cabc6089f04922abdfa8aad099824e9',
'metadata': {
'start': 1314877177402
'version': 1,
'start': 1314877177402,
'end': 1314877177412, # ends ten seconds later
'path': '/var/log/syslog.2',
'work_id': 'abc-123',
'where': 'ground_server2',
'what': 'syslog',
'id': '34fb2d1ec54245c7a57e29ed5a6ea9b2',
'hash': 'b4f2d8de24af342643d5b78a8f2b9b88'
},
'url': 's3://existingfile/url',
'create_time': 1314877177402
Expand All @@ -41,7 +75,16 @@ def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dyna
'time_index_key': '15219:zlcdzvawsp',
'range_key': 'lawvuunyws:447a4a801cabc6089f04922abdfa8aad099824e9',
'metadata': {
'start': 1314877177403 # One millisecond later
'version': 1,
'start': 1314877177413, # One millisecond later
'end': 1314877177423, # ends ten seconds later
'path': '/var/log/syslog.2',
'work_id': 'abc-123',
'where': 'ground_server2',
'what': 'syslog',
'id': '45gb2d1ec54245c7a57e29ed5a6ea9b2',
'hash': 'c5g3d8de24af342643d5b78a8f2b9b88'

},
'url': 's3://existingfile/url',
'create_time': 1314877177403
Expand All @@ -55,7 +98,7 @@ def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dyna
'start': 1414877177402,
'end': 1415128740728,
'path': '/var/log/syslog.2',
'work_id': None,
'work_id': 'foo-bizz',
'where': 'ground_server2',
'what': 'syslog',
'id': '34fb2d1ec54245c7a57e29ed5a6ea9b2',
Expand All @@ -74,24 +117,32 @@ def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dyna

res = dict(dynamodb_latest_table.get_item(time_index_key='15219:zlcdzvawsp',
range_key='lawvuunyws:447a4a801cabc6089f04922abdfa8aad099824e9'))
assert res['metadata']['start'] == Decimal('1314877177403')
assert res['metadata']['start'] == Decimal('1314877177413')
assert len(records) == 2
assert file2 == res


def test_concurrent_updates(dynamodb_latest_table, dynamodb_connection):
storage = DynamoDBStorage('latest', connection=dynamodb_connection)
storage = DynamoDBStorage(latest_table='latest', connection=dynamodb_connection)

base_record = {
'time_index_key': '15219:zlcdzvawsp',
'range_key': 'lawvuunyws:447a4a801cabc6089f04922abdfa8aad099824e9',
'metadata': {
'start': 1314877177402
'version': 1,
'start': 1314877177402,
'end': 1314877177412, # ends ten seconds later
'path': '/var/log/syslog.2',
'work_id': 'abc-123',
'where': 'ground_server2',
'what': 'syslog',
'id': '34fb2d1ec54245c7a57e29ed5a6ea9b2',
'hash': 'b4f2d8de24af342643d5b78a8f2b9b88'
},
'url': 's3://existingfile/url',
'create_time': 1314877177402
}
storage.store(base_record)
storage.store_latest(base_record)


updated_record1 = base_record.copy()
Expand Down

0 comments on commit db1ce4e

Please sign in to comment.