Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1805840: Augment telemetry with method_call_count #2804

Merged
merged 18 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
9810e24
SNOW-1805840: add method_call_count and interchange_call_count to tel…
sfc-gh-lmukhopadhyay Dec 20, 2024
d6d1a6e
Merge branch 'main' into lmukhopadhyay-SNOW-1805840-telem-call-count
sfc-gh-lmukhopadhyay Dec 20, 2024
733f4b0
update changelog
sfc-gh-lmukhopadhyay Dec 20, 2024
ed2bd25
fix telem tests
sfc-gh-lmukhopadhyay Dec 20, 2024
0801060
fix unit telem tests
sfc-gh-lmukhopadhyay Jan 4, 2025
13c3f5e
Merge branch 'main' into lmukhopadhyay-SNOW-1805840-telem-call-count
sfc-gh-lmukhopadhyay Jan 4, 2025
69a2cf2
fix telem unit error test
sfc-gh-lmukhopadhyay Jan 4, 2025
cccc1e2
remove interchange call count and address comments
sfc-gh-lmukhopadhyay Jan 6, 2025
857e249
Merge branch 'main' into lmukhopadhyay-SNOW-1805840-telem-call-count
sfc-gh-lmukhopadhyay Jan 6, 2025
e04b091
update changelog
sfc-gh-lmukhopadhyay Jan 6, 2025
05204b0
add test for multiple funcs on same qc
sfc-gh-lmukhopadhyay Jan 7, 2025
b38e304
address comments
sfc-gh-lmukhopadhyay Jan 8, 2025
2509c2e
Merge branch 'main' into lmukhopadhyay-SNOW-1805840-telem-call-count
sfc-gh-lmukhopadhyay Jan 8, 2025
7062bd0
resolve conf
sfc-gh-lmukhopadhyay Jan 8, 2025
b3caf57
Merge branch 'lmukhopadhyay-SNOW-1805840-telem-call-count' of github.…
sfc-gh-lmukhopadhyay Jan 8, 2025
4cb5589
Merge branch 'main' into lmukhopadhyay-SNOW-1805840-telem-call-count
sfc-gh-lmukhopadhyay Jan 9, 2025
f157851
Merge branch 'main' into lmukhopadhyay-SNOW-1805840-telem-call-count
sfc-gh-lmukhopadhyay Jan 13, 2025
19282ee
Merge branch 'main' into lmukhopadhyay-SNOW-1805840-telem-call-count
sfc-gh-lmukhopadhyay Jan 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@
- Updated integration testing for `session.lineage.trace` to exclude deleted objects
- Added documentation for `DataFrame.map`.
- Improve performance of `DataFrame.apply` by mapping numpy functions to snowpark functions if possible.
- Added documentation on the extent of Snowpark pandas interoperability with scikit-learn
- Added documentation on the extent of Snowpark pandas interoperability with scikit-learn.
- Added `call_count` to telemetry that counts method calls including interchange protocol calls.

## 1.26.0 (2024-12-05)

Expand Down
17 changes: 17 additions & 0 deletions src/snowflake/snowpark/modin/plugin/_internal/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class SnowparkPandasTelemetryField(Enum):
ARGS = "argument"
# fallback flag
IS_FALLBACK = "is_fallback"
# number of times a method has been called on the same QC
sfc-gh-lmukhopadhyay marked this conversation as resolved.
Show resolved Hide resolved
CALL_COUNT = "call_count"
sfc-gh-mvashishtha marked this conversation as resolved.
Show resolved Hide resolved


# Argument truncating size after converted to str. Size amount can be later specified after analysis and needs.
Expand All @@ -58,6 +60,7 @@ def _send_snowpark_pandas_telemetry_helper(
func_name: str,
query_history: Optional[QueryHistory],
api_calls: Union[str, list[dict[str, Any]]],
method_call_count: str,
) -> None:
"""
A helper function that sends Snowpark pandas API telemetry data.
Expand All @@ -71,6 +74,7 @@ def _send_snowpark_pandas_telemetry_helper(
query_history: The query history context manager to record queries that are pushed down to the Snowflake
database in the session.
api_calls: Optional list of Snowpark pandas API calls made during the function execution.
method_call_count: Number of times a method has been called.

Returns:
None
Expand All @@ -79,6 +83,11 @@ def _send_snowpark_pandas_telemetry_helper(
TelemetryField.KEY_FUNC_NAME.value: func_name,
TelemetryField.KEY_CATEGORY.value: SnowparkPandasTelemetryField.FUNC_CATEGORY_SNOWPARK_PANDAS.value,
TelemetryField.KEY_ERROR_MSG.value: error_msg,
**(
{SnowparkPandasTelemetryField.CALL_COUNT.value: method_call_count}
if method_call_count is not None
else {}
),
}
if len(api_calls) > 0:
data[TelemetryField.KEY_API_CALLS.value] = api_calls
Expand Down Expand Up @@ -274,6 +283,7 @@ def _telemetry_helper(
# Moving existing api call out first can avoid to generate duplicates.
existing_api_calls = []
need_to_restore_args0_api_calls = False
method_call_count = None

# If the decorated func is a class method or a standalone function, we need to get an active session:
if is_standalone_function or (len(args) > 0 and isinstance(args[0], type)):
Expand All @@ -295,6 +305,11 @@ def _telemetry_helper(
need_to_restore_args0_api_calls = True
session = args[0]._query_compiler._modin_frame.ordered_dataframe.session
class_prefix = args[0].__class__.__name__
func_name = _gen_func_name(
class_prefix, func, property_name, property_method_type
)
args[0]._query_compiler._method_call_counts[func_name] += 1
method_call_count = args[0]._query_compiler._method_call_counts[func_name]
except (TypeError, IndexError, AttributeError):
# TypeError: args might not support indexing; IndexError: args is empty; AttributeError: args[0] might not
# have _query_compiler attribute.
Expand Down Expand Up @@ -337,6 +352,7 @@ def _telemetry_helper(
func_name=func_name,
query_history=query_history,
api_calls=existing_api_calls + [curr_api_call],
method_call_count=method_call_count,
)
raise e

Expand Down Expand Up @@ -371,6 +387,7 @@ def _telemetry_helper(
func_name=func_name,
query_history=query_history,
api_calls=existing_api_calls + [curr_api_call],
method_call_count=method_call_count,
)
if need_to_restore_args0_api_calls:
args[0]._query_compiler.snowpark_pandas_api_calls = existing_api_calls
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import json
import logging
import re
from collections import Counter
import typing
import uuid
from collections.abc import Hashable, Iterable, Mapping, Sequence
Expand Down Expand Up @@ -530,9 +531,11 @@ def __init__(self, frame: InternalFrame) -> None:
), "frame is None or not a InternalFrame"
self._modin_frame = frame
# self.snowpark_pandas_api_calls a list of lazy Snowpark pandas telemetry api calls
# Copying and modifying self.snowpark_pandas_api_calls is taken care of in telemetry decorators
# Copying and modifying self.snowpark_pandas_api_calls and self._method_call_counts
# is taken care of in telemetry decorators
self.snowpark_pandas_api_calls: list = []
self._attrs: dict[Any, Any] = {}
self._method_call_counts: Counter[str] = Counter[str]()

def _raise_not_implemented_error_for_timedelta(
self, frame: InternalFrame = None
Expand Down
117 changes: 117 additions & 0 deletions tests/integ/modin/test_telemetry.py
sfc-gh-lmukhopadhyay marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ def test_snowpark_pandas_telemetry_method_decorator(test_table_name):
"sfqids",
"func_name",
"error_msg",
"call_count",
}
assert data["category"] == "snowpark_pandas"
assert data["api_calls"] == df1_expected_api_calls + [
Expand Down Expand Up @@ -178,6 +179,7 @@ def test_send_snowpark_pandas_telemetry_helper(send_mock):
func_name="test_send_func",
query_history=None,
api_calls=[],
method_call_count=None,
)
send_mock.assert_called_with(
{
Expand Down Expand Up @@ -559,6 +561,121 @@ def test_telemetry_repr():
]


@sql_count_checker(query_count=6, join_count=4)
def test_telemetry_interchange_call_count():
s = pd.DataFrame([1, 2, 3, 4])
t = pd.DataFrame([5])
s.__dataframe__()
s.__dataframe__()
t.__dataframe__()

s.iloc[0, 0] = 7
s.__dataframe__()
s.__dataframe__()
t.__dataframe__()

def _get_data(call):
try:
return call.to_dict()["message"][TelemetryField.KEY_DATA.value]
except Exception:
return None

telemetry_data = [
_get_data(call)
for call in pd.session._conn._telemetry_client.telemetry._log_batch
if _get_data(call) is not None
and "func_name" in _get_data(call)
and _get_data(call)["func_name"] == "DataFrame.__dataframe__"
]
assert len(telemetry_data) == 6
# s calls __dataframe__() for the first time.
assert telemetry_data[0]["call_count"] == 1
# s calls __dataframe__() for the second time.
sfc-gh-lmukhopadhyay marked this conversation as resolved.
Show resolved Hide resolved
assert telemetry_data[1]["call_count"] == 2
# t calls __dataframe__() for the first time.
assert telemetry_data[2]["call_count"] == 1
# the new version of s calls __dataframe__() for the first time.
assert telemetry_data[3]["call_count"] == 1
# the new version of s calls __dataframe__() for the second time.
assert telemetry_data[4]["call_count"] == 2
# t calls __dataframe__() for the second time.
assert telemetry_data[5]["call_count"] == 2


@sql_count_checker(query_count=4)
def test_telemetry_func_call_count():
s = pd.DataFrame([1, 2, np.nan, 4])
t = pd.DataFrame([5])

s.__repr__()
s.__repr__()
s.__repr__()

t.__repr__()

def _get_data(call):
try:
return call.to_dict()["message"][TelemetryField.KEY_DATA.value]
except Exception:
return None

telemetry_data = [
_get_data(call)
for call in pd.session._conn._telemetry_client.telemetry._log_batch
if _get_data(call) is not None
and "func_name" in _get_data(call)
and _get_data(call)["func_name"] == "DataFrame.__repr__"
]

# second to last call from telemetry data
sfc-gh-mvashishtha marked this conversation as resolved.
Show resolved Hide resolved
# s called __repr__() 3 times.
assert telemetry_data[-2]["call_count"] == 3

# last call from telemetry data
sfc-gh-mvashishtha marked this conversation as resolved.
Show resolved Hide resolved
# t called __repr__() 1 time.
assert telemetry_data[-1]["call_count"] == 1


@sql_count_checker(query_count=3)
def test_telemetry_multiple_func_call_count():
s = pd.DataFrame([1, 2, np.nan, 4])

s.__repr__()
s.__repr__()
s.__dataframe__()

def _get_data(call):
try:
return call.to_dict()["message"][TelemetryField.KEY_DATA.value]
except Exception:
return None

repr_telemetry_data = [
_get_data(call)
for call in pd.session._conn._telemetry_client.telemetry._log_batch
if _get_data(call) is not None
and "func_name" in _get_data(call)
and _get_data(call)["func_name"] == "DataFrame.__repr__"
]
dataframe_telemetry_data = [
_get_data(call)
for call in pd.session._conn._telemetry_client.telemetry._log_batch
if _get_data(call) is not None
and "func_name" in _get_data(call)
and _get_data(call)["func_name"] == "DataFrame.__dataframe__"
]

# last call from telemetry data
# s called __repr__() 2 times.
print(repr_telemetry_data)
print(dataframe_telemetry_data)
sfc-gh-lmukhopadhyay marked this conversation as resolved.
Show resolved Hide resolved
assert repr_telemetry_data[-1]["call_count"] == 2

# last call from telemetry data
# s called __dataframe__() 2 times.
assert dataframe_telemetry_data[-1]["call_count"] == 1


@sql_count_checker(query_count=0)
def test_telemetry_copy():
# copy() is defined in upstream Modin's BasePandasDataset class, and not overridden by any
Expand Down
3 changes: 3 additions & 0 deletions tests/unit/modin/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def snowpark_pandas_error_test_helper(
query_history=ANY,
telemetry_type=telemetry_type,
error_msg=error_msg,
method_call_count=ANY,
)


Expand Down Expand Up @@ -115,6 +116,7 @@ def raise_real_type_error(_):
query_history=ANY,
telemetry_type="snowpark_pandas_type_error",
error_msg=None,
method_call_count=ANY,
)
assert len(mock_arg2._query_compiler.snowpark_pandas_api_calls) == 0

Expand All @@ -133,6 +135,7 @@ def raise_real_type_error(_):
query_history=ANY,
telemetry_type="snowpark_pandas_type_error",
error_msg=None,
method_call_count=ANY,
)


Expand Down
Loading