Skip to content

Commit

Permalink
tidy up
Browse files Browse the repository at this point in the history
  • Loading branch information
mikeAdamss committed May 20, 2024
1 parent 6697c6d commit 6730167
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 41 deletions.
70 changes: 33 additions & 37 deletions dpytools/http/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
class UploadServiceClient(BaseHttpClient):
def __init__(self, upload_url: str, backoff_max=30):
super().__init__(backoff_max=backoff_max)
self.upload_url = upload_url\

self.upload_url = upload_url
# RE auth, there'd two modes
# 1. Service account mode
# 2. User account mode
Expand All @@ -53,44 +52,44 @@ def set_user_tokens(self):
self.florence_password = os.environ.get("FLORENCE_PASSWORD", None)
self.identity_api_url = os.environ.get("IDENTITY_API_URL", None)

assert self.florence_user is not None, (
"Where env var SERVICE_TOKEN_FOR_UPLOAD is None, env var FLORENCE_USER must be provided"
)
assert self.florence_user is not None, (
"Where env var SERVICE_TOKEN_FOR_UPLOAD is None, env var FLORENCE_PASSOWRD must be provided"
)
assert self.florence_user is not None, (
"Where env var SERVICE_TOKEN_FOR_UPLOAD is None, env var IDENTITY_API_URL must be provided"
)
assert (
self.florence_user is not None
), "Where env var SERVICE_TOKEN_FOR_UPLOAD is None, env var FLORENCE_USER must be provided"
assert (
self.florence_user is not None
), "Where env var SERVICE_TOKEN_FOR_UPLOAD is None, env var FLORENCE_PASSOWRD must be provided"
assert (
self.florence_user is not None
), "Where env var SERVICE_TOKEN_FOR_UPLOAD is None, env var IDENTITY_API_URL must be provided"

# https://github.com/ONSdigital/dp-identity-api/blob/develop/swagger.yaml
token_url = f"{self.identity_api_url}/tokens"
response = self.post(
token_url,
json={"login": self.florence_user, "password": self.florence_password}
)
token_url,
json={"login": self.florence_user, "password": self.florence_password},
)
if response.status_code == 201:
response_headers = response.headers
self.refresh_token = response_headers["Refresh"]

self.refresh_token = response_headers["Refresh"]
self.token_creation_time = datetime.now()

self.auth_token = response_headers["Authorization"]
self.id_token = response_headers["ID"]
else:
err = Exception("Failed to create user tokens")
logger.error(
"Failed to create user tokens",
err,
data = {
data={
"identity_api_url": self.identity_api_url,
"token_url": token_url,
"response_staus_code": response.status_code,
"response_content": response.content
}
"response_content": response.content,
},
)
raise err

def get_auth_header(self) -> Dict[str, str]:
"""
Given a dictionary of params, set the auth header based on the
Expand All @@ -100,38 +99,37 @@ def get_auth_header(self) -> Dict[str, str]:
# Using service account
if self.service_token:
return {"Authorization": f"Bearer {self.service_token}"}

# Using user account
# If the token is more than 10 minutes old refresh it
# https://github.com/ONSdigital/dp-identity-api/blob/develop/swagger.yaml
if (datetime.now() - self.token_creation_time) > timedelta(minutes=10):
token_refresh_url = f"{self.identity_api_url}/tokens/self"
response = self.put(
token_refresh_url,
json={
"Refresh": self.refresh_token,
"ID": self.id_token
}
json={"Refresh": self.refresh_token, "ID": self.id_token},
)

if response.status_code == 201:
self.auth_token = response.headers["Authorization"]
self.id_token = response.headers["ID"]
else:
err = Exception(f"Refreshing token failed, returned a {response.status_code} error")
err = Exception(
f"Refreshing token failed, returned a {response.status_code} error"
)
logger.error(
"Could not refresh user auth token",
err,
data={
"token_refresh_url": token_refresh_url,
"response_status_code": response.status_code,
"response_content": response.content
})
"response_content": response.content,
},
)
raise err

return {"X-Florence-Token": self.auth_token, "ID": self.id_token}



def upload_csv(
self,
csv_path: Union[Path, str],
Expand All @@ -152,9 +150,7 @@ def upload_sdmx(
The `s3_bucket` argument should be set as an environment variable and accessed via os.getenv() or similar. `florence_access_token` should be generated via the DP Identity API and passed as a string argument.
"""
self._upload(
sdmx_path, "application/xml", chunk_size
)
self._upload(sdmx_path, "application/xml", chunk_size)

def upload_new_csv(
self,
Expand Down Expand Up @@ -316,7 +312,7 @@ def _generate_upload_params(file_path: Path, mimetype: str, chunk_size: int) ->
filename = str(file_path).split("/")[-1]

# Get timestamp to create `resumableIdentifier` value in `POST` params
timestamp = datetime.datetime.now().strftime("%d%m%y%H%M%S")
timestamp = datetime.now().strftime("%d%m%y%H%M%S")

# Generate upload request params
upload_params = {
Expand Down Expand Up @@ -352,7 +348,7 @@ def _generate_upload_new_params(
filename = str(file_path).split("/")[-1]

# Get timestamp to create `resumableIdentifier` value in `upload_params`
timestamp = datetime.datetime.now().strftime("%d%m%y%H%M%S")
timestamp = datetime.now().strftime("%d%m%y%H%M%S")

# Create identifier from timestamp and filename
identifier = f"{timestamp}-{filename.replace('.', '-')}"
Expand Down
4 changes: 4 additions & 0 deletions dpytools/logging/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,7 @@ except Exception as err:
logger.critical("Something went boom", err, data={"some": "variable"})
raise err
```

## Flushing stream logging

When logging to stdout some services (such as aws glue) will concatentate log entries by failing to flush stout between logs. Set the environment variable `FLUSH_STOUT_AFTER_LOG_ENTRY` to `True` or `true` to flush stdout after each log entry.
17 changes: 13 additions & 4 deletions dpytools/logging/logger.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import logging
import sys
from datetime import datetime, timezone
Expand All @@ -9,13 +10,12 @@


class DpLogger:
def __init__(self, namespace: str, flush_stdout_after_log_entry: bool = False):
def __init__(self, namespace: str):
"""
Simple python logger to create structured logs in keeping
with https://github.com/ONSdigital/dp-standards/blob/main/LOGGING_STANDARDS.md
namespace: (required) the namespace for the app in question
flush_stdout_after_log_entry: (optional) whether to flush the stdout buffer after each log entry
"""

logging.getLogger().addHandler(logging.StreamHandler())
Expand All @@ -30,7 +30,14 @@ def __init__(self, namespace: str, flush_stdout_after_log_entry: bool = False):

self._logger = structlog.get_logger()
self.namespace = namespace
self.flush_stdout_after_log_entry = flush_stdout_after_log_entry
self.flush_stdout_after_log_entry = os.environ.get("FLUSH_STOUT_AFTER_LOG_ENTRY", None)

# Polics the env var being passed in for flush_stdout_after_log_entry
if self.flush_stdout_after_log_entry is not None:
assert self.flush_stdout_after_log_entry in ["True", "true", "False", "false"], (
"When using env var FLUSH_STOUT_AFTER_LOG_ENTRY it must be set to one"
f" of True, true, False false. Got '{self.flush_stdout_after_log_entry}'"
)

def _log(
self,
Expand All @@ -43,6 +50,8 @@ def _log(
data_dict = data if data is not None else {}
data_dict["level"] = logging.getLevelName(level)

# match dp logging structue
# https://github.com/ONSdigital/dp-standards/blob/main/LOGGING_STANDARDS.md
log_event = {
"severity": level_to_severity(level),
"event": event,
Expand All @@ -52,7 +61,7 @@ def _log(
"span_id": "not-implemented",
"data": data_dict,
"raw": raw,
"error": create_error_dict(error) if error is not None else None,
"errors": create_error_dict(error) if error is not None else None,
}

self._logger.log(**log_event)
Expand Down

0 comments on commit 6730167

Please sign in to comment.