Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(llmobs): implement skeleton code for ragas faithfulness evaluator #10662

Merged
merged 31 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
571d317
implement ragas faithfulenss runner with dummy ragas score generator
Sep 13, 2024
4b3d840
remove newline
Sep 13, 2024
7b9c929
pydantic v1
Sep 16, 2024
2e883a0
refactor into evaluator list
Sep 16, 2024
7b31443
add unit tests
Sep 17, 2024
13229bd
fix expectde span event
Sep 17, 2024
b493e20
merg conf
Sep 17, 2024
d290dcd
remove config option, use only env var
Sep 17, 2024
6e49cca
address comments
Sep 17, 2024
fcf9991
refactor into one evaluator service
Sep 18, 2024
10b276f
dont cancel futures
Sep 18, 2024
b6fa4e0
refactor dummy faithfulness into class
Sep 19, 2024
be893a1
rename field to label
Sep 19, 2024
a309330
Merge branch 'main' into evan.li/ragas-skeleton
lievan Sep 19, 2024
d849067
Merge branch 'main' of github.com:DataDog/dd-trace-py into evan.li/ra…
Sep 19, 2024
fd73621
Merge branch 'evan.li/ragas-skeleton' of github.com:DataDog/dd-trace-…
Sep 19, 2024
2f48461
Merge branch 'main' of github.com:DataDog/dd-trace-py into evan.li/ra…
Sep 20, 2024
04d202e
Merge branch 'main' into evan.li/ragas-skeleton
lievan Sep 20, 2024
a991f15
refactor so we store the service only in ragas
Sep 23, 2024
38e0a23
Merge branch 'evan.li/ragas-skeleton' of github.com:DataDog/dd-trace-…
Sep 23, 2024
ea5d4fa
rename a test
Sep 23, 2024
e66ee7e
clean up
Sep 26, 2024
c04dac4
rename, fix test
Sep 26, 2024
0a956e1
Merge branch 'main' of github.com:DataDog/dd-trace-py into evan.li/ra…
Sep 26, 2024
6d9c136
fork safety
Sep 26, 2024
7d7192c
fix tests
Sep 26, 2024
bb8d388
add more comments
Sep 26, 2024
3c17dee
delete unused fixture
Sep 26, 2024
62ecbe4
remove debug
Sep 26, 2024
f43dceb
fix wrong patch
Sep 30, 2024
d37b328
Merge branch 'main' into evan.li/ragas-skeleton
lievan Sep 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions ddtrace/llmobs/_evaluations/ragas/faithfulness/evaluator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import atexit
from concurrent import futures
import math
import time
from typing import Dict
from typing import Optional

from ddtrace import config
from ddtrace.internal import forksafe
from ddtrace.internal.logger import get_logger
from ddtrace.internal.periodic import PeriodicService

from ...._writer import LLMObsEvaluationMetricEvent
lievan marked this conversation as resolved.
Show resolved Hide resolved


logger = get_logger(__name__)


class RagasFaithfulnessEvaluator(PeriodicService):
lievan marked this conversation as resolved.
Show resolved Hide resolved
"""Base class for evaluating LLM Observability span events"""

def __init__(self, interval: float, _evaluation_metric_writer=None, _llmobs_instance=None):
super(RagasFaithfulnessEvaluator, self).__init__(interval=interval)
self.llmobs_instance = _llmobs_instance
lievan marked this conversation as resolved.
Show resolved Hide resolved
self._lock = forksafe.RLock()
lievan marked this conversation as resolved.
Show resolved Hide resolved
self._buffer = [] # type: list[Dict]
self._buffer_limit = 1000

self._evaluation_metric_writer = _evaluation_metric_writer

self.name = "ragas.faithfulness"
lievan marked this conversation as resolved.
Show resolved Hide resolved

self.executor = futures.ThreadPoolExecutor()

def start(self, *args, **kwargs):
super(RagasFaithfulnessEvaluator, self).start()
logger.debug("started %r to %r", self.__class__.__name__)
atexit.register(self.on_shutdown)

def on_shutdown(self):
self.executor.shutdown(cancel_futures=True)

def recreate(self):
return self.__class__(
interval=self._interval, writer=self._llmobs_eval_metric_writer, llmobs_instance=self.llmobs_instance
)

def enqueue(self, span_event: Dict) -> None:
with self._lock:
if len(self._buffer) >= self._buffer_limit:
logger.warning(
"%r event buffer full (limit is %d), dropping event", self.__class__.__name__, self._buffer_limit
)
return
self._buffer.append(span_event)

def periodic(self) -> None:
with self._lock:
if not self._buffer:
return
events = self._buffer
self._buffer = []

try:
evaluation_metrics = self.run(events)
for metric in evaluation_metrics:
if metric is not None:
self._evaluation_metric_writer.enqueue(metric)
except RuntimeError as e:
logger.debug("failed to run evaluation: %s", e)

def run(self, spans):
def dummy_score_and_return_evaluation_that_will_be_replaced(span) -> Optional[LLMObsEvaluationMetricEvent]:
return LLMObsEvaluationMetricEvent(
span_id=span.get("span_id"),
trace_id=span.get("trace_id"),
score_value=1,
ml_app=config._llmobs_ml_app,
timestamp_ms=math.floor(time.time() * 1000),
metric_type="score",
label="dummy.ragas.faithfulness",
)

results = self.executor.map(dummy_score_and_return_evaluation_that_will_be_replaced, spans)
return [result for result in results if result is not None]
31 changes: 29 additions & 2 deletions ddtrace/llmobs/_llmobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from ddtrace.llmobs._constants import SPAN_KIND
from ddtrace.llmobs._constants import SPAN_START_WHILE_DISABLED_WARNING
from ddtrace.llmobs._constants import TAGS
from ddtrace.llmobs._evaluations.ragas.faithfulness.evaluator import RagasFaithfulnessEvaluator
from ddtrace.llmobs._trace_processor import LLMObsTraceProcessor
from ddtrace.llmobs._utils import _get_llmobs_parent_id
from ddtrace.llmobs._utils import _get_ml_app
Expand Down Expand Up @@ -85,7 +86,18 @@ def __init__(self, tracer=None):
interval=float(os.getenv("_DD_LLMOBS_WRITER_INTERVAL", 1.0)),
timeout=float(os.getenv("_DD_LLMOBS_WRITER_TIMEOUT", 5.0)),
)
self._trace_processor = LLMObsTraceProcessor(self._llmobs_span_writer)

self._evaluators = []
if config._llmobs_ragas_faithfulness_enabled:
self._evaluators.append(
RagasFaithfulnessEvaluator(
interval=float(os.getenv("_DD_LLMOBS_EVALUATOR_RAGAS_FAITHFULNESS_INTERVAL", 1.0)),
_evaluation_metric_writer=self._llmobs_eval_metric_writer,
_llmobs_instance=self,
)
)

self._trace_processor = LLMObsTraceProcessor(self._llmobs_span_writer, self._evaluators)
forksafe.register(self._child_after_fork)

def _child_after_fork(self):
Expand All @@ -111,13 +123,29 @@ def _start_service(self) -> None:
except ServiceStatusError:
log.debug("Error starting LLMObs writers")

for evaluator in self._evaluators:
try:
evaluator.start()
except ServiceStatusError:
log.debug(
"Error starting evaluator: %n", evaluator.name if hasattr(evaluator, "name") else str(evaluator)
)

def _stop_service(self) -> None:
try:
self._llmobs_span_writer.stop()
self._llmobs_eval_metric_writer.stop()
except ServiceStatusError:
log.debug("Error stopping LLMObs writers")

for evaluator in self._evaluators:
try:
evaluator.stop()
except ServiceStatusError:
log.debug(
"Error stopping evaluator: %n", evaluator.name if hasattr(evaluator, "name") else str(evaluator)
)

try:
forksafe.unregister(self._child_after_fork)
self.tracer.shutdown()
Expand Down Expand Up @@ -153,7 +181,6 @@ def enable(

if os.getenv("DD_LLMOBS_ENABLED") and not asbool(os.getenv("DD_LLMOBS_ENABLED")):
log.debug("LLMObs.enable() called when DD_LLMOBS_ENABLED is set to false or 0, not starting LLMObs service")
return
lievan marked this conversation as resolved.
Show resolved Hide resolved

# grab required values for LLMObs
config._dd_site = site or config._dd_site
Expand Down
9 changes: 8 additions & 1 deletion ddtrace/llmobs/_trace_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ class LLMObsTraceProcessor(TraceProcessor):
Processor that extracts LLM-type spans in a trace to submit as separate LLMObs span events to LLM Observability.
"""

def __init__(self, llmobs_span_writer):
def __init__(self, llmobs_span_writer, evaluators):
self._span_writer = llmobs_span_writer
self._evaluators = evaluators
lievan marked this conversation as resolved.
Show resolved Hide resolved

def process_trace(self, trace: List[Span]) -> Optional[List[Span]]:
if not trace:
Expand All @@ -56,11 +57,16 @@ def process_trace(self, trace: List[Span]) -> Optional[List[Span]]:

def submit_llmobs_span(self, span: Span) -> None:
"""Generate and submit an LLMObs span event to be sent to LLMObs."""
span_event = None
try:
span_event = self._llmobs_span_event(span)
self._span_writer.enqueue(span_event)
except (KeyError, TypeError):
log.error("Error generating LLMObs span event for span %s, likely due to malformed span", span)
finally:
if span_event:
for evaluator in self._evaluators:
evaluator.enqueue(span_event)

def _llmobs_span_event(self, span: Span) -> Dict[str, Any]:
"""Span event object structure."""
Expand Down Expand Up @@ -110,6 +116,7 @@ def _llmobs_span_event(self, span: Span) -> Dict[str, Any]:
"status": "error" if span.error else "ok",
"meta": meta,
"metrics": metrics,
"ml_app": ml_app,
lievan marked this conversation as resolved.
Show resolved Hide resolved
}
session_id = _get_session_id(span)
if session_id is not None:
Expand Down
1 change: 1 addition & 0 deletions ddtrace/settings/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,7 @@ def __init__(self):
self._llmobs_sample_rate = float(os.getenv("DD_LLMOBS_SAMPLE_RATE", 1.0))
self._llmobs_ml_app = os.getenv("DD_LLMOBS_ML_APP")
self._llmobs_agentless_enabled = asbool(os.getenv("DD_LLMOBS_AGENTLESS_ENABLED", False))
self._llmobs_ragas_faithfulness_enabled = asbool(os.getenv("_DD_LLMOBS_RAGAS_FAITHFULNESS_ENABLED", False))
lievan marked this conversation as resolved.
Show resolved Hide resolved

self._inject_force = asbool(os.getenv("DD_INJECT_FORCE", False))
self._lib_was_injected = False
Expand Down
29 changes: 29 additions & 0 deletions tests/llmobs/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ def mock_llmobs_eval_metric_writer():
patcher.stop()


@pytest.fixture
def mock_llmobs_ragas_evaluator():
patcher = mock.patch("ddtrace.llmobs._evaluations.ragas.faithfulness.evaluator.RagasFaithfulnessEvaluator")
RagasEvaluator = patcher.start()
m = mock.MagicMock()
RagasEvaluator.return_value = m
yield m
patcher.stop()


@pytest.fixture
def mock_http_writer_send_payload_response():
with mock.patch(
Expand Down Expand Up @@ -88,6 +98,12 @@ def mock_writer_logs():
yield m


@pytest.fixture
def mock_evaluator_logs():
with mock.patch("ddtrace.llmobs._evaluations.ragas.faithfulness.evaluator.logger") as m:
yield m


@pytest.fixture
def mock_http_writer_logs():
with mock.patch("ddtrace.internal.writer.writer.log") as m:
Expand Down Expand Up @@ -125,3 +141,16 @@ def AgentlessLLMObs(mock_llmobs_span_agentless_writer, mock_llmobs_eval_metric_w
llmobs_service.enable(_tracer=dummy_tracer)
yield llmobs_service
llmobs_service.disable()


@pytest.fixture
lievan marked this conversation as resolved.
Show resolved Hide resolved
def LLMObsWithRagas(monkeypatch, mock_llmobs_span_writer, mock_llmobs_eval_metric_writer, ddtrace_global_config):
lievan marked this conversation as resolved.
Show resolved Hide resolved
global_config = default_global_config()
global_config.update(ddtrace_global_config)
global_config.update(dict(_llmobs_ragas_faithfulness_enabled=True))
monkeypatch.setenv("_DD_LLMOBS_EVALUATOR_RAGAS_FAITHFULNESS_INTERVAL", "0.1")
with override_global_config(global_config):
dummy_tracer = DummyTracer()
llmobs_service.enable(_tracer=dummy_tracer)
yield llmobs_service
llmobs_service.disable()
129 changes: 129 additions & 0 deletions tests/llmobs/test_llmobs_ragas_faithfulness_evaluator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import os
import time

import mock
import pytest

from ddtrace.llmobs._evaluations.ragas.faithfulness.evaluator import RagasFaithfulnessEvaluator
from ddtrace.llmobs._writer import LLMObsEvaluationMetricEvent


INTAKE_ENDPOINT = "https://api.datad0g.com/api/intake/llm-obs/v1/eval-metric"
DD_SITE = "datad0g.com"
dd_api_key = os.getenv("DD_API_KEY", default="<not-a-real-api-key>")


def _categorical_metric_event():
return {
"span_id": "12345678901",
"trace_id": "98765432101",
"metric_type": "categorical",
"categorical_value": "very",
"label": "toxicity",
"ml_app": "dummy-ml-app",
"timestamp_ms": round(time.time() * 1000),
}


def _score_metric_event():
return {
"span_id": "12345678902",
"trace_id": "98765432102",
"metric_type": "score",
"label": "sentiment",
"score_value": 0.9,
"ml_app": "dummy-ml-app",
"timestamp_ms": round(time.time() * 1000),
}


def _dummy_ragas_eval_metric_event(span_id, trace_id):
return LLMObsEvaluationMetricEvent(
span_id=span_id,
trace_id=trace_id,
score_value=1,
ml_app="unnamed-ml-app",
timestamp_ms=mock.ANY,
metric_type="score",
label="dummy.ragas.faithfulness",
)


def test_ragas_faithfulness_evaluator_start(mock_evaluator_logs):
ragas_faithfulness_evaluator = RagasFaithfulnessEvaluator(
interval=0.01, _evaluation_metric_writer=mock.MagicMock(), _llmobs_instance=mock.MagicMock()
)
ragas_faithfulness_evaluator.start()
mock_evaluator_logs.debug.assert_has_calls([mock.call("started %r to %r", "RagasFaithfulnessEvaluator")])


def test_ragas_faithfulness_evaluator_buffer_limit(mock_evaluator_logs):
ragas_faithfulness_evaluator = RagasFaithfulnessEvaluator(
interval=0.01, _evaluation_metric_writer=mock.MagicMock(), _llmobs_instance=mock.MagicMock()
)
for _ in range(1001):
ragas_faithfulness_evaluator.enqueue({})
mock_evaluator_logs.warning.assert_called_with(
"%r event buffer full (limit is %d), dropping event", "RagasFaithfulnessEvaluator", 1000
)


def test_ragas_faithfulness_evaluator_periodic_enqueues_eval_metric(LLMObs, mock_llmobs_eval_metric_writer):
lievan marked this conversation as resolved.
Show resolved Hide resolved
ragas_faithfulness_evaluator = RagasFaithfulnessEvaluator(
interval=0.01, _evaluation_metric_writer=mock_llmobs_eval_metric_writer, _llmobs_instance=mock.MagicMock()
)
ragas_faithfulness_evaluator.enqueue({"span_id": "123", "trace_id": "1234"})
ragas_faithfulness_evaluator.periodic()
mock_llmobs_eval_metric_writer.enqueue.assert_called_once_with(
_dummy_ragas_eval_metric_event(span_id="123", trace_id="1234")
)


@pytest.mark.vcr_logs
def test_ragas_faithfulness_evaluator_timed_enqueues_eval_metric(LLMObs, mock_llmobs_eval_metric_writer):
lievan marked this conversation as resolved.
Show resolved Hide resolved
ragas_faithfulness_evaluator = RagasFaithfulnessEvaluator(
interval=0.01, _evaluation_metric_writer=mock_llmobs_eval_metric_writer, _llmobs_instance=mock.MagicMock()
)
ragas_faithfulness_evaluator.start()

ragas_faithfulness_evaluator.enqueue({"span_id": "123", "trace_id": "1234"})

time.sleep(0.1)

mock_llmobs_eval_metric_writer.enqueue.assert_called_once_with(
_dummy_ragas_eval_metric_event(span_id="123", trace_id="1234")
)


def test_ragas_evaluator_on_exit(mock_writer_logs, run_python_code_in_subprocess):
out, err, status, pid = run_python_code_in_subprocess(
"""
import os
import time
import mock

from ddtrace.internal.utils.http import Response
from ddtrace.llmobs._writer import LLMObsEvalMetricWriter
from ddtrace.llmobs._evaluations.ragas.faithfulness.evaluator import RagasFaithfulnessEvaluator

with mock.patch(
"ddtrace.internal.writer.HTTPWriter._send_payload",
return_value=Response(
status=200,
body="{}",
),
):
llmobs_eval_metric_writer = LLMObsEvalMetricWriter(
site="datad0g.com", api_key=os.getenv("DD_API_KEY_STAGING"), interval=0.01, timeout=1
)
llmobs_eval_metric_writer.start()
ragas_faithfulness_evaluator = RagasFaithfulnessEvaluator(
interval=0.01, _evaluation_metric_writer=llmobs_eval_metric_writer, _llmobs_instance=mock.MagicMock()
)
ragas_faithfulness_evaluator.start()
ragas_faithfulness_evaluator.enqueue({"span_id": "123", "trace_id": "1234"})
""",
)
assert status == 0, err
assert out == b""
assert err == b""
Loading