diff --git a/ddtrace/llmobs/_evaluators/ragas/faithfulness.py b/ddtrace/llmobs/_evaluators/ragas/faithfulness.py new file mode 100644 index 00000000000..d25eddf8bb8 --- /dev/null +++ b/ddtrace/llmobs/_evaluators/ragas/faithfulness.py @@ -0,0 +1,36 @@ +import math +import time +from typing import Optional + + +class RagasFaithfulnessEvaluator: + """A class used by EvaluatorRunner to conduct ragas faithfulness evaluations + on LLM Observability span events. The job of an Evaluator is to take a span and + submit evaluation metrics based on the span's attributes. + """ + + LABEL = "ragas_faithfulness" + METRIC_TYPE = "score" + + def __init__(self, llmobs_service): + self.llmobs_service = llmobs_service + + def run_and_submit_evaluation(self, span: dict) -> None: + if not span: + return + score_result = self.evaluate(span) + if score_result: + self.llmobs_service.submit_evaluation( + span_context=span, + label=RagasFaithfulnessEvaluator.LABEL, + metric_type=RagasFaithfulnessEvaluator.METRIC_TYPE, + value=score_result, + timestamp_ms=math.floor(time.time() * 1000), + ) + + def evaluate(self, span: dict) -> Optional[float]: + """placeholder function""" + span_id, trace_id = span.get("span_id"), span.get("trace_id") + if not span_id or not trace_id: + return None + return 1.0 diff --git a/ddtrace/llmobs/_evaluators/runner.py b/ddtrace/llmobs/_evaluators/runner.py new file mode 100644 index 00000000000..02fd2939dd7 --- /dev/null +++ b/ddtrace/llmobs/_evaluators/runner.py @@ -0,0 +1,96 @@ +import atexit +from concurrent import futures +import os +from typing import Dict + +from ddtrace.internal import forksafe +from ddtrace.internal import service +from ddtrace.internal.logger import get_logger +from ddtrace.internal.periodic import PeriodicService + +from .ragas.faithfulness import RagasFaithfulnessEvaluator + + +logger = get_logger(__name__) + +SUPPORTED_EVALUATORS = { + "ragas_faithfulness": RagasFaithfulnessEvaluator, +} + + +class EvaluatorRunner(PeriodicService): + """Base class for evaluating LLM Observability span events + This class + 1. parses active evaluators from the environment and initializes these evaluators + 2. triggers evaluator runs over buffered finished spans on each `periodic` call + """ + + def __init__(self, interval: float, llmobs_service=None, evaluators=None): + super(EvaluatorRunner, self).__init__(interval=interval) + self._lock = forksafe.RLock() + self._buffer = [] # type: list[Dict] + self._buffer_limit = 1000 + + self.llmobs_service = llmobs_service + self.executor = futures.ThreadPoolExecutor() + self.evaluators = [] if evaluators is None else evaluators + + if len(self.evaluators) > 0: + return + + evaluator_str = os.getenv("_DD_LLMOBS_EVALUATORS") + if evaluator_str is None: + return + + evaluators = evaluator_str.split(",") + for evaluator in evaluators: + if evaluator in SUPPORTED_EVALUATORS: + self.evaluators.append(SUPPORTED_EVALUATORS[evaluator](llmobs_service=llmobs_service)) + + def start(self, *args, **kwargs): + if not self.evaluators: + logger.debug("no evaluators configured, not starting %r", self.__class__.__name__) + return + super(EvaluatorRunner, self).start() + logger.debug("started %r to %r", self.__class__.__name__) + atexit.register(self.on_shutdown) + + def stop(self, *args, **kwargs): + if self.status == service.ServiceStatus.STOPPED: + return + super(EvaluatorRunner, self).stop() + + def recreate(self) -> "EvaluatorRunner": + return self.__class__( + interval=self._interval, + llmobs_service=self.llmobs_service, + evaluators=self.evaluators, + ) + + def on_shutdown(self): + self.executor.shutdown() + + def enqueue(self, span_event: Dict) -> None: + with self._lock: + if len(self._buffer) >= self._buffer_limit: + logger.warning( + "%r event buffer full (limit is %d), dropping event", self.__class__.__name__, self._buffer_limit + ) + return + self._buffer.append(span_event) + + def periodic(self) -> None: + with self._lock: + if not self._buffer: + return + events = self._buffer + self._buffer = [] + + try: + self.run(events) + except RuntimeError as e: + logger.debug("failed to run evaluation: %s", e) + + def run(self, spans): + for evaluator in self.evaluators: + self.executor.map(lambda span: evaluator.run_and_submit_evaluation(span), spans) diff --git a/ddtrace/llmobs/_llmobs.py b/ddtrace/llmobs/_llmobs.py index 5f24d932c58..bbd1ba5cffa 100644 --- a/ddtrace/llmobs/_llmobs.py +++ b/ddtrace/llmobs/_llmobs.py @@ -40,6 +40,7 @@ from ddtrace.llmobs._constants import SPAN_KIND from ddtrace.llmobs._constants import SPAN_START_WHILE_DISABLED_WARNING from ddtrace.llmobs._constants import TAGS +from ddtrace.llmobs._evaluators.runner import EvaluatorRunner from ddtrace.llmobs._trace_processor import LLMObsTraceProcessor from ddtrace.llmobs._utils import AnnotationContext from ddtrace.llmobs._utils import _get_llmobs_parent_id @@ -89,13 +90,21 @@ def __init__(self, tracer=None): interval=float(os.getenv("_DD_LLMOBS_WRITER_INTERVAL", 1.0)), timeout=float(os.getenv("_DD_LLMOBS_WRITER_TIMEOUT", 5.0)), ) - self._trace_processor = LLMObsTraceProcessor(self._llmobs_span_writer) + + self._evaluator_runner = EvaluatorRunner( + interval=float(os.getenv("_DD_LLMOBS_EVALUATOR_INTERVAL", 1.0)), + llmobs_service=self, + ) + + self._trace_processor = LLMObsTraceProcessor(self._llmobs_span_writer, self._evaluator_runner) forksafe.register(self._child_after_fork) def _child_after_fork(self): self._llmobs_span_writer = self._llmobs_span_writer.recreate() self._llmobs_eval_metric_writer = self._llmobs_eval_metric_writer.recreate() + self._evaluator_runner = self._evaluator_runner.recreate() self._trace_processor._span_writer = self._llmobs_span_writer + self._trace_processor._evaluator_runner = self._evaluator_runner tracer_filters = self.tracer._filters if not any(isinstance(tracer_filter, LLMObsTraceProcessor) for tracer_filter in tracer_filters): tracer_filters += [self._trace_processor] @@ -106,6 +115,11 @@ def _child_after_fork(self): except ServiceStatusError: log.debug("Error starting LLMObs writers after fork") + try: + self._evaluator_runner.start() + except ServiceStatusError: + log.debug("Error starting evaluator runner after fork") + def _start_service(self) -> None: tracer_filters = self.tracer._filters if not any(isinstance(tracer_filter, LLMObsTraceProcessor) for tracer_filter in tracer_filters): @@ -117,6 +131,11 @@ def _start_service(self) -> None: except ServiceStatusError: log.debug("Error starting LLMObs writers") + try: + self._evaluator_runner.start() + except ServiceStatusError: + log.debug("Error starting evaluator runner") + def _stop_service(self) -> None: try: self._llmobs_span_writer.stop() @@ -124,6 +143,11 @@ def _stop_service(self) -> None: except ServiceStatusError: log.debug("Error stopping LLMObs writers") + try: + self._evaluator_runner.stop() + except ServiceStatusError: + log.debug("Error stopping evaluator runner") + try: forksafe.unregister(self._child_after_fork) self.tracer.shutdown() diff --git a/ddtrace/llmobs/_trace_processor.py b/ddtrace/llmobs/_trace_processor.py index 5a654a8fb96..3273d418b41 100644 --- a/ddtrace/llmobs/_trace_processor.py +++ b/ddtrace/llmobs/_trace_processor.py @@ -44,8 +44,9 @@ class LLMObsTraceProcessor(TraceProcessor): Processor that extracts LLM-type spans in a trace to submit as separate LLMObs span events to LLM Observability. """ - def __init__(self, llmobs_span_writer): + def __init__(self, llmobs_span_writer, evaluator_runner=None): self._span_writer = llmobs_span_writer + self._evaluator_runner = evaluator_runner def process_trace(self, trace: List[Span]) -> Optional[List[Span]]: if not trace: @@ -57,11 +58,17 @@ def process_trace(self, trace: List[Span]) -> Optional[List[Span]]: def submit_llmobs_span(self, span: Span) -> None: """Generate and submit an LLMObs span event to be sent to LLMObs.""" + span_event = None try: span_event = self._llmobs_span_event(span) self._span_writer.enqueue(span_event) except (KeyError, TypeError): log.error("Error generating LLMObs span event for span %s, likely due to malformed span", span) + finally: + if not span_event: + return + if self._evaluator_runner: + self._evaluator_runner.enqueue(span_event) def _llmobs_span_event(self, span: Span) -> Dict[str, Any]: """Span event object structure.""" diff --git a/tests/llmobs/conftest.py b/tests/llmobs/conftest.py index 66c16ae180f..dd331c726d4 100644 --- a/tests/llmobs/conftest.py +++ b/tests/llmobs/conftest.py @@ -5,6 +5,7 @@ from ddtrace.internal.utils.http import Response from ddtrace.llmobs import LLMObs as llmobs_service +from ddtrace.llmobs._evaluators.ragas.faithfulness import RagasFaithfulnessEvaluator from tests.llmobs._utils import logs_vcr from tests.utils import DummyTracer from tests.utils import override_global_config @@ -88,6 +89,12 @@ def mock_writer_logs(): yield m +@pytest.fixture +def mock_evaluator_logs(): + with mock.patch("ddtrace.llmobs._evaluators.runner.logger") as m: + yield m + + @pytest.fixture def mock_http_writer_logs(): with mock.patch("ddtrace.internal.writer.writer.log") as m: @@ -125,3 +132,10 @@ def AgentlessLLMObs(mock_llmobs_span_agentless_writer, mock_llmobs_eval_metric_w llmobs_service.enable(_tracer=dummy_tracer) yield llmobs_service llmobs_service.disable() + + +@pytest.fixture +def mock_ragas_evaluator(mock_llmobs_eval_metric_writer): + with mock.patch("ddtrace.llmobs._evaluators.ragas.faithfulness.RagasFaithfulnessEvaluator.evaluate") as m: + m.return_value = 1.0 + yield RagasFaithfulnessEvaluator diff --git a/tests/llmobs/test_llmobs_evaluator_runner.py b/tests/llmobs/test_llmobs_evaluator_runner.py new file mode 100644 index 00000000000..88f336db99e --- /dev/null +++ b/tests/llmobs/test_llmobs_evaluator_runner.py @@ -0,0 +1,99 @@ +import time + +import mock +import pytest + +import ddtrace +from ddtrace.llmobs._evaluators.runner import EvaluatorRunner +from ddtrace.llmobs._writer import LLMObsEvaluationMetricEvent + + +def _dummy_ragas_eval_metric_event(span_id, trace_id): + return LLMObsEvaluationMetricEvent( + span_id=span_id, + trace_id=trace_id, + score_value=1.0, + ml_app="unnamed-ml-app", + timestamp_ms=mock.ANY, + metric_type="score", + label="ragas_faithfulness", + tags=["ddtrace.version:{}".format(ddtrace.__version__), "ml_app:unnamed-ml-app"], + ) + + +def test_evaluator_runner_start(mock_evaluator_logs, mock_ragas_evaluator): + evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=mock.MagicMock()) + evaluator_runner.evaluators.append(mock_ragas_evaluator) + evaluator_runner.start() + mock_evaluator_logs.debug.assert_has_calls([mock.call("started %r to %r", "EvaluatorRunner")]) + + +def test_evaluator_runner_buffer_limit(mock_evaluator_logs): + evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=mock.MagicMock()) + for _ in range(1001): + evaluator_runner.enqueue({}) + mock_evaluator_logs.warning.assert_called_with( + "%r event buffer full (limit is %d), dropping event", "EvaluatorRunner", 1000 + ) + + +def test_evaluator_runner_periodic_enqueues_eval_metric(LLMObs, mock_llmobs_eval_metric_writer, mock_ragas_evaluator): + evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=LLMObs) + evaluator_runner.evaluators.append(mock_ragas_evaluator(llmobs_service=LLMObs)) + evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}) + evaluator_runner.periodic() + mock_llmobs_eval_metric_writer.enqueue.assert_called_once_with( + _dummy_ragas_eval_metric_event(span_id="123", trace_id="1234") + ) + + +@pytest.mark.vcr_logs +def test_evaluator_runner_timed_enqueues_eval_metric(LLMObs, mock_llmobs_eval_metric_writer, mock_ragas_evaluator): + evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=LLMObs) + evaluator_runner.evaluators.append(mock_ragas_evaluator(llmobs_service=LLMObs)) + evaluator_runner.start() + + evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}) + + time.sleep(0.1) + + mock_llmobs_eval_metric_writer.enqueue.assert_called_once_with( + _dummy_ragas_eval_metric_event(span_id="123", trace_id="1234") + ) + + +def test_evaluator_runner_on_exit(mock_writer_logs, run_python_code_in_subprocess): + out, err, status, pid = run_python_code_in_subprocess( + """ +import os +import time +import mock + +from ddtrace.internal.utils.http import Response +from ddtrace.llmobs import LLMObs +from ddtrace.llmobs._evaluators.runner import EvaluatorRunner +from ddtrace.llmobs._evaluators.ragas.faithfulness import RagasFaithfulnessEvaluator + +with mock.patch( + "ddtrace.llmobs._evaluators.runner.EvaluatorRunner.periodic", + return_value=Response( + status=200, + body="{}", + ), +): + LLMObs.enable( + site="datad0g.com", + api_key=os.getenv("DD_API_KEY"), + ml_app="unnamed-ml-app", + ) + evaluator_runner = EvaluatorRunner( + interval=0.01, llmobs_service=LLMObs + ) + evaluator_runner.evaluators.append(RagasFaithfulnessEvaluator(llmobs_service=LLMObs)) + evaluator_runner.start() + evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}) +""", + ) + assert status == 0, err + assert out == b"" + assert err == b"" diff --git a/tests/llmobs/test_llmobs_service.py b/tests/llmobs/test_llmobs_service.py index 4abe99d4f03..3e90cb2063d 100644 --- a/tests/llmobs/test_llmobs_service.py +++ b/tests/llmobs/test_llmobs_service.py @@ -34,6 +34,7 @@ from tests.llmobs._utils import _expected_llmobs_llm_span_event from tests.llmobs._utils import _expected_llmobs_non_llm_span_event from tests.utils import DummyTracer +from tests.utils import override_env from tests.utils import override_global_config @@ -89,6 +90,7 @@ def test_service_disable(): assert llmobs_service.enabled is False assert llmobs_service._instance._llmobs_eval_metric_writer.status.value == "stopped" assert llmobs_service._instance._llmobs_span_writer.status.value == "stopped" + assert llmobs_service._instance._evaluator_runner.status.value == "stopped" def test_service_enable_no_api_key(): @@ -99,6 +101,7 @@ def test_service_enable_no_api_key(): assert llmobs_service.enabled is False assert llmobs_service._instance._llmobs_eval_metric_writer.status.value == "stopped" assert llmobs_service._instance._llmobs_span_writer.status.value == "stopped" + assert llmobs_service._instance._evaluator_runner.status.value == "stopped" def test_service_enable_no_ml_app_specified(): @@ -109,6 +112,7 @@ def test_service_enable_no_ml_app_specified(): assert llmobs_service.enabled is False assert llmobs_service._instance._llmobs_eval_metric_writer.status.value == "stopped" assert llmobs_service._instance._llmobs_span_writer.status.value == "stopped" + assert llmobs_service._instance._evaluator_runner.status.value == "stopped" def test_service_enable_deprecated_ml_app_name(monkeypatch, mock_logs): @@ -1453,7 +1457,7 @@ def test_llmobs_fork_recreates_and_restarts_span_writer(): def test_llmobs_fork_recreates_and_restarts_eval_metric_writer(): - """Test that forking a process correctly recreates and restarts the LLMObsSpanWriter.""" + """Test that forking a process correctly recreates and restarts the LLMObsEvalMetricWriter.""" with mock.patch("ddtrace.llmobs._writer.BaseLLMObsWriter.periodic"): llmobs_service.enable(_tracer=DummyTracer(), ml_app="test_app") original_pid = llmobs_service._instance.tracer._pid @@ -1476,6 +1480,39 @@ def test_llmobs_fork_recreates_and_restarts_eval_metric_writer(): llmobs_service.disable() +def test_llmobs_fork_recreates_and_restarts_evaluator_runner(mock_ragas_evaluator): + """Test that forking a process correctly recreates and restarts the EvaluatorRunner.""" + with override_env(dict(_DD_LLMOBS_EVALUATORS="ragas_faithfulness")): + with mock.patch("ddtrace.llmobs._evaluators.runner.EvaluatorRunner.periodic"): + llmobs_service.enable(_tracer=DummyTracer(), ml_app="test_app") + original_pid = llmobs_service._instance.tracer._pid + original_evaluator_runner = llmobs_service._instance._evaluator_runner + pid = os.fork() + if pid: # parent + assert llmobs_service._instance.tracer._pid == original_pid + assert llmobs_service._instance._evaluator_runner == original_evaluator_runner + assert ( + llmobs_service._instance._trace_processor._evaluator_runner + == llmobs_service._instance._evaluator_runner + ) + assert llmobs_service._instance._evaluator_runner.status == ServiceStatus.RUNNING + else: # child + assert llmobs_service._instance.tracer._pid != original_pid + assert llmobs_service._instance._evaluator_runner != original_evaluator_runner + assert ( + llmobs_service._instance._trace_processor._evaluator_runner + == llmobs_service._instance._evaluator_runner + ) + assert llmobs_service._instance._evaluator_runner.status == ServiceStatus.RUNNING + llmobs_service.disable() + os._exit(12) + + _, status = os.waitpid(pid, 0) + exit_code = os.WEXITSTATUS(status) + assert exit_code == 12 + llmobs_service.disable() + + def test_llmobs_fork_create_span(monkeypatch): """Test that forking a process correctly encodes new spans created in each process.""" monkeypatch.setenv("_DD_LLMOBS_WRITER_INTERVAL", 5.0) @@ -1531,6 +1568,27 @@ def test_llmobs_fork_submit_evaluation(monkeypatch): llmobs_service.disable() +def test_llmobs_fork_evaluator_runner_run(monkeypatch): + """Test that forking a process correctly encodes new spans created in each process.""" + monkeypatch.setenv("_DD_LLMOBS_EVALUATOR_INTERVAL", 5.0) + with mock.patch("ddtrace.llmobs._evaluators.runner.EvaluatorRunner.periodic"): + llmobs_service.enable(_tracer=DummyTracer(), ml_app="test_app", api_key="test_api_key") + pid = os.fork() + if pid: # parent + llmobs_service._instance._evaluator_runner.enqueue({"span_id": "123", "trace_id": "456"}) + assert len(llmobs_service._instance._evaluator_runner._buffer) == 1 + else: # child + llmobs_service._instance._evaluator_runner.enqueue({"span_id": "123", "trace_id": "456"}) + assert len(llmobs_service._instance._evaluator_runner._buffer) == 1 + llmobs_service.disable() + os._exit(12) + + _, status = os.waitpid(pid, 0) + exit_code = os.WEXITSTATUS(status) + assert exit_code == 12 + llmobs_service.disable() + + def test_llmobs_fork_custom_filter(monkeypatch): """Test that forking a process correctly keeps any custom filters.""" @@ -1605,3 +1663,29 @@ async def test_annotation_context_async_nested(LLMObs): async with LLMObs.annotation_context(tags={"car": "car"}): with LLMObs.agent(name="test_agent") as span: assert json.loads(span.get_tag(TAGS)) == {"foo": "bar", "boo": "bar", "car": "car"} + + +def test_service_enable_starts_evaluator_runner_when_evaluators_exist(): + with override_global_config(dict(_dd_api_key="", _llmobs_ml_app="")): + with override_env(dict(_DD_LLMOBS_EVALUATORS="ragas_faithfulness")): + dummy_tracer = DummyTracer() + llmobs_service.enable(_tracer=dummy_tracer) + llmobs_instance = llmobs_service._instance + assert llmobs_instance is not None + assert llmobs_service.enabled + assert llmobs_service._instance._llmobs_eval_metric_writer.status.value == "running" + assert llmobs_service._instance._evaluator_runner.status.value == "running" + llmobs_service.disable() + + +def test_service_enable_does_not_start_evaluator_runner(): + with override_global_config(dict(_dd_api_key="", _llmobs_ml_app="")): + dummy_tracer = DummyTracer() + llmobs_service.enable(_tracer=dummy_tracer) + llmobs_instance = llmobs_service._instance + assert llmobs_instance is not None + assert llmobs_service.enabled + assert llmobs_service._instance._llmobs_eval_metric_writer.status.value == "running" + assert llmobs_service._instance._llmobs_span_writer.status.value == "running" + assert llmobs_service._instance._evaluator_runner.status.value == "stopped" + llmobs_service.disable()