Skip to content

Commit

Permalink
SNOW-1805840: Augment telemetry with method_call_count (#2804)
Browse files Browse the repository at this point in the history
<!---
Please answer these questions before creating your pull request. Thanks!
--->

1. Which Jira issue is this PR addressing? Make sure that there is an
accompanying issue to your PR.

   <!---
   In this section, please add a Snowflake Jira issue number.

Note that if a corresponding GitHub issue exists, you should still
include
   the Snowflake Jira issue number. For example, for GitHub issue
#1400, you should
   add "SNOW-1335071" here.
    --->

   Fixes SNOW-1805840

2. Fill out the following pre-review checklist:

- [x] I am adding a new automated test(s) to verify correctness of my
new code
- [ ] If this test skips Local Testing mode, I'm requesting review from
@snowflakedb/local-testing
   - [ ] I am adding new logging messages
   - [ ] I am adding a new telemetry message
   - [ ] I am adding new credentials
   - [ ] I am adding a new dependency
- [ ] If this is a new feature/behavior, I'm adding the Local Testing
parity changes.
- [x] I acknowledge that I have ensured my changes to be thread-safe.
Follow the link for more information: [Thread-safe Developer
Guidelines](https://github.com/snowflakedb/snowpark-python/blob/main/CONTRIBUTING.md#thread-safe-development)

3. Please describe how your code solves the related issue.

Adding method_call_count which is the # of times a pandas API method has
been called.
See more info in the interchange protocol design doc here:
https://docs.google.com/document/d/1EfqQwejVbF5_36hnOP-ap0t3NaCWmDz62iAcR0PtX20/edit?tab=t.0#heading=h.4uu48icmuq7z

---------

Signed-off-by: Labanya Mukhopadhyay <[email protected]>
  • Loading branch information
sfc-gh-lmukhopadhyay authored Jan 14, 2025
1 parent 9ab8318 commit 80d722e
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 2 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@
- Updated integration testing for `session.lineage.trace` to exclude deleted objects
- Added documentation for `DataFrame.map`.
- Improve performance of `DataFrame.apply` by mapping numpy functions to snowpark functions if possible.
- Added documentation on the extent of Snowpark pandas interoperability with scikit-learn
- Added documentation on the extent of Snowpark pandas interoperability with scikit-learn.
- Infer return type of functions in `Series.map`, `Series.apply` and `DataFrame.map` if type-hint is not provided.
- Added `call_count` to telemetry that counts method calls including interchange protocol calls.

## 1.26.0 (2024-12-05)

Expand Down
17 changes: 17 additions & 0 deletions src/snowflake/snowpark/modin/plugin/_internal/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class SnowparkPandasTelemetryField(Enum):
ARGS = "argument"
# fallback flag
IS_FALLBACK = "is_fallback"
# number of times a method has been called on the same query compiler
CALL_COUNT = "call_count"


# Argument truncating size after converted to str. Size amount can be later specified after analysis and needs.
Expand All @@ -59,6 +61,7 @@ def _send_snowpark_pandas_telemetry_helper(
func_name: str,
query_history: Optional[QueryHistory],
api_calls: Union[str, list[dict[str, Any]]],
method_call_count: str,
) -> None:
"""
A helper function that sends Snowpark pandas API telemetry data.
Expand All @@ -72,6 +75,7 @@ def _send_snowpark_pandas_telemetry_helper(
query_history: The query history context manager to record queries that are pushed down to the Snowflake
database in the session.
api_calls: Optional list of Snowpark pandas API calls made during the function execution.
method_call_count: Number of times a method has been called.
Returns:
None
Expand All @@ -80,6 +84,11 @@ def _send_snowpark_pandas_telemetry_helper(
TelemetryField.KEY_FUNC_NAME.value: func_name,
TelemetryField.KEY_CATEGORY.value: SnowparkPandasTelemetryField.FUNC_CATEGORY_SNOWPARK_PANDAS.value,
TelemetryField.KEY_ERROR_MSG.value: error_msg,
**(
{SnowparkPandasTelemetryField.CALL_COUNT.value: method_call_count}
if method_call_count is not None
else {}
),
}
if len(api_calls) > 0:
data[TelemetryField.KEY_API_CALLS.value] = api_calls
Expand Down Expand Up @@ -275,6 +284,7 @@ def _telemetry_helper(
# Moving existing api call out first can avoid to generate duplicates.
existing_api_calls = []
need_to_restore_args0_api_calls = False
method_call_count = None

# If the decorated func is a class method or a standalone function, we need to get an active session:
if is_standalone_function or (len(args) > 0 and isinstance(args[0], type)):
Expand All @@ -296,6 +306,11 @@ def _telemetry_helper(
need_to_restore_args0_api_calls = True
session = args[0]._query_compiler._modin_frame.ordered_dataframe.session
class_prefix = args[0].__class__.__name__
func_name = _gen_func_name(
class_prefix, func, property_name, property_method_type
)
args[0]._query_compiler._method_call_counts[func_name] += 1
method_call_count = args[0]._query_compiler._method_call_counts[func_name]
except (TypeError, IndexError, AttributeError):
# TypeError: args might not support indexing; IndexError: args is empty; AttributeError: args[0] might not
# have _query_compiler attribute.
Expand Down Expand Up @@ -338,6 +353,7 @@ def _telemetry_helper(
func_name=func_name,
query_history=query_history,
api_calls=existing_api_calls + [curr_api_call],
method_call_count=method_call_count,
)
raise e

Expand Down Expand Up @@ -372,6 +388,7 @@ def _telemetry_helper(
func_name=func_name,
query_history=query_history,
api_calls=existing_api_calls + [curr_api_call],
method_call_count=method_call_count,
)
if need_to_restore_args0_api_calls:
args[0]._query_compiler.snowpark_pandas_api_calls = existing_api_calls
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import json
import logging
import re
from collections import Counter
import typing
import uuid
from collections.abc import Hashable, Iterable, Mapping, Sequence
Expand Down Expand Up @@ -531,9 +532,11 @@ def __init__(self, frame: InternalFrame) -> None:
), "frame is None or not a InternalFrame"
self._modin_frame = frame
# self.snowpark_pandas_api_calls a list of lazy Snowpark pandas telemetry api calls
# Copying and modifying self.snowpark_pandas_api_calls is taken care of in telemetry decorators
# Copying and modifying self.snowpark_pandas_api_calls and self._method_call_counts
# is taken care of in telemetry decorators
self.snowpark_pandas_api_calls: list = []
self._attrs: dict[Any, Any] = {}
self._method_call_counts: Counter[str] = Counter[str]()

def _raise_not_implemented_error_for_timedelta(
self, frame: InternalFrame = None
Expand Down
115 changes: 115 additions & 0 deletions tests/integ/modin/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ def test_snowpark_pandas_telemetry_method_decorator(test_table_name):
"sfqids",
"func_name",
"error_msg",
"call_count",
}
assert data["category"] == "snowpark_pandas"
assert data["api_calls"] == df1_expected_api_calls + [
Expand Down Expand Up @@ -179,6 +180,7 @@ def test_send_snowpark_pandas_telemetry_helper(send_mock):
func_name="test_send_func",
query_history=None,
api_calls=[],
method_call_count=None,
)
send_mock.assert_called_with(
{
Expand Down Expand Up @@ -560,6 +562,119 @@ def test_telemetry_repr():
]


@sql_count_checker(query_count=6, join_count=4)
def test_telemetry_interchange_call_count():
s = pd.DataFrame([1, 2, 3, 4])
t = pd.DataFrame([5])
s.__dataframe__()
s.__dataframe__()
t.__dataframe__()

s.iloc[0, 0] = 7
s.__dataframe__()
s.__dataframe__()
t.__dataframe__()

def _get_data(call):
try:
return call.to_dict()["message"][TelemetryField.KEY_DATA.value]
except Exception:
return None

telemetry_data = [
_get_data(call)
for call in pd.session._conn._telemetry_client.telemetry._log_batch
if _get_data(call) is not None
and "func_name" in _get_data(call)
and _get_data(call)["func_name"] == "DataFrame.__dataframe__"
]
assert len(telemetry_data) == 6
# s calls __dataframe__() for the first time.
assert telemetry_data[0]["call_count"] == 1
# s calls __dataframe__() for the second time.
assert telemetry_data[1]["call_count"] == 2
# t calls __dataframe__() for the first time.
assert telemetry_data[2]["call_count"] == 1
# the new version of s calls __dataframe__() for the first time.
assert telemetry_data[3]["call_count"] == 1
# the new version of s calls __dataframe__() for the second time.
assert telemetry_data[4]["call_count"] == 2
# t calls __dataframe__() for the second time.
assert telemetry_data[5]["call_count"] == 2


@sql_count_checker(query_count=4)
def test_telemetry_func_call_count():
s = pd.DataFrame([1, 2, np.nan, 4])
t = pd.DataFrame([5])

s.__repr__()
s.__repr__()
s.__repr__()

t.__repr__()

def _get_data(call):
try:
return call.to_dict()["message"][TelemetryField.KEY_DATA.value]
except Exception:
return None

telemetry_data = [
_get_data(call)
for call in pd.session._conn._telemetry_client.telemetry._log_batch
if _get_data(call) is not None
and "func_name" in _get_data(call)
and _get_data(call)["func_name"] == "DataFrame.__repr__"
]

# second to last call from telemetry data
# s called __repr__() 3 times.
assert telemetry_data[-2]["call_count"] == 3

# last call from telemetry data
# t called __repr__() 1 time.
assert telemetry_data[-1]["call_count"] == 1


@sql_count_checker(query_count=3)
def test_telemetry_multiple_func_call_count():
s = pd.DataFrame([1, 2, np.nan, 4])

s.__repr__()
s.__repr__()
s.__dataframe__()

def _get_data(call):
try:
return call.to_dict()["message"][TelemetryField.KEY_DATA.value]
except Exception:
return None

repr_telemetry_data = [
_get_data(call)
for call in pd.session._conn._telemetry_client.telemetry._log_batch
if _get_data(call) is not None
and "func_name" in _get_data(call)
and _get_data(call)["func_name"] == "DataFrame.__repr__"
]
dataframe_telemetry_data = [
_get_data(call)
for call in pd.session._conn._telemetry_client.telemetry._log_batch
if _get_data(call) is not None
and "func_name" in _get_data(call)
and _get_data(call)["func_name"] == "DataFrame.__dataframe__"
]

# last call from telemetry data
# s called __repr__() 2 times.
assert repr_telemetry_data[-1]["call_count"] == 2

# last call from telemetry data
# s called __dataframe__() 2 times.
assert dataframe_telemetry_data[-1]["call_count"] == 1


@sql_count_checker(query_count=0)
def test_telemetry_copy():
# copy() is defined in upstream Modin's BasePandasDataset class, and not overridden by any
Expand Down
3 changes: 3 additions & 0 deletions tests/unit/modin/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def snowpark_pandas_error_test_helper(
query_history=ANY,
telemetry_type=telemetry_type,
error_msg=error_msg,
method_call_count=ANY,
)


Expand Down Expand Up @@ -116,6 +117,7 @@ def raise_real_type_error(_):
query_history=ANY,
telemetry_type="snowpark_pandas_type_error",
error_msg=None,
method_call_count=ANY,
)
assert len(mock_arg2._query_compiler.snowpark_pandas_api_calls) == 0

Expand All @@ -134,6 +136,7 @@ def raise_real_type_error(_):
query_history=ANY,
telemetry_type="snowpark_pandas_type_error",
error_msg=None,
method_call_count=ANY,
)


Expand Down

0 comments on commit 80d722e

Please sign in to comment.