forked from explodinggradients/ragas
-
Notifications
You must be signed in to change notification settings - Fork 0
/
_analytics.py
134 lines (101 loc) · 3.89 KB
/
_analytics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
from __future__ import annotations
import json
import logging
import os
import typing as t
import uuid
from functools import lru_cache, wraps
import requests
from appdirs import user_data_dir
from langchain_core.pydantic_v1 import BaseModel, Field
from ragas.utils import get_debug_mode
if t.TYPE_CHECKING:
P = t.ParamSpec("P")
T = t.TypeVar("T")
AsyncFunc = t.Callable[P, t.Coroutine[t.Any, t.Any, t.Any]]
logger = logging.getLogger(__name__)
USAGE_TRACKING_URL = "https://t.explodinggradients.com"
USAGE_REQUESTS_TIMEOUT_SEC = 1
USER_DATA_DIR_NAME = "ragas"
# Any chance you chance this also change the variable in our ci.yaml file
RAGAS_DO_NOT_TRACK = "RAGAS_DO_NOT_TRACK"
RAGAS_DEBUG_TRACKING = "__RAGAS_DEBUG_TRACKING"
@lru_cache(maxsize=1)
def do_not_track() -> bool: # pragma: no cover
# Returns True if and only if the environment variable is defined and has value True
# The function is cached for better performance.
return os.environ.get(RAGAS_DO_NOT_TRACK, str(False)).lower() == "true"
@lru_cache(maxsize=1)
def _usage_event_debugging() -> bool:
# For Ragas developers only - debug and print event payload if turned on
return os.environ.get(RAGAS_DEBUG_TRACKING, str(False)).lower() == "true"
def silent(func: t.Callable[P, T]) -> t.Callable[P, T]: # pragma: no cover
# Silent errors when tracking
@wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> t.Any:
try:
return func(*args, **kwargs)
except Exception as err: # pylint: disable=broad-except
if _usage_event_debugging():
if get_debug_mode():
logger.error(
"Tracking Error: %s", err, stack_info=True, stacklevel=3
)
raise err
else:
logger.info("Tracking Error: %s", err)
else:
logger.debug("Tracking Error: %s", err)
return wrapper
@lru_cache(maxsize=1)
@silent
def get_userid() -> str:
user_id_path = user_data_dir(appname=USER_DATA_DIR_NAME)
uuid_filepath = os.path.join(user_id_path, "uuid.json")
if os.path.exists(uuid_filepath):
user_id = json.load(open(uuid_filepath))["userid"]
else:
user_id = "a-" + uuid.uuid4().hex
os.makedirs(user_id_path)
with open(uuid_filepath, "w") as f:
json.dump({"userid": user_id}, f)
return user_id
class BaseEvent(BaseModel):
event_type: str
user_id: str = Field(default_factory=get_userid)
class EvaluationEvent(BaseEvent):
metrics: t.List[str]
evaluation_mode: str
num_rows: int
language: str
in_ci: bool
class TestsetGenerationEvent(BaseEvent):
evolution_names: t.List[str]
evolution_percentages: t.List[float]
num_rows: int
language: str
is_experiment: bool = False
@silent
def track(event_properties: BaseEvent):
if do_not_track():
return
payload = dict(event_properties)
if _usage_event_debugging():
# For internal debugging purpose
logger.info("Tracking Payload: %s", payload)
return
requests.post(USAGE_TRACKING_URL, json=payload, timeout=USAGE_REQUESTS_TIMEOUT_SEC)
class IsCompleteEvent(BaseEvent):
is_completed: bool = True # True if the event was completed, False otherwise
@silent
def track_was_completed(func: t.Callable[P, T]) -> t.Callable[P, T]: # pragma: no cover
"""
Track if the function was completed. This helps us understand failure cases and improve the user experience. Disable tracking by setting the environment variable RAGAS_DO_NOT_TRACK to True as usual.
"""
@wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> t.Any:
track(IsCompleteEvent(event_type=func.__name__, is_completed=False))
result = func(*args, **kwargs)
track(IsCompleteEvent(event_type=func.__name__, is_completed=True))
return result
return wrapper