Skip to content

Commit

Permalink
allow service and user account upload
Browse files Browse the repository at this point in the history
  • Loading branch information
mikeAdamss committed May 17, 2024
1 parent 2c1dbaa commit 68489c3
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 8 deletions.
64 changes: 64 additions & 0 deletions dpytools/http/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,67 @@ def _handle_request(self, method, url, *args, **kwargs):
},
)
raise err

# PUT request method with exponential backoff
@backoff.on_exception(
backoff.expo,
HTTPError,
max_time=30,
on_backoff=log_retry,
)
def put(self, url, *args, **kwargs):
"""
Sends a PUT request to the specified URL with optional extra arguments.
This method is a thin wrapper around `requests.post()`. Any additional arguments
are passed directly to `requests.post()`. For more information on the available
arguments, refer to the `requests.post()` documentation:
https://docs.python-requests.org/en/latest/api/#requests.post
Args:
url (str): The URL to send the POST request to.
*args: Optional positional arguments passed to `requests.post()`.
**kwargs: Optional keyword arguments passed to `requests.post()`.
Returns:
Response: The Response object from `requests.put()`.
Raises:
HTTPError: If the request fails for a network-related reason.
"""
return self._handle_request("PUT", url, *args, **kwargs)

# Method to handle requests for GET and POST
def _handle_request(self, method, url, *args, **kwargs):
logger.info(
f"Sending {method} request to {url}", data={"method": method, "url": url}
)
try:
response = requests.request(method, url, *args, **kwargs)
response.raise_for_status()
return response

except HTTPError as http_err:
logger.error(
f"HTTP error occurred: {http_err} when sending a {method} request to {url} with headers {kwargs.get('headers')}",
http_err,
data={
"http_error": http_err,
"method": method,
"url": url,
"headers": kwargs.get("headers"),
},
)
raise http_err
except Exception as err:
logger.error(
f"Other error occurred: {err} when sending a {method} to {url} with headers {kwargs.get('headers')}",
err,
data={
"error": err,
"method": method,
"url": url,
"headers": kwargs.get("headers"),
},
)
raise err
113 changes: 105 additions & 8 deletions dpytools/http/upload.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,120 @@
import datetime
from datetime import datetime, timedelta
import os
from math import ceil
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Optional, Tuple, Union
from typing import Dict, Optional, Union

from dpytools.http.base import BaseHttpClient
from dpytools.logging.logger import DpLogger

logger = DpLogger("dpytools")

base_http_client = BaseHttpClient()

class UploadServiceClient(BaseHttpClient):
def __init__(self, upload_url: str):
def __init__(self, upload_url: str, backoff_max=30):
# Inherit backoff_max value from BaseHTTPClient.__init__
super().__init__()
self.upload_url = upload_url
super().__init__(backoff_max=backoff_max)
self.upload_url = upload_url\

# RE auth, there'd two modes
# 1. Service account mode
# 2. User account mode
# The headers used are slightly different and
# user account mode required refreshing the auth
# token before a 15 minute timeout happens
# (service account auth doesn't time out)

self.service_token = os.environ.get("SERVICE_TOKEN_FOR UPLOAD")
if self.service_token is None:
self.set_user_tokens()

def set_user_tokens(self):
"""
When using user auth we need to use florence username and password
to create a florence token.
We also need to get teh refresh token so we can extend the token
lifespan beyond the 15 minute timeout where necessary.
"""

self.florence_user = os.environ.get("FLORENCE_USER", None)
self.florence_password = os.environ.get("FLORENCE_PASSWORD", None)
self.identity_api_url = os.environ.get("IDENTITY_API_URL", None)

assert self.florence_user is not None, (
"Where env var SERVICE_TOKEN_FOR_UPLOAD is None, env var FLORENCE_USER must be provided"
)
assert self.florence_user is not None, (
"Where env var SERVICE_TOKEN_FOR_UPLOAD is None, env var FLORENCE_PASSOWRD must be provided"
)
assert self.florence_user is not None, (
"Where env var SERVICE_TOKEN_FOR_UPLOAD is None, env var IDENTITY_API_URL must be provided"
)

response = base_http_client.post(
self.identity_api_url,
json={"login": self.florence_user, "password": self.florence_password}
)
if response.status_code == 201:
response_headers = response.headers

self.refresh_token = response_headers["Refresh"]
self.token_creation_time = datetime.now()

self.auth_token = response_headers["Authorization"]
self.id_token = response_headers["ID"]
else:
err = Exception("Failed to create user tokens")
logger.error(
"Failed to create user tokens",
err,
data = {
"identity_api_url": self.identity_api_url,
"response_staus_code": response.status_code,
"response_content": response.content
}
)

def get_auth_header(self) -> Dict[str, str]:
"""
Given a dictionary of params, set the auth header based on the
auth mode in use.
"""

# Using service account
if self.service_token:
return {"Authorization": f"Bearer {self.service_token}"}

# Using user account

# If the token is more than 10 minutes old refresh it
if (datetime.now() - self.token_creation_time) > timedelta(minutes=10):
response = base_http_client.post(
f"{self.identity_api_url}/self",
refresh_token=True
)

if response.status_code == 201:
refresh_url = "self.token_url}/self"
response = base_http_client.put(refresh_url, refresh_token=True)
self.auth_token = response.headers["Authorization"]
self.id_token = response.headers["ID"]
else:
err = Exception(f"Refreshing token failed, returned a {response.status_code} error")
logger.error(
"Could not refresh user auth token",
err,
data={
"refresh_url": refresh_url,
"response_status_code": response.status_code,
"response_content": response.content
})
raise err

return {"X-Florence-Token": self.auth_token, "ID": self.id_token}


def upload_csv(
self,
csv_path: Union[Path, str],
Expand Down Expand Up @@ -174,7 +272,6 @@ def _upload_file_chunks(
self,
file_chunks: list[str],
upload_params: dict,
florence_access_token: str,
) -> None:
"""
Upload file chunks to DP Upload Service with the specified upload parameters.
Expand All @@ -193,7 +290,7 @@ def _upload_file_chunks(
# Submit `POST` request to `self.upload_url`
self.post(
self.upload_url,
headers={"Authorization": f"Bearer {florence_access_token}"},
headers=self.get_auth_header(),
params=upload_params,
files=file,
verify=True,
Expand Down

0 comments on commit 68489c3

Please sign in to comment.