Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1000284: Add schema support for structure types #1323

Merged
merged 7 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### New Features
- Support stored procedure register with packages given as Python modules.
- Added support for structured type schema parsing.

## 1.15.0 (2024-04-24)

Expand Down
2 changes: 1 addition & 1 deletion recipe/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ requirements:
- python
- cloudpickle >=1.6.0,<=2.0.0 # [py<=310]
- cloudpickle==2.2.1 # [py==311]
- snowflake-connector-python
- snowflake-connector-python >=3.10.0,<4.0.0
- typing-extensions >=4.1.0
# need to pin libffi because of problems in cryptography.
# This might no longer hold true but keep it just to avoid it from biting us again
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
MODIN_DEPENDENCY_VERSION = (
"==0.28.1" # Snowpark pandas requires modin 0.28.1, which depends on pandas 2.2.1
)
CONNECTOR_DEPENDENCY_VERSION = ">=3.6.0, <4.0.0"
CONNECTOR_DEPENDENCY_VERSION = ">=3.10.0, <4.0.0"
INSTALL_REQ_LIST = [
"setuptools>=40.6.0",
"wheel",
Expand Down
52 changes: 49 additions & 3 deletions src/snowflake/snowpark/_internal/type_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,37 @@ def convert_metadata_to_sp_type(
raise ValueError(
f"Invalid result metadata for vector type: invalid element type: {element_type_name}"
)
elif column_type_name in {"ARRAY", "MAP", "OBJECT"} and getattr(
metadata, "fields", None
):
# If fields is not defined or empty then the legacy type can be returned instead
if column_type_name == "ARRAY":
assert (
len(metadata.fields) == 1
), "ArrayType columns should have one metadata field."
return ArrayType(
convert_metadata_to_sp_type(metadata.fields[0]), structured=True
)
elif column_type_name == "MAP":
assert (
len(metadata.fields) == 2
), "MapType columns should have two metadata fields."
return MapType(
convert_metadata_to_sp_type(metadata.fields[0]),
convert_metadata_to_sp_type(metadata.fields[1]),
structured=True,
)
else:
assert all(
getattr(field, "name", None) for field in metadata.fields
), "All fields of a StructType should be named."
return StructType(
[
StructField(field.name, convert_metadata_to_sp_type(field))
for field in metadata.fields
],
structured=True,
)
else:
return convert_sf_to_sp_type(
column_type_name,
Expand All @@ -142,7 +173,7 @@ def convert_sf_to_sp_type(
return ArrayType(StringType())
if column_type_name == "VARIANT":
return VariantType()
if column_type_name == "OBJECT":
if column_type_name in {"OBJECT", "MAP"}:
return MapType(StringType(), StringType())
if column_type_name == "GEOGRAPHY":
return GeographyType()
Expand Down Expand Up @@ -235,9 +266,24 @@ def convert_sp_to_sf_type(datatype: DataType) -> str:
if isinstance(datatype, BinaryType):
return "BINARY"
if isinstance(datatype, ArrayType):
return "ARRAY"
if datatype.structured:
return f"ARRAY({convert_sp_to_sf_type(datatype.element_type)})"
else:
return "ARRAY"
if isinstance(datatype, MapType):
return "OBJECT"
if datatype.structured:
return f"MAP({convert_sp_to_sf_type(datatype.key_type)}, {convert_sp_to_sf_type(datatype.value_type)})"
else:
return "OBJECT"
if isinstance(datatype, StructType):
if datatype.structured:
fields = ", ".join(
f"{field.name.upper()} {convert_sp_to_sf_type(field.datatype)}"
for field in datatype.fields
)
return f"OBJECT({fields})"
else:
return "OBJECT"
if isinstance(datatype, VariantType):
return "VARIANT"
if isinstance(datatype, GeographyType):
Expand Down
2 changes: 1 addition & 1 deletion src/snowflake/snowpark/_internal/udf_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1163,7 +1163,7 @@ def create_python_udf_or_sp(

if replace and if_not_exists:
raise ValueError("options replace and if_not_exists are incompatible")
if isinstance(return_type, StructType):
if isinstance(return_type, StructType) and not return_type.structured:
return_sql = f'RETURNS TABLE ({",".join(f"{field.name} {convert_sp_to_sf_type(field.datatype)}" for field in return_type.fields)})'
elif installed_pandas and isinstance(return_type, PandasDataFrameType):
return_sql = f'RETURNS TABLE ({",".join(f"{name} {convert_sp_to_sf_type(datatype)}" for name, datatype in zip(return_type.col_names, return_type.col_types))})'
Expand Down
20 changes: 15 additions & 5 deletions src/snowflake/snowpark/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,10 @@ def __repr__(self) -> str:
class ArrayType(DataType):
"""Array data type. This maps to the ARRAY data type in Snowflake."""

def __init__(self, element_type: Optional[DataType] = None) -> None:
def __init__(
self, element_type: Optional[DataType] = None, structured: bool = False
) -> None:
self.structured = structured
self.element_type = element_type if element_type else StringType()

def __repr__(self) -> str:
Expand All @@ -228,11 +231,15 @@ def is_primitive(self):


class MapType(DataType):
"""Map data type. This maps to the OBJECT data type in Snowflake."""
"""Map data type. This maps to the OBJECT data type in Snowflake if key and value types are not defined otherwise MAP."""

def __init__(
self, key_type: Optional[DataType] = None, value_type: Optional[DataType] = None
self,
key_type: Optional[DataType] = None,
value_type: Optional[DataType] = None,
structured: bool = False,
) -> None:
self.structured = structured
self.key_type = key_type if key_type else StringType()
self.value_type = value_type if value_type else StringType()

Expand Down Expand Up @@ -366,9 +373,12 @@ def __eq__(self, other):


class StructType(DataType):
"""Represents a table schema. Contains :class:`StructField` for each column."""
"""Represents a table schema or structured column. Contains :class:`StructField` for each field."""

def __init__(self, fields: Optional[List["StructField"]] = None) -> None:
def __init__(
self, fields: Optional[List["StructField"]] = None, structured=False
) -> None:
self.structured = structured
if fields is None:
fields = []
self.fields = fields
Expand Down
Loading
Loading