diff --git a/dpytools/http/upload.py b/dpytools/http/upload.py index f48c205..6b1572b 100644 --- a/dpytools/http/upload.py +++ b/dpytools/http/upload.py @@ -26,8 +26,7 @@ class UploadServiceClient(BaseHttpClient): def __init__(self, upload_url: str, backoff_max=30): super().__init__(backoff_max=backoff_max) - self.upload_url = upload_url\ - + self.upload_url = upload_url # RE auth, there'd two modes # 1. Service account mode # 2. User account mode @@ -53,28 +52,28 @@ def set_user_tokens(self): self.florence_password = os.environ.get("FLORENCE_PASSWORD", None) self.identity_api_url = os.environ.get("IDENTITY_API_URL", None) - assert self.florence_user is not None, ( - "Where env var SERVICE_TOKEN_FOR_UPLOAD is None, env var FLORENCE_USER must be provided" - ) - assert self.florence_user is not None, ( - "Where env var SERVICE_TOKEN_FOR_UPLOAD is None, env var FLORENCE_PASSOWRD must be provided" - ) - assert self.florence_user is not None, ( - "Where env var SERVICE_TOKEN_FOR_UPLOAD is None, env var IDENTITY_API_URL must be provided" - ) + assert ( + self.florence_user is not None + ), "Where env var SERVICE_TOKEN_FOR_UPLOAD is None, env var FLORENCE_USER must be provided" + assert ( + self.florence_user is not None + ), "Where env var SERVICE_TOKEN_FOR_UPLOAD is None, env var FLORENCE_PASSOWRD must be provided" + assert ( + self.florence_user is not None + ), "Where env var SERVICE_TOKEN_FOR_UPLOAD is None, env var IDENTITY_API_URL must be provided" # https://github.com/ONSdigital/dp-identity-api/blob/develop/swagger.yaml token_url = f"{self.identity_api_url}/tokens" response = self.post( - token_url, - json={"login": self.florence_user, "password": self.florence_password} - ) + token_url, + json={"login": self.florence_user, "password": self.florence_password}, + ) if response.status_code == 201: response_headers = response.headers - - self.refresh_token = response_headers["Refresh"] + + self.refresh_token = response_headers["Refresh"] self.token_creation_time = datetime.now() - + self.auth_token = response_headers["Authorization"] self.id_token = response_headers["ID"] else: @@ -82,15 +81,15 @@ def set_user_tokens(self): logger.error( "Failed to create user tokens", err, - data = { + data={ "identity_api_url": self.identity_api_url, "token_url": token_url, "response_staus_code": response.status_code, - "response_content": response.content - } + "response_content": response.content, + }, ) raise err - + def get_auth_header(self) -> Dict[str, str]: """ Given a dictionary of params, set the auth header based on the @@ -100,7 +99,7 @@ def get_auth_header(self) -> Dict[str, str]: # Using service account if self.service_token: return {"Authorization": f"Bearer {self.service_token}"} - + # Using user account # If the token is more than 10 minutes old refresh it # https://github.com/ONSdigital/dp-identity-api/blob/develop/swagger.yaml @@ -108,30 +107,29 @@ def get_auth_header(self) -> Dict[str, str]: token_refresh_url = f"{self.identity_api_url}/tokens/self" response = self.put( token_refresh_url, - json={ - "Refresh": self.refresh_token, - "ID": self.id_token - } + json={"Refresh": self.refresh_token, "ID": self.id_token}, ) if response.status_code == 201: self.auth_token = response.headers["Authorization"] self.id_token = response.headers["ID"] else: - err = Exception(f"Refreshing token failed, returned a {response.status_code} error") + err = Exception( + f"Refreshing token failed, returned a {response.status_code} error" + ) logger.error( "Could not refresh user auth token", err, data={ "token_refresh_url": token_refresh_url, "response_status_code": response.status_code, - "response_content": response.content - }) + "response_content": response.content, + }, + ) raise err - + return {"X-Florence-Token": self.auth_token, "ID": self.id_token} - - + def upload_csv( self, csv_path: Union[Path, str], @@ -152,9 +150,7 @@ def upload_sdmx( The `s3_bucket` argument should be set as an environment variable and accessed via os.getenv() or similar. `florence_access_token` should be generated via the DP Identity API and passed as a string argument. """ - self._upload( - sdmx_path, "application/xml", chunk_size - ) + self._upload(sdmx_path, "application/xml", chunk_size) def upload_new_csv( self, @@ -316,7 +312,7 @@ def _generate_upload_params(file_path: Path, mimetype: str, chunk_size: int) -> filename = str(file_path).split("/")[-1] # Get timestamp to create `resumableIdentifier` value in `POST` params - timestamp = datetime.datetime.now().strftime("%d%m%y%H%M%S") + timestamp = datetime.now().strftime("%d%m%y%H%M%S") # Generate upload request params upload_params = { @@ -352,7 +348,7 @@ def _generate_upload_new_params( filename = str(file_path).split("/")[-1] # Get timestamp to create `resumableIdentifier` value in `upload_params` - timestamp = datetime.datetime.now().strftime("%d%m%y%H%M%S") + timestamp = datetime.now().strftime("%d%m%y%H%M%S") # Create identifier from timestamp and filename identifier = f"{timestamp}-{filename.replace('.', '-')}" diff --git a/dpytools/logging/README.md b/dpytools/logging/README.md index 8ed604f..6d6dcef 100644 --- a/dpytools/logging/README.md +++ b/dpytools/logging/README.md @@ -63,3 +63,7 @@ except Exception as err: logger.critical("Something went boom", err, data={"some": "variable"}) raise err ``` + +## Flushing stream logging + +When logging to stdout some services (such as aws glue) will concatentate log entries by failing to flush stout between logs. Set the environment variable `FLUSH_STOUT_AFTER_LOG_ENTRY` to `True` or `true` to flush stdout after each log entry. \ No newline at end of file diff --git a/dpytools/logging/logger.py b/dpytools/logging/logger.py index 1901b12..a196dae 100644 --- a/dpytools/logging/logger.py +++ b/dpytools/logging/logger.py @@ -1,3 +1,4 @@ +import os import logging import sys from datetime import datetime, timezone @@ -9,13 +10,12 @@ class DpLogger: - def __init__(self, namespace: str, flush_stdout_after_log_entry: bool = False): + def __init__(self, namespace: str): """ Simple python logger to create structured logs in keeping with https://github.com/ONSdigital/dp-standards/blob/main/LOGGING_STANDARDS.md namespace: (required) the namespace for the app in question - flush_stdout_after_log_entry: (optional) whether to flush the stdout buffer after each log entry """ logging.getLogger().addHandler(logging.StreamHandler()) @@ -30,7 +30,14 @@ def __init__(self, namespace: str, flush_stdout_after_log_entry: bool = False): self._logger = structlog.get_logger() self.namespace = namespace - self.flush_stdout_after_log_entry = flush_stdout_after_log_entry + self.flush_stdout_after_log_entry = os.environ.get("FLUSH_STOUT_AFTER_LOG_ENTRY", None) + + # Polics the env var being passed in for flush_stdout_after_log_entry + if self.flush_stdout_after_log_entry is not None: + assert self.flush_stdout_after_log_entry in ["True", "true", "False", "false"], ( + "When using env var FLUSH_STOUT_AFTER_LOG_ENTRY it must be set to one" + f" of True, true, False false. Got '{self.flush_stdout_after_log_entry}'" + ) def _log( self, @@ -43,6 +50,8 @@ def _log( data_dict = data if data is not None else {} data_dict["level"] = logging.getLevelName(level) + # match dp logging structue + # https://github.com/ONSdigital/dp-standards/blob/main/LOGGING_STANDARDS.md log_event = { "severity": level_to_severity(level), "event": event, @@ -52,7 +61,7 @@ def _log( "span_id": "not-implemented", "data": data_dict, "raw": raw, - "error": create_error_dict(error) if error is not None else None, + "errors": create_error_dict(error) if error is not None else None, } self._logger.log(**log_event)