Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(llmobs): implement skeleton code for ragas faithfulness evaluator #10662

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
85 changes: 85 additions & 0 deletions ddtrace/llmobs/_evaluations/ragas/faithfulness/evaluator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import atexit
from concurrent import futures
import math
import time
from typing import Dict
from typing import Optional

from ddtrace import config
from ddtrace.internal import forksafe
from ddtrace.internal.logger import get_logger
from ddtrace.internal.periodic import PeriodicService

from ...._writer import LLMObsEvaluationMetricEvent
lievan marked this conversation as resolved.
Show resolved Hide resolved


logger = get_logger(__name__)


class RagasFaithfulnessEvaluator(PeriodicService):
lievan marked this conversation as resolved.
Show resolved Hide resolved
"""Base class for evaluating LLM Observability span events"""

def __init__(self, interval: float, _evaluation_metric_writer=None, _llmobs_instance=None):
super(RagasFaithfulnessEvaluator, self).__init__(interval=interval)
self.llmobs_instance = _llmobs_instance
lievan marked this conversation as resolved.
Show resolved Hide resolved
self._lock = forksafe.RLock()
lievan marked this conversation as resolved.
Show resolved Hide resolved
self._buffer = [] # type: list[Dict]
self._buffer_limit = 1000

self._evaluation_metric_writer = _evaluation_metric_writer

self.name = "ragas.faithfulness"
lievan marked this conversation as resolved.
Show resolved Hide resolved

self.executor = futures.ThreadPoolExecutor()

def start(self, *args, **kwargs):
super(RagasFaithfulnessEvaluator, self).start()
logger.debug("started %r to %r", self.__class__.__name__)
atexit.register(self.on_shutdown)

def on_shutdown(self):
self.executor.shutdown(cancel_futures=True)

def recreate(self):
return self.__class__(
interval=self._interval, writer=self._llmobs_eval_metric_writer, llmobs_instance=self.llmobs_instance
)

def enqueue(self, span_event: Dict) -> None:
with self._lock:
if len(self._buffer) >= self._buffer_limit:
logger.warning(
"%r event buffer full (limit is %d), dropping event", self.__class__.__name__, self._buffer_limit
)
return
self._buffer.append(span_event)

def periodic(self) -> None:
with self._lock:
if not self._buffer:
return
events = self._buffer
self._buffer = []

try:
evaluation_metrics = self.run(events)
for metric in evaluation_metrics:
if metric is not None:
self._evaluation_metric_writer.enqueue(metric)
except RuntimeError as e:
logger.debug("failed to run evaluation: %s", e)

def run(self, spans):
def dummy_score_and_return_evaluation_that_will_be_replaced(span) -> Optional[LLMObsEvaluationMetricEvent]:
return LLMObsEvaluationMetricEvent(
span_id=span.get("span_id"),
trace_id=span.get("trace_id"),
score_value=1,
ml_app=config._llmobs_ml_app,
timestamp_ms=math.floor(time.time() * 1000),
metric_type="score",
label="dummy.ragas.faithfulness",
)

results = self.executor.map(dummy_score_and_return_evaluation_that_will_be_replaced, spans)
return [result for result in results if result is not None]
31 changes: 29 additions & 2 deletions ddtrace/llmobs/_llmobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from ddtrace.llmobs._constants import SPAN_KIND
from ddtrace.llmobs._constants import SPAN_START_WHILE_DISABLED_WARNING
from ddtrace.llmobs._constants import TAGS
from ddtrace.llmobs._evaluations.ragas.faithfulness.evaluator import RagasFaithfulnessEvaluator
from ddtrace.llmobs._trace_processor import LLMObsTraceProcessor
from ddtrace.llmobs._utils import AnnotationContext
from ddtrace.llmobs._utils import _get_llmobs_parent_id
Expand Down Expand Up @@ -86,7 +87,18 @@ def __init__(self, tracer=None):
interval=float(os.getenv("_DD_LLMOBS_WRITER_INTERVAL", 1.0)),
timeout=float(os.getenv("_DD_LLMOBS_WRITER_TIMEOUT", 5.0)),
)
self._trace_processor = LLMObsTraceProcessor(self._llmobs_span_writer)

self._evaluators = []
if config._llmobs_ragas_faithfulness_enabled:
self._evaluators.append(
RagasFaithfulnessEvaluator(
interval=float(os.getenv("_DD_LLMOBS_EVALUATOR_RAGAS_FAITHFULNESS_INTERVAL", 1.0)),
_evaluation_metric_writer=self._llmobs_eval_metric_writer,
_llmobs_instance=self,
)
)

self._trace_processor = LLMObsTraceProcessor(self._llmobs_span_writer, self._evaluators)
forksafe.register(self._child_after_fork)

def _child_after_fork(self):
Expand All @@ -112,13 +124,29 @@ def _start_service(self) -> None:
except ServiceStatusError:
log.debug("Error starting LLMObs writers")

for evaluator in self._evaluators:
try:
evaluator.start()
except ServiceStatusError:
log.debug(
"Error starting evaluator: %n", evaluator.name if hasattr(evaluator, "name") else str(evaluator)
)

def _stop_service(self) -> None:
try:
self._llmobs_span_writer.stop()
self._llmobs_eval_metric_writer.stop()
except ServiceStatusError:
log.debug("Error stopping LLMObs writers")

for evaluator in self._evaluators:
try:
evaluator.stop()
except ServiceStatusError:
log.debug(
"Error stopping evaluator: %n", evaluator.name if hasattr(evaluator, "name") else str(evaluator)
)

try:
forksafe.unregister(self._child_after_fork)
self.tracer.shutdown()
Expand Down Expand Up @@ -154,7 +182,6 @@ def enable(

if os.getenv("DD_LLMOBS_ENABLED") and not asbool(os.getenv("DD_LLMOBS_ENABLED")):
log.debug("LLMObs.enable() called when DD_LLMOBS_ENABLED is set to false or 0, not starting LLMObs service")
return

# grab required values for LLMObs
config._dd_site = site or config._dd_site
Expand Down
9 changes: 8 additions & 1 deletion ddtrace/llmobs/_trace_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ class LLMObsTraceProcessor(TraceProcessor):
Processor that extracts LLM-type spans in a trace to submit as separate LLMObs span events to LLM Observability.
"""

def __init__(self, llmobs_span_writer):
def __init__(self, llmobs_span_writer, evaluators=None):
self._span_writer = llmobs_span_writer
self._evaluators = evaluators
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're going to have evaluators be a part of the trace processor, we're going to need to ensure we do the same thing on _child_after_fork() as we do for the span writer, i.e. something like

self._evaluator = self._evaluator.recreate()
self._trace_processor._evaluator = self._evaluator


def process_trace(self, trace: List[Span]) -> Optional[List[Span]]:
if not trace:
Expand All @@ -56,11 +57,16 @@ def process_trace(self, trace: List[Span]) -> Optional[List[Span]]:

def submit_llmobs_span(self, span: Span) -> None:
"""Generate and submit an LLMObs span event to be sent to LLMObs."""
span_event = None
try:
span_event = self._llmobs_span_event(span)
self._span_writer.enqueue(span_event)
except (KeyError, TypeError):
log.error("Error generating LLMObs span event for span %s, likely due to malformed span", span)
finally:
if span_event and self._evaluators:
for evaluator in self._evaluators:
evaluator.enqueue(span_event)
lievan marked this conversation as resolved.
Show resolved Hide resolved

def _llmobs_span_event(self, span: Span) -> Dict[str, Any]:
"""Span event object structure."""
Expand Down Expand Up @@ -110,6 +116,7 @@ def _llmobs_span_event(self, span: Span) -> Dict[str, Any]:
"status": "error" if span.error else "ok",
"meta": meta,
"metrics": metrics,
"ml_app": ml_app,
}
session_id = _get_session_id(span)
if session_id is not None:
Expand Down
1 change: 1 addition & 0 deletions ddtrace/settings/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,7 @@ def __init__(self):
self._llmobs_sample_rate = float(os.getenv("DD_LLMOBS_SAMPLE_RATE", 1.0))
self._llmobs_ml_app = os.getenv("DD_LLMOBS_ML_APP")
self._llmobs_agentless_enabled = asbool(os.getenv("DD_LLMOBS_AGENTLESS_ENABLED", False))
self._llmobs_ragas_faithfulness_enabled = asbool(os.getenv("_DD_LLMOBS_RAGAS_FAITHFULNESS_ENABLED", False))
lievan marked this conversation as resolved.
Show resolved Hide resolved

self._inject_force = asbool(os.getenv("DD_INJECT_FORCE", False))
self._lib_was_injected = False
Expand Down
4 changes: 4 additions & 0 deletions tests/llmobs/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ def _llmobs_base_span_event(
span_event["meta"]["error.type"] = error
span_event["meta"]["error.message"] = error_message
span_event["meta"]["error.stack"] = error_stack
if tags:
span_event["ml_app"] = tags.get("ml_app", "unnamed-ml-app")
else:
span_event["ml_app"] = "unnamed-ml-app"
return span_event


Expand Down
29 changes: 29 additions & 0 deletions tests/llmobs/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ def mock_llmobs_eval_metric_writer():
patcher.stop()


@pytest.fixture
def mock_llmobs_ragas_evaluator():
patcher = mock.patch("ddtrace.llmobs._evaluations.ragas.faithfulness.evaluator.RagasFaithfulnessEvaluator")
RagasEvaluator = patcher.start()
m = mock.MagicMock()
RagasEvaluator.return_value = m
yield m
patcher.stop()


@pytest.fixture
def mock_http_writer_send_payload_response():
with mock.patch(
Expand Down Expand Up @@ -88,6 +98,12 @@ def mock_writer_logs():
yield m


@pytest.fixture
def mock_evaluator_logs():
with mock.patch("ddtrace.llmobs._evaluations.ragas.faithfulness.evaluator.logger") as m:
yield m


@pytest.fixture
def mock_http_writer_logs():
with mock.patch("ddtrace.internal.writer.writer.log") as m:
Expand Down Expand Up @@ -125,3 +141,16 @@ def AgentlessLLMObs(mock_llmobs_span_agentless_writer, mock_llmobs_eval_metric_w
llmobs_service.enable(_tracer=dummy_tracer)
yield llmobs_service
llmobs_service.disable()


@pytest.fixture
def LLMObsWithRagas(monkeypatch, mock_llmobs_span_writer, mock_llmobs_eval_metric_writer, ddtrace_global_config):
lievan marked this conversation as resolved.
Show resolved Hide resolved
global_config = default_global_config()
global_config.update(ddtrace_global_config)
global_config.update(dict(_llmobs_ragas_faithfulness_enabled=True))
monkeypatch.setenv("_DD_LLMOBS_EVALUATOR_RAGAS_FAITHFULNESS_INTERVAL", "0.1")
with override_global_config(global_config):
dummy_tracer = DummyTracer()
llmobs_service.enable(_tracer=dummy_tracer)
yield llmobs_service
llmobs_service.disable()
129 changes: 129 additions & 0 deletions tests/llmobs/test_llmobs_ragas_faithfulness_evaluator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import os
import time

import mock
import pytest

from ddtrace.llmobs._evaluations.ragas.faithfulness.evaluator import RagasFaithfulnessEvaluator
from ddtrace.llmobs._writer import LLMObsEvaluationMetricEvent


INTAKE_ENDPOINT = "https://api.datad0g.com/api/intake/llm-obs/v1/eval-metric"
DD_SITE = "datad0g.com"
dd_api_key = os.getenv("DD_API_KEY", default="<not-a-real-api-key>")


def _categorical_metric_event():
return {
"span_id": "12345678901",
"trace_id": "98765432101",
"metric_type": "categorical",
"categorical_value": "very",
"label": "toxicity",
"ml_app": "dummy-ml-app",
"timestamp_ms": round(time.time() * 1000),
}


def _score_metric_event():
return {
"span_id": "12345678902",
"trace_id": "98765432102",
"metric_type": "score",
"label": "sentiment",
"score_value": 0.9,
"ml_app": "dummy-ml-app",
"timestamp_ms": round(time.time() * 1000),
}


def _dummy_ragas_eval_metric_event(span_id, trace_id):
return LLMObsEvaluationMetricEvent(
span_id=span_id,
trace_id=trace_id,
score_value=1,
ml_app="unnamed-ml-app",
timestamp_ms=mock.ANY,
metric_type="score",
label="dummy.ragas.faithfulness",
)


def test_ragas_faithfulness_evaluator_start(mock_evaluator_logs):
ragas_faithfulness_evaluator = RagasFaithfulnessEvaluator(
interval=0.01, _evaluation_metric_writer=mock.MagicMock(), _llmobs_instance=mock.MagicMock()
)
ragas_faithfulness_evaluator.start()
mock_evaluator_logs.debug.assert_has_calls([mock.call("started %r to %r", "RagasFaithfulnessEvaluator")])


def test_ragas_faithfulness_evaluator_buffer_limit(mock_evaluator_logs):
ragas_faithfulness_evaluator = RagasFaithfulnessEvaluator(
interval=0.01, _evaluation_metric_writer=mock.MagicMock(), _llmobs_instance=mock.MagicMock()
)
for _ in range(1001):
ragas_faithfulness_evaluator.enqueue({})
mock_evaluator_logs.warning.assert_called_with(
"%r event buffer full (limit is %d), dropping event", "RagasFaithfulnessEvaluator", 1000
)


def test_ragas_faithfulness_evaluator_periodic_enqueues_eval_metric(LLMObs, mock_llmobs_eval_metric_writer):
lievan marked this conversation as resolved.
Show resolved Hide resolved
ragas_faithfulness_evaluator = RagasFaithfulnessEvaluator(
interval=0.01, _evaluation_metric_writer=mock_llmobs_eval_metric_writer, _llmobs_instance=mock.MagicMock()
)
ragas_faithfulness_evaluator.enqueue({"span_id": "123", "trace_id": "1234"})
ragas_faithfulness_evaluator.periodic()
mock_llmobs_eval_metric_writer.enqueue.assert_called_once_with(
_dummy_ragas_eval_metric_event(span_id="123", trace_id="1234")
)


@pytest.mark.vcr_logs
def test_ragas_faithfulness_evaluator_timed_enqueues_eval_metric(LLMObs, mock_llmobs_eval_metric_writer):
lievan marked this conversation as resolved.
Show resolved Hide resolved
ragas_faithfulness_evaluator = RagasFaithfulnessEvaluator(
interval=0.01, _evaluation_metric_writer=mock_llmobs_eval_metric_writer, _llmobs_instance=mock.MagicMock()
)
ragas_faithfulness_evaluator.start()

ragas_faithfulness_evaluator.enqueue({"span_id": "123", "trace_id": "1234"})

time.sleep(0.1)

mock_llmobs_eval_metric_writer.enqueue.assert_called_once_with(
_dummy_ragas_eval_metric_event(span_id="123", trace_id="1234")
)


def test_ragas_evaluator_on_exit(mock_writer_logs, run_python_code_in_subprocess):
out, err, status, pid = run_python_code_in_subprocess(
"""
import os
import time
import mock

from ddtrace.internal.utils.http import Response
from ddtrace.llmobs._writer import LLMObsEvalMetricWriter
from ddtrace.llmobs._evaluations.ragas.faithfulness.evaluator import RagasFaithfulnessEvaluator

with mock.patch(
"ddtrace.internal.writer.HTTPWriter._send_payload",
return_value=Response(
status=200,
body="{}",
),
):
llmobs_eval_metric_writer = LLMObsEvalMetricWriter(
site="datad0g.com", api_key=os.getenv("DD_API_KEY_STAGING"), interval=0.01, timeout=1
)
llmobs_eval_metric_writer.start()
ragas_faithfulness_evaluator = RagasFaithfulnessEvaluator(
interval=0.01, _evaluation_metric_writer=llmobs_eval_metric_writer, _llmobs_instance=mock.MagicMock()
)
ragas_faithfulness_evaluator.start()
ragas_faithfulness_evaluator.enqueue({"span_id": "123", "trace_id": "1234"})
""",
)
assert status == 0, err
assert out == b""
assert err == b""
Loading
Loading