diff --git a/cumulus_etl/__init__.py b/cumulus_etl/__init__.py index 4845145..c45782d 100644 --- a/cumulus_etl/__init__.py +++ b/cumulus_etl/__init__.py @@ -1,3 +1,3 @@ """Turns FHIR data into de-identified & aggregated records""" -__version__ = "1.4.0" +__version__ = "1.5.0" diff --git a/cumulus_etl/cli.py b/cumulus_etl/cli.py index 35f61e4..02c2395 100644 --- a/cumulus_etl/cli.py +++ b/cumulus_etl/cli.py @@ -10,7 +10,7 @@ import rich.logging from cumulus_etl import common, etl, export, upload_notes -from cumulus_etl.etl import convert +from cumulus_etl.etl import convert, init class Command(enum.Enum): @@ -23,6 +23,7 @@ class Command(enum.Enum): CONVERT = "convert" ETL = "etl" EXPORT = "export" + INIT = "init" UPLOAD_NOTES = "upload-notes" # Why isn't this part of Enum directly...? @@ -70,13 +71,15 @@ async def main(argv: list[str]) -> None: run_method = convert.run_convert elif subcommand == Command.EXPORT.value: run_method = export.run_export + elif subcommand == Command.INIT.value: + run_method = init.run_init else: parser.description = "Extract, transform, and load FHIR data." if not subcommand: # Add a note about other subcommands we offer, and tell argparse not to wrap our formatting parser.formatter_class = argparse.RawDescriptionHelpFormatter parser.description += "\n\nother commands available:\n" - parser.description += " convert\n export\n upload-notes" + parser.description += " convert\n export\n init\n upload-notes" run_method = etl.run_etl with tempfile.TemporaryDirectory() as tempdir: diff --git a/cumulus_etl/cli_utils.py b/cumulus_etl/cli_utils.py index 5402df9..0805599 100644 --- a/cumulus_etl/cli_utils.py +++ b/cumulus_etl/cli_utils.py @@ -69,6 +69,15 @@ def add_nlp(parser: argparse.ArgumentParser): return group +def add_output_format(parser: argparse.ArgumentParser) -> None: + parser.add_argument( + "--output-format", + default="deltalake", + choices=["deltalake", "ndjson"], + help="output format (default is deltalake)", + ) + + def add_task_selection(parser: argparse.ArgumentParser): task = parser.add_argument_group("task selection") task.add_argument( diff --git a/cumulus_etl/etl/cli.py b/cumulus_etl/etl/cli.py index 7f0f65e..3569d78 100644 --- a/cumulus_etl/etl/cli.py +++ b/cumulus_etl/etl/cli.py @@ -102,12 +102,7 @@ def define_etl_parser(parser: argparse.ArgumentParser) -> None: choices=["i2b2", "ndjson"], help="input format (default is ndjson)", ) - parser.add_argument( - "--output-format", - default="deltalake", - choices=["deltalake", "ndjson"], - help="output format (default is deltalake)", - ) + cli_utils.add_output_format(parser) parser.add_argument( "--batch-size", type=int, diff --git a/cumulus_etl/etl/init/__init__.py b/cumulus_etl/etl/init/__init__.py new file mode 100644 index 0000000..f2bc7d8 --- /dev/null +++ b/cumulus_etl/etl/init/__init__.py @@ -0,0 +1,3 @@ +"""Subcommand to initialize basic tables""" + +from .cli import run_init diff --git a/cumulus_etl/etl/init/cli.py b/cumulus_etl/etl/init/cli.py new file mode 100644 index 0000000..705916a --- /dev/null +++ b/cumulus_etl/etl/init/cli.py @@ -0,0 +1,70 @@ +""" +Initializes basic resource tables. + +Creates the tables if they don't exist and pushes up a basic schema. +""" + +import argparse +from collections.abc import Iterable + +from cumulus_etl import cli_utils, formats, store +from cumulus_etl.etl import tasks +from cumulus_etl.etl.tasks import task_factory + + +def define_init_parser(parser: argparse.ArgumentParser) -> None: + parser.usage = "%(prog)s [OPTION]... OUTPUT" + parser.description = ( + "Initialize all basic output tables. " + "After this command is run, you will be ready to set up Cumulus Library. " + "This command is safe to run multiple times on the same folder, " + "or even on an existing folder with data already in it." + ) + + parser.add_argument("dir_output", metavar="/path/to/output") + cli_utils.add_output_format(parser) + + cli_utils.add_aws(parser) + + +def get_task_tables() -> Iterable[tuple[type[tasks.EtlTask], tasks.OutputTable]]: + for task_class in task_factory.get_default_tasks(): + for output in task_class.outputs: + if not output.get_name(task_class).startswith("etl__"): + yield task_class, output + + +async def init_main(args: argparse.Namespace) -> None: + """Main logic for initialization""" + # record filesystem options like --s3-region before creating Roots + store.set_user_fs_options(vars(args)) + + output_root = store.Root(args.dir_output) + + with cli_utils.make_progress_bar() as progress: + # Set up progress bar + total_steps = len(list(get_task_tables())) + 1 # extra 1 is initializing the formatter + task = progress.add_task("Initializing tables", total=total_steps) + + # Initialize formatter (which can take a moment with deltalake) + format_class = formats.get_format_class(args.output_format) + format_class.initialize_class(output_root) + progress.update(task, advance=1) + + # Create an empty JobConfig/ folder, so that the 'convert' command will recognize this + # folder as an ETL folder. + output_root.makedirs(output_root.joinpath("JobConfig")) + + # Now iterate through, pushing to each output table + for task_class, output in get_task_tables(): + batch = task_class.make_batch_from_rows(output.get_resource_type(task_class), []) + formatter = format_class(output_root, output.get_name(task_class)) + formatter.write_records(batch) + progress.update(task, advance=1) + + +async def run_init(parser: argparse.ArgumentParser, argv: list[str]) -> None: + """Parse arguments and do the work""" + define_init_parser(parser) + args = parser.parse_args(argv) + await init_main(args) diff --git a/docs/setup/sample-runs.md b/docs/setup/sample-runs.md index 68956fc..282e167 100644 --- a/docs/setup/sample-runs.md +++ b/docs/setup/sample-runs.md @@ -135,17 +135,33 @@ Congratulations! You've run your first Cumulus ETL process. The first of many! ### AWS Test Run -Let's do the same thing, but now pointing at S3 buckets. +Let's do that again, but now pointing at S3 buckets. This assumes you've followed the [S3 setup guide](aws.md). +We didn't do this above, but now that we're getting more serious, +let's run `cumulus-etl init` first, which will create all the basic tables for us. + When using S3 buckets, you'll need to set the `--s3-region` argument to the correct region. -Run this command, but replace: +Run the command below, but replace: * `us-east-2` with the region your buckets are in * `99999999999` with your account ID * `my-cumulus-prefix` with the bucket prefix you used when setting up AWS * and `subdir1` with the ETL subdirectory you used when setting up AWS +```sh +docker compose -f $CUMULUS_REPO_PATH/compose.yaml \ + run --rm \ + cumulus-etl init \ + --s3-region=us-east-2 \ + s3://my-cumulus-prefix-99999999999-us-east-2/subdir1/ +``` + +This will create empty tables for all the core resources that Cumulus works with. +You should now even be able to see some (very small) output files in your S3 buckets! + +Let's go one step further and put some actual (fake) test data in there too. + ```sh docker compose -f $CUMULUS_REPO_PATH/compose.yaml \ run --volume $CUMULUS_REPO_PATH:/cumulus-etl --rm \ @@ -156,7 +172,8 @@ docker compose -f $CUMULUS_REPO_PATH/compose.yaml \ s3://my-cumulus-prefix-phi-99999999999-us-east-2/subdir1/ ``` -You should now be able to see some (very small) output files in your S3 buckets! +(Though, note now that your S3 bucket has test data in it. +Before you put any real data in there, you should delete the S3 folder and start fresh.) Obviously, this was just example data. But if you'd prefer to keep PHI off of AWS when you deploy for real, diff --git a/tests/init/__init__.py b/tests/init/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/init/test_init_cli.py b/tests/init/test_init_cli.py new file mode 100644 index 0000000..f765769 --- /dev/null +++ b/tests/init/test_init_cli.py @@ -0,0 +1,41 @@ +"""Tests for etl/init/cli.py""" + +import os + +import ddt + +from cumulus_etl import cli, common +from tests import utils + + +@ddt.ddt +class TestInit(utils.AsyncTestCase): + """Tests for high-level init support.""" + + def setUp(self): + super().setUp() + self.output_path = self.make_tempdir() + + async def run_init(self, output_path: str | None = None) -> None: + args = [ + "init", + output_path or self.output_path, + "--output-format=ndjson", + ] + await cli.main(args) + + async def test_happy_path(self): + """Verify that we can do a simple init""" + await self.run_init() + + # Do some spot checks + dirs = set(os.listdir(self.output_path)) + self.assertIn("device", dirs) + self.assertIn("patient", dirs) + self.assertIn("medicationrequest", dirs) + self.assertIn("medication", dirs) # secondary table + self.assertIn("JobConfig", dirs) # so that the dir is flagged as an ETL dir by 'convert' + + # Are folder contents what we expect? + self.assertEqual(["patient.000.ndjson"], os.listdir(f"{self.output_path}/patient")) + self.assertEqual("", common.read_text(f"{self.output_path}/patient/patient.000.ndjson"))