Skip to content

Commit

Permalink
Merge pull request #308 from smart-on-fhir/mikix/completion3
Browse files Browse the repository at this point in the history
feat: write out a bulk export log file when we do the export
  • Loading branch information
mikix authored May 7, 2024
2 parents 09073cb + 6d4fb88 commit aa5f6cb
Show file tree
Hide file tree
Showing 14 changed files with 953 additions and 315 deletions.
2 changes: 1 addition & 1 deletion cumulus_etl/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Cumulus public entry point"""

__version__ = "1.0.0"
__version__ = "1.1.0"
21 changes: 17 additions & 4 deletions cumulus_etl/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,15 @@ def read_local_line_count(path) -> int:
"""Reads a local file and provides the count of new line characters."""
# From https://stackoverflow.com/a/27517681/239668
# Copyright Michael Bacon, licensed CC-BY-SA 3.0
count = 0
buf = None
with open(path, "rb") as f:
bufgen = itertools.takewhile(lambda x: x, (f.raw.read(1024 * 1024) for _ in itertools.repeat(None)))
return sum(buf.count(b"\n") for buf in bufgen if buf)
for buf in bufgen:
count += buf.count(b"\n")
if buf and buf[-1] != "\n": # catch a final line without a trailing newline
count += 1
return count


def sparse_dict(dictionary: dict) -> dict:
Expand Down Expand Up @@ -326,11 +332,18 @@ def print_header(name: str | None = None) -> None:
###############################################################################


def datetime_now() -> datetime.datetime:
def datetime_now(local: bool = False) -> datetime.datetime:
"""
UTC date and time, suitable for use as a FHIR 'instant' data type
Current date and time, suitable for use as a FHIR 'instant' data type
The returned datetime is always 'aware' (not 'naive').
:param local: whether to use local timezone or (if False) UTC
"""
return datetime.datetime.now(datetime.timezone.utc)
now = datetime.datetime.now(datetime.timezone.utc)
if local:
now = now.astimezone()
return now


def timestamp_datetime(time: datetime.datetime = None) -> str:
Expand Down
25 changes: 17 additions & 8 deletions cumulus_etl/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import sys
from typing import NoReturn

import httpx
import rich.console


Expand Down Expand Up @@ -35,28 +36,36 @@
COMPLETION_ARG_MISSING = 34


class FhirConnectionError(Exception):
"""We needed to connect to a FHIR server but failed"""
class FatalError(Exception):
"""An unrecoverable error"""


class NetworkError(FatalError):
"""An unrecoverable network error"""

def __init__(self, msg: str, response: httpx.Response):
super().__init__(msg)
self.response = response


class FhirConnectionConfigError(FatalError):
"""We needed to connect to a FHIR server but are not configured correctly"""

class FhirUrlMissing(FhirConnectionError):

class FhirUrlMissing(FhirConnectionConfigError):
"""We needed to connect to a FHIR server but no URL was provided"""

def __init__(self):
super().__init__("Could not download some files without a FHIR server URL (use --fhir-url)")


class FhirAuthMissing(FhirConnectionError):
class FhirAuthMissing(FhirConnectionConfigError):
"""We needed to connect to a FHIR server but no authentication config was provided"""

def __init__(self):
super().__init__("Could not download some files without authentication parameters (see --help)")


class FatalError(Exception):
"""An unrecoverable error"""


def fatal(message: str, status: int) -> NoReturn:
"""Convenience method to exit the program with a user-friendly error message a test-friendly status code"""
rich.console.Console(stderr=True).print(message, style="bold red", highlight=False)
Expand Down
2 changes: 1 addition & 1 deletion cumulus_etl/etl/tasks/nlp_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ async def read_notes(

try:
clinical_note = await fhir.get_docref_note(self.task_config.client, docref)
except errors.FhirConnectionError as exc:
except errors.FhirConnectionConfigError as exc:
if not warned_connection_error:
# Only warn user about a misconfiguration once per task.
# It's not fatal because it might be intentional (partially inlined DocRefs
Expand Down
23 changes: 21 additions & 2 deletions cumulus_etl/fhir/fhir_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def __init__(
self._server_root, resources, basic_user, basic_password, bearer_token, smart_client_id, smart_jwks
)
self._session: httpx.AsyncClient | None = None
self._capabilities: dict = {}

async def __aenter__(self):
# Limit the number of connections open at once, because EHRs tend to be very busy.
Expand Down Expand Up @@ -120,13 +121,16 @@ async def request(self, method: str, path: str, headers: dict = None, stream: bo
return exc.response

if stream:
await response.aread()
await response.aclose()

# All other 4xx or 5xx codes are treated as fatal errors
message = None
try:
json_response = exc.response.json()
if json_response.get("resourceType") == "OperationOutcome":
if not isinstance(json_response, dict):
message = exc.response.text
elif json_response.get("resourceType") == "OperationOutcome":
issue = json_response["issue"][0] # just grab first issue
message = issue.get("details", {}).get("text")
message = message or issue.get("diagnostics")
Expand All @@ -135,10 +139,23 @@ async def request(self, method: str, path: str, headers: dict = None, stream: bo
if not message:
message = str(exc)

raise errors.FatalError(f'An error occurred when connecting to "{url}": {message}') from exc
raise errors.NetworkError(
f'An error occurred when connecting to "{url}": {message}',
response,
) from exc

return response

def get_capabilities(self) -> dict:
"""
Returns the server's CapabilityStatement, if available.
See https://www.hl7.org/fhir/R4/capabilitystatement.html
If the statement could not be retrieved, this returns an empty dict.
"""
return self._capabilities

###################################################################################################################
#
# Helpers
Expand Down Expand Up @@ -180,6 +197,8 @@ async def _read_capabilities(self) -> None:
# Example: https://fhir.epic.com/interconnect-fhir-oauth/api/FHIR/R4/metadata?_format=json
self._server_type = ServerType.EPIC

self._capabilities = capabilities

async def _request_with_signed_headers(self, method: str, url: str, headers: dict, **kwargs) -> httpx.Response:
"""
Issues a GET request and sign the headers with the current access token.
Expand Down
Loading

0 comments on commit aa5f6cb

Please sign in to comment.