Skip to content

Commit

Permalink
bug fix for writing empty df or null only cols
Browse files Browse the repository at this point in the history
  • Loading branch information
sungwy committed Feb 1, 2024
1 parent 29db67f commit bf08a45
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 4 deletions.
5 changes: 4 additions & 1 deletion pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1339,7 +1339,10 @@ def update_min(self, val: Any) -> None:
def update_max(self, val: Any) -> None:
self.current_max = val if self.current_max is None else max(val, self.current_max)

def min_as_bytes(self) -> bytes:
def min_as_bytes(self) -> Optional[bytes]:
if self.current_min is None:
return None

return self.serialize(
self.current_min
if self.trunc_length is None
Expand Down
9 changes: 6 additions & 3 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -946,10 +946,13 @@ def append(self, df: pa.Table) -> None:
if len(self.sort_order().fields) > 0:
raise ValueError("Cannot write to tables with a sort-order")

data_files = _dataframe_to_data_files(self, df=df)
merge = _MergingSnapshotProducer(operation=Operation.APPEND, table=self)
for data_file in data_files:
merge.append_data_file(data_file)

# skip writing data files if the dataframe is empty
if df.shape[0] > 0:
data_files = _dataframe_to_data_files(self, df=df)
for data_file in data_files:
merge.append_data_file(data_file)

merge.commit()

Expand Down
178 changes: 178 additions & 0 deletions tests/integration/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,54 @@ def arrow_table_with_null() -> pa.Table:
return pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=pa_schema)


@pytest.fixture(scope="session")
def arrow_table_without_data() -> pa.Table:
"""PyArrow table with all kinds of columns"""
pa_schema = pa.schema([
("bool", pa.bool_()),
("string", pa.string()),
("string_long", pa.string()),
("int", pa.int32()),
("long", pa.int64()),
("float", pa.float32()),
("double", pa.float64()),
("timestamp", pa.timestamp(unit="us")),
("timestamptz", pa.timestamp(unit="us", tz="UTC")),
("date", pa.date32()),
# Not supported by Spark
# ("time", pa.time64("us")),
# Not natively supported by Arrow
# ("uuid", pa.fixed(16)),
("binary", pa.binary()),
("fixed", pa.binary(16)),
])
return pa.Table.from_pylist([], schema=pa_schema)


@pytest.fixture(scope="session")
def arrow_table_with_only_nulls() -> pa.Table:
"""PyArrow table with all kinds of columns"""
pa_schema = pa.schema([
("bool", pa.bool_()),
("string", pa.string()),
("string_long", pa.string()),
("int", pa.int32()),
("long", pa.int64()),
("float", pa.float32()),
("double", pa.float64()),
("timestamp", pa.timestamp(unit="us")),
("timestamptz", pa.timestamp(unit="us", tz="UTC")),
("date", pa.date32()),
# Not supported by Spark
# ("time", pa.time64("us")),
# Not natively supported by Arrow
# ("uuid", pa.fixed(16)),
("binary", pa.binary()),
("fixed", pa.binary(16)),
])
return pa.Table.from_pylist([{}, {}], schema=pa_schema)


@pytest.fixture(scope="session", autouse=True)
def table_v1_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
identifier = "default.arrow_table_v1_with_null"
Expand All @@ -157,6 +205,36 @@ def table_v1_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table
assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}"


@pytest.fixture(scope="session", autouse=True)
def table_v1_without_data(session_catalog: Catalog, arrow_table_without_data: pa.Table) -> None:
identifier = "default.arrow_table_v1_without_data"

try:
session_catalog.drop_table(identifier=identifier)
except NoSuchTableError:
pass

tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '1'})
tbl.append(arrow_table_without_data)

assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}"


@pytest.fixture(scope="session", autouse=True)
def table_v1_with_only_nulls(session_catalog: Catalog, arrow_table_with_only_nulls: pa.Table) -> None:
identifier = "default.arrow_table_v1_with_only_nulls"

try:
session_catalog.drop_table(identifier=identifier)
except NoSuchTableError:
pass

tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '1'})
tbl.append(arrow_table_with_only_nulls)

assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}"


@pytest.fixture(scope="session", autouse=True)
def table_v1_appended_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
identifier = "default.arrow_table_v1_appended_with_null"
Expand Down Expand Up @@ -189,6 +267,36 @@ def table_v2_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table
assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}"


@pytest.fixture(scope="session", autouse=True)
def table_v2_without_data(session_catalog: Catalog, arrow_table_without_data: pa.Table) -> None:
identifier = "default.arrow_table_v2_without_data"

try:
session_catalog.drop_table(identifier=identifier)
except NoSuchTableError:
pass

tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '2'})
tbl.append(arrow_table_without_data)

assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}"


@pytest.fixture(scope="session", autouse=True)
def table_v2_with_only_nulls(session_catalog: Catalog, arrow_table_with_only_nulls: pa.Table) -> None:
identifier = "default.arrow_table_v2_with_only_nulls"

try:
session_catalog.drop_table(identifier=identifier)
except NoSuchTableError:
pass

tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '2'})
tbl.append(arrow_table_with_only_nulls)

assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}"


@pytest.fixture(scope="session", autouse=True)
def table_v2_appended_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
identifier = "default.arrow_table_v2_appended_with_null"
Expand Down Expand Up @@ -279,6 +387,26 @@ def test_query_filter_null(spark: SparkSession, col: str, format_version: int) -
assert df.where(f"{col} is not null").count() == 2, f"Expected 2 rows for {col}"


@pytest.mark.integration
@pytest.mark.parametrize("col", TEST_DATA_WITH_NULL.keys())
@pytest.mark.parametrize("format_version", [1, 2])
def test_query_filter_without_data(spark: SparkSession, col: str, format_version: int) -> None:
identifier = f"default.arrow_table_v{format_version}_without_data"
df = spark.table(identifier)
assert df.where(f"{col} is null").count() == 0, f"Expected 0 row for {col}"
assert df.where(f"{col} is not null").count() == 0, f"Expected 0 rows for {col}"


@pytest.mark.integration
@pytest.mark.parametrize("col", TEST_DATA_WITH_NULL.keys())
@pytest.mark.parametrize("format_version", [1, 2])
def test_query_filter_only_nulls(spark: SparkSession, col: str, format_version: int) -> None:
identifier = f"default.arrow_table_v{format_version}_with_only_nulls"
df = spark.table(identifier)
assert df.where(f"{col} is null").count() == 2, f"Expected 2 row for {col}"
assert df.where(f"{col} is not null").count() == 0, f"Expected 0 rows for {col}"


@pytest.mark.integration
@pytest.mark.parametrize("col", TEST_DATA_WITH_NULL.keys())
@pytest.mark.parametrize("format_version", [1, 2])
Expand Down Expand Up @@ -409,3 +537,53 @@ def test_invalid_arguments(spark: SparkSession, session_catalog: Catalog, arrow_

with pytest.raises(ValueError, match="Expected PyArrow table, got: not a df"):
tbl.append("not a df")


@pytest.mark.integration
def test_summaries_with_only_nulls(
spark: SparkSession, session_catalog: Catalog, arrow_table_without_data: pa.Table, arrow_table_with_only_nulls: pa.Table
) -> None:
identifier = "default.arrow_table_summaries_with_only_nulls"

try:
session_catalog.drop_table(identifier=identifier)
except NoSuchTableError:
pass
tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '1'})

tbl.append(arrow_table_without_data)
tbl.append(arrow_table_with_only_nulls)

rows = spark.sql(
f"""
SELECT operation, summary
FROM {identifier}.snapshots
ORDER BY committed_at ASC
"""
).collect()

operations = [row.operation for row in rows]
assert operations == ['append', 'append']

summaries = [row.summary for row in rows]

assert summaries[0] == {
'total-data-files': '0',
'total-delete-files': '0',
'total-equality-deletes': '0',
'total-files-size': '0',
'total-position-deletes': '0',
'total-records': '0',
}

assert summaries[1] == {
'added-data-files': '1',
'added-files-size': '4045',
'added-records': '2',
'total-data-files': '1',
'total-delete-files': '0',
'total-equality-deletes': '0',
'total-files-size': '4045',
'total-position-deletes': '0',
'total-records': '2',
}

0 comments on commit bf08a45

Please sign in to comment.