Skip to content

Commit

Permalink
Merge branch 'main' into rsureshbabu-SNOW-976704-leadlagfunctions
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-rsureshbabu committed Jan 31, 2024
2 parents af473ff + 638ce3d commit a0a6ef0
Show file tree
Hide file tree
Showing 14 changed files with 177 additions and 35 deletions.
33 changes: 19 additions & 14 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,51 +1,56 @@
# Release History

## 1.12.0 (TBD)
## 1.12.0 (2024-01-30)

### New Features

- Expose `statement_params` in `StoredProcedure.__call__`.
- Exposed `statement_params` in `StoredProcedure.__call__`.
- Added two optional arguments to `Session.add_import`.
- `chunk_size`: The number of bytes to hash per chunk of the uploaded files.
- `whole_file_hash`: By default only the first chunk of the uploaded import is hashed to save time. When this is set to True each uploaded file is fully hashed instead.
- Added parameters `external_access_integrations` and `secrets` when creating a UDAF from Snowpark Python to allow integration with external access.
- Added a new method `Session.append_query_tag`. Allows an additional tag to be added to the current query tag by appending it as a comma separated value.
- Added a new method `Session.update_query_tag`. Allows updates to a json encoded dictionary query tag.
- Added a new method `Session.update_query_tag`. Allows updates to a JSON encoded dictionary query tag.
- `SessionBuilder.getOrCreate` will now attempt to replace the singleton it returns when token expiration has been detected.
- Added support for new function(s) in `snowflake.snowpark.functions`:
- Added support for new functions in `snowflake.snowpark.functions`:
- `array_except`
- `create_map`
- `sign`/`signum`
- Added following functions to DataFrame.analytics
- Added moving_agg function in DataFrame.analytics for enabling moving aggregations like sums and averages with multiple window sizes.
- Added cummulative_agg function in DataFrame.analytics for enabling commulative aggregations like sums and averages on multiple columns.
- Added compute_lag and compute_lead function in DataFrame.analytics for enabling lead and lag calculations on multiple columns.
- Added the following functions to `DataFrame.analytics`:
- Added the `moving_agg` function in `DataFrame.analytics` to enable moving aggregations like sums and averages with multiple window sizes.
- Added the `cummulative_agg` function in `DataFrame.analytics` to enable commulative aggregations like sums and averages on multiple columns.
- Added the `compute_lag` and `compute_lead` function in `DataFrame.analytics` for enabling lead and lag calculations on multiple columns.

### Bug Fixes

- Fixed a bug in `DataFrame.na.fill` that caused Boolean values to erroneously override integer values.
- Fixed a bug in `Session.create_dataframe` where the snowpark dataframes created using pandas dataframes were not inferring the type for timestamp columns correctly. The behavior is as follows:
- Earlier timestamp columns without a timezone would be converted to nanosecond epochs and inferred as `LongType()`, but will now be correctly be maintained as timestamp values and be inferred as `TimestampType(TimestampTimeZone.NTZ)`.
- Fixed a bug in `Session.create_dataframe` where the Snowpark DataFrames created using pandas DataFrames were not inferring the type for timestamp columns correctly. The behavior is as follows:
- Earlier timestamp columns without a timezone would be converted to nanosecond epochs and inferred as `LongType()`, but will now be correctly maintained as timestamp values and be inferred as `TimestampType(TimestampTimeZone.NTZ)`.
- Earlier timestamp columns with a timezone would be inferred as `TimestampType(TimestampTimeZone.NTZ)` and loose timezone information but will now be correctly inferred as `TimestampType(TimestampTimeZone.LTZ)` and timezone information is retained correctly.
- Set session parameter `PYTHON_SNOWPARK_USE_LOGICAL_TYPE_FOR_CREATE_DATAFRAME` to revert back to old behavior. It is recommended that you update your code soon to align with correct behavior as the parameter will be removed in the future.
- Set session parameter `PYTHON_SNOWPARK_USE_LOGICAL_TYPE_FOR_CREATE_DATAFRAME` to revert back to old behavior. It is recommended that you update your code to align with correct behavior because the parameter will be removed in the future.
- Fixed a bug that `DataFrame.to_pandas` gets decimal type when scale is not 0, and creates an object dtype in `pandas`. Instead, we cast the value to a float64 type.
- Fixed bugs that wrongly flattened the generated SQL when one of the following happens:
- `DataFrame.filter()` is called after `DataFrame.sort().limit()`.
- `DataFrame.sort()` or `filter()` is called on a DataFrame that already has a window function or sequence-dependent data generator column.
For instance, `df.select("a", seq1().alias("b")).select("a", "b").sort("a")` won't flatten the sort clause anymore.
- a window or sequence-dependent data generator column is used after `DataFrame.limit()`. For instance, `df.limit(10).select(row_number().over())` won't flatten the limit and select in the generated SQL.
- Fixed a bug that aliasing a DataFrame column raises an error when the DataFame is copied from another DataFrame with an aliased column. For instance,
- Fixed a bug where aliasing a DataFrame column raised an error when the DataFame was copied from another DataFrame with an aliased column. For instance,

```python
df = df.select(col("a").alias("b"))
df = copy(df)
df.select(col("b").alias("c")) # threw an error. Now it's fixed.
```

- Fixed a bug in `Session.create_dataframe` that the non-nullable field in a schema is not respected for boolean type. Note that this fix is only effective when the user has the privilege to create a temp table.
- Fixed a bug in SQL simplifier where non-select statements in `session.sql` dropped a SQL query when used with `limit()`.
- Fixed a bug that raised an exception when session parameter `ERROR_ON_NONDETERMINISTIC_UPDATE` is true.

### Behavior Changes (API Compatible)

- When parsing datatype during `to_pandas` operation, we rely on GS precision value to fix precision issue for large integer values. This may affect users where a column that was earlier returned as `int8` gets returned as `int64`. Users can fix this by explicitly specifying precision values for their return column.
- When parsing data types during a `to_pandas` operation, we rely on GS precision value to fix precision issues for large integer values. This may affect users where a column that was earlier returned as `int8` gets returned as `int64`. Users can fix this by explicitly specifying precision values for their return column.
- Aligned behavior for `Session.call` in case of table stored procedures where running `Session.call` would not trigger stored procedure unless a `collect()` operation was performed.
- `StoredProcedureRegistration` will now automatically add `snowflake-snowpark-python` as a package dependency. The added dependency will be on the clients local version of the library and an error is thrown if the server cannot support that version.
- `StoredProcedureRegistration` will now automatically add `snowflake-snowpark-python` as a package dependency. The added dependency will be on the client's local version of the library and an error is thrown if the server cannot support that version.

## 1.11.1 (2023-12-07)

Expand Down
2 changes: 1 addition & 1 deletion recipe/meta.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{% set name = "snowflake-snowpark-python" %}
{% set version = "1.11.1" %}
{% set version = "1.12.0" %}

package:
name: {{ name|lower }}
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
f"snowflake-connector-python[secure-local-storage]{CONNECTOR_DEPENDENCY_VERSION}",
],
"development": [
"pytest",
"pytest<8.0.0", # check SNOW-1022240 for more details on the pin here
"pytest-cov",
"coverage",
"sphinx==5.0.2",
Expand Down
11 changes: 9 additions & 2 deletions src/snowflake/snowpark/_internal/analyzer/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,11 @@ def do_resolve_with_resolved_children(
)

if isinstance(logical_plan, SnowflakeValues):
schema_query = schema_query_for_values_statement(logical_plan.output)
if logical_plan.schema_query:
schema_query = logical_plan.schema_query
else:
schema_query = schema_query_for_values_statement(logical_plan.output)

if logical_plan.data:
if (
len(logical_plan.output) * len(logical_plan.data)
Expand All @@ -899,7 +903,10 @@ def do_resolve_with_resolved_children(
)
else:
return self.plan_builder.large_local_relation_plan(
logical_plan.output, logical_plan.data, logical_plan
logical_plan.output,
logical_plan.data,
logical_plan,
schema_query=schema_query,
)
else:
return self.plan_builder.query(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ def __init__(
limit_: Optional[int] = None,
offset: Optional[int] = None,
analyzer: "Analyzer",
schema_query: Optional[str] = None,
) -> None:
super().__init__(analyzer)
self.projection: Optional[List[Expression]] = projection
Expand All @@ -433,7 +434,7 @@ def __init__(
self.pre_actions = self.from_.pre_actions
self.post_actions = self.from_.post_actions
self._sql_query = None
self._schema_query = None
self._schema_query = schema_query
self._projection_in_str = None
self._query_params = None
self.expr_to_alias.update(self.from_.expr_to_alias)
Expand All @@ -453,6 +454,7 @@ def __copy__(self):
limit_=self.limit_,
offset=self.offset,
analyzer=self.analyzer,
schema_query=self.schema_query,
)
# The following values will change if they're None in the newly copied one so reset their values here
# to avoid problems.
Expand Down Expand Up @@ -809,6 +811,8 @@ def limit(self, n: int, *, offset: int = 0) -> "SelectStatement":
new.limit_ = min(self.limit_, n) if self.limit_ else n
new.offset = offset or self.offset
new.column_states = self.column_states
new.pre_actions = new.from_.pre_actions
new.post_actions = new.from_.post_actions
return new


Expand Down
7 changes: 5 additions & 2 deletions src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,9 @@ def with_subqueries(self, subquery_plans: List["SnowflakePlan"]) -> "SnowflakePl
@cached_property
def attributes(self) -> List[Attribute]:
output = analyze_attributes(self.schema_query, self.session)
self.schema_query = schema_value_statement(output)
# No simplifier case relies on this schema_query change to update SHOW TABLES to a nested sql friendly query.
if not self.schema_query or not self.session.sql_simplifier_enabled:
self.schema_query = schema_value_statement(output)
return output

@cached_property
Expand Down Expand Up @@ -431,6 +433,7 @@ def large_local_relation_plan(
output: List[Attribute],
data: List[Row],
source_plan: Optional[LogicalPlan],
schema_query: Optional[str],
) -> SnowflakePlan:
temp_table_name = random_name_for_temp_object(TempObjectType.TABLE)
attributes = [
Expand All @@ -449,7 +452,7 @@ def large_local_relation_plan(
)
select_stmt = project_statement([], temp_table_name)
drop_table_stmt = drop_table_if_exists_statement(temp_table_name)
schema_query = schema_value_statement(attributes)
schema_query = schema_query or schema_value_statement(attributes)
queries = [
Query(create_table_stmt, is_ddl_on_temp_object=True),
BatchInsertQuery(insert_stmt, data),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,16 @@ def __init__(self, name: str) -> None:


class SnowflakeValues(LeafNode):
def __init__(self, output: List[Attribute], data: List[Row]) -> None:
def __init__(
self,
output: List[Attribute],
data: List[Row],
schema_query: Optional[str] = None,
) -> None:
super().__init__()
self.output = output
self.data = data
self.schema_query = schema_query


class SaveMode(Enum):
Expand Down
33 changes: 29 additions & 4 deletions src/snowflake/snowpark/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from snowflake.connector import ProgrammingError, SnowflakeConnection
from snowflake.connector.options import installed_pandas, pandas
from snowflake.connector.pandas_tools import write_pandas
from snowflake.snowpark._internal.analyzer import analyzer_utils
from snowflake.snowpark._internal.analyzer.analyzer import Analyzer
from snowflake.snowpark._internal.analyzer.analyzer_utils import result_scan_statement
from snowflake.snowpark._internal.analyzer.datatype_mapper import str_to_sql
Expand Down Expand Up @@ -2257,8 +2258,31 @@ def create_dataframe(

# infer the schema based on the data
names = None
schema_query = None
if isinstance(schema, StructType):
new_schema = schema
# SELECT query has an undefined behavior for nullability, so if the schema requires non-nullable column and
# all columns are primitive type columns, we use a temp table to lock in the nullabilities.
# TODO(SNOW-1015527): Support non-primitive type
if (
not isinstance(self._conn, MockServerConnection)
and any([field.nullable is False for field in schema.fields])
and all([field.datatype.is_primitive() for field in schema.fields])
):
temp_table_name = random_name_for_temp_object(TempObjectType.TABLE)
schema_string = analyzer_utils.attribute_to_schema_string(
schema._to_attributes()
)
try:
self._run_query(
f"CREATE SCOPED TEMP TABLE {temp_table_name} ({schema_string})"
)
schema_query = f"SELECT * FROM {self.get_current_database()}.{self.get_current_schema()}.{temp_table_name}"
except ProgrammingError as e:
logging.debug(
f"Cannot create temp table for specified non-nullable schema, fall back to using schema "
f"string from select query. Exception: {str(e)}"
)
else:
if not data:
raise ValueError("Cannot infer schema from empty data")
Expand Down Expand Up @@ -2427,15 +2451,16 @@ def convert_row_to_list(
self,
self._analyzer.create_select_statement(
from_=self._analyzer.create_select_snowflake_plan(
SnowflakeValues(attrs, converted), analyzer=self._analyzer
SnowflakeValues(attrs, converted, schema_query=schema_query),
analyzer=self._analyzer,
),
analyzer=self._analyzer,
),
).select(project_columns)
else:
df = DataFrame(self, SnowflakeValues(attrs, converted)).select(
project_columns
)
df = DataFrame(
self, SnowflakeValues(attrs, converted, schema_query=schema_query)
).select(project_columns)
set_api_call_source(df, "Session.create_dataframe[values]")

if (
Expand Down
8 changes: 6 additions & 2 deletions src/snowflake/snowpark/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ class UpdateResult(NamedTuple):
"""Result of updating rows in a :class:`Table`."""

rows_updated: int #: The number of rows modified.
multi_joined_rows_updated: int #: The number of multi-joined rows modified.
multi_joined_rows_updated: Optional[
int
] = None #: The number of multi-joined rows modified. ``None`` if ERROR_ON_NONDETERMINISTIC_UPDATE is enabled.


class DeleteResult(NamedTuple):
Expand Down Expand Up @@ -230,7 +232,9 @@ def insert(


def _get_update_result(rows: List[Row]) -> UpdateResult:
return UpdateResult(int(rows[0][0]), int(rows[0][1]))
if len(rows[0]) == 2:
return UpdateResult(int(rows[0][0]), int(rows[0][1]))
return UpdateResult(int(rows[0][0]))


def _get_delete_result(rows: List[Row]) -> DeleteResult:
Expand Down
15 changes: 14 additions & 1 deletion src/snowflake/snowpark/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ def __ne__(self, other):
def __repr__(self):
return f"{self.__class__.__name__}()"

def is_primitive(self):
return True


# Data types
class NullType(DataType):
Expand Down Expand Up @@ -220,6 +223,9 @@ def __init__(self, element_type: Optional[DataType] = None) -> None:
def __repr__(self) -> str:
return f"ArrayType({repr(self.element_type) if self.element_type else ''})"

def is_primitive(self):
return False


class MapType(DataType):
"""Map data type. This maps to the OBJECT data type in Snowflake."""
Expand All @@ -233,6 +239,9 @@ def __init__(
def __repr__(self) -> str:
return f"MapType({repr(self.key_type) if self.key_type else ''}, {repr(self.value_type) if self.value_type else ''})"

def is_primitive(self):
return False


class VectorType(DataType):
"""Vector data type. This maps to the VECTOR data type in Snowflake."""
Expand All @@ -257,6 +266,9 @@ def __init__(
def __repr__(self) -> str:
return f"VectorType({self.element_type},{self.dimension})"

def is_primitive(self):
return False


class ColumnIdentifier:
"""Represents a column identifier."""
Expand Down Expand Up @@ -424,7 +436,8 @@ def names(self) -> List[str]:
class VariantType(DataType):
"""Variant data type. This maps to the VARIANT data type in Snowflake."""

pass
def is_primitive(self):
return False


class GeographyType(DataType):
Expand Down
2 changes: 1 addition & 1 deletion src/snowflake/snowpark/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
#

# Update this for the versions
VERSION = (1, 11, 1)
VERSION = (1, 12, 0)
12 changes: 12 additions & 0 deletions tests/integ/scala/test_update_delete_merge_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,18 @@ def test_update_rows_in_table(session):
assert "condition should also be provided if source is provided" in str(ex_info)


def test_update_rows_nondeterministic_update(session):
TestData.test_data2(session).write.save_as_table(
table_name, mode="overwrite", table_type="temporary"
)
table = session.table(table_name)
session.sql("alter session set ERROR_ON_NONDETERMINISTIC_UPDATE = true").collect()
try:
assert table.update({"b": 0}, col("a") == 1) == UpdateResult(2)
finally:
session.sql("alter session unset ERROR_ON_NONDETERMINISTIC_UPDATE").collect()


@pytest.mark.localtest
def test_delete_rows_in_table(session):
TestData.test_data2(session).write.save_as_table(
Expand Down
Loading

0 comments on commit a0a6ef0

Please sign in to comment.