diff --git a/ingester/datalake_ingester/cli.py b/ingester/datalake_ingester/cli.py index fd2cf42..8c8a324 100644 --- a/ingester/datalake_ingester/cli.py +++ b/ingester/datalake_ingester/cli.py @@ -1,6 +1,7 @@ import click from datalake.common.conf import load_config from .ingester import Ingester +from .log import configure_logging DEFAULT_CONFIG = '/etc/datalake-ingester.env' @@ -23,6 +24,7 @@ @click.pass_context def cli(ctx, **kwargs): conf = kwargs.pop('config') + configure_logging() load_config(conf, DEFAULT_CONFIG, **kwargs) @@ -35,3 +37,4 @@ def _subcommand_or_fail(ctx): def listen(): i = Ingester.from_config() i.listen() + diff --git a/ingester/datalake_ingester/ingester.py b/ingester/datalake_ingester/ingester.py index e79e790..b9f541d 100644 --- a/ingester/datalake_ingester/ingester.py +++ b/ingester/datalake_ingester/ingester.py @@ -24,7 +24,6 @@ UnsupportedS3Event ] - class IngesterReport(dict): def start(self): diff --git a/ingester/datalake_ingester/log.py b/ingester/datalake_ingester/log.py index 31838c0..b6925b1 100644 --- a/ingester/datalake_ingester/log.py +++ b/ingester/datalake_ingester/log.py @@ -1,6 +1,7 @@ import logging import sentry_sdk +_log_configured = False conf = { 'version': 1, @@ -24,8 +25,15 @@ } } +def configure_logging(): + global _log_configured + if not _log_configured: + sentry_sdk.init() + logging.config.dictConfig(conf) + log = logging.getLogger() + level = logging.INFO + log.setLevel(level) + log.info(f"Logging initialized with provided conf {conf}.") + _log_configured = True -sentry_sdk.init() - -logging.config.dictConfig(conf) diff --git a/ingester/datalake_ingester/storage.py b/ingester/datalake_ingester/storage.py index 2b61aae..4c669fd 100644 --- a/ingester/datalake_ingester/storage.py +++ b/ingester/datalake_ingester/storage.py @@ -28,8 +28,9 @@ class DynamoDBStorage(object): def __init__(self, table_name=None, latest_table_name=None, connection=None): self.table_name = table_name self.latest_table_name = latest_table_name - self._prepare_connection(connection) self.logger = logging.getLogger('storage') + self._prepare_connection(connection) + @classmethod def from_config(cls): @@ -40,6 +41,7 @@ def from_config(cls): return cls(table_name, latest_table_name) def _prepare_connection(self, connection): + self.logger.info("Preparing connection...") region = os.environ.get('AWS_REGION') if connection: self._connection = connection @@ -89,6 +91,11 @@ def store_latest(self, record): else: work_id_value = {'S': str(record['metadata']['work_id'])} + if record['metadata']['end'] is None: + end_time_value = {'NULL': True} + else: + end_time_value = {'N': str(record['metadata']['end'])} + record = { 'what_where_key': {"S": record['metadata']['what']+':'+record['metadata']['where']}, 'time_index_key': {"S": record['time_index_key']}, @@ -98,9 +105,7 @@ def store_latest(self, record): 'start': { 'N': str(record['metadata']['start']) }, - 'end': { - 'N': str(record['metadata']['end']) - }, + 'end': end_time_value, 'id': { 'S': str(record['metadata']['id']) },