Skip to content

Commit

Permalink
Storage: Adding None workaround val in metadata.end. Configure loggin…
Browse files Browse the repository at this point in the history
…g at runtime.
  • Loading branch information
ABPLMC committed Jul 18, 2024
1 parent 60460d4 commit 8d6c0e1
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 37 deletions.
7 changes: 2 additions & 5 deletions ingester/datalake_ingester/cli.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import click
from datalake.common.conf import load_config
from .ingester import Ingester
from .log import log_debugger
from .log import configure_logging


DEFAULT_CONFIG = '/etc/datalake-ingester.env'
Expand All @@ -24,9 +24,8 @@
@click.pass_context
def cli(ctx, **kwargs):
conf = kwargs.pop('config')
log_debugger(None, "datalake_ingester:cli.py before load_config", loc='datalake_ingester:cli.py:cli')
configure_logging()
load_config(conf, DEFAULT_CONFIG, **kwargs)
log_debugger(None, "datalake_ingester:cli.py AFTER load_config", loc='datalake_ingester:cli.py:cli')


def _subcommand_or_fail(ctx):
Expand All @@ -36,8 +35,6 @@ def _subcommand_or_fail(ctx):

@cli.command()
def listen():
log_debugger(None, "datalake_ingester:listen.py before instantiating Ingester", loc='datalake_ingester:cli.py:listen')
i = Ingester.from_config()
i.listen()
log_debugger(None, f"datalake_ingester:listen.py AFTER instantiating Ingester: {i}", loc='datalake_ingester:cli.py:listen')

4 changes: 0 additions & 4 deletions ingester/datalake_ingester/ingester.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
InvalidDatalakeMetadata,
UnsupportedS3Event
]
from .log import log_debugger

class IngesterReport(dict):

Expand Down Expand Up @@ -83,7 +82,6 @@ def from_config(cls):
def ingest(self, url):
'''ingest the metadata associated with the given url'''
records = DatalakeRecord.list_from_url(url)
log_debugger(logger, "Ingester:ingest", loc=self.__class__.__name__)
for r in records:
self.storage.store(r)

Expand Down Expand Up @@ -118,7 +116,6 @@ def _handler(self, s3_notification, ir):
def _add_records(self, datalake_records, ir):
for r in datalake_records:
ir.add_record(r)
log_debugger(logger, "Ingester:_add_records", loc=self.__class__.__name__, )
self.storage.store(r)

def _update_records(self, datalake_records, ir):
Expand All @@ -137,5 +134,4 @@ def listen(self, timeout=None):
raise InsufficientConfiguration('No queue configured.')

self.queue.set_handler(self.handler)
log_debugger(logger, "Ingester:listen", loc=self.__class__.__name__)
self.queue.drain(timeout=timeout)
36 changes: 12 additions & 24 deletions ingester/datalake_ingester/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,7 @@
import sentry_sdk


def log_debugger(logger=None, message='', loc='', conf=None):
"""
Initializes logging configuration and logs messages at different levels.
Parameters:
logger (logging.Logger): The logger instance to use for logging.
message (str): The message to log.
loc (str, optional): The location information (e.g., file or class). Defaults to ''.
conf (dict, optional): The logging configuration dictionary.
"""
sentry_sdk.init()
if conf:
logging.config.dictConfig(conf)
logging.debug("Logging has been initialized with the provided configuration.")

print(f'\n======= Inside {loc} log_debugger print: at {message} =======')

if logger:
# Log at different levels
logger.info(f"======= Inside {loc} log_debugger logger.info: {message} =======\n")
logger.warning(f"======= Inside {loc} log_debugger logger.warning: {message} =======\n")

_log_configured = False

conf = {
'version': 1,
Expand All @@ -36,7 +15,7 @@ def log_debugger(logger=None, message='', loc='', conf=None):
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'DEBUG',
'level': 'INFO',
'formatter': 'simple',
'stream': 'ext://sys.stdout'
},
Expand All @@ -47,4 +26,13 @@ def log_debugger(logger=None, message='', loc='', conf=None):
}
}

log_debugger(conf=conf)
def configure_logging():
global _log_configured
if not _log_configured:
sentry_sdk.init()
if conf:
logging.config.dictConfig(conf)
logging.debug(f"Logging initialized with provided conf {conf}.")
_log_configured = True


6 changes: 2 additions & 4 deletions ingester/datalake_ingester/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from datalake.common.errors import InsufficientConfiguration
import logging

from .log import log_debugger

class DynamoDBStorage(object):
'''store datalake records in a dynamoDB table'''
Expand All @@ -42,7 +41,7 @@ def from_config(cls):
return cls(table_name, latest_table_name)

def _prepare_connection(self, connection):
log_debugger(self.logger, "Storage:_prepare_connection before setup connection", loc=self.__class__.__name__)
self.logger.info("Preparing connection...")
region = os.environ.get('AWS_REGION')
if connection:
self._connection = connection
Expand All @@ -61,7 +60,7 @@ def _latest_table(self):
return Table(self.latest_table_name, connection=self._connection)

def store(self, record):
log_debugger(self.logger, "Storage:store before PUT", loc=self.__class__.__name__)
self.logger.info(f"Storage:store before PUT {self.__class__.__name__}")
try:
self._table.put_item(data=record)
except ConditionalCheckFailedException:
Expand All @@ -78,7 +77,6 @@ def store_latest(self, record):
Record must utilize AttributeValue syntax
for the conditional put.
"""
log_debugger(self.logger, "Storage:store_latest before conditional_put", loc=self.__class__.__name__)
condition_expression = " attribute_not_exists(what_where_key) OR metadata.#metadata_start < :new_start"
expression_attribute_values = {
':new_start': {'N': str(record['metadata']['start'])}
Expand Down

0 comments on commit 8d6c0e1

Please sign in to comment.