From bf08a455cfa5883b77cfc5ce006181ed038c9f23 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Thu, 1 Feb 2024 18:40:45 +0000 Subject: [PATCH] bug fix for writing empty df or null only cols --- pyiceberg/io/pyarrow.py | 5 +- pyiceberg/table/__init__.py | 9 +- tests/integration/test_writes.py | 178 +++++++++++++++++++++++++++++++ 3 files changed, 188 insertions(+), 4 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 7a94ce4c7d..4271c3137b 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1339,7 +1339,10 @@ def update_min(self, val: Any) -> None: def update_max(self, val: Any) -> None: self.current_max = val if self.current_max is None else max(val, self.current_max) - def min_as_bytes(self) -> bytes: + def min_as_bytes(self) -> Optional[bytes]: + if self.current_min is None: + return None + return self.serialize( self.current_min if self.trunc_length is None diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 16ed9ed292..9a9b5dad0a 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -946,10 +946,13 @@ def append(self, df: pa.Table) -> None: if len(self.sort_order().fields) > 0: raise ValueError("Cannot write to tables with a sort-order") - data_files = _dataframe_to_data_files(self, df=df) merge = _MergingSnapshotProducer(operation=Operation.APPEND, table=self) - for data_file in data_files: - merge.append_data_file(data_file) + + # skip writing data files if the dataframe is empty + if df.shape[0] > 0: + data_files = _dataframe_to_data_files(self, df=df) + for data_file in data_files: + merge.append_data_file(data_file) merge.commit() diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py index 17dc997163..3540db53ac 100644 --- a/tests/integration/test_writes.py +++ b/tests/integration/test_writes.py @@ -142,6 +142,54 @@ def arrow_table_with_null() -> pa.Table: return pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=pa_schema) +@pytest.fixture(scope="session") +def arrow_table_without_data() -> pa.Table: + """PyArrow table with all kinds of columns""" + pa_schema = pa.schema([ + ("bool", pa.bool_()), + ("string", pa.string()), + ("string_long", pa.string()), + ("int", pa.int32()), + ("long", pa.int64()), + ("float", pa.float32()), + ("double", pa.float64()), + ("timestamp", pa.timestamp(unit="us")), + ("timestamptz", pa.timestamp(unit="us", tz="UTC")), + ("date", pa.date32()), + # Not supported by Spark + # ("time", pa.time64("us")), + # Not natively supported by Arrow + # ("uuid", pa.fixed(16)), + ("binary", pa.binary()), + ("fixed", pa.binary(16)), + ]) + return pa.Table.from_pylist([], schema=pa_schema) + + +@pytest.fixture(scope="session") +def arrow_table_with_only_nulls() -> pa.Table: + """PyArrow table with all kinds of columns""" + pa_schema = pa.schema([ + ("bool", pa.bool_()), + ("string", pa.string()), + ("string_long", pa.string()), + ("int", pa.int32()), + ("long", pa.int64()), + ("float", pa.float32()), + ("double", pa.float64()), + ("timestamp", pa.timestamp(unit="us")), + ("timestamptz", pa.timestamp(unit="us", tz="UTC")), + ("date", pa.date32()), + # Not supported by Spark + # ("time", pa.time64("us")), + # Not natively supported by Arrow + # ("uuid", pa.fixed(16)), + ("binary", pa.binary()), + ("fixed", pa.binary(16)), + ]) + return pa.Table.from_pylist([{}, {}], schema=pa_schema) + + @pytest.fixture(scope="session", autouse=True) def table_v1_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: identifier = "default.arrow_table_v1_with_null" @@ -157,6 +205,36 @@ def table_v1_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" +@pytest.fixture(scope="session", autouse=True) +def table_v1_without_data(session_catalog: Catalog, arrow_table_without_data: pa.Table) -> None: + identifier = "default.arrow_table_v1_without_data" + + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '1'}) + tbl.append(arrow_table_without_data) + + assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v1_with_only_nulls(session_catalog: Catalog, arrow_table_with_only_nulls: pa.Table) -> None: + identifier = "default.arrow_table_v1_with_only_nulls" + + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '1'}) + tbl.append(arrow_table_with_only_nulls) + + assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" + + @pytest.fixture(scope="session", autouse=True) def table_v1_appended_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: identifier = "default.arrow_table_v1_appended_with_null" @@ -189,6 +267,36 @@ def table_v2_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" +@pytest.fixture(scope="session", autouse=True) +def table_v2_without_data(session_catalog: Catalog, arrow_table_without_data: pa.Table) -> None: + identifier = "default.arrow_table_v2_without_data" + + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '2'}) + tbl.append(arrow_table_without_data) + + assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v2_with_only_nulls(session_catalog: Catalog, arrow_table_with_only_nulls: pa.Table) -> None: + identifier = "default.arrow_table_v2_with_only_nulls" + + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '2'}) + tbl.append(arrow_table_with_only_nulls) + + assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" + + @pytest.fixture(scope="session", autouse=True) def table_v2_appended_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: identifier = "default.arrow_table_v2_appended_with_null" @@ -279,6 +387,26 @@ def test_query_filter_null(spark: SparkSession, col: str, format_version: int) - assert df.where(f"{col} is not null").count() == 2, f"Expected 2 rows for {col}" +@pytest.mark.integration +@pytest.mark.parametrize("col", TEST_DATA_WITH_NULL.keys()) +@pytest.mark.parametrize("format_version", [1, 2]) +def test_query_filter_without_data(spark: SparkSession, col: str, format_version: int) -> None: + identifier = f"default.arrow_table_v{format_version}_without_data" + df = spark.table(identifier) + assert df.where(f"{col} is null").count() == 0, f"Expected 0 row for {col}" + assert df.where(f"{col} is not null").count() == 0, f"Expected 0 rows for {col}" + + +@pytest.mark.integration +@pytest.mark.parametrize("col", TEST_DATA_WITH_NULL.keys()) +@pytest.mark.parametrize("format_version", [1, 2]) +def test_query_filter_only_nulls(spark: SparkSession, col: str, format_version: int) -> None: + identifier = f"default.arrow_table_v{format_version}_with_only_nulls" + df = spark.table(identifier) + assert df.where(f"{col} is null").count() == 2, f"Expected 2 row for {col}" + assert df.where(f"{col} is not null").count() == 0, f"Expected 0 rows for {col}" + + @pytest.mark.integration @pytest.mark.parametrize("col", TEST_DATA_WITH_NULL.keys()) @pytest.mark.parametrize("format_version", [1, 2]) @@ -409,3 +537,53 @@ def test_invalid_arguments(spark: SparkSession, session_catalog: Catalog, arrow_ with pytest.raises(ValueError, match="Expected PyArrow table, got: not a df"): tbl.append("not a df") + + +@pytest.mark.integration +def test_summaries_with_only_nulls( + spark: SparkSession, session_catalog: Catalog, arrow_table_without_data: pa.Table, arrow_table_with_only_nulls: pa.Table +) -> None: + identifier = "default.arrow_table_summaries_with_only_nulls" + + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '1'}) + + tbl.append(arrow_table_without_data) + tbl.append(arrow_table_with_only_nulls) + + rows = spark.sql( + f""" + SELECT operation, summary + FROM {identifier}.snapshots + ORDER BY committed_at ASC + """ + ).collect() + + operations = [row.operation for row in rows] + assert operations == ['append', 'append'] + + summaries = [row.summary for row in rows] + + assert summaries[0] == { + 'total-data-files': '0', + 'total-delete-files': '0', + 'total-equality-deletes': '0', + 'total-files-size': '0', + 'total-position-deletes': '0', + 'total-records': '0', + } + + assert summaries[1] == { + 'added-data-files': '1', + 'added-files-size': '4045', + 'added-records': '2', + 'total-data-files': '1', + 'total-delete-files': '0', + 'total-equality-deletes': '0', + 'total-files-size': '4045', + 'total-position-deletes': '0', + 'total-records': '2', + }