Skip to content

Commit

Permalink
feat: add new "cumulus-etl export" command for bulk exporting
Browse files Browse the repository at this point in the history
This new command is quite simple. It just takes a URL and an output
folder. The usual auth and export args (like --since) are accepted.

You can also filter exported resources by task with --task.

But even if you don't use --since or --task, you can provide an export
URL that has _since, _type, _typeFilter, whatever you want.

In addition:
- "--task=help" will now print a list of task names and exit.
- The traditional way to kick off a bulk export when doing an ETL job
  (by providing a URL as the input path) now also lets you specify
  custom parameters like _since or _typeFilter directly in the URL.
- Handle non-compliant servers that give us transactionTime values that
  aren't formatted correctly (as bulk-data-server does).
  • Loading branch information
mikix committed Jul 31, 2024
1 parent 100e317 commit fe809cb
Show file tree
Hide file tree
Showing 20 changed files with 438 additions and 134 deletions.
8 changes: 6 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,17 @@ jobs:
run: |
python -m pytest --cov=cumulus_etl --cov-report=xml
- name: Log missing coverage
run: |
coverage report -m --skip-covered
- name: Check coverage report
if: github.ref != 'refs/heads/main'
uses: orgoro/coverage@v3.1
uses: orgoro/coverage@v3.2
with:
coverageFile: coverage.xml
token: ${{ secrets.GITHUB_TOKEN }}
thresholdAll: .93
thresholdAll: .97
thresholdNew: 1
thresholdModified: 1

Expand Down
4 changes: 2 additions & 2 deletions cumulus_etl/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Cumulus public entry point"""
"""Turns FHIR data into de-identified & aggregated records"""

__version__ = "1.1.1"
__version__ = "1.2.0"
8 changes: 6 additions & 2 deletions cumulus_etl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import rich.logging

from cumulus_etl import common, etl, upload_notes
from cumulus_etl import common, etl, export, upload_notes
from cumulus_etl.etl import convert


Expand All @@ -22,6 +22,7 @@ class Command(enum.Enum):
CHART_REVIEW = "chart-review"
CONVERT = "convert"
ETL = "etl"
EXPORT = "export"
UPLOAD_NOTES = "upload-notes"

# Why isn't this part of Enum directly...?
Expand Down Expand Up @@ -67,12 +68,15 @@ async def main(argv: list[str]) -> None:
run_method = upload_notes.run_upload_notes
elif subcommand == Command.CONVERT.value:
run_method = convert.run_convert
elif subcommand == Command.EXPORT.value:
run_method = export.run_export
else:
parser.description = "Extract, transform, and load FHIR data."
if not subcommand:
# Add a note about other subcommands we offer, and tell argparse not to wrap our formatting
parser.formatter_class = argparse.RawDescriptionHelpFormatter
parser.description += "\n\nother commands available:\n convert\n upload-notes"
parser.description += "\n\nother commands available:\n"
parser.description += " convert\n export\n upload-notes"
run_method = etl.run_etl

with tempfile.TemporaryDirectory() as tempdir:
Expand Down
53 changes: 40 additions & 13 deletions cumulus_etl/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,47 +11,74 @@
from cumulus_etl import common, errors, store


def add_auth(parser: argparse.ArgumentParser) -> None:
def add_auth(parser: argparse.ArgumentParser, *, use_fhir_url: bool = True):
group = parser.add_argument_group("authentication")
group.add_argument("--smart-client-id", metavar="ID", help="Client ID for SMART authentication")
group.add_argument("--smart-client-id", metavar="ID", help="client ID for SMART authentication")
group.add_argument("--smart-jwks", metavar="PATH", help="JWKS file for SMART authentication")
group.add_argument("--basic-user", metavar="USER", help="Username for Basic authentication")
group.add_argument("--basic-user", metavar="USER", help="username for Basic authentication")
group.add_argument(
"--basic-passwd", metavar="PATH", help="Password file for Basic authentication"
"--basic-passwd", metavar="PATH", help="password file for Basic authentication"
)
group.add_argument(
"--bearer-token", metavar="PATH", help="Token file for Bearer authentication"
)
group.add_argument(
"--fhir-url",
metavar="URL",
help="FHIR server base URL, only needed if you exported separately",
"--bearer-token", metavar="PATH", help="token file for Bearer authentication"
)
if use_fhir_url:
group.add_argument(
"--fhir-url",
metavar="URL",
help="FHIR server base URL, only needed if you exported separately",
)


def add_aws(parser: argparse.ArgumentParser) -> None:
group = parser.add_argument_group("AWS")
group.add_argument(
"--s3-region", metavar="REGION", help="If using S3 paths (s3://...), this is their region"
"--s3-region", metavar="REGION", help="if using S3 paths (s3://...), this is their region"
)
group.add_argument(
"--s3-kms-key",
metavar="KEY",
help="If using S3 paths (s3://...), this is the KMS key ID to use",
help="if using S3 paths (s3://...), this is the KMS key ID to use",
)


def add_bulk_export(parser: argparse.ArgumentParser, *, as_subgroup: bool = True):
if as_subgroup:
parser = parser.add_argument_group("bulk export")
parser.add_argument("--since", help="start date for export from the FHIR server")
# "Until" is not an official part of the bulk FHIR API, but some custom servers support it
parser.add_argument("--until", help="end date for export from the FHIR server")
return parser


def add_nlp(parser: argparse.ArgumentParser):
group = parser.add_argument_group("NLP")
group.add_argument(
"--ctakes-overrides",
metavar="DIR",
default="/ctakes-overrides",
help="Path to cTAKES overrides dir (default is /ctakes-overrides)",
help="path to cTAKES overrides dir (default is /ctakes-overrides)",
)
return group


def add_task_selection(parser: argparse.ArgumentParser):
task = parser.add_argument_group("task selection")
task.add_argument(
"--task",
action="append",
help="only consider these tasks (comma separated, "
"default is all supported FHIR resources, "
"use '--task help' to see full list)",
)
task.add_argument(
"--task-filter",
action="append",
choices=["covid_symptom", "cpu", "gpu"],
help="restrict tasks to only the given sets (comma separated)",
)


def add_debugging(parser: argparse.ArgumentParser):
group = parser.add_argument_group("debugging")
group.add_argument("--skip-init-checks", action="store_true", help=argparse.SUPPRESS)
Expand Down
1 change: 1 addition & 0 deletions cumulus_etl/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
FHIR_AUTH_FAILED = 32
SERVICE_MISSING = 33 # generic init-check service is missing
COMPLETION_ARG_MISSING = 34
TASK_HELP = 35


class FatalError(Exception):
Expand Down
27 changes: 4 additions & 23 deletions cumulus_etl/etl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import argparse
import datetime
import itertools
import os
import shutil
import sys
Expand Down Expand Up @@ -124,14 +123,12 @@ def define_etl_parser(parser: argparse.ArgumentParser) -> None:
cli_utils.add_aws(parser)
cli_utils.add_auth(parser)

export = parser.add_argument_group("bulk export")
export = cli_utils.add_bulk_export(parser)
export.add_argument(
"--export-to",
metavar="DIR",
help="Where to put exported files (default is to delete after use)",
help="where to put exported files (default is to delete after use)",
)
export.add_argument("--since", help="Start date for export from the FHIR server")
export.add_argument("--until", help="End date for export from the FHIR server")

group = parser.add_argument_group("external export identification")
group.add_argument("--export-group", help=argparse.SUPPRESS)
Expand All @@ -142,18 +139,7 @@ def define_etl_parser(parser: argparse.ArgumentParser) -> None:
)

cli_utils.add_nlp(parser)

task = parser.add_argument_group("task selection")
task.add_argument(
"--task", action="append", help="Only update the given output tables (comma separated)"
)
task.add_argument(
"--task-filter",
action="append",
choices=["covid_symptom", "cpu", "gpu"],
help="Restrict tasks to only the given sets (comma separated)",
)

cli_utils.add_task_selection(parser)
cli_utils.add_debugging(parser)


Expand Down Expand Up @@ -241,12 +227,7 @@ async def etl_main(args: argparse.Namespace) -> None:
job_context = context.JobContext(root_phi.joinpath("context.json"))
job_datetime = common.datetime_now() # grab timestamp before we do anything

# Check which tasks are being run, allowing comma-separated values
task_names = args.task and set(itertools.chain.from_iterable(t.split(",") for t in args.task))
task_filters = args.task_filter and list(
itertools.chain.from_iterable(t.split(",") for t in args.task_filter)
)
selected_tasks = task_factory.get_selected_tasks(task_names, task_filters)
selected_tasks = task_factory.get_selected_tasks(args.task, args.task_filter)

# Print configuration
print_config(args, job_datetime, selected_tasks)
Expand Down
25 changes: 19 additions & 6 deletions cumulus_etl/etl/tasks/task_factory.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Finds and creates ETL tasks"""

import itertools
import sys
from collections.abc import Iterable
from typing import TypeVar
Expand Down Expand Up @@ -64,9 +65,17 @@ def get_selected_tasks(
:param filter_tags: only tasks that have all the listed tags will be eligible for selection
:returns: a list of selected EtlTask subclasses, to instantiate and run
"""
names = names and set(names)
names = names and set(itertools.chain.from_iterable(t.lower().split(",") for t in names))
filter_tags = filter_tags and list(
itertools.chain.from_iterable(t.lower().split(",") for t in filter_tags)
)
filter_tag_set = set(filter_tags or [])

if names and "help" in names:
# OK, we actually are just going to print the list of all task names and be done.
_print_task_names()
raise SystemExit(errors.TASK_HELP) # not an *error* exactly, but not successful ETL either

# Just give back the default set if the user didn't specify any constraints
if not names and not filter_tag_set:
return get_default_tasks()
Expand All @@ -88,11 +97,8 @@ def get_selected_tasks(
# Check for unknown names the user gave us
all_task_names = {t.name for t in all_tasks}
if unknown_names := names - all_task_names:
print_names = "\n".join(sorted(f" {key}" for key in all_task_names))
print(
f"Unknown task '{unknown_names.pop()}' requested. Valid task names:\n{print_names}",
file=sys.stderr,
)
print(f"Unknown task '{unknown_names.pop()}' requested.", file=sys.stderr)
_print_task_names(file=sys.stderr)
raise SystemExit(errors.TASK_UNKNOWN)

# Check for names that conflict with the chosen filters
Expand All @@ -106,3 +112,10 @@ def get_selected_tasks(
raise SystemExit(errors.TASK_FILTERED_OUT)

return [task for task in filtered_tasks if task.name in names]


def _print_task_names(*, file=sys.stdout) -> None:
all_tasks = get_all_tasks()
all_task_names = {t.name for t in all_tasks}
print_names = "\n".join(sorted(f" {key}" for key in all_task_names))
print(f"Valid task names:\n{print_names}", file=file)
3 changes: 3 additions & 0 deletions cumulus_etl/export/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""Bulk export"""

from .cli import run_export
49 changes: 49 additions & 0 deletions cumulus_etl/export/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""Do a standalone bulk export from an EHR"""

import argparse

from cumulus_etl import cli_utils, fhir, loaders, store
from cumulus_etl.etl.tasks import task_factory


def define_export_parser(parser: argparse.ArgumentParser) -> None:
parser.usage = "cumulus-etl export [OPTION]... FHIR_URL DIR"

parser.add_argument("url_input", metavar="https://fhir.example.com/Group/ABC")
parser.add_argument("export_to", metavar="/path/to/output")
cli_utils.add_bulk_export(parser, as_subgroup=False)

cli_utils.add_auth(parser, use_fhir_url=False)
cli_utils.add_task_selection(parser)


async def export_main(args: argparse.Namespace) -> None:
"""Exports data from an EHR to a folder."""
# record filesystem options before creating Roots
store.set_user_fs_options(vars(args))

selected_tasks = task_factory.get_selected_tasks(args.task, args.task_filter)
required_resources = {t.resource for t in selected_tasks}
using_default_tasks = not args.task and not args.task_filter

fhir_root = store.Root(args.url_input)
client = fhir.create_fhir_client_for_cli(args, fhir_root, required_resources)

async with client:
loader = loaders.FhirNdjsonLoader(
fhir_root,
client=client,
export_to=args.export_to,
since=args.since,
until=args.until,
)
await loader.load_from_bulk_export(
sorted(required_resources), prefer_url_resources=using_default_tasks
)


async def run_export(parser: argparse.ArgumentParser, argv: list[str]) -> None:
"""Parses an export CLI"""
define_export_parser(parser)
args = parser.parse_args(argv)
await export_main(args)
4 changes: 2 additions & 2 deletions cumulus_etl/fhir/fhir_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,9 @@ def create_fhir_client_for_cli(
The usual FHIR server authentication options should be represented in args.
"""
client_base_url = args.fhir_url
client_base_url = getattr(args, "fhir_url", None)
if root_input.protocol in {"http", "https"}:
if args.fhir_url and not root_input.path.startswith(args.fhir_url):
if client_base_url and not root_input.path.startswith(client_base_url):
print(
"You provided both an input FHIR server and a different --fhir-url. Try dropping --fhir-url.",
file=sys.stderr,
Expand Down
Loading

0 comments on commit fe809cb

Please sign in to comment.