Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(llmobs): add sampling for ragas skeleton code #10719

Draft
wants to merge 8 commits into
base: evan.li/ragas-skeleton
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ddtrace/llmobs/_evaluators/ragas/faithfulness.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@


class RagasFaithfulnessEvaluator:
name = "ragas_faithfulness"
label = "ragas_faithfulness"
metric_type = "score"

@classmethod
Expand All @@ -17,5 +17,5 @@ def evaluate(cls, span):
"ml_app": config._llmobs_ml_app,
"timestamp_ms": math.floor(time.time() * 1000),
"metric_type": cls.metric_type,
"label": cls.name,
"label": cls.label,
}
25 changes: 18 additions & 7 deletions ddtrace/llmobs/_evaluators/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@
from concurrent import futures
import os
from typing import Dict
from typing import List
from typing import Tuple

from ddtrace import Span
from ddtrace.internal import forksafe
from ddtrace.internal.logger import get_logger
from ddtrace.internal.periodic import PeriodicService

from .ragas.faithfulness import RagasFaithfulnessEvaluator
from .sampler import EvaluatorSampler


logger = get_logger(__name__)
Expand All @@ -23,7 +27,7 @@ class EvaluatorRunner(PeriodicService):
def __init__(self, interval: float, _evaluation_metric_writer=None):
super(EvaluatorRunner, self).__init__(interval=interval)
self._lock = forksafe.RLock()
self._buffer = [] # type: list[Dict]
self._buffer = [] # type: List[Tuple[Dict, Span]]
self._buffer_limit = 1000

self._evaluation_metric_writer = _evaluation_metric_writer
Expand All @@ -38,6 +42,8 @@ def __init__(self, interval: float, _evaluation_metric_writer=None):
if evaluator in SUPPORTED_EVALUATORS:
self.evaluators.append(SUPPORTED_EVALUATORS[evaluator])

self.sampler = EvaluatorSampler()

def start(self, *args, **kwargs):
super(EvaluatorRunner, self).start()
logger.debug("started %r to %r", self.__class__.__name__)
Expand All @@ -46,35 +52,40 @@ def start(self, *args, **kwargs):
def on_shutdown(self):
self.executor.shutdown()

def enqueue(self, span_event: Dict) -> None:
def enqueue(self, span_event: Dict, span: Span) -> None:
with self._lock:
if len(self._buffer) >= self._buffer_limit:
logger.warning(
"%r event buffer full (limit is %d), dropping event", self.__class__.__name__, self._buffer_limit
)
return
self._buffer.append(span_event)
self._buffer.append((span_event, span))

def periodic(self) -> None:
with self._lock:
if not self._buffer:
return
events = self._buffer
span_batch = self._buffer # type: List[Tuple[Dict, Span]]
self._buffer = []

try:
evaluation_metrics = self.run(events)
evaluation_metrics = self.run(span_batch)
for metric in evaluation_metrics:
if metric is not None:
self._evaluation_metric_writer.enqueue(metric)
except RuntimeError as e:
logger.debug("failed to run evaluation: %s", e)

def run(self, spans):
def run(self, span_batch: List[Tuple[Dict, Span]]) -> List[Dict]:
batches_of_results = []

for evaluator in self.evaluators:
batches_of_results.append(self.executor.map(evaluator.evaluate, spans))
batches_of_results.append(
self.executor.map(
evaluator.evaluate,
[span_event for span_event, span in span_batch if self.sampler.sample(evaluator.label, span)],
)
)

results = []
for batch in batches_of_results:
Expand Down
84 changes: 84 additions & 0 deletions ddtrace/llmobs/_evaluators/sampler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import json
import os
from typing import List
from typing import Optional

from ddtrace import config
from ddtrace.internal.logger import get_logger

# from ddtrace.internal.rate_limiter import RateLimiter
from ddtrace.sampling_rule import SamplingRule


try:
from json.decoder import JSONDecodeError
except ImportError:
# handling python 2.X import error
JSONDecodeError = ValueError # type: ignore

logger = get_logger(__name__)


class EvaluatorSamplingRule(SamplingRule):
def __init__(self, sample_rate: float, evaluator: Optional[str] = None, span_name: Optional[str] = None):
super(EvaluatorSamplingRule, self).__init__(sample_rate)
self.evaluator_label = evaluator
self.span_name = span_name

def matches(self, span_event, evaluator_label):
for prop, pattern in [(span_event.get("name"), self.span_name), (evaluator_label, self.evaluator_label)]:
if prop == pattern:
return False
return True

def __repr__(self):
return "EvaluatorSamplingRule(sample_rate={}, evaluator_label={}, span_name={})".format(
self.sample_rate, self.evaluator_name, self.span_name
)

__str__ = __repr__


class EvaluatorSampler:
DEFAULT_SAMPLING_RATE = 1.0
SAMPLING_RULES_ENV_VAR = "_DD_LLMOBS_EVALUATOR_SAMPLING_RULES"

def __init__(self):
self.rules = self.parse_rules()
self.default_sampling_rule = SamplingRule(
float(os.getenv("_DD_LLMOBS_EVALUATOR_DEFAULT_SAMPLE_RATE", self.DEFAULT_SAMPLING_RATE))
)

def sample(self, evaluator_label, span):
for rule in self.rules:
if rule.matches(span, span.get("name")):
return rule.sample(evaluator_label, span)
result = self.default_sampling_rule.sample(span)
return result

def parse_rules(self) -> List[EvaluatorSamplingRule]:
rules = []
sampling_rules_str = os.getenv(self.SAMPLING_RULES_ENV_VAR)
if not sampling_rules_str:
return []
try:
json_rules = json.loads(sampling_rules_str)
except JSONDecodeError:
if config._raise:
raise ValueError("Unable to parse {}".format(self.SAMPLING_RULES_ENV_VAR))
logger.warning("Failed to parse evaluator sampling rules with error: ", exc_info=True)
return []
if not isinstance(json_rules, list):
if config._raise:
raise ValueError("Evaluator sampling rules must be a list of dictionaries")
return []
for rule in json_rules:
if "sample_rate" not in rule:
if config._raise:
raise KeyError("No sample_rate provided for sampling rule: {}".format(json.dumps(rule)))
continue
sample_rate = float(rule["sample_rate"])
name = rule.get("span_name", SamplingRule.NO_RULE)
evaluator_label = rule.get("evaluator_label", SamplingRule.NO_RULE)
rules.append(EvaluatorSamplingRule(sample_rate, evaluator_label, name))
return rules
2 changes: 1 addition & 1 deletion ddtrace/llmobs/_trace_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def submit_llmobs_span(self, span: Span) -> None:
if not span_event:
return
if self._evaluator_runner:
self._evaluator_runner.enqueue(span_event)
self._evaluator_runner.enqueue(span_event, span)

def _llmobs_span_event(self, span: Span) -> Dict[str, Any]:
"""Span event object structure."""
Expand Down
Loading
Loading