From 6d4fb88b836518c7806e815c128bc201bf992938 Mon Sep 17 00:00:00 2001 From: Michael Terry Date: Mon, 22 Apr 2024 09:44:32 -0400 Subject: [PATCH] feat: write out a bulk export log file when we do the export When Cumulus ETL is running the bulk export ourselves, we now write out a bulk export log, in the standard[1] format. This is only useful when passing an --export-to folder of course, but that ought to be the normal use case. Also: - Don't early exit an export if there are server-provided error messages. We'll still bail, because our exporter is very conservative. But we'll do that at the end, after we download everything. This should help with debugging issues and archiving. [1] https://github.com/smart-on-fhir/bulk-data-client/wiki/Bulk-Data-Export-Log-Items --- cumulus_etl/__init__.py | 2 +- cumulus_etl/common.py | 21 +- cumulus_etl/errors.py | 25 +- cumulus_etl/etl/tasks/nlp_task.py | 2 +- cumulus_etl/fhir/fhir_client.py | 23 +- cumulus_etl/loaders/fhir/bulk_export.py | 172 ++++-- cumulus_etl/loaders/fhir/export_log.py | 188 +++++- cumulus_etl/loaders/fhir/ndjson_loader.py | 38 +- docs/setup/cumulus-aws-template.yaml | 2 - pyproject.toml | 2 +- tests/etl/test_etl_cli.py | 2 +- tests/loaders/ndjson/test_bulk_export.py | 669 ++++++++++++++------- tests/loaders/ndjson/test_ndjson_loader.py | 39 +- tests/utils.py | 83 ++- 14 files changed, 953 insertions(+), 315 deletions(-) diff --git a/cumulus_etl/__init__.py b/cumulus_etl/__init__.py index 5fb803f0..35fe00ac 100644 --- a/cumulus_etl/__init__.py +++ b/cumulus_etl/__init__.py @@ -1,3 +1,3 @@ """Cumulus public entry point""" -__version__ = "1.0.0" +__version__ = "1.1.0" diff --git a/cumulus_etl/common.py b/cumulus_etl/common.py index 924a0f2b..32950a9b 100644 --- a/cumulus_etl/common.py +++ b/cumulus_etl/common.py @@ -229,9 +229,15 @@ def read_local_line_count(path) -> int: """Reads a local file and provides the count of new line characters.""" # From https://stackoverflow.com/a/27517681/239668 # Copyright Michael Bacon, licensed CC-BY-SA 3.0 + count = 0 + buf = None with open(path, "rb") as f: bufgen = itertools.takewhile(lambda x: x, (f.raw.read(1024 * 1024) for _ in itertools.repeat(None))) - return sum(buf.count(b"\n") for buf in bufgen if buf) + for buf in bufgen: + count += buf.count(b"\n") + if buf and buf[-1] != "\n": # catch a final line without a trailing newline + count += 1 + return count def sparse_dict(dictionary: dict) -> dict: @@ -326,11 +332,18 @@ def print_header(name: str | None = None) -> None: ############################################################################### -def datetime_now() -> datetime.datetime: +def datetime_now(local: bool = False) -> datetime.datetime: """ - UTC date and time, suitable for use as a FHIR 'instant' data type + Current date and time, suitable for use as a FHIR 'instant' data type + + The returned datetime is always 'aware' (not 'naive'). + + :param local: whether to use local timezone or (if False) UTC """ - return datetime.datetime.now(datetime.timezone.utc) + now = datetime.datetime.now(datetime.timezone.utc) + if local: + now = now.astimezone() + return now def timestamp_datetime(time: datetime.datetime = None) -> str: diff --git a/cumulus_etl/errors.py b/cumulus_etl/errors.py index 7d70a24d..652731d7 100644 --- a/cumulus_etl/errors.py +++ b/cumulus_etl/errors.py @@ -3,6 +3,7 @@ import sys from typing import NoReturn +import httpx import rich.console @@ -35,28 +36,36 @@ COMPLETION_ARG_MISSING = 34 -class FhirConnectionError(Exception): - """We needed to connect to a FHIR server but failed""" +class FatalError(Exception): + """An unrecoverable error""" + + +class NetworkError(FatalError): + """An unrecoverable network error""" + + def __init__(self, msg: str, response: httpx.Response): + super().__init__(msg) + self.response = response + +class FhirConnectionConfigError(FatalError): + """We needed to connect to a FHIR server but are not configured correctly""" -class FhirUrlMissing(FhirConnectionError): + +class FhirUrlMissing(FhirConnectionConfigError): """We needed to connect to a FHIR server but no URL was provided""" def __init__(self): super().__init__("Could not download some files without a FHIR server URL (use --fhir-url)") -class FhirAuthMissing(FhirConnectionError): +class FhirAuthMissing(FhirConnectionConfigError): """We needed to connect to a FHIR server but no authentication config was provided""" def __init__(self): super().__init__("Could not download some files without authentication parameters (see --help)") -class FatalError(Exception): - """An unrecoverable error""" - - def fatal(message: str, status: int) -> NoReturn: """Convenience method to exit the program with a user-friendly error message a test-friendly status code""" rich.console.Console(stderr=True).print(message, style="bold red", highlight=False) diff --git a/cumulus_etl/etl/tasks/nlp_task.py b/cumulus_etl/etl/tasks/nlp_task.py index b017367b..1ad5014f 100644 --- a/cumulus_etl/etl/tasks/nlp_task.py +++ b/cumulus_etl/etl/tasks/nlp_task.py @@ -78,7 +78,7 @@ async def read_notes( try: clinical_note = await fhir.get_docref_note(self.task_config.client, docref) - except errors.FhirConnectionError as exc: + except errors.FhirConnectionConfigError as exc: if not warned_connection_error: # Only warn user about a misconfiguration once per task. # It's not fatal because it might be intentional (partially inlined DocRefs diff --git a/cumulus_etl/fhir/fhir_client.py b/cumulus_etl/fhir/fhir_client.py index 11375601..925135a9 100644 --- a/cumulus_etl/fhir/fhir_client.py +++ b/cumulus_etl/fhir/fhir_client.py @@ -61,6 +61,7 @@ def __init__( self._server_root, resources, basic_user, basic_password, bearer_token, smart_client_id, smart_jwks ) self._session: httpx.AsyncClient | None = None + self._capabilities: dict = {} async def __aenter__(self): # Limit the number of connections open at once, because EHRs tend to be very busy. @@ -120,13 +121,16 @@ async def request(self, method: str, path: str, headers: dict = None, stream: bo return exc.response if stream: + await response.aread() await response.aclose() # All other 4xx or 5xx codes are treated as fatal errors message = None try: json_response = exc.response.json() - if json_response.get("resourceType") == "OperationOutcome": + if not isinstance(json_response, dict): + message = exc.response.text + elif json_response.get("resourceType") == "OperationOutcome": issue = json_response["issue"][0] # just grab first issue message = issue.get("details", {}).get("text") message = message or issue.get("diagnostics") @@ -135,10 +139,23 @@ async def request(self, method: str, path: str, headers: dict = None, stream: bo if not message: message = str(exc) - raise errors.FatalError(f'An error occurred when connecting to "{url}": {message}') from exc + raise errors.NetworkError( + f'An error occurred when connecting to "{url}": {message}', + response, + ) from exc return response + def get_capabilities(self) -> dict: + """ + Returns the server's CapabilityStatement, if available. + + See https://www.hl7.org/fhir/R4/capabilitystatement.html + + If the statement could not be retrieved, this returns an empty dict. + """ + return self._capabilities + ################################################################################################################### # # Helpers @@ -180,6 +197,8 @@ async def _read_capabilities(self) -> None: # Example: https://fhir.epic.com/interconnect-fhir-oauth/api/FHIR/R4/metadata?_format=json self._server_type = ServerType.EPIC + self._capabilities = capabilities + async def _request_with_signed_headers(self, method: str, url: str, headers: dict, **kwargs) -> httpx.Response: """ Issues a GET request and sign the headers with the current access token. diff --git a/cumulus_etl/loaders/fhir/bulk_export.py b/cumulus_etl/loaders/fhir/bulk_export.py index 21a25ac0..6e675675 100644 --- a/cumulus_etl/loaders/fhir/bulk_export.py +++ b/cumulus_etl/loaders/fhir/bulk_export.py @@ -5,12 +5,15 @@ import json import os import urllib.parse +from collections.abc import Callable +from functools import partial import httpx import rich.live import rich.text -from cumulus_etl import common, errors, fhir +from cumulus_etl import common, errors, fhir, store +from cumulus_etl.loaders.fhir import export_log class BulkExporter: @@ -21,6 +24,10 @@ class BulkExporter: - The bulk-data-server test server (https://github.com/smart-on-fhir/bulk-data-server) - Cerner (https://www.cerner.com/) - Epic (https://www.epic.com/) + + TODO: make it more robust against server flakiness (like random http errors during file + download or status checks). At least for intermittent issues (i.e. we should do some + retrying). Actual server errors we should continue to surface, if they persist after retries. """ _TIMEOUT_THRESHOLD = 60 * 60 * 24 # a day, which is probably an overly generous timeout @@ -54,6 +61,7 @@ def __init__( self._total_wait_time = 0 # in seconds, across all our requests self._since = since self._until = until + self._log: export_log.BulkExportLogWriter = None # Public properties, to be read after the export: self.export_datetime = None @@ -72,11 +80,13 @@ async def export(self) -> None: Encounter.000.ndjson Encounter.001.ndjson Patient.000.ndjson + log.ndjson See http://hl7.org/fhir/uv/bulkdata/export/index.html for details. """ # Initiate bulk export print("Starting bulk FHIR export…") + self._log = export_log.BulkExportLogWriter(store.Root(self._destination)) params = {"_type": ",".join(self._resources)} if self._since: @@ -86,41 +96,60 @@ async def export(self) -> None: # But some servers do support it, and it is a possible future addition to the spec. params["_until"] = self._until + full_url = urllib.parse.urljoin(self._url, f"$export?{urllib.parse.urlencode(params)}") try: response = await self._request_with_delay( - urllib.parse.urljoin(self._url, f"$export?{urllib.parse.urlencode(params)}"), + full_url, headers={"Prefer": "respond-async"}, target_status_code=202, ) - except errors.FhirConnectionError as exc: - errors.fatal(str(exc), errors.FHIR_AUTH_FAILED) + except Exception as exc: + self._log.kickoff(full_url, self._client.get_capabilities(), exc) + raise + else: + self._log.kickoff(full_url, self._client.get_capabilities(), response) # Grab the poll location URL for status updates poll_location = response.headers["Content-Location"] - try: - # Request status report, until export is done - response = await self._request_with_delay(poll_location, headers={"Accept": "application/json"}) - - # Finished! We're done waiting and can download all the files - response_json = response.json() - - self.export_datetime = datetime.datetime.fromisoformat(response_json["transactionTime"]) - - # Were there any server-side errors during the export? - # The spec acknowledges that "error" is perhaps misleading for an array that can contain info messages. - error_texts, warning_texts = await self._gather_all_messages(response_json.get("error", [])) - if error_texts: - raise errors.FatalError("\n - ".join(["Errors occurred during export:"] + error_texts)) - if warning_texts: - print("\n - ".join(["Messages from server:"] + warning_texts)) - - # Download all the files - print("Bulk FHIR export finished, now downloading resources…") - files = response_json.get("output", []) - await self._download_all_ndjson_files(files) - finally: - await self._delete_export(poll_location) + # Request status report, until export is done + response = await self._request_with_logging( + poll_location, + headers={"Accept": "application/json"}, + log_progress=self._log.status_progress, + log_error=self._log.status_error, + ) + self._log.status_complete(response) + + # Finished! We're done waiting and can download all the files + response_json = response.json() + + self.export_datetime = datetime.datetime.fromisoformat(response_json["transactionTime"]) + + # Were there any server-side errors during the export? + # The spec acknowledges that "error" is perhaps misleading for an array that can contain info messages. + error_texts, warning_texts = await self._gather_all_messages(response_json.get("error", [])) + if warning_texts: + print("\n - ".join(["Messages from server:"] + warning_texts)) + + # Download all the files + print("Bulk FHIR export finished, now downloading resources…") + files = response_json.get("output", []) + await self._download_all_ndjson_files(files) + + self._log.export_complete() + + # If we raised an error in the above code, we intentionally will not reach this DELETE + # call. If we had an issue talking to the server (like http errors), we want to leave the + # files up there, so the user could try to manually recover. + await self._delete_export(poll_location) + + # Make sure we're fully done before we bail because the server told us the export has issues. + # We still want to DELETE the export in this case. And we still want to download all the files + # the server DID give us. Servers may have lots of ignorable errors that need human review, + # before passing back to us as input ndjson. + if error_texts: + raise errors.FatalError("\n - ".join(["Errors occurred during export:"] + error_texts)) ################################################################################################################### # @@ -137,7 +166,12 @@ async def _delete_export(self, poll_url: str) -> None: pass async def _request_with_delay( - self, path: str, headers: dict = None, target_status_code: int = 200, method: str = "GET" + self, + path: str, + headers: dict = None, + target_status_code: int = 200, + method: str = "GET", + log_progress: Callable[[httpx.Response], None] = None, ) -> httpx.Response: """ Requests a file, while respecting any requests to wait longer. @@ -161,6 +195,9 @@ async def _request_with_delay( # 202 == server is still working on it, 429 == server is busy -- in both cases, we wait if response.status_code in [202, 429]: + if log_progress: + log_progress(response) + # Print a message to the user, so they don't see us do nothing for a while delay = int(response.headers.get("Retry-After", 60)) if response.status_code == 202: @@ -181,12 +218,30 @@ async def _request_with_delay( # It feels silly to abort on an unknown *success* code, but the spec has such clear guidance on # what the expected response codes are, that it's not clear if a code outside those parameters means # we should keep waiting or stop waiting. So let's be strict here for now. - raise errors.FatalError( - f"Unexpected status code {response.status_code} from the bulk FHIR export server." + raise errors.NetworkError( + f"Unexpected status code {response.status_code} from the bulk FHIR export server.", + response, ) raise errors.FatalError("Timed out waiting for the bulk FHIR export to finish.") + async def _request_with_logging( + self, + *args, + log_begin: Callable[[], None] = None, + log_error: Callable[[Exception], None] = None, + **kwargs, + ) -> httpx.Response: + if log_begin: + log_begin() + + try: + return await self._request_with_delay(*args, **kwargs) + except Exception as exc: + if log_error: + log_error(exc) + raise + async def _gather_all_messages(self, error_list: list[dict]) -> (list[str], list[str]): """ Downloads all outcome message ndjson files from the bulk export server. @@ -197,13 +252,26 @@ async def _gather_all_messages(self, error_list: list[dict]) -> (list[str], list coroutines = [] for error in error_list: if error.get("type") == "OperationOutcome": # per spec as of writing, the only allowed type - coroutines.append(self._request_with_delay(error["url"], headers={"Accept": "application/fhir+ndjson"})) + coroutines.append( + self._request_with_logging( + error["url"], + headers={"Accept": "application/fhir+ndjson"}, + log_begin=partial( + self._log.download_request, + error["url"], + "error", + error["type"], + ), + log_error=partial(self._log.download_error, error["url"]), + ), + ) responses = await asyncio.gather(*coroutines) fatal_messages = [] info_messages = [] for response in responses: - outcomes = (json.loads(x) for x in response.text.split("\n") if x) # a list of OperationOutcomes + outcomes = [json.loads(x) for x in response.text.split("\n") if x] # a list of OperationOutcomes + self._log.download_complete(response.url, len(outcomes), len(response.text)) for outcome in outcomes: for issue in outcome.get("issue", []): text = issue.get("diagnostics") @@ -228,23 +296,47 @@ async def _download_all_ndjson_files(self, files: list[dict]) -> None: count = resource_counts.get(file["type"], -1) + 1 resource_counts[file["type"]] = count filename = f'{file["type"]}.{count:03}.ndjson' - coroutines.append(self._download_ndjson_file(file["url"], os.path.join(self._destination, filename))) + coroutines.append( + self._download_ndjson_file( + file["url"], + file["type"], + os.path.join(self._destination, filename), + ), + ) await asyncio.gather(*coroutines) - async def _download_ndjson_file(self, url: str, filename: str) -> None: + async def _download_ndjson_file(self, url: str, resource_type: str, filename: str) -> None: """ Downloads a single ndjson file from the bulk export server. :param url: URL location of file to download + :param resource_type: the resource type of the file :param filename: local path to write data to """ - response = await self._client.request("GET", url, headers={"Accept": "application/fhir+ndjson"}, stream=True) + + self._log.download_request(url, "output", resource_type) + decompressed_size = 0 + try: - with open(filename, "w", encoding="utf8") as file: - async for block in response.aiter_text(): - file.write(block) - finally: - await response.aclose() + response = await self._client.request( + "GET", + url, + headers={"Accept": "application/fhir+ndjson"}, + stream=True, + ) + try: + with open(filename, "w", encoding="utf8") as file: + async for block in response.aiter_text(): + file.write(block) + decompressed_size += len(block) + finally: + await response.aclose() + except Exception as exc: + self._log.download_error(url, exc) + raise + + lines = common.read_local_line_count(filename) + self._log.download_complete(url, lines, decompressed_size) url_last_part = url.split("/")[-1] filename_last_part = filename.split("/")[-1] diff --git a/cumulus_etl/loaders/fhir/export_log.py b/cumulus_etl/loaders/fhir/export_log.py index 0e8fae8b..023acf20 100644 --- a/cumulus_etl/loaders/fhir/export_log.py +++ b/cumulus_etl/loaders/fhir/export_log.py @@ -8,8 +8,11 @@ import json import os import re +import uuid -from cumulus_etl import common, fhir, store +import httpx + +from cumulus_etl import common, errors, fhir, store class BulkExportLogParser: @@ -97,3 +100,186 @@ def _find(self, root: store.Root) -> str: return filenames[log_files[0]] case _: raise self.MultipleLogs("Multiple log.*.ndjson files found") + + +class BulkExportLogWriter: + """Writes a standard log bulk export file.""" + + def __init__(self, root: store.Root): + self.root = root + self._export_id = str(uuid.uuid4()) + self._filename = root.joinpath("log.ndjson") + self._num_files = 0 + self._num_resources = 0 + self._num_bytes = 0 + self._start_time = None + + def _event(self, event_id: str, detail: dict, *, timestamp: datetime.datetime = None) -> None: + timestamp = timestamp or common.datetime_now(local=True) + if self._start_time is None: + self._start_time = timestamp + + # We open the file anew for each event because: + # a) logging should be flushed often to disk + # b) it makes the API of the class easier by avoiding a context manager + with self.root.fs.open(self._filename, "a", encoding="utf8") as f: + row = { + "exportId": self._export_id, + "timestamp": timestamp.isoformat(), + "eventId": event_id, + "eventDetail": detail, + } + json.dump(row, f) + f.write("\n") + + @staticmethod + def _body(response: httpx.Response) -> dict | str: + try: + parsed = response.json() + if isinstance(parsed, dict): + return parsed + except json.JSONDecodeError: + pass # fall back to text + return response.text + + @staticmethod + def _response_info(response: httpx.Response) -> dict: + return { + "body": BulkExportLogWriter._body(response), + "code": response.status_code, + "responseHeaders": dict(response.headers), + } + + @staticmethod + def _error_info(exc: Exception) -> dict: + """Merge the returned dictionary into an event detail object""" + info = { + "body": None, + "code": None, + "message": str(exc), + "responseHeaders": None, + } + + if isinstance(exc, errors.NetworkError): + info.update(BulkExportLogWriter._response_info(exc.response)) + + return info + + def kickoff(self, url: str, capabilities: dict, response: httpx.Response | Exception): + # https://www.hl7.org/fhir/R4/capabilitystatement.html + software = capabilities.get("software", {}) + response_info = {} + + # Spec says we shouldn't log the `patient` parameter, so strip it here. + request_headers = dict(httpx.URL(url).params) + request_headers.pop("patient", None) + + if isinstance(response, Exception): + response_info = self._error_info(response) + if response_info["body"] is None: # for non-httpx error cases + response_info["body"] = response_info["message"] + else: + response_info = BulkExportLogWriter._response_info(response) + if response.status_code == 202: + response_info["body"] = None + response_info["code"] = None + + self._event( + "kickoff", + { + "exportUrl": url, + "softwareName": software.get("name"), + "softwareVersion": software.get("version"), + "softwareReleaseDate": software.get("releaseDate"), + "fhirVersion": capabilities.get("fhirVersion"), + "requestParameters": request_headers, + "errorCode": response_info["code"], + "errorBody": response_info["body"], + "responseHeaders": response_info["responseHeaders"], + }, + ) + + def status_progress(self, response: httpx.Response): + self._event( + "status_progress", + { + "body": self._body(response), + "xProgress": response.headers.get("X-Progress"), + "retryAfter": response.headers.get("Retry-After"), + }, + ) + + def status_complete(self, response: httpx.Response): + response_json = response.json() + self._event( + "status_complete", + { + "transactionTime": response_json.get("transactionTime"), + "outputFileCount": len(response_json.get("output", [])), + "deletedFileCount": len(response_json.get("deleted", [])), + "errorFileCount": len(response_json.get("error", [])), + }, + ) + + def status_error(self, exc: Exception): + self._event( + "status_error", + self._error_info(exc), + ) + + def download_request( + self, + file_url: str | httpx.URL, + item_type: str, + resource_type: str | None, + ): + self._event( + "download_request", + { + "fileUrl": str(file_url), + "itemType": item_type, + "resourceType": resource_type, + }, + ) + + def download_complete( + self, + file_url: str | httpx.URL, + resource_count: int | None, + file_size: int, + ): + self._num_files += 1 + self._num_resources += resource_count or 0 + self._num_bytes += file_size + self._event( + "download_complete", + { + "fileUrl": str(file_url), + "resourceCount": resource_count, + "fileSize": file_size, + }, + ) + + def download_error(self, file_url: str | httpx.URL, exc: Exception): + self._event( + "download_error", + { + "fileUrl": str(file_url), + **self._error_info(exc), + }, + ) + + def export_complete(self): + timestamp = common.datetime_now(local=True) + duration = (timestamp - self._start_time) if self._start_time else 0 + self._event( + "export_complete", + { + "files": self._num_files, + "resources": self._num_resources, + "bytes": self._num_bytes, + "attachments": None, + "duration": duration.microseconds // 1000, + }, + timestamp=timestamp, + ) diff --git a/cumulus_etl/loaders/fhir/ndjson_loader.py b/cumulus_etl/loaders/fhir/ndjson_loader.py index f5984870..cdb9eb17 100644 --- a/cumulus_etl/loaders/fhir/ndjson_loader.py +++ b/cumulus_etl/loaders/fhir/ndjson_loader.py @@ -41,38 +41,40 @@ def __init__( async def load_all(self, resources: list[str]) -> common.Directory: # Are we doing a bulk FHIR export from a server? if self.root.protocol in ["http", "https"]: - return await self._load_from_bulk_export(resources) + loaded_dir = await self._load_from_bulk_export(resources) + input_root = store.Root(loaded_dir.name) + else: + if self.export_to or self.since or self.until: + errors.fatal( + "You provided FHIR bulk export parameters but did not provide a FHIR server", errors.ARGS_CONFLICT + ) - if self.export_to or self.since or self.until: - errors.fatal( - "You provided FHIR bulk export parameters but did not provide a FHIR server", errors.ARGS_CONFLICT - ) - - # Parse logs for export information - try: - parser = BulkExportLogParser(self.root) - self.group_name = parser.group_name - self.export_datetime = parser.export_datetime - except BulkExportLogParser.LogParsingError: - # Once we require group name & export datetime, we should warn about this. - # For now, just ignore any errors. - pass + input_root = self.root + # Parse logs for export information + try: + parser = BulkExportLogParser(input_root) + self.group_name = parser.group_name + self.export_datetime = parser.export_datetime + except BulkExportLogParser.LogParsingError: + # Once we require group name & export datetime, we should warn about this. + # For now, just ignore any errors. + pass # Copy the resources we need from the remote directory (like S3 buckets) to a local one. # # We do this even if the files are local, because the next step in our pipeline is the MS deid tool, # and it will just process *everything* in a directory. So if there are other *.ndjson sitting next to our # target resources, they'll get processed by the MS tool and that slows down running a single task with - # "--task" a lot. + # "--task" a lot. (Or it'll be invalid FHIR ndjson like our log.ndjson and the MS tool will complain.) # # This uses more disk space temporarily (copied files will get deleted once the MS tool is done and this # TemporaryDirectory gets discarded), but that seems reasonable. print("Copying ndjson input files…") tmpdir = tempfile.TemporaryDirectory() # pylint: disable=consider-using-with for resource in resources: - filenames = common.ls_resources(self.root, resource) + filenames = common.ls_resources(input_root, resource) for filename in filenames: - self.root.get(filename, f"{tmpdir.name}/") + input_root.get(filename, f"{tmpdir.name}/") if not filenames: logging.warning("No resources found for %s", resource) return tmpdir diff --git a/docs/setup/cumulus-aws-template.yaml b/docs/setup/cumulus-aws-template.yaml index 401cbfc0..aa0aba74 100644 --- a/docs/setup/cumulus-aws-template.yaml +++ b/docs/setup/cumulus-aws-template.yaml @@ -198,8 +198,6 @@ Resources: - !Sub "s3://${S3Bucket}/${EtlSubdir}/servicerequest" - !Sub "s3://${S3Bucket}/${EtlSubdir}/covid_symptom__nlp_results" - !Sub "s3://${S3Bucket}/${EtlSubdir}/covid_symptom__nlp_results_term_exists" - - !Sub "s3://${S3Bucket}/${EtlSubdir}/etl__completion" - - !Sub "s3://${S3Bucket}/${EtlSubdir}/etl__completion_encounters" CreateNativeDeltaTable: True WriteManifest: False diff --git a/pyproject.toml b/pyproject.toml index d256073a..3c3f4cfa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ dependencies = [ "label-studio-sdk < 1", "oracledb < 3", "philter-lite < 1", - "pyarrow < 16", + "pyarrow < 17", "rich < 14", "s3fs", ] diff --git a/tests/etl/test_etl_cli.py b/tests/etl/test_etl_cli.py index 43cb2438..e99fa64e 100644 --- a/tests/etl/test_etl_cli.py +++ b/tests/etl/test_etl_cli.py @@ -171,7 +171,7 @@ async def test_bulk_no_auth(self): # Now run the ETL on that new input dir without any server auth config provided with self.assertRaises(SystemExit) as cm: await self.run_etl(input_path="https://localhost:12345/", tasks=["patient"]) - self.assertEqual(errors.FHIR_AUTH_FAILED, cm.exception.code) + self.assertEqual(errors.BULK_EXPORT_FAILED, cm.exception.code) @ddt.data( # First line is CLI args diff --git a/tests/loaders/ndjson/test_bulk_export.py b/tests/loaders/ndjson/test_bulk_export.py index 56c574e4..659e107e 100644 --- a/tests/loaders/ndjson/test_bulk_export.py +++ b/tests/loaders/ndjson/test_bulk_export.py @@ -1,22 +1,22 @@ """Tests for bulk export support""" import contextlib +import datetime import io import tempfile -from json import dumps from unittest import mock import ddt import respx -from jwcrypto import jwk from cumulus_etl import cli, common, errors, store from cumulus_etl.loaders.fhir.bulk_export import BulkExporter -from tests.utils import AsyncTestCase, make_response +from cumulus_etl.loaders.fhir.export_log import BulkExportLogParser +from tests import utils @ddt.ddt -class TestBulkExporter(AsyncTestCase): +class TestBulkExporter(utils.AsyncTestCase, utils.FhirClientMixin): """ Test case for bulk export logic. @@ -26,153 +26,296 @@ class TestBulkExporter(AsyncTestCase): def setUp(self): super().setUp() self.tmpdir = self.make_tempdir() - self.server = mock.AsyncMock() - - def make_exporter(self, **kwargs) -> BulkExporter: - return BulkExporter(self.server, ["Condition", "Patient"], "https://localhost/", self.tmpdir, **kwargs) + self.exporter = None + + async def export(self, **kwargs) -> None: + resources = ["Condition", "Patient"] + async with self.fhir_client(resources) as client: + self.exporter = BulkExporter(client, resources, self.fhir_url, self.tmpdir, **kwargs) + await self.exporter.export() + + def assert_log_equals(self, *rows) -> None: + found_rows = list(common.read_ndjson(f"{self.tmpdir}/log.ndjson")) + + # Do we use the same export ID throughout? + all_export_ids = {x["exportId"] for x in found_rows} + self.assertEqual(1, len(all_export_ids)) + + # Are timestamps increasing? + all_timestamps = [x["timestamp"] for x in found_rows] + self.assertListEqual(all_timestamps, sorted(all_timestamps)) + + # Max one kickoff and one completion + all_event_ids = [x["eventId"] for x in found_rows] + self.assertLessEqual(all_event_ids.count("kickoff"), 1) + self.assertLessEqual(all_event_ids.count("export_complete"), 1) + + # Verify that duration is correct + if "export_complete" in all_event_ids: + kickoff_index = all_event_ids.index("kickoff") + complete_index = all_event_ids.index("export_complete") + kickoff_timestamp = found_rows[kickoff_index]["timestamp"] + complete_timestamp = found_rows[complete_index]["timestamp"] + kickoff_datetime = datetime.datetime.fromisoformat(kickoff_timestamp) + complete_datetime = datetime.datetime.fromisoformat(complete_timestamp) + expected_duration = (complete_datetime - kickoff_datetime).microseconds // 1000 + found_duration = found_rows[complete_index]["eventDetail"]["duration"] + self.assertEqual(found_duration, expected_duration) + + extracted_details = [(x["eventId"], x["eventDetail"]) for x in found_rows] + + # Match and reorder download requests/completes because those can be async/non-deterministic + reordered_details = [] + completion = [] + downloads = {} + for event_id, detail in extracted_details: + if event_id == "download_request": + self.assertNotIn(detail["fileUrl"], downloads) + downloads[detail["fileUrl"]] = [(event_id, detail)] + elif event_id in ("download_complete", "download_error"): + self.assertIn(detail["fileUrl"], downloads) + downloads[detail["fileUrl"]].append((event_id, detail)) + elif event_id == "export_complete": + completion.append((event_id, detail)) + else: + reordered_details.append((event_id, detail)) + for file_url in sorted(downloads): + reordered_details += downloads[file_url] + reordered_details += completion + + self.assertEqual(len(reordered_details), len(rows), reordered_details) + for index, row in enumerate(rows): + self.assertEqual(reordered_details[index][0], row[0]) + if row[1] is not None: + self.assertEqual(reordered_details[index][1], row[1]) + + def mock_kickoff(self, params: str = "?_type=Condition%2CPatient", side_effect: list = None, **kwargs) -> None: + kwargs.setdefault("status_code", 202) + route = self.respx_mock.get( + f"{self.fhir_url}/$export{params}", + headers={"Prefer": "respond-async"}, + ) + if side_effect: + route.side_effect = side_effect + else: + route.respond( + headers={ + "Content-Location": "https://example.com/poll", + "Vendor-Transaction-ID": "1234", + }, + **kwargs, + ) - async def export(self, **kwargs) -> BulkExporter: - exporter = self.make_exporter(**kwargs) - await exporter.export() - return exporter + def mock_delete(self, **kwargs) -> None: + kwargs.setdefault("status_code", 202) + self.respx_mock.delete("https://example.com/poll").respond(**kwargs) async def test_happy_path(self): """Verify an end-to-end bulk export with no problems and no waiting works as expected""" - self.server.request.side_effect = [ - make_response(status_code=202, headers={"Content-Location": "https://example.com/poll"}), # kickoff - make_response( - json_payload={ - "transactionTime": "2015-02-07T13:28:17.239+02:00", - "output": [ - {"type": "Condition", "url": "https://example.com/con1"}, - {"type": "Condition", "url": "https://example.com/con2"}, - {"type": "Patient", "url": "https://example.com/pat1"}, - ], - } - ), # status - make_response(json_payload={"type": "Condition1"}, stream=True), # download - make_response(json_payload={"type": "Condition2"}, stream=True), # download - make_response(json_payload={"type": "Patient1"}, stream=True), # download - make_response(status_code=202), # delete request - ] - - exporter = await self.export() - - self.assertListEqual( - [ - mock.call( - "GET", - "https://localhost/$export?_type=Condition%2CPatient", - headers={"Prefer": "respond-async"}, - ), - mock.call("GET", "https://example.com/poll", headers={"Accept": "application/json"}), - mock.call( - "GET", "https://example.com/con1", headers={"Accept": "application/fhir+ndjson"}, stream=True - ), - mock.call( - "GET", "https://example.com/con2", headers={"Accept": "application/fhir+ndjson"}, stream=True - ), - mock.call( - "GET", "https://example.com/pat1", headers={"Accept": "application/fhir+ndjson"}, stream=True - ), - mock.call("DELETE", "https://example.com/poll", headers=None), - ], - self.server.request.call_args_list, + self.mock_kickoff() + self.mock_delete() + self.respx_mock.get( + "https://example.com/poll", + headers={"Accept": "application/json"}, + ).respond( + json={ + "transactionTime": "2015-02-07T13:28:17.239+02:00", + "output": [ + {"type": "Condition", "url": "https://example.com/con1"}, + {"type": "Condition", "url": "https://example.com/con2"}, + {"type": "Patient", "url": "https://example.com/pat1"}, + ], + }, ) + self.respx_mock.get( + "https://example.com/con1", + headers={"Accept": "application/fhir+ndjson"}, + ).respond(json={"resourceType": "Condition", "id": "1"}) + self.respx_mock.get( + "https://example.com/con2", + headers={"Accept": "application/fhir+ndjson"}, + ).respond(json={"resourceType": "Condition", "id": "2"}) + self.respx_mock.get( + "https://example.com/pat1", + headers={"Accept": "application/fhir+ndjson"}, + ).respond(json={"resourceType": "Patient", "id": "P"}) + + await self.export() + + self.assertEqual("MyGroup", self.exporter.group_name) + self.assertEqual("2015-02-07T13:28:17.239000+02:00", self.exporter.export_datetime.isoformat()) + + # Ensure we can read back our own log and parse the above values too + parser = BulkExportLogParser(store.Root(self.tmpdir)) + self.assertEqual("MyGroup", parser.group_name) + self.assertEqual("2015-02-07T13:28:17.239000+02:00", parser.export_datetime.isoformat()) + + self.assertEqual( + {"resourceType": "Condition", "id": "1"}, common.read_json(f"{self.tmpdir}/Condition.000.ndjson") + ) + self.assertEqual( + {"resourceType": "Condition", "id": "2"}, common.read_json(f"{self.tmpdir}/Condition.001.ndjson") + ) + self.assertEqual({"resourceType": "Patient", "id": "P"}, common.read_json(f"{self.tmpdir}/Patient.000.ndjson")) - self.assertEqual("", exporter.group_name) # global group name is empty string - self.assertEqual("2015-02-07T13:28:17.239000+02:00", exporter.export_datetime.isoformat()) - - self.assertEqual({"type": "Condition1"}, common.read_json(f"{self.tmpdir}/Condition.000.ndjson")) - self.assertEqual({"type": "Condition2"}, common.read_json(f"{self.tmpdir}/Condition.001.ndjson")) - self.assertEqual({"type": "Patient1"}, common.read_json(f"{self.tmpdir}/Patient.000.ndjson")) + self.assert_log_equals( + ( + "kickoff", + { + "exportUrl": f"{self.fhir_url}/$export?_type=Condition%2CPatient", + "softwareName": "Test", + "softwareVersion": "0.git", + "softwareReleaseDate": "today", + "fhirVersion": "4.0.1", + "requestParameters": {"_type": "Condition,Patient"}, + "errorCode": None, + "errorBody": None, + "responseHeaders": { + "content-location": "https://example.com/poll", + "vendor-transaction-id": "1234", + }, + }, + ), + ( + "status_complete", + { + "deletedFileCount": 0, + "errorFileCount": 0, + "outputFileCount": 3, + "transactionTime": "2015-02-07T13:28:17.239+02:00", + }, + ), + ( + "download_request", + {"fileUrl": "https://example.com/con1", "itemType": "output", "resourceType": "Condition"}, + ), + ("download_complete", {"fileSize": 40, "fileUrl": "https://example.com/con1", "resourceCount": 1}), + ( + "download_request", + {"fileUrl": "https://example.com/con2", "itemType": "output", "resourceType": "Condition"}, + ), + ("download_complete", {"fileSize": 40, "fileUrl": "https://example.com/con2", "resourceCount": 1}), + ( + "download_request", + {"fileUrl": "https://example.com/pat1", "itemType": "output", "resourceType": "Patient"}, + ), + ("download_complete", {"fileSize": 38, "fileUrl": "https://example.com/pat1", "resourceCount": 1}), + ("export_complete", {"attachments": None, "bytes": 118, "duration": 0, "files": 3, "resources": 3}), + ) async def test_since_until(self): """Verify that we send since & until parameters correctly to the server""" - self.server.request.side_effect = (make_response(status_code=500),) # early exit + self.mock_kickoff( + params="?_type=Condition%2CPatient&_since=2000-01-01T00%3A00%3A00%2B00.00&_until=2010", + status_code=500, # early exit + ) with self.assertRaises(errors.FatalError): await self.export(since="2000-01-01T00:00:00+00.00", until="2010") - self.assertListEqual( - [ - mock.call( - "GET", - "https://localhost/$export?" - "_type=Condition%2CPatient&_since=2000-01-01T00%3A00%3A00%2B00.00&_until=2010", - headers={"Prefer": "respond-async"}, - ), - ], - self.server.request.call_args_list, - ) - async def test_export_error(self): """Verify that we download and present any server-reported errors during the bulk export""" - self.server.request.side_effect = [ - make_response(status_code=202, headers={"Content-Location": "https://example.com/poll"}), # kickoff - make_response( - json_payload={ - "transactionTime": "2015-02-07T13:28:17.239+02:00", - "error": [ - {"type": "OperationOutcome", "url": "https://example.com/err1"}, - {"type": "OperationOutcome", "url": "https://example.com/err2"}, - ], - "output": [ # include an output too, to confirm we don't bother trying to download it - {"type": "Condition", "url": "https://example.com/con1"}, - ], - } - ), # status - # errors - make_response( - json_payload={"type": "OperationOutcome", "issue": [{"severity": "error", "diagnostics": "err1"}]} - ), - make_response( - text='{"type": "OperationOutcome", "issue": [{"severity": "fatal", "details": {"text": "err2"}}]}\n' - '{"type": "OperationOutcome", "issue": [{"severity": "warning", "diagnostics": "warning1"}]}\n' - '{"type": "OperationOutcome", "issue": [' - '{"severity": "error", "code": "err3"}, {"severity": "fatal", "code": "err4"}]}\n' - ), - make_response(status_code=202), # delete request - ] + self.mock_kickoff() + self.mock_delete() + self.respx_mock.get( + "https://example.com/poll", + headers={"Accept": "application/json"}, + ).respond( + json={ + "transactionTime": "2015-02-07T13:28:17.239+02:00", + "error": [ + {"type": "OperationOutcome", "url": "https://example.com/err1"}, + {"type": "OperationOutcome", "url": "https://example.com/err2"}, + ], + "output": [ # include an output too, to confirm we do download it + {"type": "Condition", "url": "https://example.com/con1"}, + ], + }, + ) + self.respx_mock.get( + "https://example.com/err1", + headers={"Accept": "application/fhir+ndjson"}, + ).respond( + json={ + "resourceType": "OperationOutcome", + "issue": [{"severity": "error", "diagnostics": "err1"}], + }, + ) + self.respx_mock.get( + "https://example.com/err2", + headers={"Accept": "application/fhir+ndjson"}, + ).respond( + text=( + '{"resourceType": "OperationOutcome",' + '"issue": [{"severity": "fatal", "details": {"text": "err2"}}]}\n' + '{"resourceType": "OperationOutcome",' + '"issue": [{"severity": "warning", "diagnostics": "warning1"}]}\n' + '{"resourceType": "OperationOutcome",' + '"issue": [' + '{"severity": "error", "code": "err3"},' + '{"severity": "fatal", "code": "err4"}' + "]}\n" + ) + ) + self.respx_mock.get( + "https://example.com/con1", + headers={"Accept": "application/fhir+ndjson"}, + ).respond( + json={"resourceType": "Condition"}, + ) with self.assertRaisesRegex( errors.FatalError, "Errors occurred during export:\n - err1\n - err2\n - err3\n - err4" ): await self.export() - self.assertListEqual( - [ - mock.call( - "GET", - "https://localhost/$export?_type=Condition%2CPatient", - headers={"Prefer": "respond-async"}, - ), - mock.call("GET", "https://example.com/poll", headers={"Accept": "application/json"}), - mock.call("GET", "https://example.com/err1", headers={"Accept": "application/fhir+ndjson"}), - mock.call("GET", "https://example.com/err2", headers={"Accept": "application/fhir+ndjson"}), - mock.call("DELETE", "https://example.com/poll", headers=None), - ], - self.server.request.call_args_list, + self.assert_log_equals( + ("kickoff", None), + ( + "status_complete", + { + "deletedFileCount": 0, + "errorFileCount": 2, + "outputFileCount": 1, + "transactionTime": "2015-02-07T13:28:17.239+02:00", + }, + ), + ( + "download_request", + {"fileUrl": "https://example.com/con1", "itemType": "output", "resourceType": "Condition"}, + ), + ("download_complete", {"fileSize": 29, "fileUrl": "https://example.com/con1", "resourceCount": 1}), + ( + "download_request", + {"fileUrl": "https://example.com/err1", "itemType": "error", "resourceType": "OperationOutcome"}, + ), + ("download_complete", {"fileSize": 93, "fileUrl": "https://example.com/err1", "resourceCount": 1}), + ( + "download_request", + {"fileUrl": "https://example.com/err2", "itemType": "error", "resourceType": "OperationOutcome"}, + ), + ("download_complete", {"fileSize": 322, "fileUrl": "https://example.com/err2", "resourceCount": 3}), + ("export_complete", {"attachments": None, "bytes": 444, "duration": 0, "files": 3, "resources": 5}), ) async def test_export_warning(self): """Verify that we download and present any server-reported warnings during the bulk export""" - self.server.request.side_effect = [ - make_response(status_code=202, headers={"Content-Location": "https://example.com/poll"}), # kickoff - make_response( - json_payload={ - "transactionTime": "2015-02-07T13:28:17.239+02:00", - "error": [ - {"type": "OperationOutcome", "url": "https://example.com/warning1"}, - ], - } - ), # status - # warning - make_response( - json_payload={"type": "OperationOutcome", "issue": [{"severity": "warning", "diagnostics": "warning1"}]} - ), - make_response(status_code=202), # delete request - ] + self.mock_kickoff() + self.mock_delete() + self.respx_mock.get("https://example.com/poll").respond( + json={ + "transactionTime": "2015-02-07T13:28:17.239+02:00", + "error": [ + {"type": "OperationOutcome", "url": "https://example.com/warning1"}, + ], + }, + ) + self.respx_mock.get("https://example.com/warning1").respond( + json={ + "resourceType": "OperationOutcome", + "issue": [{"severity": "warning", "diagnostics": "warning1"}], + }, + ) stdout = io.StringIO() with contextlib.redirect_stdout(stdout): @@ -180,31 +323,92 @@ async def test_export_warning(self): self.assertIn("Messages from server:\n - warning1\n", stdout.getvalue()) + async def test_file_download_error(self): + """Verify that we correctly handle a resource download failure""" + self.mock_kickoff() + self.respx_mock.get("https://example.com/poll").respond( + json={ + "transactionTime": "2015-02-07T13:28:17.239+02:00", + "output": [ + {"type": "Condition", "url": "https://example.com/con1"}, + ], + }, + ) + self.respx_mock.get("https://example.com/con1").respond(status_code=501, content=b'["error"]') + + with self.assertRaisesRegex( + errors.FatalError, + r'An error occurred when connecting to "https://example.com/con1": \["error"\]', + ): + await self.export() + + self.assert_log_equals( + ("kickoff", None), + ("status_complete", None), + ("download_request", None), + ( + "download_error", + { + "fileUrl": "https://example.com/con1", + "body": '["error"]', + "code": 501, + "message": 'An error occurred when connecting to "https://example.com/con1": ["error"]', + "responseHeaders": {"content-length": "9"}, + }, + ), + ) + async def test_unexpected_status_code(self): """Verify that we bail if we see a successful code we don't understand""" - self.server.request.return_value = make_response(status_code=204) # "no content" + self.mock_kickoff(status_code=204) # "no content" + with self.assertRaisesRegex(errors.FatalError, "Unexpected status code 204"): await self.export() + self.assert_log_equals( + ( + "kickoff", + { + "exportUrl": f"{self.fhir_url}/$export?_type=Condition%2CPatient", + "softwareName": "Test", + "softwareVersion": "0.git", + "softwareReleaseDate": "today", + "fhirVersion": "4.0.1", + "requestParameters": {"_type": "Condition,Patient"}, + "errorCode": 204, + "errorBody": "", + "responseHeaders": { + "content-location": "https://example.com/poll", + "vendor-transaction-id": "1234", + }, + }, + ), + ) + @mock.patch("cumulus_etl.loaders.fhir.bulk_export.asyncio.sleep") async def test_delay(self, mock_sleep): """Verify that we wait the amount of time the server asks us to""" - self.server.request.side_effect = [ - # Kicking off bulk export - make_response(status_code=429, headers={"Retry-After": "3600"}), # one hour - make_response(status_code=202, headers={"Content-Location": "https://example.com/poll"}), # kickoff done - # Checking status of bulk export - make_response(status_code=429), # default of one minute - make_response(status_code=202, headers={"Retry-After": "18000"}), # five hours (gets limited to five min) - make_response(status_code=429, headers={"Retry-After": "82800"}), # 23 hours (putting us over a day) + self.mock_kickoff( + side_effect=[ + # Before returning a successful kickoff, pause for an hour + respx.MockResponse(status_code=429, headers={"Retry-After": "3600"}), + respx.MockResponse(status_code=202, headers={"Content-Location": "https://example.com/poll"}), + ] + ) + self.respx_mock.get("https://example.com/poll").side_effect = [ + # default of one minute + respx.MockResponse(status_code=429, headers={"X-Progress": "chill"}, content=b"{}"), + # five hours (though 202 responses will get limited to five min) + respx.MockResponse(status_code=202, headers={"Retry-After": "18000"}, content=b"..."), + # 23 hours (putting us over a day) + respx.MockResponse(status_code=429, headers={"Retry-After": "82800", "X-Progress": "plz wait"}), ] - exporter = self.make_exporter() with self.assertRaisesRegex(errors.FatalError, "Timed out waiting"): - await exporter.export() + await self.export() # 86760 == 24 hours + six minutes - self.assertEqual(86760, exporter._total_wait_time) # pylint: disable=protected-access + self.assertEqual(86760, self.exporter._total_wait_time) # pylint: disable=protected-access self.assertListEqual( [ @@ -216,87 +420,91 @@ async def test_delay(self, mock_sleep): mock_sleep.call_args_list, ) - async def test_delete_if_interrupted(self): - """Verify that we still delete the export on the server if we raise an exception during the middle of export""" - self.server.request.side_effect = [ - make_response(status_code=202, headers={"Content-Location": "https://example.com/poll"}), # kickoff done - errors.FatalError("Test Status Call Failed"), # status error - make_response(status_code=501), # also verify that an error during delete does not override the first - ] + self.assert_log_equals( + ("kickoff", None), + ("status_progress", {"body": {}, "xProgress": "chill", "retryAfter": None}), + ("status_progress", {"body": "...", "xProgress": None, "retryAfter": "18000"}), + ("status_progress", {"body": "", "xProgress": "plz wait", "retryAfter": "82800"}), + ( + "status_error", + { + "body": None, + "code": None, + "message": "Timed out waiting for the bulk FHIR export to finish.", + "responseHeaders": None, + }, + ), + ) + + async def test_no_delete_if_interrupted(self): + """Verify that we don't delete the export on the server if we raise an exception during the middle of export""" + self.mock_kickoff() + self.respx_mock.get("https://example.com/poll").respond( + status_code=500, + content=b"Test Status Call Failed", + ) with self.assertRaisesRegex(errors.FatalError, "Test Status Call Failed"): await self.export() - self.assertListEqual( - [ - mock.call( - "GET", - "https://localhost/$export?_type=Condition%2CPatient", - headers={"Prefer": "respond-async"}, - ), - mock.call("GET", "https://example.com/poll", headers={"Accept": "application/json"}), - mock.call("DELETE", "https://example.com/poll", headers=None), - ], - self.server.request.call_args_list, + self.assert_log_equals( + ("kickoff", None), + ( + "status_error", + { + "body": "Test Status Call Failed", + "code": 500, + "message": ( + 'An error occurred when connecting to "https://example.com/poll": ' "Test Status Call Failed" + ), + "responseHeaders": {"content-length": "23"}, + }, + ), ) + async def test_log_duration(self): + """Verify that we calculate the correct export duration for the logs""" -class TestBulkExportEndToEnd(AsyncTestCase): - """ - Test case for doing an entire bulk export loop, without mocking python code. - - Server responses are mocked, but that's it. This is more of a functional test case than a unit test case. - """ + def status_check(request): + del request + future = utils.FROZEN_TIME_UTC + datetime.timedelta(milliseconds=192) + self.time_machine.move_to(future) + return respx.MockResponse( + json={ + "transactionTime": "2015-02-07T13:28:17.239+02:00", + } + ) - def setUp(self) -> None: - super().setUp() + self.mock_kickoff() + self.mock_delete() + self.respx_mock.get("https://example.com/poll").mock(side_effect=status_check) - self.root = store.Root("http://localhost:9999/fhir") - self.input_url = self.root.joinpath("Group/MyGroup") - self.client_id = "test-client-id" + await self.export() - self.jwks_file = tempfile.NamedTemporaryFile() # pylint: disable=consider-using-with - jwk_token = jwk.JWK.generate(kty="EC", alg="ES384", curve="P-384", kid="a", key_ops=["sign", "verify"]).export( - as_dict=True - ) - jwks = {"keys": [jwk_token]} - self.jwks_file.write(dumps(jwks).encode("utf8")) - self.jwks_file.flush() - self.jwks_path = self.jwks_file.name - - def set_up_requests(self, respx_mock): - # /metadata - respx_mock.get( - f"{self.root.path}/metadata", - ).respond(json={}) - - # /.well-known/smart-configuration - respx_mock.get( - f"{self.root.path}/.well-known/smart-configuration", - headers={"Accept": "application/json"}, - ).respond( - json={ - "capabilities": ["client-confidential-asymmetric"], - "token_endpoint": f"{self.root.path}/token", - "token_endpoint_auth_methods_supported": ["private_key_jwt"], - }, + self.assert_log_equals( + ("kickoff", None), + ("status_complete", None), + ( + "export_complete", + {"attachments": None, "bytes": 0, "duration": 192, "files": 0, "resources": 0}, + ), ) - # /token - respx_mock.post( - f"{self.root.path}/token", - ).respond( - json={ - "access_token": "1234567890", - }, - ) +class TestBulkExportEndToEnd(utils.AsyncTestCase, utils.FhirClientMixin): + """ + Test case for doing an entire bulk export loop, without mocking python code. + + Server responses are mocked, but that's it. This is more of a functional test case than a unit test case. + """ + + def set_up_requests(self): # /$export - respx_mock.get( - f"{self.input_url}/$export", + self.respx_mock.get( + f"{self.fhir_url}/$export", headers={ "Accept": "application/fhir+json", - "Authorization": "Bearer 1234567890", + "Authorization": f"Bearer {self.fhir_bearer}", "Prefer": "respond-async", }, params={ @@ -304,43 +512,43 @@ def set_up_requests(self, respx_mock): }, ).respond( status_code=202, - headers={"Content-Location": f"{self.root.path}/poll"}, + headers={"Content-Location": f"{self.fhir_base}/poll"}, ) # /poll - respx_mock.get( - f"{self.root.path}/poll", + self.respx_mock.get( + f"{self.fhir_base}/poll", headers={ "Accept": "application/json", - "Authorization": "Bearer 1234567890", + "Authorization": f"Bearer {self.fhir_bearer}", }, ).respond( json={ "transactionTime": "2015-02-07T13:28:17+02:00", - "output": [{"type": "Patient", "url": f"{self.root.path}/download/patient1"}], + "output": [{"type": "Patient", "url": f"{self.fhir_base}/download/patient1"}], }, ) # /download/patient1 - respx_mock.get( - f"{self.root.path}/download/patient1", + self.respx_mock.get( + f"{self.fhir_base}/download/patient1", headers={ "Accept": "application/fhir+ndjson", - "Authorization": "Bearer 1234567890", + "Authorization": f"Bearer {self.fhir_bearer}", }, ).respond( json={ # content doesn't really matter - "id": "testPatient1", "resourceType": "Patient", + "id": "testPatient1", }, ) # DELETE /poll - respx_mock.delete( - f"{self.root.path}/poll", + self.respx_mock.delete( + f"{self.fhir_base}/poll", headers={ "Accept": "application/fhir+json", - "Authorization": "Bearer 1234567890", + "Authorization": f"Bearer {self.fhir_bearer}", }, ).respond( status_code=202, @@ -349,22 +557,21 @@ def set_up_requests(self, respx_mock): async def test_successful_bulk_export(self): """Verify a happy path bulk export, from toe to tip""" with tempfile.TemporaryDirectory() as tmpdir: - with respx.mock(assert_all_called=True) as respx_mock: - self.set_up_requests(respx_mock) - - await cli.main( - [ - self.input_url, - f"{tmpdir}/output", - f"{tmpdir}/phi", - "--skip-init-checks", - "--output-format=ndjson", - "--task=patient", - f"--smart-client-id={self.client_id}", - f"--smart-jwks={self.jwks_path}", - "--write-completion", - ] - ) + self.set_up_requests() + + await cli.main( + [ + self.fhir_url, + f"{tmpdir}/output", + f"{tmpdir}/phi", + "--skip-init-checks", + "--output-format=ndjson", + "--task=patient", + f"--smart-client-id={self.fhir_client_id}", + f"--smart-jwks={self.fhir_jwks_path}", + "--write-completion", + ] + ) self.assertEqual( {"id": "4342abf315cf6f243e11f4d460303e36c6c3663a25c91cc6b1a8002476c850dd", "resourceType": "Patient"}, diff --git a/tests/loaders/ndjson/test_ndjson_loader.py b/tests/loaders/ndjson/test_ndjson_loader.py index 468a2927..5d02529f 100644 --- a/tests/loaders/ndjson/test_ndjson_loader.py +++ b/tests/loaders/ndjson/test_ndjson_loader.py @@ -276,13 +276,46 @@ async def test_fatal_errors_are_fatal(self): async def test_export_to_folder_happy_path(self): with tempfile.TemporaryDirectory() as tmpdir: + + async def fake_export() -> None: + output_dir = self.mock_exporter_class.call_args[0][3] + common.write_json(f"{output_dir}/Patient.ndjson", {"id": "A"}) + common.write_json(f"{output_dir}/log.ndjson", {"eventId": "kickoff"}) + + self.mock_exporter.export.side_effect = fake_export + target = f"{tmpdir}/target" loader = loaders.FhirNdjsonLoader(store.Root("http://localhost:9999"), mock.AsyncMock(), export_to=target) - folder = await loader.load_all([]) + folder = await loader.load_all(["Patient"]) - self.assertTrue(os.path.isdir(target)) # confirm it got created + # Confirm export folder still has the data (and log) we created above in the mock + self.assertTrue(os.path.isdir(target)) self.assertEqual(target, self.mock_exporter_class.call_args[0][3]) - self.assertEqual(target, folder.name) + self.assertEqual({"Patient.ndjson", "log.ndjson"}, set(os.listdir(target))) + self.assertEqual({"id": "A"}, common.read_json(f"{target}/Patient.ndjson")) + self.assertEqual({"eventId": "kickoff"}, common.read_json(f"{target}/log.ndjson")) + + # Confirm the returned dir has only the data (we don't want to confuse MS tool with logs) + self.assertNotEqual(folder.name, target) + self.assertEqual({"Patient.ndjson"}, set(os.listdir(folder.name))) + self.assertEqual({"id": "A"}, common.read_json(f"{folder.name}/Patient.ndjson")) + + async def test_export_internal_folder_happy_path(self): + """Test that we can also safely export without an export-to folder involved""" + + async def fake_export() -> None: + output_dir = self.mock_exporter_class.call_args[0][3] + common.write_json(f"{output_dir}/Patient.ndjson", {"id": "A"}) + common.write_json(f"{output_dir}/log.ndjson", {"eventId": "kickoff"}) + + self.mock_exporter.export.side_effect = fake_export + + loader = loaders.FhirNdjsonLoader(store.Root("http://localhost:9999"), mock.AsyncMock()) + folder = await loader.load_all(["Patient"]) + + # Confirm the returned dir has only the data (we don't want to confuse MS tool with logs) + self.assertEqual({"Patient.ndjson"}, set(os.listdir(folder.name))) + self.assertEqual({"id": "A"}, common.read_json(f"{folder.name}/Patient.ndjson")) async def test_export_to_folder_has_contents(self): """Verify we fail if an export folder already has contents""" diff --git a/tests/utils.py b/tests/utils.py index f1babbf9..fb99f7e0 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -16,7 +16,9 @@ import httpx import respx import time_machine +from jwcrypto import jwk +from cumulus_etl import fhir from cumulus_etl.formats.deltalake import DeltaLakeFormat # Pass a non-UTC time to time-machine to help notice any bad timezone handling. @@ -25,8 +27,6 @@ FROZEN_TIME_UTC = _FROZEN_TIME.astimezone(datetime.timezone.utc) -# Several tests involve timestamps in some form, so just pick a standard time for all tests. -@time_machine.travel(_FROZEN_TIME, tick=False) class AsyncTestCase(unittest.IsolatedAsyncioTestCase): """ Test case to hold some common code (suitable for async *OR* sync tests) @@ -46,6 +46,11 @@ def setUp(self): # Make it easy to grab test data, regardless of where the test is self.datadir = os.path.join(os.path.dirname(__file__), "data") + # Several tests involve timestamps in some form, so just pick a standard time for all tests. + traveller = time_machine.travel(_FROZEN_TIME, tick=False) + self.addCleanup(traveller.stop) + self.time_machine = traveller.start() + def make_tempdir(self) -> str: """Creates a temporary dir that will be automatically cleaned up""" tempdir = tempfile.TemporaryDirectory() # pylint: disable=consider-using-with @@ -160,6 +165,80 @@ def assert_files_equal(self, left_path: str, right_path: str) -> None: self.assertEqual(left_contents, right_contents, f"{right_path} vs {left_path}") +class FhirClientMixin(unittest.TestCase): + """Mixin that provides a realistic FhirClient""" + + def setUp(self): + super().setUp() + + self.fhir_base = "http://localhost:9999/fhir" + self.fhir_url = f"{self.fhir_base}/Group/MyGroup" + self.fhir_client_id = "test-client-id" + self.fhir_bearer = "1234567890" # the provided oauth bearer token + + jwk_token = jwk.JWK.generate(kty="EC", alg="ES384", curve="P-384", kid="a", key_ops=["sign", "verify"]).export( + as_dict=True + ) + self.fhir_jwks = {"keys": [jwk_token]} + + self._fhir_jwks_file = tempfile.NamedTemporaryFile() # pylint: disable=consider-using-with + self._fhir_jwks_file.write(json.dumps(self.fhir_jwks).encode("utf8")) + self._fhir_jwks_file.flush() + self.addCleanup(self._fhir_jwks_file.close) + self.fhir_jwks_path = self._fhir_jwks_file.name + + # Do not unset assert_all_called - existing tests rely on it + self.respx_mock = respx.MockRouter(assert_all_called=True) + self.addCleanup(self.respx_mock.stop) + self.respx_mock.start() + + self.mock_fhir_auth() + + def mock_fhir_auth(self) -> None: + # /metadata + self.respx_mock.get( + f"{self.fhir_base}/metadata", + ).respond( + json={ + "fhirVersion": "4.0.1", + "software": { + "name": "Test", + "version": "0.git", + "releaseDate": "today", + }, + } + ) + + # /.well-known/smart-configuration + self.respx_mock.get( + f"{self.fhir_base}/.well-known/smart-configuration", + headers={"Accept": "application/json"}, + ).respond( + json={ + "capabilities": ["client-confidential-asymmetric"], + "token_endpoint": f"{self.fhir_base}/token", + "token_endpoint_auth_methods_supported": ["private_key_jwt"], + }, + ) + + # /token + self.respx_mock.post( + f"{self.fhir_base}/token", + ).respond( + json={ + "access_token": self.fhir_bearer, + }, + ) + + def fhir_client(self, resources: list[str]) -> fhir.FhirClient: + return fhir.FhirClient( + self.fhir_base, + resources, + smart_client_id=self.fhir_client_id, + smart_jwks=self.fhir_jwks, + ) + + def make_response(status_code=200, json_payload=None, text=None, reason=None, headers=None, stream=False): """ Makes a fake respx response for ease of testing.