Skip to content

Commit

Permalink
feat: write out a bulk export log file when we do the export
Browse files Browse the repository at this point in the history
When Cumulus ETL is running the bulk export ourselves, we now write out
a bulk export log, in the standard[1] format.

This is only useful when passing an --export-to folder of course, but
that ought to be the normal use case.

Also:
- Don't early exit an export if there are server-provided error
  messages. We'll still bail, because our exporter is very
  conservative. But we'll do that at the end, after we download
  everything. This should help with debugging issues and archiving.

[1] https://github.com/smart-on-fhir/bulk-data-client/wiki/Bulk-Data-Export-Log-Items
  • Loading branch information
mikix committed May 6, 2024
1 parent 09073cb commit 6d4fb88
Show file tree
Hide file tree
Showing 14 changed files with 953 additions and 315 deletions.
2 changes: 1 addition & 1 deletion cumulus_etl/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Cumulus public entry point"""

__version__ = "1.0.0"
__version__ = "1.1.0"
21 changes: 17 additions & 4 deletions cumulus_etl/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,15 @@ def read_local_line_count(path) -> int:
"""Reads a local file and provides the count of new line characters."""
# From https://stackoverflow.com/a/27517681/239668
# Copyright Michael Bacon, licensed CC-BY-SA 3.0
count = 0
buf = None
with open(path, "rb") as f:
bufgen = itertools.takewhile(lambda x: x, (f.raw.read(1024 * 1024) for _ in itertools.repeat(None)))
return sum(buf.count(b"\n") for buf in bufgen if buf)
for buf in bufgen:
count += buf.count(b"\n")
if buf and buf[-1] != "\n": # catch a final line without a trailing newline
count += 1
return count


def sparse_dict(dictionary: dict) -> dict:
Expand Down Expand Up @@ -326,11 +332,18 @@ def print_header(name: str | None = None) -> None:
###############################################################################


def datetime_now() -> datetime.datetime:
def datetime_now(local: bool = False) -> datetime.datetime:
"""
UTC date and time, suitable for use as a FHIR 'instant' data type
Current date and time, suitable for use as a FHIR 'instant' data type
The returned datetime is always 'aware' (not 'naive').
:param local: whether to use local timezone or (if False) UTC
"""
return datetime.datetime.now(datetime.timezone.utc)
now = datetime.datetime.now(datetime.timezone.utc)
if local:
now = now.astimezone()
return now


def timestamp_datetime(time: datetime.datetime = None) -> str:
Expand Down
25 changes: 17 additions & 8 deletions cumulus_etl/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import sys
from typing import NoReturn

import httpx
import rich.console


Expand Down Expand Up @@ -35,28 +36,36 @@
COMPLETION_ARG_MISSING = 34


class FhirConnectionError(Exception):
"""We needed to connect to a FHIR server but failed"""
class FatalError(Exception):
"""An unrecoverable error"""


class NetworkError(FatalError):
"""An unrecoverable network error"""

def __init__(self, msg: str, response: httpx.Response):
super().__init__(msg)
self.response = response


class FhirConnectionConfigError(FatalError):
"""We needed to connect to a FHIR server but are not configured correctly"""

class FhirUrlMissing(FhirConnectionError):

class FhirUrlMissing(FhirConnectionConfigError):
"""We needed to connect to a FHIR server but no URL was provided"""

def __init__(self):
super().__init__("Could not download some files without a FHIR server URL (use --fhir-url)")


class FhirAuthMissing(FhirConnectionError):
class FhirAuthMissing(FhirConnectionConfigError):
"""We needed to connect to a FHIR server but no authentication config was provided"""

def __init__(self):
super().__init__("Could not download some files without authentication parameters (see --help)")


class FatalError(Exception):
"""An unrecoverable error"""


def fatal(message: str, status: int) -> NoReturn:
"""Convenience method to exit the program with a user-friendly error message a test-friendly status code"""
rich.console.Console(stderr=True).print(message, style="bold red", highlight=False)
Expand Down
2 changes: 1 addition & 1 deletion cumulus_etl/etl/tasks/nlp_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ async def read_notes(

try:
clinical_note = await fhir.get_docref_note(self.task_config.client, docref)
except errors.FhirConnectionError as exc:
except errors.FhirConnectionConfigError as exc:
if not warned_connection_error:
# Only warn user about a misconfiguration once per task.
# It's not fatal because it might be intentional (partially inlined DocRefs
Expand Down
23 changes: 21 additions & 2 deletions cumulus_etl/fhir/fhir_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def __init__(
self._server_root, resources, basic_user, basic_password, bearer_token, smart_client_id, smart_jwks
)
self._session: httpx.AsyncClient | None = None
self._capabilities: dict = {}

async def __aenter__(self):
# Limit the number of connections open at once, because EHRs tend to be very busy.
Expand Down Expand Up @@ -120,13 +121,16 @@ async def request(self, method: str, path: str, headers: dict = None, stream: bo
return exc.response

if stream:
await response.aread()
await response.aclose()

# All other 4xx or 5xx codes are treated as fatal errors
message = None
try:
json_response = exc.response.json()
if json_response.get("resourceType") == "OperationOutcome":
if not isinstance(json_response, dict):
message = exc.response.text
elif json_response.get("resourceType") == "OperationOutcome":
issue = json_response["issue"][0] # just grab first issue
message = issue.get("details", {}).get("text")
message = message or issue.get("diagnostics")
Expand All @@ -135,10 +139,23 @@ async def request(self, method: str, path: str, headers: dict = None, stream: bo
if not message:
message = str(exc)

raise errors.FatalError(f'An error occurred when connecting to "{url}": {message}') from exc
raise errors.NetworkError(
f'An error occurred when connecting to "{url}": {message}',
response,
) from exc

return response

def get_capabilities(self) -> dict:
"""
Returns the server's CapabilityStatement, if available.
See https://www.hl7.org/fhir/R4/capabilitystatement.html
If the statement could not be retrieved, this returns an empty dict.
"""
return self._capabilities

###################################################################################################################
#
# Helpers
Expand Down Expand Up @@ -180,6 +197,8 @@ async def _read_capabilities(self) -> None:
# Example: https://fhir.epic.com/interconnect-fhir-oauth/api/FHIR/R4/metadata?_format=json
self._server_type = ServerType.EPIC

self._capabilities = capabilities

async def _request_with_signed_headers(self, method: str, url: str, headers: dict, **kwargs) -> httpx.Response:
"""
Issues a GET request and sign the headers with the current access token.
Expand Down
Loading

0 comments on commit 6d4fb88

Please sign in to comment.