Skip to content

Commit

Permalink
Merge pull request #96 from planetlabs/none-val-end-bugfix
Browse files Browse the repository at this point in the history
None val in metadata.end bugfix and logging setup.
  • Loading branch information
ABPLMC authored Jul 26, 2024
2 parents 981af8a + 2d4303a commit d531092
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 8 deletions.
3 changes: 3 additions & 0 deletions ingester/datalake_ingester/cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import click
from datalake.common.conf import load_config
from .ingester import Ingester
from .log import configure_logging


DEFAULT_CONFIG = '/etc/datalake-ingester.env'
Expand All @@ -23,6 +24,7 @@
@click.pass_context
def cli(ctx, **kwargs):
conf = kwargs.pop('config')
configure_logging()
load_config(conf, DEFAULT_CONFIG, **kwargs)


Expand All @@ -35,3 +37,4 @@ def _subcommand_or_fail(ctx):
def listen():
i = Ingester.from_config()
i.listen()

1 change: 0 additions & 1 deletion ingester/datalake_ingester/ingester.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
UnsupportedS3Event
]


class IngesterReport(dict):

def start(self):
Expand Down
14 changes: 11 additions & 3 deletions ingester/datalake_ingester/log.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import sentry_sdk

_log_configured = False

conf = {
'version': 1,
Expand All @@ -24,8 +25,15 @@
}
}

def configure_logging():
global _log_configured
if not _log_configured:
sentry_sdk.init()
logging.config.dictConfig(conf)
log = logging.getLogger()
level = logging.INFO
log.setLevel(level)
log.info(f"Logging initialized with provided conf {conf}.")
_log_configured = True

sentry_sdk.init()


logging.config.dictConfig(conf)
13 changes: 9 additions & 4 deletions ingester/datalake_ingester/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ class DynamoDBStorage(object):
def __init__(self, table_name=None, latest_table_name=None, connection=None):
self.table_name = table_name
self.latest_table_name = latest_table_name
self._prepare_connection(connection)
self.logger = logging.getLogger('storage')
self._prepare_connection(connection)


@classmethod
def from_config(cls):
Expand All @@ -40,6 +41,7 @@ def from_config(cls):
return cls(table_name, latest_table_name)

def _prepare_connection(self, connection):
self.logger.info("Preparing connection...")
region = os.environ.get('AWS_REGION')
if connection:
self._connection = connection
Expand Down Expand Up @@ -89,6 +91,11 @@ def store_latest(self, record):
else:
work_id_value = {'S': str(record['metadata']['work_id'])}

if record['metadata']['end'] is None:
end_time_value = {'NULL': True}
else:
end_time_value = {'N': str(record['metadata']['end'])}

record = {
'what_where_key': {"S": record['metadata']['what']+':'+record['metadata']['where']},
'time_index_key': {"S": record['time_index_key']},
Expand All @@ -98,9 +105,7 @@ def store_latest(self, record):
'start': {
'N': str(record['metadata']['start'])
},
'end': {
'N': str(record['metadata']['end'])
},
'end': end_time_value,
'id': {
'S': str(record['metadata']['id'])
},
Expand Down

0 comments on commit d531092

Please sign in to comment.