diff --git a/CHANGELOG.md b/CHANGELOG.md index 11dffa17a2a..c08027859a5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,8 @@ - to_timestamp_ltz - to_timestamp_ntz - to_timestamp_tz -- Added support for the following local testing functions: +- Added support for ASOF JOIN type. +- Added support for the following local testing APIs: - to_timestamp - to_timestamp_ltz - to_timestamp_ntz @@ -17,8 +18,6 @@ - greatest - least - dateadd -- Added support for ASOF JOIN type. -- Added support for the following local testing APIs: - Session.get_current_account - Session.get_current_warehouse - Session.get_current_role @@ -29,20 +28,21 @@ ### Bug Fixes -- Fixed a bug in Local Testing's implementation of LEFT ANTI and LEFT SEMI joins where rows with null values are dropped. -- Fixed a bug in Local Testing's implementation of `count_distinct`. -- Fixed a bug in Local Testing's implementation where VARIANT columns raise errors at `DataFrame.collect`. +- Fixed a bug in `SnowflakePlanBuilder` that `save_as_table` does not filter column that name start with '$' and follow by number correctly. +- Fixed a bug in local testing implementation of LEFT ANTI and LEFT SEMI joins where rows with null values are dropped. +- Fixed a bug in local testing implementation of DataFrameReader.csv when the optional parameter `field_optionally_enclosed_by` is specified. +- Fixed a bug in local testing implementation of Column.regexp where only the first entry is considered when `pattern` is a `Column`. +- Fixed a bug in local testing implementation of Table.update in which null value in the rows to be updated causes `KeyError`. +- Fixed a bug in local testing implementation where VARIANT columns raise errors at `DataFrame.collect`. +- Fixed a bug in local testing implementation of `count_distinct`. ### Deprecations: - Deprecated `Session.get_fully_qualified_current_schema`. Consider using `Session.get_fully_qualified_name_if_possible` instead. -### Bug Fixes +### Improvements -- Fixed a bug in `SnowflakePlanBuilder` that `save_as_table` does not filter column that name start with '$' and follow by number correctly. -- Fixed a bug in local testing implementation of DataFrameReader.csv when the optional parameter `field_optionally_enclosed_by` is specified. -- Fixed a bug in Local Testing implementation of Table.update in which null value in the rows to be updated causes `KeyError`. -- Fixed a bug in local testing implementation of Column.regexp where only the first entry is considered when `pattern` is a `Column`. +- Added telemetry to local testing. ## 1.13.0 (2024-02-26) diff --git a/src/snowflake/snowpark/_internal/utils.py b/src/snowflake/snowpark/_internal/utils.py index 1c5fe3c1be7..7219202b7a3 100644 --- a/src/snowflake/snowpark/_internal/utils.py +++ b/src/snowflake/snowpark/_internal/utils.py @@ -19,6 +19,7 @@ import traceback import zipfile from enum import Enum +from functools import lru_cache from json import JSONEncoder from random import choice from typing import ( @@ -196,22 +197,27 @@ def validate_object_name(name: str): raise SnowparkClientExceptionMessages.GENERAL_INVALID_OBJECT_NAME(name) +@lru_cache def get_version() -> str: return ".".join([str(d) for d in snowpark_version if d is not None]) +@lru_cache def get_python_version() -> str: return platform.python_version() +@lru_cache def get_connector_version() -> str: return ".".join([str(d) for d in connector_version if d is not None]) +@lru_cache def get_os_name() -> str: return platform.system() +@lru_cache def get_application_name() -> str: return "PythonSnowpark" diff --git a/src/snowflake/snowpark/dataframe_reader.py b/src/snowflake/snowpark/dataframe_reader.py index d74c45abf51..2c0c71fab16 100644 --- a/src/snowflake/snowpark/dataframe_reader.py +++ b/src/snowflake/snowpark/dataframe_reader.py @@ -648,8 +648,11 @@ def _read_semi_structured_file(self, path: str, format: str) -> DataFrame: from snowflake.snowpark.mock._connection import MockServerConnection if isinstance(self._session._conn, MockServerConnection): - raise NotImplementedError( - f"[Local Testing] Support for semi structured file {format} is not implemented." + self._session._conn.log_not_supported_error( + external_feature_name=f"Read semi structured {format} file", + internal_feature_name="DataFrameReader._read_semi_structured_file", + parameters_info={"format": str(format)}, + raise_error=NotImplementedError, ) if self._user_schema: diff --git a/src/snowflake/snowpark/mock/_analyzer.py b/src/snowflake/snowpark/mock/_analyzer.py index 513e5bb06f9..50894a4246c 100644 --- a/src/snowflake/snowpark/mock/_analyzer.py +++ b/src/snowflake/snowpark/mock/_analyzer.py @@ -139,6 +139,7 @@ MockSelectExecutionPlan, MockSelectStatement, ) +from snowflake.snowpark.mock._telemetry import LocalTestOOBTelemetryService from snowflake.snowpark.types import _NumericType @@ -148,7 +149,12 @@ def serialize_expression(exp: Expression): elif isinstance(exp, UnresolvedAttribute): return str(exp) else: - raise TypeError(f"{type(exp)} isn't supported yet in mocking.") + LocalTestOOBTelemetryService.get_instance().log_not_supported_error( + external_feature_name=f"Expression {type(exp).__name__}", + internal_feature_name="_analyzer.serialize_expression", + parameters_info={"exp": type(exp).__name__}, + raise_error=TypeError, + ) class MockAnalyzer: @@ -158,6 +164,7 @@ def __init__(self, session: "snowflake.snowpark.session.Session") -> None: self.generated_alias_maps = {} self.subquery_plans = [] self.alias_maps_to_use = None + self._conn = self.session._conn def analyze( self, @@ -179,8 +186,9 @@ def analyze( if expr_to_alias is None: expr_to_alias = {} if isinstance(expr, GroupingSetsExpression): - raise NotImplementedError( - "[Local Testing] group by grouping sets is not implemented." + self._conn.log_not_supported_error( + external_feature_name="DataFrame.group_by_grouping_sets", + raise_error=NotImplementedError, ) if isinstance(expr, Like): @@ -243,8 +251,9 @@ def analyze( ) if isinstance(expr, GroupingSet): - raise NotImplementedError( - "[Local Testing] group by grouping sets is not implemented." + self._conn.log_not_supported_error( + external_feature_name="DataFrame.group_by_grouping_sets", + raise_error=NotImplementedError, ) if isinstance(expr, WindowExpression): @@ -634,17 +643,22 @@ def do_resolve_with_resolved_children( if isinstance(logical_plan, MockExecutionPlan): return logical_plan if isinstance(logical_plan, TableFunctionJoin): - raise NotImplementedError( - "[Local Testing] Table function is currently not supported." + self._conn.log_not_supported_error( + external_feature_name="table_function.TableFunctionJoin", + raise_error=NotImplementedError, ) if isinstance(logical_plan, TableFunctionRelation): - raise NotImplementedError( - "[Local Testing] table function is not implemented." + self._conn.log_not_supported_error( + external_feature_name="table_function.TableFunctionRelation", + raise_error=NotImplementedError, ) if isinstance(logical_plan, Lateral): - raise NotImplementedError("[Local Testing] Lateral is not implemented.") + self._conn.log_not_supported_error( + external_feature_name="table_function.Lateral", + raise_error=NotImplementedError, + ) if isinstance(logical_plan, Aggregate): return MockExecutionPlan( @@ -713,19 +727,24 @@ def do_resolve_with_resolved_children( ) if isinstance(logical_plan, Pivot): - raise NotImplementedError("[Local Testing] Pivot is not implemented.") + self._conn.log_not_supported_error( + external_feature_name="RelationalGroupedDataFrame.Pivot", + raise_error=NotImplementedError, + ) if isinstance(logical_plan, Unpivot): - raise NotImplementedError( - "[Local Testing] DataFrame.unpivot is not currently supported." + self._conn.log_not_supported_error( + external_feature_name="RelationalGroupedDataFrame.Unpivot", + raise_error=NotImplementedError, ) if isinstance(logical_plan, CreateViewCommand): return MockExecutionPlan(logical_plan, self.session) if isinstance(logical_plan, CopyIntoTableNode): - raise NotImplementedError( - "[Local Testing] Copy into table is currently not supported." + self._conn.log_not_supported_error( + external_feature_name="DateFrame.copy_into_table", + raise_error=NotImplementedError, ) if isinstance(logical_plan, CopyIntoLocationNode): @@ -749,8 +768,9 @@ def do_resolve_with_resolved_children( return MockExecutionPlan(logical_plan, self.session) if isinstance(logical_plan, CreateDynamicTableCommand): - raise NotImplementedError( - "[Local Testing] Dynamic tables are currently not supported." + self._conn.log_not_supported_error( + external_feature_name="DateFrame.create_or_replace_dynamic_table", + raise_error=NotImplementedError, ) if isinstance(logical_plan, TableMerge): diff --git a/src/snowflake/snowpark/mock/_connection.py b/src/snowflake/snowpark/mock/_connection.py index fbc103b4ec0..e5d6a4c4acf 100644 --- a/src/snowflake/snowpark/mock/_connection.py +++ b/src/snowflake/snowpark/mock/_connection.py @@ -5,10 +5,12 @@ import functools import json +import logging import os import re import sys import time +import uuid from copy import copy from decimal import Decimal from logging import getLogger @@ -44,6 +46,7 @@ from snowflake.snowpark.exceptions import SnowparkSQLException from snowflake.snowpark.mock._plan import MockExecutionPlan, execute_mock_plan from snowflake.snowpark.mock._snowflake_data_type import TableEmulator +from snowflake.snowpark.mock._telemetry import LocalTestOOBTelemetryService from snowflake.snowpark.row import Row from snowflake.snowpark.types import ( ArrayType, @@ -62,7 +65,11 @@ def _build_put_statement(*args, **kwargs): - raise NotImplementedError() + LocalTestOOBTelemetryService.get_instance().log_not_supported_error( + external_feature_name="PUT stream", + internal_feature_name="_connection._build_put_statement", + raise_error=NotImplementedError, + ) def _build_target_path(stage_location: str, dest_prefix: str = "") -> str: @@ -229,6 +236,47 @@ def __init__(self, options: Optional[Dict[str, Any]] = None) -> None: self._active_schema = self._options.get( "schema", snowflake.snowpark.mock._constants.CURRENT_SCHEMA ) + self._connection_uuid = str(uuid.uuid4()) + # by default, usage telemetry is collected + self._disable_local_testing_telemetry = self._options.get( + "disable_local_testing_telemetry", False + ) + self._oob_telemetry = LocalTestOOBTelemetryService.get_instance() + if self._disable_local_testing_telemetry: + # after disabling, the log will basically be a no-op, not sending any telemetry + self._oob_telemetry.disable() + else: + self._oob_telemetry.log_session_creation(self._connection_uuid) + + def log_not_supported_error( + self, + external_feature_name: Optional[str] = None, + internal_feature_name: Optional[str] = None, + error_message: Optional[str] = None, + parameters_info: Optional[dict] = None, + raise_error: Optional[type] = None, + warning_logger: Optional[logging.Logger] = None, + ): + """ + send telemetry to oob servie, can raise error or logging a warning based upon the input + + Args: + external_feature_name: customer facing feature name, this information is used to raise error + internal_feature_name: optional internal api/feature name, this information is used to track internal api + error_message: optional error message overwrite the default message + parameters_info: optionals parameters information related to the feature + raise_error: Set to an exception to raise exception + warning_logger: Set logger to log a warning message + """ + self._oob_telemetry.log_not_supported_error( + external_feature_name=external_feature_name, + internal_feature_name=internal_feature_name, + parameters_info=parameters_info, + error_message=error_message, + connection_uuid=self._connection_uuid, + raise_error=raise_error, + warning_logger=warning_logger, + ) def _get_client_side_session_parameter(self, name: str, default_value: Any) -> Any: # mock implementation @@ -325,8 +373,10 @@ def upload_stream( overwrite: bool = False, is_in_udf: bool = False, ) -> Optional[Dict[str, Any]]: - raise NotImplementedError( - "[Local Testing] PUT stream is currently not supported." + self.log_not_supported_error( + external_feature_name="PUT stream", + internal_feature_name="MockServerConnection.upload_stream", + raise_error=NotImplementedError, ) @_Decorator.wrap_exception @@ -352,8 +402,10 @@ def run_query( setattr(self, f"_active_{object_type}", object_name) return {"data": [("Statement executed successfully.",)], "sfqid": None} else: - raise NotImplementedError( - "[Local Testing] Running SQL queries is not supported." + self.log_not_supported_error( + external_feature_name="Running SQL queries", + internal_feature_name="MockServerConnection.run_query", + raise_error=NotImplementedError, ) def _to_data_or_iter( @@ -407,8 +459,11 @@ def execute( List[Row], "pandas.DataFrame", Iterator[Row], Iterator["pandas.DataFrame"] ]: if not block: - raise NotImplementedError( - "[Local Testing] Async jobs are currently not supported." + self.log_not_supported_error( + external_feature_name="Async job", + internal_feature_name="MockServerConnection.execute", + parameters_info={"block": str(block)}, + raise_error=NotImplementedError, ) res = execute_mock_plan(plan) diff --git a/src/snowflake/snowpark/mock/_file_operation.py b/src/snowflake/snowpark/mock/_file_operation.py index 6a5ce85bbac..420290aafaf 100644 --- a/src/snowflake/snowpark/mock/_file_operation.py +++ b/src/snowflake/snowpark/mock/_file_operation.py @@ -113,8 +113,11 @@ def read_file( if format.lower() == "csv": for option in options: if option not in SUPPORTED_CSV_READ_OPTIONS: - _logger.warning( - f"[Local Testing] read file option {option} is not supported." + analyzer.session._conn.log_not_supported_error( + external_feature_name=f"Read file option {option}", + internal_feature_name="_file_operation.read_file", + parameters_info={"format": format, "option": option}, + warning_logger=_logger, ) skip_header = options.get("SKIP_HEADER", 0) skip_blank_lines = options.get("SKIP_BLANK_LINES", False) @@ -140,8 +143,17 @@ def read_file( result_df[column_name] = column_series result_df_sf_types[column_name] = column_series.sf_type if type(column_series.sf_type.datatype) not in CONVERT_MAP: - _logger.warning( - f"[Local Testing] Reading snowflake data type {type(column_series.sf_type.datatype)} is not supported. It will be treated as a raw string in the dataframe." + analyzer.session._conn.log_not_supported_error( + error_message=f"[Local Testing] Reading snowflake" + f" data type {type(column_series.sf_type.datatype).__name__} is" + f" not supported. It will be treated as a raw string in the dataframe.", + internal_feature_name="_file_operation.read_file", + parameters_info={ + "column_series.sf_type.datatype": str( + type(column_series.sf_type.datatype).__name__ + ) + }, + warning_logger=_logger, ) continue converter = CONVERT_MAP[type(column_series.sf_type.datatype)] @@ -187,4 +199,10 @@ def read_file( result_df = pd.concat([result_df, df], ignore_index=True) result_df.sf_types = result_df_sf_types return result_df - raise NotImplementedError(f"[Local Testing] File format {format} is not supported.") + + analyzer.session._conn.log_not_supported_error( + external_feature_name=f"Read {format} file", + internal_feature_name="_file_operation.read_file", + parameters_info={"format": format}, + raise_error=NotImplementedError, + ) diff --git a/src/snowflake/snowpark/mock/_functions.py b/src/snowflake/snowpark/mock/_functions.py index 6a00fa86096..5713b841480 100644 --- a/src/snowflake/snowpark/mock/_functions.py +++ b/src/snowflake/snowpark/mock/_functions.py @@ -42,6 +42,7 @@ _NumericType, ) +from ._telemetry import LocalTestOOBTelemetryService from ._util import ( convert_snowflake_datetime_format, process_numeric_time, @@ -732,17 +733,29 @@ def mock_to_char( try_convert, lambda x: datetime.datetime.strftime(x, date_format), try_cast ) elif isinstance(source_datatype, TimeType): - raise NotImplementedError( - "[Local Testing] Use TO_CHAR on Time data is not supported yet" + LocalTestOOBTelemetryService.get_instance().log_not_supported_error( + external_feature_name="Use TO_CHAR on Time data", + internal_feature_name="mock_to_char", + parameters_info={"source_datatype": type(source_datatype).__name__}, + raise_error=NotImplementedError, ) elif isinstance(source_datatype, (DateType, TimeType, TimestampType)): - raise NotImplementedError( - "[Local Testing] Use TO_CHAR on Timestamp data is not supported yet" + LocalTestOOBTelemetryService.get_instance().log_not_supported_error( + external_feature_name="Use TO_CHAR on Timestamp data", + internal_feature_name="mock_to_char", + parameters_info={"source_datatype": type(source_datatype).__name__}, + raise_error=NotImplementedError, ) elif isinstance(source_datatype, _NumericType): if fmt: - raise NotImplementedError( - "[Local Testing] Use format strings with Numeric types in TO_CHAR is not supported yet." + LocalTestOOBTelemetryService.get_instance().log_not_supported_error( + external_feature_name="Use format strings with Numeric types in TO_CHAR", + internal_feature_name="mock_to_char", + parameters_info={ + "source_datatype": type(source_datatype).__name__, + "fmt": str(fmt), + }, + raise_error=NotImplementedError, ) func = partial(try_convert, lambda x: str(x), try_cast) else: @@ -776,18 +789,28 @@ def mock_to_double( Note that conversion of decimal fractions to binary and back is not precise (i.e. printing of a floating-point number converted from decimal representation might produce a slightly diffe """ if fmt: - raise NotImplementedError( - "[Local Testing] Using format strings in to_double is not supported yet" + LocalTestOOBTelemetryService.get_instance().log_not_supported_error( + external_feature_name="Using format strings in TO_DOUBLE", + internal_feature_name="mock_to_double", + parameters_info={"fmt": str(fmt)}, + raise_error=NotImplementedError, ) if isinstance(column.sf_type.datatype, (_NumericType, StringType)): res = column.apply(lambda x: try_convert(float, try_cast, x)) res.sf_type = ColumnType(DoubleType(), column.sf_type.nullable) return res elif isinstance(column.sf_type.datatype, VariantType): - raise NotImplementedError("[Local Testing] Variant is not supported yet") + LocalTestOOBTelemetryService.get_instance().log_not_supported_error( + external_feature_name="Use TO_DOUBLE on Variant data", + internal_feature_name="mock_to_double", + parameters_info={ + "column.sf_type.datatype": type(column.sf_type.datatype).__name__ + }, + raise_error=NotImplementedError, + ) else: raise NotImplementedError( - f"[Local Testing[ Invalid type {column.sf_type.datatype} for parameter 'TO_DOUBLE'" + f"[Local Testing] Invalid type {column.sf_type.datatype} for parameter 'TO_DOUBLE'" ) diff --git a/src/snowflake/snowpark/mock/_pandas_util.py b/src/snowflake/snowpark/mock/_pandas_util.py index 349cff496a1..1d3217e9ed0 100644 --- a/src/snowflake/snowpark/mock/_pandas_util.py +++ b/src/snowflake/snowpark/mock/_pandas_util.py @@ -11,6 +11,7 @@ ) from snowflake.snowpark._internal.type_utils import infer_type from snowflake.snowpark.exceptions import SnowparkClientException +from snowflake.snowpark.mock._telemetry import LocalTestOOBTelemetryService from snowflake.snowpark.table import Table from snowflake.snowpark.types import ( ArrayType, @@ -121,8 +122,11 @@ def convert_to_python_obj(obj): elif isinstance(obj, pd.Timestamp): return int(obj.value / 1000) else: - raise NotImplementedError( - f"[Local Testing] {type(obj)} within pandas.Interval is not supported." + LocalTestOOBTelemetryService.get_instance().log_not_supported_error( + external_feature_name=f"{type(obj).__name__} within pandas.Interval", + internal_feature_name="_pandas_util._extract_schema_and_data_from_pandas_df", + parameters_info={"obj": type(obj).__name__}, + raise_error=NotImplementedError, ) plain_data[row_idx][col_idx] = { @@ -152,8 +156,15 @@ def convert_to_python_obj(obj): scale = 0 if len(decimal_parts) == 1 else len(decimal_parts[1]) precision = integer_len + scale if precision > 38: - raise SnowparkClientException( - f"[Local Testing] Column precision {precision} and scale {scale} are not supported." + LocalTestOOBTelemetryService.get_instance().log_not_supported_error( + external_feature_name=f"Column precision {precision} and scale {scale}", + internal_feature_name="_pandas_util._extract_schema_and_data_from_pandas_df", + parameters_info={ + "precision": str(precision), + "scale": str(scale), + "data_type": type(data_type).__name__, + }, + raise_error=SnowparkClientException, ) # handle integer and float separately data_type = DecimalType(precision=precision, scale=scale) @@ -161,9 +172,18 @@ def convert_to_python_obj(obj): if isinstance(previous_inferred_type, NullType): inferred_type_dict[col_idx] = data_type if type(data_type) != type(previous_inferred_type): - raise SnowparkClientException( - f"[Local Testing] Detected type {type(data_type)} and type {type(previous_inferred_type)}" - f" in column, coercion is not currently supported" + LocalTestOOBTelemetryService.get_instance().log_not_supported_error( + external_feature_name=f"Coercion of detected" + f" type {type(data_type).__name__} " + f"and type {str(type(previous_inferred_type).__name)} in column", + internal_feature_name="_pandas_util._extract_schema_and_data_from_pandas_df", + parameters_info={ + "data_type": type(data_type).__name__, + "previous_inferred_type": str( + type(previous_inferred_type).__name__ + ), + }, + raise_error=SnowparkClientException, ) if isinstance(inferred_type_dict[col_idx], DecimalType): inferred_type_dict[col_idx] = DecimalType( diff --git a/src/snowflake/snowpark/mock/_plan.py b/src/snowflake/snowpark/mock/_plan.py index 375920488c7..d74cb1239ae 100644 --- a/src/snowflake/snowpark/mock/_plan.py +++ b/src/snowflake/snowpark/mock/_plan.py @@ -324,15 +324,21 @@ def handle_function_expression( importlib.import_module("snowflake.snowpark.functions"), func_name ) except AttributeError: - raise NotImplementedError( - f"[Local Testing] Mocking function {func_name} is not supported." + # this is missing function in snowpark-python, need support for both live and local test + analyzer.session._conn.log_not_supported_error( + external_feature_name=func_name, + error_message=f"Function {func_name} is not supported in snowpark-python.", + raise_error=NotImplementedError, ) signatures = inspect.signature(original_func) spec = inspect.getfullargspec(original_func) if func_name not in _MOCK_FUNCTION_IMPLEMENTATION_MAP: - raise NotImplementedError( - f"[Local Testing] Mocking function {func_name} is not implemented." + analyzer.session._conn.log_not_supported_error( + external_feature_name=func_name, + error_message=f"Function {func_name} is not implemented. You can implement and make a patch by " + f"using the `snowflake.snowpark.mock.patch` decorator.", + raise_error=NotImplementedError, ) to_pass_args = [] type_hints = typing.get_type_hints(original_func) @@ -539,8 +545,11 @@ def execute_mock_plan( # Compute drop duplicates res_df = res_df.drop_duplicates() else: - raise NotImplementedError( - f"[Local Testing] SetStatement operator {operator} is currently not implemented." + analyzer.session._conn.log_not_supported_error( + external_feature_name=f"SetStatement operator {operator}", + internal_feature_name=type(source_plan).__name__, + parameters_info={"operator": str(operator)}, + raise_error=NotImplementedError, ) return res_df if isinstance(source_plan, MockSelectableEntity): @@ -610,8 +619,14 @@ def execute_mock_plan( ), ) else: - raise NotImplementedError( - f"[Local Testing] Aggregate expression {type(agg_expr.child).__name__} is not implemented." + analyzer.session._conn.log_not_supported_error( + external_feature_name=f"Aggregate expression {type(agg_expr.child).__name__}", + internal_feature_name=type(source_plan).__name__, + parameters_info={ + "agg_expr": type(agg_expr).__name__, + "agg_expr.child": type(agg_expr.child).__name__, + }, + raise_error=NotImplementedError, ) elif isinstance(agg_expr, (Attribute, UnresolvedAlias)): column_name = plan.session._analyzer.analyze(agg_expr) @@ -626,8 +641,13 @@ def execute_mock_plan( f"[Local Testing] invalid identifier {column_name}" ) else: - raise NotImplementedError( - f"[Local Testing] Aggregate expression {type(agg_expr).__name__} is not implemented." + analyzer.session._conn.log_not_supported_error( + external_feature_name=f"Aggregate expression {type(agg_expr).__name__}", + internal_feature_name=type(source_plan).__name__, + parameters_info={ + "agg_expr": type(agg_expr).__name__, + }, + raise_error=NotImplementedError, ) result_df_sf_Types = {} @@ -899,8 +919,11 @@ def outer_join(base_df): return execute_file_operation(source_plan, analyzer) if isinstance(source_plan, SnowflakeCreateTable): if source_plan.column_names is not None: - raise NotImplementedError( - "[Local Testing] Inserting data into table by matching column names is currently not supported." + analyzer.session._conn.log_not_supported_error( + external_feature_name="Inserting data into table by matching columns", + internal_feature_name=type(source_plan).__name__, + parameters_info={"source_plan.column_names": "True"}, + raise_error=NotImplementedError, ) res_df = execute_mock_plan(source_plan.query) return entity_registry.write_table( @@ -1209,8 +1232,10 @@ def outer_join(base_df): return [Row(*res)] - raise NotImplementedError( - f"[Local Testing] Mocking SnowflakePlan {type(source_plan).__name__} is not implemented." + analyzer.session._conn.log_not_supported_error( + external_feature_name=f"Mocking SnowflakePlan {type(source_plan).__name__}", + internal_feature_name=type(source_plan).__name__, + raise_error=NotImplementedError, ) @@ -1279,8 +1304,11 @@ def calculate_expression( return input_data[exp.name] if isinstance(exp, (UnresolvedAttribute, Attribute)): if exp.is_sql_text: - raise NotImplementedError( - "[Local Testing] SQL Text Expression is not supported." + analyzer.session._conn.log_not_supported_error( + external_feature_name="SQL Text Expression", + internal_feature_name=type(exp).__name__, + parameters_info={"exp.is_sql_text": str(exp.is_sql_text)}, + raise_error=NotImplementedError, ) try: return input_data[exp.name] @@ -1420,8 +1448,10 @@ def calculate_expression( elif isinstance(exp, BitwiseAnd): new_column = left & right else: - raise NotImplementedError( - f"[Local Testing] Binary expression {type(exp)} is not implemented." + analyzer.session._conn.log_not_supported_error( + external_feature_name=f"Binary Expression {type(exp).__name__}", + internal_feature_name=type(exp).__name__, + raise_error=NotImplementedError, ) return new_column if isinstance(exp, UnaryMinus): @@ -1476,8 +1506,11 @@ def _match_pattern(row) -> bool: elif isinstance(rhs, TableEmulator): res = res | lhs.isin(rhs.iloc[:, 0]) else: - raise NotImplementedError( - f"[Local Testing] IN expression does not support {type(rhs)} type on the right" + analyzer.session._conn.log_not_supported_error( + external_feature_name=f"IN expression with type {type(rhs).__name__} on the right", + internal_feature_name=type(exp).__name__, + parameters_info={"rhs": type(rhs).__name__}, + raise_error=NotImplementedError, ) else: exists = lhs.apply(tuple, 1).isin(rhs.apply(tuple, 1)) @@ -1539,8 +1572,11 @@ def _match_pattern(row) -> bool: elif isinstance(exp.to, VariantType): return _MOCK_FUNCTION_IMPLEMENTATION_MAP["to_variant"](column) else: - raise NotImplementedError( - f"[Local Testing] Cast to {exp.to} is not supported yet" + analyzer.session._conn.log_not_supported_error( + external_feature_name=f"Cast to {type(exp.to).__name__}", + internal_feature_name=type(exp).__name__, + parameters_info={"exp.to": type(exp.to).__name__}, + raise_error=NotImplementedError, ) if isinstance(exp, CaseWhen): remaining = input_data @@ -1643,8 +1679,17 @@ def _match_pattern(row) -> bool: lower = window_spec.frame_spec.lower if isinstance(upper, Literal) or isinstance(lower, Literal): - raise SnowparkSQLException( - "Range is not supported for sliding window frames." + analyzer.session._conn.log_not_supported_error( + external_feature_name="Range for sliding window frames", + internal_feature_name=type(exp).__name__, + parameters_info={ + "window_spec.frame_spec.frame_type": type( + window_spec.frame_spec.frame_type + ).__name__, + "upper": type(upper).__name__, + "lower": type(lower).__name__, + }, + raise_error=SnowparkSQLException, ) windows = handle_range_frame_indexing( @@ -1699,9 +1744,21 @@ def _match_pattern(row) -> bool: # the result calculated upon a windows can be None, this is still valid and we can keep # the calculation elif not isinstance(sub_window_res.sf_type.datatype, NullType): - raise SnowparkSQLException( - f"[Local Testing] Detected type {type(calculated_sf_type.datatype)} and type {type(sub_window_res.sf_type.datatype)}" - f" in column, coercion is not currently supported" + analyzer.session._conn.log_not_supported_error( + external_feature_name=f"Coercion of detected type" + f" {type(calculated_sf_type.datatype).__name__}" + f" and type {type(sub_window_res.sf_type.datatype).__name__}", + internal_feature_name=type(exp).__name__, + parameters_info={ + "window_function": type(window_function).__name__, + "sub_window_res.sf_type.datatype": str( + type(sub_window_res.sf_type.datatype).__name__ + ), + "calculated_sf_type.datatype": str( + type(calculated_sf_type.datatype).__name__ + ), + }, + raise_error=SnowparkSQLException, ) res_cols.append(sub_window_res.iloc[0]) elif not ignore_nulls or offset == 0: @@ -1731,9 +1788,21 @@ def _match_pattern(row) -> bool: # the result calculated upon a windows can be None, this is still valid and we can keep # the calculation elif not isinstance(sub_window_res.sf_type.datatype, NullType): - raise SnowparkSQLException( - f"[Local Testing] Detected type {type(calculated_sf_type.datatype)} and type {type(cur_windows_sf_type.datatype)}" - f" in column, coercion is not currently supported" + analyzer.session._conn.log_not_supported_error( + external_feature_name=f"Coercion of detected type" + f" {type(calculated_sf_type.datatype).__name__}" + f" and type {type(sub_window_res.sf_type.datatype).__name__}", + internal_feature_name=type(exp).__name__, + parameters_info={ + "window_function": type(window_function).__name__, + "sub_window_res.sf_type.datatype": type( + sub_window_res.sf_type.datatype + ).__name__, + "calculated_sf_type.datatype": type( + calculated_sf_type.datatype + ).__name__, + }, + raise_error=SnowparkSQLException, ) res_cols.append(sub_window_res.iloc[0]) else: @@ -1851,8 +1920,11 @@ def _match_pattern(row) -> bool: res_col.index = res_index return res_col.sort_index() else: - raise NotImplementedError( - f"[Local Testing] Window Function {window_function} is not implemented." + analyzer.session._conn.log_not_supported_error( + external_feature_name=f"Window Function {type(window_function).__name__}", + internal_feature_name=type(exp).__name__, + parameters_info={"window_function": type(window_function).__name__}, + raise_error=NotImplementedError, ) elif isinstance(exp, SubfieldString): col = calculate_expression(exp.child, input_data, analyzer, expr_to_alias) @@ -1874,8 +1946,11 @@ def _match_pattern(row) -> bool: res = col.apply(lambda x: None if x is None else x[exp.field]) res.set_sf_type(ColumnType(VariantType(), col.sf_type.nullable)) return res - raise NotImplementedError( - f"[Local Testing] Mocking Expression {type(exp).__name__} is not implemented." + + analyzer.session._conn.log_not_supported_error( + external_feature_name=f"Mocking Expression {type(exp).__name__}", + internal_feature_name=type(exp).__name__, + raise_error=NotImplementedError, ) @@ -1892,6 +1967,6 @@ def execute_file_operation(source_plan: MockFileOperation, analyzer: "MockAnalyz analyzer, source_plan.options, ) - raise NotImplementedError( - f"[Local Testing] File operation {source_plan.operator.value} is not implemented." + analyzer.session._conn.log_not_supported_error( + external_feature_name=f"File operation {source_plan.operator.value}" ) diff --git a/src/snowflake/snowpark/mock/_plan_builder.py b/src/snowflake/snowpark/mock/_plan_builder.py index 52c6cb9c8a9..a6cbdbcc1bd 100644 --- a/src/snowflake/snowpark/mock/_plan_builder.py +++ b/src/snowflake/snowpark/mock/_plan_builder.py @@ -8,12 +8,15 @@ from snowflake.snowpark._internal.analyzer.snowflake_plan import SnowflakePlanBuilder from snowflake.snowpark._internal.utils import is_single_quoted from snowflake.snowpark.mock._plan import MockExecutionPlan, MockFileOperation +from snowflake.snowpark.mock._telemetry import LocalTestOOBTelemetryService class MockSnowflakePlanBuilder(SnowflakePlanBuilder): def create_temp_table(self, *args, **kwargs): - raise NotImplementedError( - "[Local Testing] DataFrame.cache_result is currently not implemented." + LocalTestOOBTelemetryService.get_instance().log_not_supported_error( + external_feature_name="DataFrame.cache_result", + internal_feature_name="MockSnowflakePlanBuilder.create_temp_table", + raise_error=NotImplementedError, ) def read_file( @@ -28,8 +31,11 @@ def read_file( metadata_schema: Optional[List[Attribute]] = None, ) -> MockExecutionPlan: if format.upper() != "CSV": - raise NotImplementedError( - "[Local Testing] Reading non CSV data into dataframe is not currently supported." + LocalTestOOBTelemetryService.get_instance().log_not_supported_error( + external_feature_name=f"Reading {format} data into dataframe", + internal_feature_name="MockSnowflakePlanBuilder.read_file", + parameters_info={"format": str(format)}, + raise_error=NotImplementedError, ) return MockExecutionPlan( source_plan=MockFileOperation( @@ -47,11 +53,19 @@ def file_operation_plan( self, command: str, file_name: str, stage_location: str, options: Dict[str, str] ) -> MockExecutionPlan: if options.get("auto_compress", False): - raise NotImplementedError( - "[Local Testing] PUT with auto_compress=True is currently not supported." + LocalTestOOBTelemetryService.get_instance().log_not_supported_error( + external_feature_name="File operation PUT with auto_compress=True", + internal_feature_name="MockSnowflakePlanBuilder.file_operation_plan", + parameters_info={"auto_compress": "True", "command": str(command)}, + raise_error=NotImplementedError, ) if command == "get": - raise NotImplementedError("[Local Testing] GET is currently not supported.") + LocalTestOOBTelemetryService.get_instance().log_not_supported_error( + external_feature_name="File operation GET", + internal_feature_name="MockSnowflakePlanBuilder.file_operation_plan", + parameters_info={"command": str(command)}, + raise_error=NotImplementedError, + ) return MockExecutionPlan( source_plan=MockFileOperation( session=self.session, diff --git a/src/snowflake/snowpark/mock/_snowflake_data_type.py b/src/snowflake/snowpark/mock/_snowflake_data_type.py index 610e048a974..ca8292511cd 100644 --- a/src/snowflake/snowpark/mock/_snowflake_data_type.py +++ b/src/snowflake/snowpark/mock/_snowflake_data_type.py @@ -4,6 +4,7 @@ from typing import Dict, NamedTuple, Optional, Union from snowflake.connector.options import installed_pandas, pandas as pd +from snowflake.snowpark.mock._telemetry import LocalTestOOBTelemetryService from snowflake.snowpark.types import ( BooleanType, DataType, @@ -187,8 +188,11 @@ def calculate_type(c1: ColumnType, c2: Optional[ColumnType], op: Union[str]): result_type = normalize_output_sf_type(DecimalType(new_decimal, new_scale)) return ColumnType(result_type, nullable) else: - return NotImplementedError( - f"Type inference for operator {op} is implemented." + LocalTestOOBTelemetryService.get_instance().log_not_supported_error( + external_feature_name=f"Type inference for operator {op} is implemented.", + internal_feature_name="_snowflake_data_type.calculate_type", + parameters_info={"op": op}, + raise_error=NotImplementedError, ) elif isinstance(t1, (FloatType, DoubleType)) or isinstance( t2, (FloatType, DoubleType) diff --git a/src/snowflake/snowpark/mock/_telemetry.py b/src/snowflake/snowpark/mock/_telemetry.py new file mode 100644 index 00000000000..a7a632c44eb --- /dev/null +++ b/src/snowflake/snowpark/mock/_telemetry.py @@ -0,0 +1,206 @@ +# +# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved. +# +import atexit +import json +import logging +import os +import uuid +from datetime import datetime +from enum import Enum +from typing import Optional + +from snowflake.connector.compat import OK +from snowflake.connector.secret_detector import SecretDetector +from snowflake.connector.telemetry_oob import REQUEST_TIMEOUT, TelemetryService +from snowflake.connector.vendored import requests +from snowflake.snowpark._internal.utils import ( + get_os_name, + get_python_version, + get_version, +) + +logger = logging.getLogger(__name__) + +OS_VERSION = get_os_name() +PYTHON_VERSION = get_python_version() +SNOWPARK_PYTHON_VERSION = get_version() + +TELEMETRY_VALUE_SNOWPARK_EVENT_TYPE = "Snowpark Python" +TELEMETRY_KEY_TYPE = "Type" +TELEMETRY_KEY_LOWER_TYPE = "type" +TELEMETRY_KEY_UUID = "UUID" +TELEMETRY_KEY_CREATED_ON = "Created_on" +TELEMETRY_KEY_MESSAGE = "Message" +TELEMETRY_KEY_TAGS = "Tags" +TELEMETRY_KEY_PROPERTIES = "properties" +TELEMETRY_KEY_SNOWPARK_VERSION = "Snowpark_Version" +TELEMETRY_KEY_OS_VERSION = "OS_Version" +TELEMETRY_KEY_PYTHON_VERSION = "Python_Version" +TELEMETRY_KEY_EVENT_TYPE = "Event_type" +TELEMETRY_KEY_CONN_UUID = "Connection_UUID" +TELEMETRY_KEY_FEATURE_NAME = "feature_name" +TELEMETRY_KEY_PARAMETERS_INFO = "parameters_info" +TELEMETRY_KEY_ERROR_MESSAGE = "error_message" +TELEMETRY_KEY_IS_INTERNAL = "is_internal" + + +def generate_base_oob_telemetry_data_dict( + connection_uuid: Optional[str] = None, +) -> dict: + return { + TELEMETRY_KEY_TYPE: TELEMETRY_VALUE_SNOWPARK_EVENT_TYPE, + TELEMETRY_KEY_UUID: str(uuid.uuid4()), + TELEMETRY_KEY_CREATED_ON: str(datetime.now().strftime("%Y-%m-%d %H:%M:%S")), + TELEMETRY_KEY_MESSAGE: {TELEMETRY_KEY_CONN_UUID: connection_uuid}, + TELEMETRY_KEY_TAGS: { + TELEMETRY_KEY_SNOWPARK_VERSION: SNOWPARK_PYTHON_VERSION, + TELEMETRY_KEY_OS_VERSION: OS_VERSION, + TELEMETRY_KEY_PYTHON_VERSION: PYTHON_VERSION, + }, + TELEMETRY_KEY_PROPERTIES: { + TELEMETRY_KEY_LOWER_TYPE: TELEMETRY_VALUE_SNOWPARK_EVENT_TYPE + }, + } + + +class LocalTestTelemetryEventType(Enum): + UNSUPPORTED = "unsupported" + SUPPORTED = "supported" + SESSION_CONNECTION = "session" + + +class LocalTestOOBTelemetryService(TelemetryService): + PROD = "https://client-telemetry.c1.us-west-2.aws.app.snowflake.com/enqueue" + + def __init__(self) -> None: + super().__init__() + self._is_internal_usage = bool( + os.getenv("SNOWPARK_LOCAL_TESTING_INTERNAL_TELEMETRY", False) + ) + self._deployment_url = self.PROD + + def _upload_payload(self, payload) -> None: + success = True + response = None + try: + with requests.Session() as session: + response = session.post( + self._deployment_url, + data=payload, + timeout=REQUEST_TIMEOUT, + headers={"Content-type": "application/json"}, + ) + if ( + response.status_code == OK + and json.loads(response.text).get("statusCode", 0) == OK + ): + logger.debug( + "telemetry server request success: %d", response.status_code + ) + else: + logger.debug( + "telemetry server request error: %d", response.status_code + ) + success = False + except Exception as e: + logger.debug( + "Telemetry request failed, Exception response: %s, exception: %s", + response, + str(e), + ) + success = False + finally: + logger.debug("Telemetry request success=%s", success) + + def add(self, event) -> None: + """Adds a telemetry event to the queue.""" + if not self.enabled: + return + + self.queue.put(event) + if self.queue.qsize() > self.batch_size: + payload = self.export_queue_to_string() + if payload is None: + return + self._upload_payload(payload) + + def export_queue_to_string(self): + logs = list() + while not self.queue.empty(): + logs.append(self.queue.get()) + # We may get an exception trying to serialize a python object to JSON + try: + payload = json.dumps(logs) + except Exception: + logger.debug( + "Failed to generate a JSON dump from the passed in telemetry OOB events. String representation of " + "logs: %s " % str(logs), + exc_info=True, + ) + payload = None + _, masked_text, _ = SecretDetector.mask_secrets(payload) + return masked_text + + def log_session_creation(self, connection_uuid: Optional[str] = None): + try: + telemetry_data = generate_base_oob_telemetry_data_dict( + connection_uuid=connection_uuid + ) + telemetry_data[TELEMETRY_KEY_TAGS][ + TELEMETRY_KEY_EVENT_TYPE + ] = LocalTestTelemetryEventType.SESSION_CONNECTION.value + telemetry_data[TELEMETRY_KEY_MESSAGE][TELEMETRY_KEY_IS_INTERNAL] = ( + 1 if self._is_internal_usage else 0 + ) + self.add(telemetry_data) + except Exception: + logger.debug("Failed to log session creation", exc_info=True) + + def log_not_supported_error( + self, + external_feature_name: Optional[str] = None, + internal_feature_name: Optional[str] = None, + parameters_info: Optional[dict] = None, + error_message: Optional[str] = None, + connection_uuid: Optional[str] = None, + raise_error: Optional[type] = None, + warning_logger: Optional[logging.Logger] = None, + ): + if not external_feature_name and not error_message: + raise ValueError( + "At least one of external_feature_name or" + " error_message should be provided to raise user facing error" + ) + + error_message = f"[Local Testing] {error_message or f'{external_feature_name} is not supported.'}" + try: + telemetry_data = generate_base_oob_telemetry_data_dict( + connection_uuid=connection_uuid + ) + telemetry_data[TELEMETRY_KEY_TAGS][ + TELEMETRY_KEY_EVENT_TYPE + ] = LocalTestTelemetryEventType.UNSUPPORTED.value + telemetry_data[TELEMETRY_KEY_MESSAGE][TELEMETRY_KEY_IS_INTERNAL] = ( + 1 if self._is_internal_usage else 0 + ) + telemetry_data[TELEMETRY_KEY_MESSAGE][TELEMETRY_KEY_FEATURE_NAME] = ( + internal_feature_name or external_feature_name + ) + telemetry_data[TELEMETRY_KEY_MESSAGE][ + TELEMETRY_KEY_PARAMETERS_INFO + ] = parameters_info + self.add(telemetry_data) + except Exception: + logger.debug( + "[Local Testing] Failed to log not supported feature call", + exc_info=True, + ) + + if warning_logger: + warning_logger.warning(error_message) + if raise_error: + raise raise_error(error_message) + + +atexit.register(LocalTestOOBTelemetryService.get_instance().close) diff --git a/src/snowflake/snowpark/session.py b/src/snowflake/snowpark/session.py index 126b10ff03d..3442aea46c0 100644 --- a/src/snowflake/snowpark/session.py +++ b/src/snowflake/snowpark/session.py @@ -673,8 +673,9 @@ def add_import( :meth:`session.udf.register() `. """ if isinstance(self._conn, MockServerConnection): - raise NotImplementedError( - "[Local Testing] Stored procedures are not currently supported." + self._conn.log_not_supported_error( + external_feature_name="Session.add_import", + raise_error=NotImplementedError, ) path, checksum, leading_path = self._resolve_import_path( path, import_path, chunk_size, whole_file_hash @@ -951,8 +952,9 @@ def add_packages( and the Snowflake server. """ if isinstance(self._conn, MockServerConnection): - raise NotImplementedError( - "[Local Testing] Add session packages is not currently supported." + self._conn.log_not_supported_error( + external_feature_name="Session.add_packages", + raise_error=NotImplementedError, ) self._resolve_packages( parse_positional_args_to_list(*packages), @@ -1828,8 +1830,9 @@ def table_function( Generator functions are not supported with :meth:`Session.table_function`. """ if isinstance(self._conn, MockServerConnection): - raise NotImplementedError( - "[Local Testing] Table function is not currently supported." + self._conn.log_not_supported_error( + external_feature_name="Session.table_function", + raise_error=NotImplementedError, ) func_expr = _create_table_function_expression( func_name, *func_arguments, **func_named_arguments @@ -1900,8 +1903,9 @@ def generator( A new :class:`DataFrame` with data from calling the generator table function. """ if isinstance(self._conn, MockServerConnection): - raise NotImplementedError( - "[Local Testing] DataFrame.generator is currently not supported." + self._conn.log_not_supported_error( + external_feature_name="DataFrame.generator", + raise_error=NotImplementedError, ) if not columns: raise ValueError("Columns cannot be empty for generator table function") @@ -1960,8 +1964,9 @@ def sql(self, query: str, params: Optional[Sequence[Any]] = None) -> DataFrame: [Row(COLUMN1=1, COLUMN2='a'), Row(COLUMN1=2, COLUMN2='b')] """ if isinstance(self._conn, MockServerConnection): - raise NotImplementedError( - "[Local Testing] `Session.sql` is currently not supported." + self._conn.log_not_supported_error( + external_feature_name="Session.sql", + raise_error=NotImplementedError, ) if self.sql_simplifier_enabled: @@ -2564,8 +2569,9 @@ def create_async_job(self, query_id: str) -> AsyncJob: "Async query is not supported in stored procedure yet" ) if isinstance(self._conn, MockServerConnection): - raise NotImplementedError( - "[Local Testing] Async query is currently not supported." + self._conn.log_not_supported_error( + external_feature_name="Session.create_async_job", + raise_error=NotImplementedError, ) return AsyncJob(query_id, None, self) @@ -2731,7 +2737,9 @@ def udf(self) -> UDFRegistration: See details of how to use this object in :class:`udf.UDFRegistration`. """ if isinstance(self._conn, MockServerConnection): - raise NotImplementedError("[Local Testing] UDF is not currently supported.") + self._conn.log_not_supported_error( + external_feature_name="Session.udf", raise_error=NotImplementedError + ) return self._udf_registration @property @@ -2741,8 +2749,8 @@ def udtf(self) -> UDTFRegistration: See details of how to use this object in :class:`udtf.UDTFRegistration`. """ if isinstance(self._conn, MockServerConnection): - raise NotImplementedError( - "[Local Testing] UDTF is not currently supported." + self._conn.log_not_supported_error( + external_feature_name="Session.udtf", raise_error=NotImplementedError ) return self._udtf_registration @@ -2762,8 +2770,9 @@ def sproc(self) -> StoredProcedureRegistration: See details of how to use this object in :class:`stored_procedure.StoredProcedureRegistration`. """ if isinstance(self, MockServerConnection): - raise NotImplementedError( - "[Local Testing] Stored procedures are not currently supported." + self._conn.log_not_supported_error( + external_feature_name="Session.sproc", + raise_error=NotImplementedError, ) return self._sp_registration @@ -2960,7 +2969,9 @@ def flatten( - :meth:`Session.table_function`, which can be used for any Snowflake table functions, including ``flatten``. """ if isinstance(self._conn, MockServerConnection): - raise NotImplementedError("[Local Testing] flatten is not implemented.") + self._conn.log_not_supported_error( + external_feature_name="Session.flatten", raise_error=NotImplementedError + ) mode = mode.upper() if mode not in ("OBJECT", "ARRAY", "BOTH"): raise ValueError("mode must be one of ('OBJECT', 'ARRAY', 'BOTH')") diff --git a/tests/conftest.py b/tests/conftest.py index 87a90ca1176..7a00cd67a91 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,6 +4,7 @@ # import logging +import os from pathlib import Path import pytest @@ -49,6 +50,21 @@ def local_testing_mode(pytestconfig): return pytestconfig.getoption("local_testing_mode") +@pytest.fixture(scope="function") +def local_testing_telemetry_setup(): + # the import here is because we want LocalTestOOBTelemetryService to be initialized + # after pytest_sessionstart is setup so that it can detect os.environ["SNOWPARK_LOCAL_TESTING_INTERNAL_TELEMETRY"] + # and set internal usage to be true + from snowflake.snowpark.mock._telemetry import LocalTestOOBTelemetryService + + LocalTestOOBTelemetryService.get_instance().enable() + yield + LocalTestOOBTelemetryService.get_instance().disable() + + @pytest.fixture(scope="session") def cte_optimization_enabled(pytestconfig): return pytestconfig.getoption("enable_cte_optimization") + +def pytest_sessionstart(session): + os.environ["SNOWPARK_LOCAL_TESTING_INTERNAL_TELEMETRY"] = "1" diff --git a/tests/integ/conftest.py b/tests/integ/conftest.py index 65f559aeb4b..59cf576ccb2 100644 --- a/tests/integ/conftest.py +++ b/tests/integ/conftest.py @@ -154,7 +154,7 @@ def resources_path() -> str: @pytest.fixture(scope="session") def connection(db_parameters, local_testing_mode): if local_testing_mode: - yield MockServerConnection() + yield MockServerConnection(options={"disable_local_testing_telemetry": True}) else: _keys = [ "user", diff --git a/tests/mock/test_oob_telemetry.py b/tests/mock/test_oob_telemetry.py new file mode 100644 index 00000000000..906e79cbcc3 --- /dev/null +++ b/tests/mock/test_oob_telemetry.py @@ -0,0 +1,231 @@ +# +# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved. +# +import json +import logging +import re +import uuid +from datetime import datetime, timedelta + +import pytest + +from snowflake.snowpark._internal.utils import ( + get_os_name, + get_python_version, + get_version, +) +from snowflake.snowpark.mock._connection import MockServerConnection +from snowflake.snowpark.mock._telemetry import LocalTestOOBTelemetryService +from snowflake.snowpark.session import Session + + +def test_unit_oob_connection_telemetry(caplog, local_testing_telemetry_setup): + oob_service = LocalTestOOBTelemetryService.get_instance() + with caplog.at_level(logging.DEBUG, logger="snowflake.snowpark.mock._telemetry"): + connection_uuid = str(uuid.uuid4()) + oob_service.log_session_creation() + assert oob_service.size() == 1 + oob_service.log_session_creation(connection_uuid=connection_uuid) + assert oob_service.size() == 2 + payload = oob_service.export_queue_to_string() + unpacked_payload = json.loads(payload) + now_time = datetime.now().replace(microsecond=0) + + # test generated payload + for idx, event in enumerate(unpacked_payload): + assert len(event) == 6 + assert len(event["properties"]) == 1 + assert len(event["Message"]) == 2 + assert len(event["Tags"]) == 4 + assert event["Type"] == event["properties"]["type"] == "Snowpark Python" + assert event["UUID"] + assert datetime.strptime( + event["Created_on"], "%Y-%m-%d %H:%M:%S" + ) == pytest.approx(now_time, abs=timedelta(seconds=1)) + assert event["Message"]["Connection_UUID"] == ( + None if idx == 0 else connection_uuid + ) + assert event["Message"]["is_internal"] == 1 + assert event["Tags"]["Snowpark_Version"] == get_version() + assert event["Tags"]["OS_Version"] == get_os_name() + assert event["Tags"]["Python_Version"] == get_python_version() + assert event["Tags"]["Event_type"] == "session" + assert oob_service.size() == 0 + assert not caplog.record_tuples + + # test sending successfully + oob_service.log_session_creation() + oob_service.log_session_creation(connection_uuid=connection_uuid) + assert oob_service.size() == 2 + oob_service.flush() + assert oob_service.size() == 0 + assert len(caplog.record_tuples) == 2 + assert ( + "telemetry server request success: 200" in caplog.text + and "Telemetry request success=True" in caplog.text + ) + + +def test_unit_oob_log_not_implemented_error(caplog, local_testing_telemetry_setup): + oob_service = LocalTestOOBTelemetryService.get_instance() + connection_uuid = str(uuid.uuid4()) + logger = logging.getLogger("LocalTestLogger") + + unraise_feature_name = "Test Feature No Raise Error" + with caplog.at_level(logging.WARNING, logger="LocalTestLogger"): + # test basic case + warning + oob_service.log_not_supported_error( + external_feature_name=unraise_feature_name, + warning_logger=logger, + ) + assert ( + f"[Local Testing] {unraise_feature_name} is not supported." in caplog.text + ) + assert oob_service.size() == 1 + + # test raising error + raise_feature_name = "Test Feature With Raise Error" + with pytest.raises( + NotImplementedError, + match=re.escape(f"[Local Testing] {raise_feature_name} is not supported."), + ): + oob_service.log_not_supported_error( + external_feature_name=raise_feature_name, + internal_feature_name="module_a.function_b", + parameters_info={"param_c": "value_d"}, + connection_uuid=connection_uuid, + raise_error=NotImplementedError, + ) + assert oob_service.size() == 2 + payload = oob_service.export_queue_to_string() + unpacked_payload = json.loads(payload) + now_time = datetime.now().replace(microsecond=0) + + # test generated payload + for idx, event in enumerate(unpacked_payload): + assert len(event) == 6 + assert len(event["properties"]) == 1 + assert len(event["Message"]) == 4 + assert len(event["Tags"]) == 4 + assert event["Type"] == event["properties"]["type"] == "Snowpark Python" + assert event["UUID"] + assert datetime.strptime( + event["Created_on"], "%Y-%m-%d %H:%M:%S" + ) == pytest.approx(now_time, abs=timedelta(seconds=1)) + assert event["Message"]["Connection_UUID"] == ( + None if idx == 0 else connection_uuid + ) + assert event["Message"]["is_internal"] == 1 + assert event["Message"]["feature_name"] == ( + unraise_feature_name if idx == 0 else "module_a.function_b" + ) + assert event["Message"]["parameters_info"] == ( + None if idx == 0 else {"param_c": "value_d"} + ) + assert event["Tags"]["Snowpark_Version"] == get_version() + assert event["Tags"]["OS_Version"] == get_os_name() + assert event["Tags"]["Python_Version"] == get_python_version() + assert event["Tags"]["Event_type"] == "unsupported" + assert oob_service.size() == 0 + + # test sending successfully + caplog.clear() + with caplog.at_level(logging.DEBUG, logger="snowflake.snowpark.mock._telemetry"): + oob_service.log_not_supported_error( + external_feature_name=unraise_feature_name, + ) + try: + oob_service.log_not_supported_error( + external_feature_name=raise_feature_name, + internal_feature_name="module_a.function_b", + parameters_info={"param_c": "value_d"}, + connection_uuid=connection_uuid, + raise_error=NotImplementedError, + ) + except Exception: + pass + assert oob_service.size() == 2 + oob_service.flush() + assert oob_service.size() == 0 + assert len(caplog.record_tuples) == 2 + assert ( + "telemetry server request success: 200" in caplog.text + and "Telemetry request success=True" in caplog.text + ) + + # test sending empty raise error + with pytest.raises(ValueError): + assert oob_service.log_not_supported_error() + + +def test_unit_connection(caplog, local_testing_telemetry_setup): + conn = MockServerConnection() + # creating a mock connection will send connection telemetry + error_message = "Error Message" + external_feature_name = "Test Feature" + assert conn._oob_telemetry.get_instance().size() == 1 + with pytest.raises( + NotImplementedError, match=re.escape(error_message) + ), caplog.at_level(level=logging.DEBUG): + conn.log_not_supported_error( + external_feature_name=external_feature_name, + error_message=error_message, + internal_feature_name="module_a.function_b", + parameters_info={"param_c": "value_d"}, + raise_error=NotImplementedError, + ) + assert not caplog.record_tuples + conn._oob_telemetry.flush() + assert conn._oob_telemetry.get_instance().size() == 0 + + +def test_unit_connection_disable_telemetry(caplog, local_testing_telemetry_setup): + disabled_telemetry_conn = MockServerConnection( + options={"disable_local_testing_telemetry": True} + ) + # creating a mock connection will send connection telemetry, after disable, there should no telemetry event + assert disabled_telemetry_conn._oob_telemetry.get_instance().size() == 0 + + # test sending empty raise error + with pytest.raises(ValueError): + disabled_telemetry_conn.log_not_supported_error() + + # test error raised but no telemetry sent + error_message = "Error Message" + with pytest.raises(TypeError, match=re.escape(error_message)): + disabled_telemetry_conn.log_not_supported_error( + raise_error=TypeError, error_message=error_message + ) + assert disabled_telemetry_conn._oob_telemetry.get_instance().size() == 0 + + +def test_snowpark_telemetry(caplog, local_testing_telemetry_setup): + session = Session.builder.configs(options={"local_testing": True}).create() + assert session._conn._oob_telemetry.get_instance().size() == 1 + with pytest.raises( + NotImplementedError, + match=re.escape("[Local Testing] Session.sql is not supported."), + ): + session.sql("select 1") + assert session._conn._oob_telemetry.get_instance().size() == 2 + payload = session._conn._oob_telemetry.export_queue_to_string() + unpacked_payload = json.loads(payload) + assert unpacked_payload[0]["Tags"]["Event_type"] == "session" + assert unpacked_payload[1]["Tags"]["Event_type"] == "unsupported" + + # test sending successfully + with caplog.at_level(logging.DEBUG, logger="snowflake.snowpark.mock._telemetry"): + session = Session.builder.configs(options={"local_testing": True}).create() + assert session._conn._oob_telemetry.get_instance().size() == 1 + with pytest.raises( + NotImplementedError, + match=re.escape("[Local Testing] Session.sql is not supported."), + ): + session.sql("select 1") + + session._conn._oob_telemetry.flush() + assert session._conn._oob_telemetry.get_instance().size() == 0 + assert ( + "telemetry server request success: 200" in caplog.text + and "Telemetry request success=True" in caplog.text + ) diff --git a/tox.ini b/tox.ini index ce15f4635ae..e6263a3fae6 100644 --- a/tox.ini +++ b/tox.ini @@ -22,6 +22,7 @@ envlist = fix_lint, coverage nopandas skip_missing_interpreters = true +setenv = SNOWPARK_LOCAL_TESTING_INTERNAL_TELEMETRY=1 [testenv] allowlist_externals = bash