Skip to content

Commit

Permalink
SNOW-1041753: oob telemetry for snowpark python local testing (#1265)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-aling authored Mar 13, 2024
1 parent e695d3b commit 420a422
Show file tree
Hide file tree
Showing 17 changed files with 826 additions and 123 deletions.
22 changes: 11 additions & 11 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,15 @@
- to_timestamp_ltz
- to_timestamp_ntz
- to_timestamp_tz
- Added support for the following local testing functions:
- Added support for ASOF JOIN type.
- Added support for the following local testing APIs:
- to_timestamp
- to_timestamp_ltz
- to_timestamp_ntz
- to_timestamp_tz
- greatest
- least
- dateadd
- Added support for ASOF JOIN type.
- Added support for the following local testing APIs:
- Session.get_current_account
- Session.get_current_warehouse
- Session.get_current_role
Expand All @@ -29,20 +28,21 @@

### Bug Fixes

- Fixed a bug in Local Testing's implementation of LEFT ANTI and LEFT SEMI joins where rows with null values are dropped.
- Fixed a bug in Local Testing's implementation of `count_distinct`.
- Fixed a bug in Local Testing's implementation where VARIANT columns raise errors at `DataFrame.collect`.
- Fixed a bug in `SnowflakePlanBuilder` that `save_as_table` does not filter column that name start with '$' and follow by number correctly.
- Fixed a bug in local testing implementation of LEFT ANTI and LEFT SEMI joins where rows with null values are dropped.
- Fixed a bug in local testing implementation of DataFrameReader.csv when the optional parameter `field_optionally_enclosed_by` is specified.
- Fixed a bug in local testing implementation of Column.regexp where only the first entry is considered when `pattern` is a `Column`.
- Fixed a bug in local testing implementation of Table.update in which null value in the rows to be updated causes `KeyError`.
- Fixed a bug in local testing implementation where VARIANT columns raise errors at `DataFrame.collect`.
- Fixed a bug in local testing implementation of `count_distinct`.

### Deprecations:

- Deprecated `Session.get_fully_qualified_current_schema`. Consider using `Session.get_fully_qualified_name_if_possible` instead.

### Bug Fixes
### Improvements

- Fixed a bug in `SnowflakePlanBuilder` that `save_as_table` does not filter column that name start with '$' and follow by number correctly.
- Fixed a bug in local testing implementation of DataFrameReader.csv when the optional parameter `field_optionally_enclosed_by` is specified.
- Fixed a bug in Local Testing implementation of Table.update in which null value in the rows to be updated causes `KeyError`.
- Fixed a bug in local testing implementation of Column.regexp where only the first entry is considered when `pattern` is a `Column`.
- Added telemetry to local testing.

## 1.13.0 (2024-02-26)

Expand Down
6 changes: 6 additions & 0 deletions src/snowflake/snowpark/_internal/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import traceback
import zipfile
from enum import Enum
from functools import lru_cache
from json import JSONEncoder
from random import choice
from typing import (
Expand Down Expand Up @@ -196,22 +197,27 @@ def validate_object_name(name: str):
raise SnowparkClientExceptionMessages.GENERAL_INVALID_OBJECT_NAME(name)


@lru_cache
def get_version() -> str:
return ".".join([str(d) for d in snowpark_version if d is not None])


@lru_cache
def get_python_version() -> str:
return platform.python_version()


@lru_cache
def get_connector_version() -> str:
return ".".join([str(d) for d in connector_version if d is not None])


@lru_cache
def get_os_name() -> str:
return platform.system()


@lru_cache
def get_application_name() -> str:
return "PythonSnowpark"

Expand Down
7 changes: 5 additions & 2 deletions src/snowflake/snowpark/dataframe_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,8 +648,11 @@ def _read_semi_structured_file(self, path: str, format: str) -> DataFrame:
from snowflake.snowpark.mock._connection import MockServerConnection

if isinstance(self._session._conn, MockServerConnection):
raise NotImplementedError(
f"[Local Testing] Support for semi structured file {format} is not implemented."
self._session._conn.log_not_supported_error(
external_feature_name=f"Read semi structured {format} file",
internal_feature_name="DataFrameReader._read_semi_structured_file",
parameters_info={"format": str(format)},
raise_error=NotImplementedError,
)

if self._user_schema:
Expand Down
54 changes: 37 additions & 17 deletions src/snowflake/snowpark/mock/_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@
MockSelectExecutionPlan,
MockSelectStatement,
)
from snowflake.snowpark.mock._telemetry import LocalTestOOBTelemetryService
from snowflake.snowpark.types import _NumericType


Expand All @@ -148,7 +149,12 @@ def serialize_expression(exp: Expression):
elif isinstance(exp, UnresolvedAttribute):
return str(exp)
else:
raise TypeError(f"{type(exp)} isn't supported yet in mocking.")
LocalTestOOBTelemetryService.get_instance().log_not_supported_error(
external_feature_name=f"Expression {type(exp).__name__}",
internal_feature_name="_analyzer.serialize_expression",
parameters_info={"exp": type(exp).__name__},
raise_error=TypeError,
)


class MockAnalyzer:
Expand All @@ -158,6 +164,7 @@ def __init__(self, session: "snowflake.snowpark.session.Session") -> None:
self.generated_alias_maps = {}
self.subquery_plans = []
self.alias_maps_to_use = None
self._conn = self.session._conn

def analyze(
self,
Expand All @@ -179,8 +186,9 @@ def analyze(
if expr_to_alias is None:
expr_to_alias = {}
if isinstance(expr, GroupingSetsExpression):
raise NotImplementedError(
"[Local Testing] group by grouping sets is not implemented."
self._conn.log_not_supported_error(
external_feature_name="DataFrame.group_by_grouping_sets",
raise_error=NotImplementedError,
)

if isinstance(expr, Like):
Expand Down Expand Up @@ -243,8 +251,9 @@ def analyze(
)

if isinstance(expr, GroupingSet):
raise NotImplementedError(
"[Local Testing] group by grouping sets is not implemented."
self._conn.log_not_supported_error(
external_feature_name="DataFrame.group_by_grouping_sets",
raise_error=NotImplementedError,
)

if isinstance(expr, WindowExpression):
Expand Down Expand Up @@ -634,17 +643,22 @@ def do_resolve_with_resolved_children(
if isinstance(logical_plan, MockExecutionPlan):
return logical_plan
if isinstance(logical_plan, TableFunctionJoin):
raise NotImplementedError(
"[Local Testing] Table function is currently not supported."
self._conn.log_not_supported_error(
external_feature_name="table_function.TableFunctionJoin",
raise_error=NotImplementedError,
)

if isinstance(logical_plan, TableFunctionRelation):
raise NotImplementedError(
"[Local Testing] table function is not implemented."
self._conn.log_not_supported_error(
external_feature_name="table_function.TableFunctionRelation",
raise_error=NotImplementedError,
)

if isinstance(logical_plan, Lateral):
raise NotImplementedError("[Local Testing] Lateral is not implemented.")
self._conn.log_not_supported_error(
external_feature_name="table_function.Lateral",
raise_error=NotImplementedError,
)

if isinstance(logical_plan, Aggregate):
return MockExecutionPlan(
Expand Down Expand Up @@ -713,19 +727,24 @@ def do_resolve_with_resolved_children(
)

if isinstance(logical_plan, Pivot):
raise NotImplementedError("[Local Testing] Pivot is not implemented.")
self._conn.log_not_supported_error(
external_feature_name="RelationalGroupedDataFrame.Pivot",
raise_error=NotImplementedError,
)

if isinstance(logical_plan, Unpivot):
raise NotImplementedError(
"[Local Testing] DataFrame.unpivot is not currently supported."
self._conn.log_not_supported_error(
external_feature_name="RelationalGroupedDataFrame.Unpivot",
raise_error=NotImplementedError,
)

if isinstance(logical_plan, CreateViewCommand):
return MockExecutionPlan(logical_plan, self.session)

if isinstance(logical_plan, CopyIntoTableNode):
raise NotImplementedError(
"[Local Testing] Copy into table is currently not supported."
self._conn.log_not_supported_error(
external_feature_name="DateFrame.copy_into_table",
raise_error=NotImplementedError,
)

if isinstance(logical_plan, CopyIntoLocationNode):
Expand All @@ -749,8 +768,9 @@ def do_resolve_with_resolved_children(
return MockExecutionPlan(logical_plan, self.session)

if isinstance(logical_plan, CreateDynamicTableCommand):
raise NotImplementedError(
"[Local Testing] Dynamic tables are currently not supported."
self._conn.log_not_supported_error(
external_feature_name="DateFrame.create_or_replace_dynamic_table",
raise_error=NotImplementedError,
)

if isinstance(logical_plan, TableMerge):
Expand Down
69 changes: 62 additions & 7 deletions src/snowflake/snowpark/mock/_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@

import functools
import json
import logging
import os
import re
import sys
import time
import uuid
from copy import copy
from decimal import Decimal
from logging import getLogger
Expand Down Expand Up @@ -44,6 +46,7 @@
from snowflake.snowpark.exceptions import SnowparkSQLException
from snowflake.snowpark.mock._plan import MockExecutionPlan, execute_mock_plan
from snowflake.snowpark.mock._snowflake_data_type import TableEmulator
from snowflake.snowpark.mock._telemetry import LocalTestOOBTelemetryService
from snowflake.snowpark.row import Row
from snowflake.snowpark.types import (
ArrayType,
Expand All @@ -62,7 +65,11 @@


def _build_put_statement(*args, **kwargs):
raise NotImplementedError()
LocalTestOOBTelemetryService.get_instance().log_not_supported_error(
external_feature_name="PUT stream",
internal_feature_name="_connection._build_put_statement",
raise_error=NotImplementedError,
)


def _build_target_path(stage_location: str, dest_prefix: str = "") -> str:
Expand Down Expand Up @@ -229,6 +236,47 @@ def __init__(self, options: Optional[Dict[str, Any]] = None) -> None:
self._active_schema = self._options.get(
"schema", snowflake.snowpark.mock._constants.CURRENT_SCHEMA
)
self._connection_uuid = str(uuid.uuid4())
# by default, usage telemetry is collected
self._disable_local_testing_telemetry = self._options.get(
"disable_local_testing_telemetry", False
)
self._oob_telemetry = LocalTestOOBTelemetryService.get_instance()
if self._disable_local_testing_telemetry:
# after disabling, the log will basically be a no-op, not sending any telemetry
self._oob_telemetry.disable()
else:
self._oob_telemetry.log_session_creation(self._connection_uuid)

def log_not_supported_error(
self,
external_feature_name: Optional[str] = None,
internal_feature_name: Optional[str] = None,
error_message: Optional[str] = None,
parameters_info: Optional[dict] = None,
raise_error: Optional[type] = None,
warning_logger: Optional[logging.Logger] = None,
):
"""
send telemetry to oob servie, can raise error or logging a warning based upon the input
Args:
external_feature_name: customer facing feature name, this information is used to raise error
internal_feature_name: optional internal api/feature name, this information is used to track internal api
error_message: optional error message overwrite the default message
parameters_info: optionals parameters information related to the feature
raise_error: Set to an exception to raise exception
warning_logger: Set logger to log a warning message
"""
self._oob_telemetry.log_not_supported_error(
external_feature_name=external_feature_name,
internal_feature_name=internal_feature_name,
parameters_info=parameters_info,
error_message=error_message,
connection_uuid=self._connection_uuid,
raise_error=raise_error,
warning_logger=warning_logger,
)

def _get_client_side_session_parameter(self, name: str, default_value: Any) -> Any:
# mock implementation
Expand Down Expand Up @@ -325,8 +373,10 @@ def upload_stream(
overwrite: bool = False,
is_in_udf: bool = False,
) -> Optional[Dict[str, Any]]:
raise NotImplementedError(
"[Local Testing] PUT stream is currently not supported."
self.log_not_supported_error(
external_feature_name="PUT stream",
internal_feature_name="MockServerConnection.upload_stream",
raise_error=NotImplementedError,
)

@_Decorator.wrap_exception
Expand All @@ -352,8 +402,10 @@ def run_query(
setattr(self, f"_active_{object_type}", object_name)
return {"data": [("Statement executed successfully.",)], "sfqid": None}
else:
raise NotImplementedError(
"[Local Testing] Running SQL queries is not supported."
self.log_not_supported_error(
external_feature_name="Running SQL queries",
internal_feature_name="MockServerConnection.run_query",
raise_error=NotImplementedError,
)

def _to_data_or_iter(
Expand Down Expand Up @@ -407,8 +459,11 @@ def execute(
List[Row], "pandas.DataFrame", Iterator[Row], Iterator["pandas.DataFrame"]
]:
if not block:
raise NotImplementedError(
"[Local Testing] Async jobs are currently not supported."
self.log_not_supported_error(
external_feature_name="Async job",
internal_feature_name="MockServerConnection.execute",
parameters_info={"block": str(block)},
raise_error=NotImplementedError,
)

res = execute_mock_plan(plan)
Expand Down
28 changes: 23 additions & 5 deletions src/snowflake/snowpark/mock/_file_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,11 @@ def read_file(
if format.lower() == "csv":
for option in options:
if option not in SUPPORTED_CSV_READ_OPTIONS:
_logger.warning(
f"[Local Testing] read file option {option} is not supported."
analyzer.session._conn.log_not_supported_error(
external_feature_name=f"Read file option {option}",
internal_feature_name="_file_operation.read_file",
parameters_info={"format": format, "option": option},
warning_logger=_logger,
)
skip_header = options.get("SKIP_HEADER", 0)
skip_blank_lines = options.get("SKIP_BLANK_LINES", False)
Expand All @@ -140,8 +143,17 @@ def read_file(
result_df[column_name] = column_series
result_df_sf_types[column_name] = column_series.sf_type
if type(column_series.sf_type.datatype) not in CONVERT_MAP:
_logger.warning(
f"[Local Testing] Reading snowflake data type {type(column_series.sf_type.datatype)} is not supported. It will be treated as a raw string in the dataframe."
analyzer.session._conn.log_not_supported_error(
error_message=f"[Local Testing] Reading snowflake"
f" data type {type(column_series.sf_type.datatype).__name__} is"
f" not supported. It will be treated as a raw string in the dataframe.",
internal_feature_name="_file_operation.read_file",
parameters_info={
"column_series.sf_type.datatype": str(
type(column_series.sf_type.datatype).__name__
)
},
warning_logger=_logger,
)
continue
converter = CONVERT_MAP[type(column_series.sf_type.datatype)]
Expand Down Expand Up @@ -187,4 +199,10 @@ def read_file(
result_df = pd.concat([result_df, df], ignore_index=True)
result_df.sf_types = result_df_sf_types
return result_df
raise NotImplementedError(f"[Local Testing] File format {format} is not supported.")

analyzer.session._conn.log_not_supported_error(
external_feature_name=f"Read {format} file",
internal_feature_name="_file_operation.read_file",
parameters_info={"format": format},
raise_error=NotImplementedError,
)
Loading

0 comments on commit 420a422

Please sign in to comment.