diff --git a/ddtrace/llmobs/_evaluators/runner.py b/ddtrace/llmobs/_evaluators/runner.py index 02fd2939dd7..7f08b258f62 100644 --- a/ddtrace/llmobs/_evaluators/runner.py +++ b/ddtrace/llmobs/_evaluators/runner.py @@ -3,12 +3,13 @@ import os from typing import Dict +from ddtrace import Span from ddtrace.internal import forksafe from ddtrace.internal import service from ddtrace.internal.logger import get_logger from ddtrace.internal.periodic import PeriodicService - -from .ragas.faithfulness import RagasFaithfulnessEvaluator +from ddtrace.llmobs._evaluators.ragas.faithfulness import RagasFaithfulnessEvaluator +from ddtrace.llmobs._evaluators.sampler import EvaluatorRunnerSampler logger = get_logger(__name__) @@ -28,11 +29,12 @@ class EvaluatorRunner(PeriodicService): def __init__(self, interval: float, llmobs_service=None, evaluators=None): super(EvaluatorRunner, self).__init__(interval=interval) self._lock = forksafe.RLock() - self._buffer = [] # type: list[Dict] + self._buffer = [] # type: list[tuple[Dict, Span]] self._buffer_limit = 1000 self.llmobs_service = llmobs_service self.executor = futures.ThreadPoolExecutor() + self.sampler = EvaluatorRunnerSampler() self.evaluators = [] if evaluators is None else evaluators if len(self.evaluators) > 0: @@ -70,27 +72,34 @@ def recreate(self) -> "EvaluatorRunner": def on_shutdown(self): self.executor.shutdown() - def enqueue(self, span_event: Dict) -> None: + def enqueue(self, span_event: Dict, span: Span) -> None: with self._lock: if len(self._buffer) >= self._buffer_limit: logger.warning( "%r event buffer full (limit is %d), dropping event", self.__class__.__name__, self._buffer_limit ) return - self._buffer.append(span_event) + self._buffer.append((span_event, span)) def periodic(self) -> None: with self._lock: if not self._buffer: return - events = self._buffer + span_events_and_spans = self._buffer # type: list[tuple[Dict, Span]] self._buffer = [] try: - self.run(events) + self.run(span_events_and_spans) except RuntimeError as e: logger.debug("failed to run evaluation: %s", e) - def run(self, spans): + def run(self, span_events_and_spans): for evaluator in self.evaluators: - self.executor.map(lambda span: evaluator.run_and_submit_evaluation(span), spans) + self.executor.map( + lambda span: evaluator.run_and_submit_evaluation(span), + [ + span_event + for span_event, span in span_events_and_spans + if self.sampler.sample(evaluator.LABEL, span) + ], + ) diff --git a/ddtrace/llmobs/_evaluators/sampler.py b/ddtrace/llmobs/_evaluators/sampler.py new file mode 100644 index 00000000000..a3298603656 --- /dev/null +++ b/ddtrace/llmobs/_evaluators/sampler.py @@ -0,0 +1,94 @@ +import json +from json.decoder import JSONDecodeError +import os +from typing import List +from typing import Optional +from typing import Union + +from ddtrace import config +from ddtrace.internal.logger import get_logger +from ddtrace.sampling_rule import SamplingRule + + +logger = get_logger(__name__) + + +class EvaluatorRunnerSamplingRule(SamplingRule): + SAMPLE_RATE_KEY = "sample_rate" + EVALUATOR_LABEL_KEY = "evaluator_label" + SPAN_NAME_KEY = "span_name" + + def __init__( + self, + sample_rate: float, + evaluator_label: Optional[Union[str, object]] = None, + span_name: Optional[object] = None, + ): + super(EvaluatorRunnerSamplingRule, self).__init__(sample_rate) + self.evaluator_label = evaluator_label + self.span_name = span_name + + def matches(self, evaluator_label, span_name): + for prop, pattern in [(span_name, self.span_name), (evaluator_label, self.evaluator_label)]: + if pattern != self.NO_RULE and prop != pattern: + return False + return True + + def __repr__(self): + return "EvaluatorRunnerSamplingRule(sample_rate={}, evaluator_label={}, span_name={})".format( + self.sample_rate, self.evaluator_label, self.span_name + ) + + __str__ = __repr__ + + +class EvaluatorRunnerSampler: + SAMPLING_RULES_ENV_VAR = "_DD_LLMOBS_EVALUATOR_SAMPLING_RULES" + + def __init__(self): + self.rules = self.parse_rules() + + def sample(self, evaluator_label, span): + for rule in self.rules: + if rule.matches(evaluator_label=evaluator_label, span_name=span.name): + return rule.sample(span) + return True + + def parse_rules(self) -> List[EvaluatorRunnerSamplingRule]: + rules = [] + sampling_rules_str = os.getenv(self.SAMPLING_RULES_ENV_VAR) + + def parsing_failed_because(msg, maybe_throw_this): + if config._raise: + raise maybe_throw_this(msg) + logger.warning(msg, exc_info=True) + + if not sampling_rules_str: + return [] + try: + json_rules = json.loads(sampling_rules_str) + except JSONDecodeError: + parsing_failed_because( + "Failed to parse evaluator sampling rules of: `{}`".format(sampling_rules_str), ValueError + ) + return [] + + if not isinstance(json_rules, list): + parsing_failed_because("Evaluator sampling rules must be a list of dictionaries", ValueError) + return [] + + for rule in json_rules: + if "sample_rate" not in rule: + parsing_failed_because( + "No sample_rate provided for sampling rule: {}".format(json.dumps(rule)), KeyError + ) + continue + try: + sample_rate = float(rule[EvaluatorRunnerSamplingRule.SAMPLE_RATE_KEY]) + except ValueError: + parsing_failed_because("sample_rate is not a float for rule: {}".format(json.dumps(rule)), KeyError) + continue + span_name = rule.get(EvaluatorRunnerSamplingRule.SPAN_NAME_KEY, SamplingRule.NO_RULE) + evaluator_label = rule.get(EvaluatorRunnerSamplingRule.EVALUATOR_LABEL_KEY, SamplingRule.NO_RULE) + rules.append(EvaluatorRunnerSamplingRule(sample_rate, evaluator_label, span_name)) + return rules diff --git a/ddtrace/llmobs/_trace_processor.py b/ddtrace/llmobs/_trace_processor.py index d0a7c28f2c3..beca9684c6f 100644 --- a/ddtrace/llmobs/_trace_processor.py +++ b/ddtrace/llmobs/_trace_processor.py @@ -68,7 +68,7 @@ def submit_llmobs_span(self, span: Span) -> None: if not span_event: return if self._evaluator_runner: - self._evaluator_runner.enqueue(span_event) + self._evaluator_runner.enqueue(span_event, span) def _llmobs_span_event(self, span: Span) -> Dict[str, Any]: """Span event object structure.""" diff --git a/tests/llmobs/_utils.py b/tests/llmobs/_utils.py index 308e420ddff..f29f9781721 100644 --- a/tests/llmobs/_utils.py +++ b/tests/llmobs/_utils.py @@ -438,3 +438,18 @@ def _oversized_retrieval_event(): }, "metrics": {"input_tokens": 64, "output_tokens": 128, "total_tokens": 192}, } + + +class DummyEvaluator: + LABEL = "dummy" + + def __init__(self, llmobs_service): + self.llmobs_service = llmobs_service + + def run_and_submit_evaluation(self, span): + self.llmobs_service.submit_evaluation( + span_context=span, + label=DummyEvaluator.LABEL, + value=1.0, + metric_type="score", + ) diff --git a/tests/llmobs/conftest.py b/tests/llmobs/conftest.py index dd331c726d4..fbe38232cd5 100644 --- a/tests/llmobs/conftest.py +++ b/tests/llmobs/conftest.py @@ -95,6 +95,12 @@ def mock_evaluator_logs(): yield m +@pytest.fixture +def mock_evaluator_sampler_logs(): + with mock.patch("ddtrace.llmobs._evaluators.sampler.logger") as m: + yield m + + @pytest.fixture def mock_http_writer_logs(): with mock.patch("ddtrace.internal.writer.writer.log") as m: diff --git a/tests/llmobs/llmobs_cassettes/tests.llmobs.test_llmobs_evaluator_runner.send_score_metric.yaml b/tests/llmobs/llmobs_cassettes/tests.llmobs.test_llmobs_evaluator_runner.send_score_metric.yaml new file mode 100644 index 00000000000..d715994c439 --- /dev/null +++ b/tests/llmobs/llmobs_cassettes/tests.llmobs.test_llmobs_evaluator_runner.send_score_metric.yaml @@ -0,0 +1,37 @@ +interactions: +- request: + body: '{"data": {"type": "evaluation_metric", "attributes": {"metrics": [{"span_id": + "123", "trace_id": "1234", "label": "dummy", "metric_type": "score", "timestamp_ms": + 1728480443772, "score_value": 1.0, "ml_app": "unnamed-ml-app", "tags": ["ddtrace.version:2.14.0.dev196+g7cf7989ab", + "ml_app:unnamed-ml-app"]}]}}}' + headers: + Content-Type: + - application/json + DD-API-KEY: + - XXXXXX + method: POST + uri: https://api.datad0g.com/api/intake/llm-obs/v1/eval-metric + response: + body: + string: '{"data":{"id":"ccf36d1a-6153-4042-ba2d-a5ec5896a6ac","type":"evaluation_metric","attributes":{"metrics":[{"id":"bYp4oTawxz","trace_id":"1234","span_id":"123","timestamp_ms":1728480443772,"ml_app":"unnamed-ml-app","metric_type":"score","label":"dummy","score_value":1,"tags":["ddtrace.version:2.14.0.dev196+g7cf7989ab","ml_app:unnamed-ml-app"]}]}}}' + headers: + content-length: + - '347' + content-security-policy: + - frame-ancestors 'self'; report-uri https://logs.browser-intake-datadoghq.com/api/v2/logs?dd-api-key=pub293163a918901030b79492fe1ab424cf&dd-evp-origin=content-security-policy&ddsource=csp-report&ddtags=site%3Adatad0g.com + content-type: + - application/vnd.api+json + date: + - Wed, 09 Oct 2024 13:27:25 GMT + strict-transport-security: + - max-age=31536000; includeSubDomains; preload + vary: + - Accept-Encoding + x-content-type-options: + - nosniff + x-frame-options: + - SAMEORIGIN + status: + code: 202 + message: Accepted +version: 1 diff --git a/tests/llmobs/test_llmobs_evaluator_runner.py b/tests/llmobs/test_llmobs_evaluator_runner.py index 88f336db99e..a846914b3ac 100644 --- a/tests/llmobs/test_llmobs_evaluator_runner.py +++ b/tests/llmobs/test_llmobs_evaluator_runner.py @@ -1,11 +1,21 @@ +import json +import os import time import mock import pytest import ddtrace +from ddtrace._trace.span import Span from ddtrace.llmobs._evaluators.runner import EvaluatorRunner +from ddtrace.llmobs._evaluators.sampler import EvaluatorRunnerSampler +from ddtrace.llmobs._evaluators.sampler import EvaluatorRunnerSamplingRule from ddtrace.llmobs._writer import LLMObsEvaluationMetricEvent +from tests.utils import override_env +from tests.utils import override_global_config + + +DUMMY_SPAN = Span("dummy_span") def _dummy_ragas_eval_metric_event(span_id, trace_id): @@ -31,7 +41,7 @@ def test_evaluator_runner_start(mock_evaluator_logs, mock_ragas_evaluator): def test_evaluator_runner_buffer_limit(mock_evaluator_logs): evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=mock.MagicMock()) for _ in range(1001): - evaluator_runner.enqueue({}) + evaluator_runner.enqueue({}, DUMMY_SPAN) mock_evaluator_logs.warning.assert_called_with( "%r event buffer full (limit is %d), dropping event", "EvaluatorRunner", 1000 ) @@ -40,7 +50,7 @@ def test_evaluator_runner_buffer_limit(mock_evaluator_logs): def test_evaluator_runner_periodic_enqueues_eval_metric(LLMObs, mock_llmobs_eval_metric_writer, mock_ragas_evaluator): evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=LLMObs) evaluator_runner.evaluators.append(mock_ragas_evaluator(llmobs_service=LLMObs)) - evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}) + evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}, DUMMY_SPAN) evaluator_runner.periodic() mock_llmobs_eval_metric_writer.enqueue.assert_called_once_with( _dummy_ragas_eval_metric_event(span_id="123", trace_id="1234") @@ -53,7 +63,7 @@ def test_evaluator_runner_timed_enqueues_eval_metric(LLMObs, mock_llmobs_eval_me evaluator_runner.evaluators.append(mock_ragas_evaluator(llmobs_service=LLMObs)) evaluator_runner.start() - evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}) + evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}, DUMMY_SPAN) time.sleep(0.1) @@ -63,37 +73,197 @@ def test_evaluator_runner_timed_enqueues_eval_metric(LLMObs, mock_llmobs_eval_me def test_evaluator_runner_on_exit(mock_writer_logs, run_python_code_in_subprocess): + env = os.environ.copy() + pypath = [os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))] + if "PYTHONPATH" in env: + pypath.append(env["PYTHONPATH"]) + env.update( + { + "DD_API_KEY": "dummy-api-key", + "DD_SITE": "datad0g.com", + "PYTHONPATH": ":".join(pypath), + "DD_LLMOBS_ML_APP": "unnamed-ml-app", + "_DD_LLMOBS_WRITER_INTERVAL": "0.01", + } + ) out, err, status, pid = run_python_code_in_subprocess( """ import os import time -import mock - -from ddtrace.internal.utils.http import Response +import atexit from ddtrace.llmobs import LLMObs from ddtrace.llmobs._evaluators.runner import EvaluatorRunner -from ddtrace.llmobs._evaluators.ragas.faithfulness import RagasFaithfulnessEvaluator - -with mock.patch( - "ddtrace.llmobs._evaluators.runner.EvaluatorRunner.periodic", - return_value=Response( - status=200, - body="{}", - ), -): - LLMObs.enable( - site="datad0g.com", - api_key=os.getenv("DD_API_KEY"), - ml_app="unnamed-ml-app", - ) - evaluator_runner = EvaluatorRunner( - interval=0.01, llmobs_service=LLMObs - ) - evaluator_runner.evaluators.append(RagasFaithfulnessEvaluator(llmobs_service=LLMObs)) - evaluator_runner.start() - evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}) +from tests.llmobs._utils import logs_vcr +from tests.llmobs._utils import DummyEvaluator + +ctx = logs_vcr.use_cassette("tests.llmobs.test_llmobs_evaluator_runner.send_score_metric.yaml") +ctx.__enter__() +atexit.register(lambda: ctx.__exit__()) +LLMObs.enable() +evaluator_runner = EvaluatorRunner( + interval=0.01, llmobs_service=LLMObs +) +evaluator_runner.evaluators.append(DummyEvaluator(llmobs_service=LLMObs)) +evaluator_runner.start() +evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}, None) +evaluator_runner.periodic() """, + env=env, ) assert status == 0, err assert out == b"" assert err == b"" + + +def test_evaluator_runner_sampler_single_rule(monkeypatch): + monkeypatch.setenv( + EvaluatorRunnerSampler.SAMPLING_RULES_ENV_VAR, + json.dumps([{"sample_rate": 0.5, "evaluator_label": "ragas_faithfulness", "span_name": "dummy_span"}]), + ) + sampling_rules = EvaluatorRunnerSampler().rules + assert len(sampling_rules) == 1 + assert sampling_rules[0].sample_rate == 0.5 + assert sampling_rules[0].evaluator_label == "ragas_faithfulness" + assert sampling_rules[0].span_name == "dummy_span" + + +def test_evaluator_runner_sampler_multiple_rules(monkeypatch): + monkeypatch.setenv( + EvaluatorRunnerSampler.SAMPLING_RULES_ENV_VAR, + json.dumps( + [ + {"sample_rate": 0.5, "evaluator_label": "ragas_faithfulness", "span_name": "dummy_span"}, + {"sample_rate": 0.2, "evaluator_label": "ragas_faithfulness", "span_name": "dummy_span_2"}, + ] + ), + ) + sampling_rules = EvaluatorRunnerSampler().rules + assert len(sampling_rules) == 2 + assert sampling_rules[0].sample_rate == 0.5 + assert sampling_rules[0].evaluator_label == "ragas_faithfulness" + assert sampling_rules[0].span_name == "dummy_span" + + assert sampling_rules[1].sample_rate == 0.2 + assert sampling_rules[1].evaluator_label == "ragas_faithfulness" + assert sampling_rules[1].span_name == "dummy_span_2" + + +def test_evaluator_runner_sampler_no_rule_label_or_name(monkeypatch): + monkeypatch.setenv( + EvaluatorRunnerSampler.SAMPLING_RULES_ENV_VAR, + json.dumps([{"sample_rate": 0.5}]), + ) + sampling_rules = EvaluatorRunnerSampler().rules + assert len(sampling_rules) == 1 + assert sampling_rules[0].sample_rate == 0.5 + assert sampling_rules[0].evaluator_label == EvaluatorRunnerSamplingRule.NO_RULE + assert sampling_rules[0].span_name == EvaluatorRunnerSamplingRule.NO_RULE + + +def test_evaluator_sampler_invalid_json(monkeypatch, mock_evaluator_sampler_logs): + monkeypatch.setenv( + EvaluatorRunnerSampler.SAMPLING_RULES_ENV_VAR, + "not a json", + ) + + with override_global_config({"_raise": True}): + with pytest.raises(ValueError): + EvaluatorRunnerSampler().rules + + with override_global_config({"_raise": False}): + sampling_rules = EvaluatorRunnerSampler().rules + assert len(sampling_rules) == 0 + mock_evaluator_sampler_logs.warning.assert_called_once_with( + "Failed to parse evaluator sampling rules of: `not a json`", exc_info=True + ) + + +def test_evaluator_sampler_invalid_rule_not_a_list(monkeypatch, mock_evaluator_sampler_logs): + monkeypatch.setenv( + EvaluatorRunnerSampler.SAMPLING_RULES_ENV_VAR, + json.dumps({"sample_rate": 0.5, "evaluator_label": "ragas_faithfulness", "span_name": "dummy_span"}), + ) + + with override_global_config({"_raise": True}): + with pytest.raises(ValueError): + EvaluatorRunnerSampler().rules + + with override_global_config({"_raise": False}): + sampling_rules = EvaluatorRunnerSampler().rules + assert len(sampling_rules) == 0 + mock_evaluator_sampler_logs.warning.assert_called_once_with( + "Evaluator sampling rules must be a list of dictionaries", exc_info=True + ) + + +def test_evaluator_sampler_invalid_rule_missing_sample_rate(monkeypatch, mock_evaluator_sampler_logs): + monkeypatch.setenv( + EvaluatorRunnerSampler.SAMPLING_RULES_ENV_VAR, + json.dumps([{"sample_rate": 0.1, "span_name": "dummy"}, {"span_name": "dummy2"}]), + ) + + with override_global_config({"_raise": True}): + with pytest.raises(KeyError): + EvaluatorRunnerSampler().rules + + with override_global_config({"_raise": False}): + sampling_rules = EvaluatorRunnerSampler().rules + assert len(sampling_rules) == 1 + mock_evaluator_sampler_logs.warning.assert_called_once_with( + 'No sample_rate provided for sampling rule: {"span_name": "dummy2"}', exc_info=True + ) + + +def test_evaluator_runner_sampler_no_rules_samples_all(monkeypatch): + iterations = int(1e4) + + sampled = sum(EvaluatorRunnerSampler().sample("ragas_faithfulness", Span(name=str(i))) for i in range(iterations)) + + deviation = abs(sampled - (iterations)) / (iterations) + assert deviation < 0.05 + + +def test_evaluator_sampling_rule_matches(monkeypatch): + sample_rate = 0.5 + span_name_rule = "dummy_span" + evaluator_label_rule = "ragas_faithfulness" + + for rule in [ + {"evaluator_label": evaluator_label_rule}, + {"evaluator_label": evaluator_label_rule, "span_name": span_name_rule}, + {"span_name": span_name_rule}, + ]: + rule["sample_rate"] = sample_rate + with override_env({EvaluatorRunnerSampler.SAMPLING_RULES_ENV_VAR: json.dumps([rule])}): + iterations = int(1e4 / sample_rate) + sampled = sum( + EvaluatorRunnerSampler().sample(evaluator_label_rule, Span(name=span_name_rule)) + for i in range(iterations) + ) + + deviation = abs(sampled - (iterations * sample_rate)) / (iterations * sample_rate) + assert deviation < 0.05 + + +def test_evaluator_sampling_does_not_match_samples_all(monkeypatch): + sample_rate = 0.5 + span_name_rule = "dummy_span" + evaluator_label_rule = "ragas_faithfulness" + + for rule in [ + {"evaluator_label": evaluator_label_rule}, + {"evaluator_label": evaluator_label_rule, "span_name": span_name_rule}, + {"span_name": span_name_rule}, + ]: + rule["sample_rate"] = sample_rate + with override_env({EvaluatorRunnerSampler.SAMPLING_RULES_ENV_VAR: json.dumps([rule])}): + iterations = int(1e4 / sample_rate) + + label_and_span = "not a matching label", Span(name="not matching span name") + + assert EvaluatorRunnerSampler().rules[0].matches(*label_and_span) is False + + sampled = sum(EvaluatorRunnerSampler().sample(*label_and_span) for i in range(iterations)) + + deviation = abs(sampled - (iterations)) / (iterations) + assert deviation < 0.05 diff --git a/tests/llmobs/test_llmobs_service.py b/tests/llmobs/test_llmobs_service.py index 61f7f72b4f2..2672c8a8921 100644 --- a/tests/llmobs/test_llmobs_service.py +++ b/tests/llmobs/test_llmobs_service.py @@ -1567,10 +1567,10 @@ def test_llmobs_fork_evaluator_runner_run(monkeypatch): llmobs_service.enable(_tracer=DummyTracer(), ml_app="test_app", api_key="test_api_key") pid = os.fork() if pid: # parent - llmobs_service._instance._evaluator_runner.enqueue({"span_id": "123", "trace_id": "456"}) + llmobs_service._instance._evaluator_runner.enqueue({"span_id": "123", "trace_id": "456"}, None) assert len(llmobs_service._instance._evaluator_runner._buffer) == 1 else: # child - llmobs_service._instance._evaluator_runner.enqueue({"span_id": "123", "trace_id": "456"}) + llmobs_service._instance._evaluator_runner.enqueue({"span_id": "123", "trace_id": "456"}, None) assert len(llmobs_service._instance._evaluator_runner._buffer) == 1 llmobs_service.disable() os._exit(12)