Skip to content

Commit

Permalink
deltalake: enable liquid clustering for new tables
Browse files Browse the repository at this point in the history
  • Loading branch information
mikix committed Oct 3, 2024
1 parent 047b279 commit 1194196
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 6 deletions.
2 changes: 1 addition & 1 deletion cumulus_etl/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Turns FHIR data into de-identified & aggregated records"""

__version__ = "1.3.0"
__version__ = "1.4.0"
1 change: 1 addition & 0 deletions cumulus_etl/formats/deltalake.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def update_delta_table(
table = (
delta.DeltaTable.createIfNotExists(self.spark)
.addColumns(updates.schema)
.clusterBy(*self.uniqueness_fields)
.location(self._table_path(self.dbname))
.execute()
)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ requires-python = ">= 3.10"
dependencies = [
"ctakesclient >= 5.1, < 6",
"cumulus-fhir-support >= 1.2, < 2",
"delta-spark >= 3, < 4",
"delta-spark >= 3.2.1, < 4",
"httpx < 1",
"inscriptis < 3",
"jwcrypto < 2",
Expand Down
12 changes: 8 additions & 4 deletions tests/etl/test_etl_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,20 +410,24 @@ async def test_etl_job_deltalake(self):
"_delta_log/.00000000000000000000.json.crc",
"_delta_log/00000000000000000001.json", # merge
"_delta_log/.00000000000000000001.json.crc",
"_delta_log/00000000000000000002.json", # vacuum start
"_delta_log/00000000000000000002.json", # optimize
"_delta_log/.00000000000000000002.json.crc",
"_delta_log/00000000000000000003.json", # vacuum end
"_delta_log/00000000000000000003.json", # vacuum start
"_delta_log/.00000000000000000003.json.crc",
"_delta_log/00000000000000000004.json", # vacuum end
"_delta_log/.00000000000000000004.json.crc",
"_symlink_format_manifest/manifest",
"_symlink_format_manifest/.manifest.crc",
},
metadata_files,
)

self.assertEqual(1, len(data_files))
# Expect two data files - one will be original (now marked as deleted from optimize call)
# and the other will be the new optimized file.
self.assertEqual(2, len(data_files))
self.assertRegex(data_files.pop(), r"part-00000-.*-c000.snappy.parquet")

self.assertEqual(1, len(data_crc_files))
self.assertEqual(2, len(data_crc_files))
self.assertRegex(data_crc_files.pop(), r".part-00000-.*-c000.snappy.parquet.crc")


Expand Down

0 comments on commit 1194196

Please sign in to comment.