From 571d317e860550d7a628e259927b61c4632d6de4 Mon Sep 17 00:00:00 2001 From: lievan Date: Fri, 13 Sep 2024 14:51:18 -0400 Subject: [PATCH 01/12] implement ragas faithfulenss runner with dummy ragas score generator --- .../ragas/faithfulness/evaluator.py | 95 +++++++++++++++++++ ddtrace/llmobs/_llmobs.py | 25 ++++- ddtrace/llmobs/_trace_processor.py | 10 ++ ddtrace/llmobs/utils.py | 64 +++++++++++++ ddtrace/settings/config.py | 1 + 5 files changed, 194 insertions(+), 1 deletion(-) create mode 100644 ddtrace/llmobs/_evaluations/ragas/faithfulness/evaluator.py diff --git a/ddtrace/llmobs/_evaluations/ragas/faithfulness/evaluator.py b/ddtrace/llmobs/_evaluations/ragas/faithfulness/evaluator.py new file mode 100644 index 00000000000..85b7b1d41ae --- /dev/null +++ b/ddtrace/llmobs/_evaluations/ragas/faithfulness/evaluator.py @@ -0,0 +1,95 @@ +import atexit +from concurrent import futures +import math +import time +from typing import Dict +from typing import List +from typing import Optional + +from ddtrace import config +from ddtrace.internal import forksafe +from ddtrace.internal.logger import get_logger +from ddtrace.internal.periodic import PeriodicService + +from ....utils import EvaluationMetric +from ....utils import LLMObsSpanContext + + +logger = get_logger(__name__) + + +class RagasFaithfulnessEvaluator(PeriodicService): + """Base class for evaluating LLM Observability span events""" + + def __init__(self, interval: float, _evaluation_metric_writer=None, _llmobs_instance=None): + super(RagasFaithfulnessEvaluator, self).__init__(interval=interval) + self.llmobs_instance = _llmobs_instance + self._lock = forksafe.RLock() + self._buffer = [] # type: List[LLMObsSpanContext] + self._buffer_limit = 1000 + + self._evaluation_metric_writer = _evaluation_metric_writer + self.spans = [] # type: List[LLMObsSpanContext] + + self.name = "ragas.faithfulness" + + self.executor = futures.ThreadPoolExecutor() + + def start(self, *args, **kwargs): + super(RagasFaithfulnessEvaluator, self).start() + logger.debug("started %r to %r", self.__class__.__name__) + atexit.register(self.on_shutdown) + + def on_shutdown(self): + self.executor.shutdown(cancel_futures=True) + + def recreate(self): + return self.__class__( + interval=self._interval, writer=self._llmobs_eval_metric_writer, llmobs_instance=self.llmobs_instance + ) + + def enqueue(self, raw_span_event: Dict) -> None: + with self._lock: + if len(self._buffer) >= self._buffer_limit: + logger.warning( + "%r event buffer full (limit is %d), dropping event", self.__class__.__name__, self._buffer_limit + ) + return + try: + self._buffer.append(LLMObsSpanContext(**raw_span_event)) + except Exception as e: + logger.error("Failed to validate span event for eval", e) + + def periodic(self) -> None: + with self._lock: + if not self._buffer: + return + events = self._buffer + self._buffer = [] + + try: + evaluation_metrics = self.run(events) + for metric in evaluation_metrics: + if metric is not None: + self._evaluation_metric_writer.enqueue(metric.model_dump()) + except RuntimeError as e: + logger.debug("failed to run evaluation: %s", e) + + def run(self, spans: List[LLMObsSpanContext]) -> List[EvaluationMetric]: + def dummy_score_and_return_evaluation_that_will_be_replaced(span) -> Optional[EvaluationMetric]: + try: + return EvaluationMetric( + label="dummy.ragas.faithfulness", + span_id=span.span_id, + trace_id=span.trace_id, + score_value=1, + ml_app=config._llmobs_ml_app, + timestamp_ms=math.floor(time.time() * 1000), + metric_type="score", + ) + except RuntimeError as e: + logger.error("Failed to run evaluation: %s", e) + return None + + results = self.executor.map(dummy_score_and_return_evaluation_that_will_be_replaced, spans) + return [result for result in results if result is not None] diff --git a/ddtrace/llmobs/_llmobs.py b/ddtrace/llmobs/_llmobs.py index cbc17981799..f5b502f6df7 100644 --- a/ddtrace/llmobs/_llmobs.py +++ b/ddtrace/llmobs/_llmobs.py @@ -39,6 +39,7 @@ from ddtrace.llmobs._constants import SPAN_KIND from ddtrace.llmobs._constants import SPAN_START_WHILE_DISABLED_WARNING from ddtrace.llmobs._constants import TAGS +from ddtrace.llmobs._evaluations.ragas.faithfulness.evaluator import RagasFaithfulnessEvaluator from ddtrace.llmobs._trace_processor import LLMObsTraceProcessor from ddtrace.llmobs._utils import _get_llmobs_parent_id from ddtrace.llmobs._utils import _get_ml_app @@ -55,7 +56,6 @@ log = get_logger(__name__) - SUPPORTED_LLMOBS_INTEGRATIONS = { "anthropic": "anthropic", "bedrock": "botocore", @@ -85,7 +85,16 @@ def __init__(self, tracer=None): interval=float(os.getenv("_DD_LLMOBS_WRITER_INTERVAL", 1.0)), timeout=float(os.getenv("_DD_LLMOBS_WRITER_TIMEOUT", 5.0)), ) + self._trace_processor = LLMObsTraceProcessor(self._llmobs_span_writer) + self._ragas_evaluator = None + + if config._llmobs_ragas_faithfulness_enabled: + self._ragas_evaluator = RagasFaithfulnessEvaluator( + interval=1, _evaluation_metric_writer=self._llmobs_eval_metric_writer, _llmobs_instance=self + ) + self._trace_processor._register_ragas_evaluator(self._ragas_evaluator) + forksafe.register(self._child_after_fork) def _child_after_fork(self): @@ -99,6 +108,10 @@ def _child_after_fork(self): self._llmobs_span_writer.start() except ServiceStatusError: log.debug("Error starting LLMObs span writer after fork") + try: + self._ragas_evaluator.start() + except ServiceStatusError: + log.debug("Error starting LLMObs ragas evaluator after fork") def _start_service(self) -> None: tracer_filters = self.tracer._filters @@ -111,6 +124,11 @@ def _start_service(self) -> None: except ServiceStatusError: log.debug("Error starting LLMObs writers") + try: + self._ragas_evaluator.start() + except ServiceStatusError: + log.debug("Error starting LLMObs ragas evaluator") + def _stop_service(self) -> None: try: self._llmobs_span_writer.stop() @@ -118,6 +136,11 @@ def _stop_service(self) -> None: except ServiceStatusError: log.debug("Error stopping LLMObs writers") + try: + self._ragas_evaluator.stop() + except ServiceStatusError: + log.debug("Error stopping LLMObs ragas evaluator") + try: forksafe.unregister(self._child_after_fork) self.tracer.shutdown() diff --git a/ddtrace/llmobs/_trace_processor.py b/ddtrace/llmobs/_trace_processor.py index ea86841657e..2cb43bca7f7 100644 --- a/ddtrace/llmobs/_trace_processor.py +++ b/ddtrace/llmobs/_trace_processor.py @@ -45,6 +45,7 @@ class LLMObsTraceProcessor(TraceProcessor): def __init__(self, llmobs_span_writer): self._span_writer = llmobs_span_writer + self.ragas_evaluator = None def process_trace(self, trace: List[Span]) -> Optional[List[Span]]: if not trace: @@ -56,11 +57,19 @@ def process_trace(self, trace: List[Span]) -> Optional[List[Span]]: def submit_llmobs_span(self, span: Span) -> None: """Generate and submit an LLMObs span event to be sent to LLMObs.""" + span_event = None try: span_event = self._llmobs_span_event(span) self._span_writer.enqueue(span_event) except (KeyError, TypeError): log.error("Error generating LLMObs span event for span %s, likely due to malformed span", span) + finally: + if span_event and self._ragas_evaluator: + self._ragas_evaluator.enqueue(span_event) + + def _register_ragas_evaluator(self, ragas_evaluator): + """Register ragas evaluator""" + self._ragas_evaluator = ragas_evaluator def _llmobs_span_event(self, span: Span) -> Dict[str, Any]: """Span event object structure.""" @@ -110,6 +119,7 @@ def _llmobs_span_event(self, span: Span) -> Dict[str, Any]: "status": "error" if span.error else "ok", "meta": meta, "metrics": metrics, + "ml_app": ml_app, } session_id = _get_session_id(span) if session_id is not None: diff --git a/ddtrace/llmobs/utils.py b/ddtrace/llmobs/utils.py index c63a0e3f44c..80884070b13 100644 --- a/ddtrace/llmobs/utils.py +++ b/ddtrace/llmobs/utils.py @@ -1,7 +1,11 @@ from typing import Dict from typing import List +from typing import Optional from typing import Union +from pydantic import BaseModel +from pydantic import ConfigDict + # TypedDict was added to typing in python 3.8 try: @@ -21,6 +25,66 @@ Message = TypedDict("Message", {"content": str, "role": str}, total=False) +class ToolCall(BaseModel): + name: Optional[str] = "" + arguments: Optional[Dict] = {} + tool_id: Optional[str] = "" + type: Optional[str] = "" + + +class MetaIO(BaseModel): + value: Optional[str] = None + # (TODO()): lievan, let Messages and Documents inherit from BaseModel + documents: Optional[List[DocumentType]] = None + messages: Optional[List[Dict[str, str]]] = None + + +class SpanField(BaseModel): + kind: str = "" + + +class Error(BaseModel): + message: str = "" + stack: str = "" + + +class Meta(BaseModel): + # model_* is a protected namespace in pydantic, so we need to add this line to allow + # for model_* fields + model_config = ConfigDict(protected_namespaces=()) + span: SpanField = SpanField() + error: Error = Error() + input: MetaIO = MetaIO() + output: MetaIO = MetaIO() + metadata: Dict = {} + # (TODO()) lievan: validate model_* fields are only present on certain span types + model_name: str = "" + model_provider: str = "" + + +class LLMObsSpanContext(BaseModel): + span_id: str + trace_id: str + name: str + ml_app: str + meta: Meta = Meta() + session_id: str = "" + metrics: Dict[str, Union[int, float]] = {} + tags: List[str] = [] + + +class EvaluationMetric(BaseModel): + label: str + categorical_value: Optional[str] = "" + score_value: Optional[Union[int, float]] = None + metric_type: str + tags: Optional[List[str]] = [] + ml_app: str + timestamp_ms: int + span_id: str + trace_id: str + + class Messages: def __init__(self, messages: Union[List[Dict[str, str]], Dict[str, str], str]): self.messages = [] diff --git a/ddtrace/settings/config.py b/ddtrace/settings/config.py index b947e8d26c4..917296e4c32 100644 --- a/ddtrace/settings/config.py +++ b/ddtrace/settings/config.py @@ -569,6 +569,7 @@ def __init__(self): self._llmobs_sample_rate = float(os.getenv("DD_LLMOBS_SAMPLE_RATE", 1.0)) self._llmobs_ml_app = os.getenv("DD_LLMOBS_ML_APP") self._llmobs_agentless_enabled = asbool(os.getenv("DD_LLMOBS_AGENTLESS_ENABLED", False)) + self._llmobs_ragas_faithfulness_enabled = asbool(os.getenv("_DD_LLMOBS_RAGAS_FAITHFULNESS_ENABLED", False)) self._inject_force = asbool(os.getenv("DD_INJECT_FORCE", False)) self._lib_was_injected = False From 4b3d840d10d116180e81b537c0d8b0f5d47a9592 Mon Sep 17 00:00:00 2001 From: lievan Date: Fri, 13 Sep 2024 14:53:25 -0400 Subject: [PATCH 02/12] remove newline --- ddtrace/llmobs/_llmobs.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ddtrace/llmobs/_llmobs.py b/ddtrace/llmobs/_llmobs.py index f5b502f6df7..fca1bfce851 100644 --- a/ddtrace/llmobs/_llmobs.py +++ b/ddtrace/llmobs/_llmobs.py @@ -56,6 +56,7 @@ log = get_logger(__name__) + SUPPORTED_LLMOBS_INTEGRATIONS = { "anthropic": "anthropic", "bedrock": "botocore", From 7b9c929e46456c02a7589157818f09864bad683b Mon Sep 17 00:00:00 2001 From: lievan Date: Mon, 16 Sep 2024 08:55:53 -0400 Subject: [PATCH 03/12] pydantic v1 --- ddtrace/llmobs/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ddtrace/llmobs/utils.py b/ddtrace/llmobs/utils.py index 80884070b13..ca739df6305 100644 --- a/ddtrace/llmobs/utils.py +++ b/ddtrace/llmobs/utils.py @@ -3,8 +3,8 @@ from typing import Optional from typing import Union -from pydantic import BaseModel -from pydantic import ConfigDict +from pydantic.v1 import BaseModel +from pydantic.v1 import ConfigDict # TypedDict was added to typing in python 3.8 From 2e883a0775dd6c0fb9021fd6e33f9a546bab12c0 Mon Sep 17 00:00:00 2001 From: lievan Date: Mon, 16 Sep 2024 10:07:40 -0400 Subject: [PATCH 04/12] refactor into evaluator list --- ddtrace/llmobs/_llmobs.py | 36 ++++++++++++++++++------------ ddtrace/llmobs/_trace_processor.py | 13 +++++------ 2 files changed, 27 insertions(+), 22 deletions(-) diff --git a/ddtrace/llmobs/_llmobs.py b/ddtrace/llmobs/_llmobs.py index fca1bfce851..513aa7fb8b0 100644 --- a/ddtrace/llmobs/_llmobs.py +++ b/ddtrace/llmobs/_llmobs.py @@ -87,15 +87,16 @@ def __init__(self, tracer=None): timeout=float(os.getenv("_DD_LLMOBS_WRITER_TIMEOUT", 5.0)), ) - self._trace_processor = LLMObsTraceProcessor(self._llmobs_span_writer) - self._ragas_evaluator = None + self.evaluators = [] if config._llmobs_ragas_faithfulness_enabled: - self._ragas_evaluator = RagasFaithfulnessEvaluator( - interval=1, _evaluation_metric_writer=self._llmobs_eval_metric_writer, _llmobs_instance=self + self.evaluators.append( + RagasFaithfulnessEvaluator( + interval=1, _evaluation_metric_writer=self._llmobs_eval_metric_writer, _llmobs_instance=self + ) ) - self._trace_processor._register_ragas_evaluator(self._ragas_evaluator) + self._trace_processor = LLMObsTraceProcessor(self._llmobs_span_writer, self._ragas_evaluator) forksafe.register(self._child_after_fork) def _child_after_fork(self): @@ -110,7 +111,8 @@ def _child_after_fork(self): except ServiceStatusError: log.debug("Error starting LLMObs span writer after fork") try: - self._ragas_evaluator.start() + for evaluator in self.evaluators: + evaluator.start() except ServiceStatusError: log.debug("Error starting LLMObs ragas evaluator after fork") @@ -125,10 +127,13 @@ def _start_service(self) -> None: except ServiceStatusError: log.debug("Error starting LLMObs writers") - try: - self._ragas_evaluator.start() - except ServiceStatusError: - log.debug("Error starting LLMObs ragas evaluator") + for evaluator in self.evaluators: + try: + evaluator.start() + except ServiceStatusError: + log.debug( + "Error starting evaluator: %n", evaluator.name if hasattr(evaluator, "name") else str(evaluator) + ) def _stop_service(self) -> None: try: @@ -137,10 +142,13 @@ def _stop_service(self) -> None: except ServiceStatusError: log.debug("Error stopping LLMObs writers") - try: - self._ragas_evaluator.stop() - except ServiceStatusError: - log.debug("Error stopping LLMObs ragas evaluator") + for evaluator in self.evaluators: + try: + evaluator.stop() + except ServiceStatusError: + log.debug( + "Error stopping evaluator: %n", evaluator.name if hasattr(evaluator, "name") else str(evaluator) + ) try: forksafe.unregister(self._child_after_fork) diff --git a/ddtrace/llmobs/_trace_processor.py b/ddtrace/llmobs/_trace_processor.py index 2cb43bca7f7..fc972608b5e 100644 --- a/ddtrace/llmobs/_trace_processor.py +++ b/ddtrace/llmobs/_trace_processor.py @@ -43,9 +43,9 @@ class LLMObsTraceProcessor(TraceProcessor): Processor that extracts LLM-type spans in a trace to submit as separate LLMObs span events to LLM Observability. """ - def __init__(self, llmobs_span_writer): + def __init__(self, llmobs_span_writer, evaluators): self._span_writer = llmobs_span_writer - self.ragas_evaluator = None + self.evaluators = evaluators def process_trace(self, trace: List[Span]) -> Optional[List[Span]]: if not trace: @@ -64,12 +64,9 @@ def submit_llmobs_span(self, span: Span) -> None: except (KeyError, TypeError): log.error("Error generating LLMObs span event for span %s, likely due to malformed span", span) finally: - if span_event and self._ragas_evaluator: - self._ragas_evaluator.enqueue(span_event) - - def _register_ragas_evaluator(self, ragas_evaluator): - """Register ragas evaluator""" - self._ragas_evaluator = ragas_evaluator + if span_event: + for evaluator in self.evaluators: + evaluator.enqueue(span_event) def _llmobs_span_event(self, span: Span) -> Dict[str, Any]: """Span event object structure.""" From 7b314433c59ca08e3cfc7aa6a0c66c4e7cb23b73 Mon Sep 17 00:00:00 2001 From: lievan Date: Tue, 17 Sep 2024 12:35:41 -0400 Subject: [PATCH 05/12] add unit tests --- .../ragas/faithfulness/evaluator.py | 42 +++--- ddtrace/llmobs/_llmobs.py | 21 ++- ddtrace/llmobs/_trace_processor.py | 4 +- ddtrace/llmobs/utils.py | 64 --------- tests/llmobs/conftest.py | 29 ++++ ...est_llmobs_ragas_faithfulness_evaluator.py | 129 ++++++++++++++++++ tests/llmobs/test_llmobs_service.py | 47 +++++++ tests/utils.py | 1 + 8 files changed, 232 insertions(+), 105 deletions(-) create mode 100644 tests/llmobs/test_llmobs_ragas_faithfulness_evaluator.py diff --git a/ddtrace/llmobs/_evaluations/ragas/faithfulness/evaluator.py b/ddtrace/llmobs/_evaluations/ragas/faithfulness/evaluator.py index 85b7b1d41ae..a7a17d4fb65 100644 --- a/ddtrace/llmobs/_evaluations/ragas/faithfulness/evaluator.py +++ b/ddtrace/llmobs/_evaluations/ragas/faithfulness/evaluator.py @@ -3,7 +3,6 @@ import math import time from typing import Dict -from typing import List from typing import Optional from ddtrace import config @@ -11,8 +10,7 @@ from ddtrace.internal.logger import get_logger from ddtrace.internal.periodic import PeriodicService -from ....utils import EvaluationMetric -from ....utils import LLMObsSpanContext +from ...._writer import LLMObsEvaluationMetricEvent logger = get_logger(__name__) @@ -25,11 +23,10 @@ def __init__(self, interval: float, _evaluation_metric_writer=None, _llmobs_inst super(RagasFaithfulnessEvaluator, self).__init__(interval=interval) self.llmobs_instance = _llmobs_instance self._lock = forksafe.RLock() - self._buffer = [] # type: List[LLMObsSpanContext] + self._buffer = [] # type: list[Dict] self._buffer_limit = 1000 self._evaluation_metric_writer = _evaluation_metric_writer - self.spans = [] # type: List[LLMObsSpanContext] self.name = "ragas.faithfulness" @@ -48,17 +45,14 @@ def recreate(self): interval=self._interval, writer=self._llmobs_eval_metric_writer, llmobs_instance=self.llmobs_instance ) - def enqueue(self, raw_span_event: Dict) -> None: + def enqueue(self, span_event: Dict) -> None: with self._lock: if len(self._buffer) >= self._buffer_limit: logger.warning( "%r event buffer full (limit is %d), dropping event", self.__class__.__name__, self._buffer_limit ) return - try: - self._buffer.append(LLMObsSpanContext(**raw_span_event)) - except Exception as e: - logger.error("Failed to validate span event for eval", e) + self._buffer.append(span_event) def periodic(self) -> None: with self._lock: @@ -71,25 +65,21 @@ def periodic(self) -> None: evaluation_metrics = self.run(events) for metric in evaluation_metrics: if metric is not None: - self._evaluation_metric_writer.enqueue(metric.model_dump()) + self._evaluation_metric_writer.enqueue(metric) except RuntimeError as e: logger.debug("failed to run evaluation: %s", e) - def run(self, spans: List[LLMObsSpanContext]) -> List[EvaluationMetric]: - def dummy_score_and_return_evaluation_that_will_be_replaced(span) -> Optional[EvaluationMetric]: - try: - return EvaluationMetric( - label="dummy.ragas.faithfulness", - span_id=span.span_id, - trace_id=span.trace_id, - score_value=1, - ml_app=config._llmobs_ml_app, - timestamp_ms=math.floor(time.time() * 1000), - metric_type="score", - ) - except RuntimeError as e: - logger.error("Failed to run evaluation: %s", e) - return None + def run(self, spans): + def dummy_score_and_return_evaluation_that_will_be_replaced(span) -> Optional[LLMObsEvaluationMetricEvent]: + return LLMObsEvaluationMetricEvent( + span_id=span.get("span_id"), + trace_id=span.get("trace_id"), + score_value=1, + ml_app=config._llmobs_ml_app, + timestamp_ms=math.floor(time.time() * 1000), + metric_type="score", + label="dummy.ragas.faithfulness", + ) results = self.executor.map(dummy_score_and_return_evaluation_that_will_be_replaced, spans) return [result for result in results if result is not None] diff --git a/ddtrace/llmobs/_llmobs.py b/ddtrace/llmobs/_llmobs.py index 513aa7fb8b0..886a7593ecd 100644 --- a/ddtrace/llmobs/_llmobs.py +++ b/ddtrace/llmobs/_llmobs.py @@ -87,16 +87,17 @@ def __init__(self, tracer=None): timeout=float(os.getenv("_DD_LLMOBS_WRITER_TIMEOUT", 5.0)), ) - self.evaluators = [] - + self._evaluators = [] if config._llmobs_ragas_faithfulness_enabled: - self.evaluators.append( + self._evaluators.append( RagasFaithfulnessEvaluator( - interval=1, _evaluation_metric_writer=self._llmobs_eval_metric_writer, _llmobs_instance=self + interval=float(os.getenv("_DD_LLMOBS_EVALUATOR_RAGAS_FAITHFULNESS_INTERVAL", 1.0)), + _evaluation_metric_writer=self._llmobs_eval_metric_writer, + _llmobs_instance=self, ) ) - self._trace_processor = LLMObsTraceProcessor(self._llmobs_span_writer, self._ragas_evaluator) + self._trace_processor = LLMObsTraceProcessor(self._llmobs_span_writer, self._evaluators) forksafe.register(self._child_after_fork) def _child_after_fork(self): @@ -110,11 +111,6 @@ def _child_after_fork(self): self._llmobs_span_writer.start() except ServiceStatusError: log.debug("Error starting LLMObs span writer after fork") - try: - for evaluator in self.evaluators: - evaluator.start() - except ServiceStatusError: - log.debug("Error starting LLMObs ragas evaluator after fork") def _start_service(self) -> None: tracer_filters = self.tracer._filters @@ -127,7 +123,7 @@ def _start_service(self) -> None: except ServiceStatusError: log.debug("Error starting LLMObs writers") - for evaluator in self.evaluators: + for evaluator in self._evaluators: try: evaluator.start() except ServiceStatusError: @@ -142,7 +138,7 @@ def _stop_service(self) -> None: except ServiceStatusError: log.debug("Error stopping LLMObs writers") - for evaluator in self.evaluators: + for evaluator in self._evaluators: try: evaluator.stop() except ServiceStatusError: @@ -185,7 +181,6 @@ def enable( if os.getenv("DD_LLMOBS_ENABLED") and not asbool(os.getenv("DD_LLMOBS_ENABLED")): log.debug("LLMObs.enable() called when DD_LLMOBS_ENABLED is set to false or 0, not starting LLMObs service") - return # grab required values for LLMObs config._dd_site = site or config._dd_site diff --git a/ddtrace/llmobs/_trace_processor.py b/ddtrace/llmobs/_trace_processor.py index fc972608b5e..743b67c12d0 100644 --- a/ddtrace/llmobs/_trace_processor.py +++ b/ddtrace/llmobs/_trace_processor.py @@ -45,7 +45,7 @@ class LLMObsTraceProcessor(TraceProcessor): def __init__(self, llmobs_span_writer, evaluators): self._span_writer = llmobs_span_writer - self.evaluators = evaluators + self._evaluators = evaluators def process_trace(self, trace: List[Span]) -> Optional[List[Span]]: if not trace: @@ -65,7 +65,7 @@ def submit_llmobs_span(self, span: Span) -> None: log.error("Error generating LLMObs span event for span %s, likely due to malformed span", span) finally: if span_event: - for evaluator in self.evaluators: + for evaluator in self._evaluators: evaluator.enqueue(span_event) def _llmobs_span_event(self, span: Span) -> Dict[str, Any]: diff --git a/ddtrace/llmobs/utils.py b/ddtrace/llmobs/utils.py index ca739df6305..c63a0e3f44c 100644 --- a/ddtrace/llmobs/utils.py +++ b/ddtrace/llmobs/utils.py @@ -1,11 +1,7 @@ from typing import Dict from typing import List -from typing import Optional from typing import Union -from pydantic.v1 import BaseModel -from pydantic.v1 import ConfigDict - # TypedDict was added to typing in python 3.8 try: @@ -25,66 +21,6 @@ Message = TypedDict("Message", {"content": str, "role": str}, total=False) -class ToolCall(BaseModel): - name: Optional[str] = "" - arguments: Optional[Dict] = {} - tool_id: Optional[str] = "" - type: Optional[str] = "" - - -class MetaIO(BaseModel): - value: Optional[str] = None - # (TODO()): lievan, let Messages and Documents inherit from BaseModel - documents: Optional[List[DocumentType]] = None - messages: Optional[List[Dict[str, str]]] = None - - -class SpanField(BaseModel): - kind: str = "" - - -class Error(BaseModel): - message: str = "" - stack: str = "" - - -class Meta(BaseModel): - # model_* is a protected namespace in pydantic, so we need to add this line to allow - # for model_* fields - model_config = ConfigDict(protected_namespaces=()) - span: SpanField = SpanField() - error: Error = Error() - input: MetaIO = MetaIO() - output: MetaIO = MetaIO() - metadata: Dict = {} - # (TODO()) lievan: validate model_* fields are only present on certain span types - model_name: str = "" - model_provider: str = "" - - -class LLMObsSpanContext(BaseModel): - span_id: str - trace_id: str - name: str - ml_app: str - meta: Meta = Meta() - session_id: str = "" - metrics: Dict[str, Union[int, float]] = {} - tags: List[str] = [] - - -class EvaluationMetric(BaseModel): - label: str - categorical_value: Optional[str] = "" - score_value: Optional[Union[int, float]] = None - metric_type: str - tags: Optional[List[str]] = [] - ml_app: str - timestamp_ms: int - span_id: str - trace_id: str - - class Messages: def __init__(self, messages: Union[List[Dict[str, str]], Dict[str, str], str]): self.messages = [] diff --git a/tests/llmobs/conftest.py b/tests/llmobs/conftest.py index 66c16ae180f..3ac479eb090 100644 --- a/tests/llmobs/conftest.py +++ b/tests/llmobs/conftest.py @@ -58,6 +58,16 @@ def mock_llmobs_eval_metric_writer(): patcher.stop() +@pytest.fixture +def mock_llmobs_ragas_evaluator(): + patcher = mock.patch("ddtrace.llmobs._evaluations.ragas.faithfulness.evaluator.RagasFaithfulnessEvaluator") + RagasEvaluator = patcher.start() + m = mock.MagicMock() + RagasEvaluator.return_value = m + yield m + patcher.stop() + + @pytest.fixture def mock_http_writer_send_payload_response(): with mock.patch( @@ -88,6 +98,12 @@ def mock_writer_logs(): yield m +@pytest.fixture +def mock_evaluator_logs(): + with mock.patch("ddtrace.llmobs._evaluations.ragas.faithfulness.evaluator.logger") as m: + yield m + + @pytest.fixture def mock_http_writer_logs(): with mock.patch("ddtrace.internal.writer.writer.log") as m: @@ -125,3 +141,16 @@ def AgentlessLLMObs(mock_llmobs_span_agentless_writer, mock_llmobs_eval_metric_w llmobs_service.enable(_tracer=dummy_tracer) yield llmobs_service llmobs_service.disable() + + +@pytest.fixture +def LLMObsWithRagas(monkeypatch, mock_llmobs_span_writer, mock_llmobs_eval_metric_writer, ddtrace_global_config): + global_config = default_global_config() + global_config.update(ddtrace_global_config) + global_config.update(dict(_llmobs_ragas_faithfulness_enabled=True)) + monkeypatch.setenv("_DD_LLMOBS_EVALUATOR_RAGAS_FAITHFULNESS_INTERVAL", "0.1") + with override_global_config(global_config): + dummy_tracer = DummyTracer() + llmobs_service.enable(_tracer=dummy_tracer) + yield llmobs_service + llmobs_service.disable() diff --git a/tests/llmobs/test_llmobs_ragas_faithfulness_evaluator.py b/tests/llmobs/test_llmobs_ragas_faithfulness_evaluator.py new file mode 100644 index 00000000000..a0e801fcf52 --- /dev/null +++ b/tests/llmobs/test_llmobs_ragas_faithfulness_evaluator.py @@ -0,0 +1,129 @@ +import os +import time + +import mock +import pytest + +from ddtrace.llmobs._evaluations.ragas.faithfulness.evaluator import RagasFaithfulnessEvaluator +from ddtrace.llmobs._writer import LLMObsEvaluationMetricEvent + + +INTAKE_ENDPOINT = "https://api.datad0g.com/api/intake/llm-obs/v1/eval-metric" +DD_SITE = "datad0g.com" +dd_api_key = os.getenv("DD_API_KEY", default="") + + +def _categorical_metric_event(): + return { + "span_id": "12345678901", + "trace_id": "98765432101", + "metric_type": "categorical", + "categorical_value": "very", + "label": "toxicity", + "ml_app": "dummy-ml-app", + "timestamp_ms": round(time.time() * 1000), + } + + +def _score_metric_event(): + return { + "span_id": "12345678902", + "trace_id": "98765432102", + "metric_type": "score", + "label": "sentiment", + "score_value": 0.9, + "ml_app": "dummy-ml-app", + "timestamp_ms": round(time.time() * 1000), + } + + +def _dummy_ragas_eval_metric_event(span_id, trace_id): + return LLMObsEvaluationMetricEvent( + span_id=span_id, + trace_id=trace_id, + score_value=1, + ml_app="unnamed-ml-app", + timestamp_ms=mock.ANY, + metric_type="score", + label="dummy.ragas.faithfulness", + ) + + +def test_ragas_faithfulness_evaluator_start(mock_evaluator_logs): + ragas_faithfulness_evaluator = RagasFaithfulnessEvaluator( + interval=0.01, _evaluation_metric_writer=mock.MagicMock(), _llmobs_instance=mock.MagicMock() + ) + ragas_faithfulness_evaluator.start() + mock_evaluator_logs.debug.assert_has_calls([mock.call("started %r to %r", "RagasFaithfulnessEvaluator")]) + + +def test_ragas_faithfulness_evaluator_buffer_limit(mock_evaluator_logs): + ragas_faithfulness_evaluator = RagasFaithfulnessEvaluator( + interval=0.01, _evaluation_metric_writer=mock.MagicMock(), _llmobs_instance=mock.MagicMock() + ) + for _ in range(1001): + ragas_faithfulness_evaluator.enqueue({}) + mock_evaluator_logs.warning.assert_called_with( + "%r event buffer full (limit is %d), dropping event", "RagasFaithfulnessEvaluator", 1000 + ) + + +def test_ragas_faithfulness_evaluator_periodic_enqueues_eval_metric(LLMObs, mock_llmobs_eval_metric_writer): + ragas_faithfulness_evaluator = RagasFaithfulnessEvaluator( + interval=0.01, _evaluation_metric_writer=mock_llmobs_eval_metric_writer, _llmobs_instance=mock.MagicMock() + ) + ragas_faithfulness_evaluator.enqueue({"span_id": "123", "trace_id": "1234"}) + ragas_faithfulness_evaluator.periodic() + mock_llmobs_eval_metric_writer.enqueue.assert_called_once_with( + _dummy_ragas_eval_metric_event(span_id="123", trace_id="1234") + ) + + +@pytest.mark.vcr_logs +def test_ragas_faithfulness_evaluator_timed_enqueues_eval_metric(LLMObs, mock_llmobs_eval_metric_writer): + ragas_faithfulness_evaluator = RagasFaithfulnessEvaluator( + interval=0.01, _evaluation_metric_writer=mock_llmobs_eval_metric_writer, _llmobs_instance=mock.MagicMock() + ) + ragas_faithfulness_evaluator.start() + + ragas_faithfulness_evaluator.enqueue({"span_id": "123", "trace_id": "1234"}) + + time.sleep(0.1) + + mock_llmobs_eval_metric_writer.enqueue.assert_called_once_with( + _dummy_ragas_eval_metric_event(span_id="123", trace_id="1234") + ) + + +def test_ragas_evaluator_on_exit(mock_writer_logs, run_python_code_in_subprocess): + out, err, status, pid = run_python_code_in_subprocess( + """ +import os +import time +import mock + +from ddtrace.internal.utils.http import Response +from ddtrace.llmobs._writer import LLMObsEvalMetricWriter +from ddtrace.llmobs._evaluations.ragas.faithfulness.evaluator import RagasFaithfulnessEvaluator + +with mock.patch( + "ddtrace.internal.writer.HTTPWriter._send_payload", + return_value=Response( + status=200, + body="{}", + ), +): + llmobs_eval_metric_writer = LLMObsEvalMetricWriter( + site="datad0g.com", api_key=os.getenv("DD_API_KEY_STAGING"), interval=0.01, timeout=1 + ) + llmobs_eval_metric_writer.start() + ragas_faithfulness_evaluator = RagasFaithfulnessEvaluator( + interval=0.01, _evaluation_metric_writer=llmobs_eval_metric_writer, _llmobs_instance=mock.MagicMock() + ) + ragas_faithfulness_evaluator.start() + ragas_faithfulness_evaluator.enqueue({"span_id": "123", "trace_id": "1234"}) +""", + ) + assert status == 0, err + assert out == b"" + assert err == b"" diff --git a/tests/llmobs/test_llmobs_service.py b/tests/llmobs/test_llmobs_service.py index 4e605de8708..f80d2f9fc45 100644 --- a/tests/llmobs/test_llmobs_service.py +++ b/tests/llmobs/test_llmobs_service.py @@ -1,5 +1,6 @@ import json import os +import time import mock import pytest @@ -88,6 +89,27 @@ def test_service_enable_with_apm_disabled(monkeypatch): llmobs_service.disable() +def test_service_enable_with_ragas_evaluator_enabled(): + with override_global_config( + dict( + _dd_api_key="", + _llmobs_ml_app="", + _llmobs_ragas_faithfulness_enabled="true", + ) + ): + dummy_tracer = DummyTracer() + llmobs_service.enable(_tracer=dummy_tracer) + llmobs_instance = llmobs_service._instance + assert llmobs_instance is not None + assert llmobs_service.enabled + assert llmobs_instance.tracer == dummy_tracer + assert any(isinstance(tracer_filter, LLMObsTraceProcessor) for tracer_filter in dummy_tracer._filters) + assert run_llmobs_trace_filter(dummy_tracer) is not None + assert len(llmobs_service._instance._evaluators) == 1 + assert llmobs_service._instance._evaluators[0].status.value == "running" + llmobs_service.disable() + + def test_service_disable(): with override_global_config(dict(_dd_api_key="", _llmobs_ml_app="")): dummy_tracer = DummyTracer() @@ -98,6 +120,24 @@ def test_service_disable(): assert llmobs_service._instance._llmobs_span_writer.status.value == "stopped" +def test_service_disable_with_ragas_evaluator_enabled(): + with override_global_config( + dict( + _dd_api_key="", + _llmobs_ml_app="", + _llmobs_ragas_faithfulness_enabled="true", + ) + ): + dummy_tracer = DummyTracer() + llmobs_service.enable(_tracer=dummy_tracer) + llmobs_service.disable() + assert llmobs_service.enabled is False + assert llmobs_service._instance._llmobs_eval_metric_writer.status.value == "stopped" + assert llmobs_service._instance._llmobs_span_writer.status.value == "stopped" + assert len(llmobs_service._instance._evaluators) == 1 + assert llmobs_service._instance._evaluators[0].status.value == "stopped" + + def test_service_enable_no_api_key(): with override_global_config(dict(_dd_api_key="", _llmobs_ml_app="")): dummy_tracer = DummyTracer() @@ -1504,3 +1544,10 @@ def process_trace(self, trace): exit_code = os.WEXITSTATUS(status) assert exit_code == 12 llmobs_service.disable() + + +def test_llm_span_ragas_evaluator(LLMObsWithRagas): + with LLMObsWithRagas.llm(model_name="test_model"): + pass + time.sleep(0.1) + LLMObsWithRagas._instance._llmobs_eval_metric_writer.enqueue.call_count = 1 diff --git a/tests/utils.py b/tests/utils.py index 6c5edac85d7..a4434371349 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -156,6 +156,7 @@ def override_global_config(values): "_llmobs_sample_rate", "_llmobs_ml_app", "_llmobs_agentless_enabled", + "_llmobs_ragas_faithfulness_enabled", "_data_streams_enabled", ] From 13229bd92aa60bfe2adc43a91e15589e61e37ea2 Mon Sep 17 00:00:00 2001 From: lievan Date: Tue, 17 Sep 2024 12:49:48 -0400 Subject: [PATCH 06/12] fix expectde span event --- ddtrace/llmobs/_trace_processor.py | 4 ++-- tests/llmobs/_utils.py | 4 ++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/ddtrace/llmobs/_trace_processor.py b/ddtrace/llmobs/_trace_processor.py index 743b67c12d0..bd267494340 100644 --- a/ddtrace/llmobs/_trace_processor.py +++ b/ddtrace/llmobs/_trace_processor.py @@ -43,7 +43,7 @@ class LLMObsTraceProcessor(TraceProcessor): Processor that extracts LLM-type spans in a trace to submit as separate LLMObs span events to LLM Observability. """ - def __init__(self, llmobs_span_writer, evaluators): + def __init__(self, llmobs_span_writer, evaluators=None): self._span_writer = llmobs_span_writer self._evaluators = evaluators @@ -64,7 +64,7 @@ def submit_llmobs_span(self, span: Span) -> None: except (KeyError, TypeError): log.error("Error generating LLMObs span event for span %s, likely due to malformed span", span) finally: - if span_event: + if span_event and self._evaluators: for evaluator in self._evaluators: evaluator.enqueue(span_event) diff --git a/tests/llmobs/_utils.py b/tests/llmobs/_utils.py index 47d8891950e..3d61d7403ef 100644 --- a/tests/llmobs/_utils.py +++ b/tests/llmobs/_utils.py @@ -210,6 +210,10 @@ def _llmobs_base_span_event( span_event["meta"]["error.type"] = error span_event["meta"]["error.message"] = error_message span_event["meta"]["error.stack"] = error_stack + if tags: + span_event["ml_app"] = tags.get("ml_app", "unnamed-ml-app") + else: + span_event["ml_app"] = "unnamed-ml-app" return span_event From d290dcd38e0ad241092473d9deb4f992619fac23 Mon Sep 17 00:00:00 2001 From: lievan Date: Tue, 17 Sep 2024 13:18:59 -0400 Subject: [PATCH 07/12] remove config option, use only env var --- ddtrace/llmobs/_llmobs.py | 24 ++++++++++++++++-------- ddtrace/settings/config.py | 1 - tests/llmobs/conftest.py | 2 +- tests/llmobs/test_llmobs_service.py | 8 ++++---- tests/utils.py | 1 - 5 files changed, 21 insertions(+), 15 deletions(-) diff --git a/ddtrace/llmobs/_llmobs.py b/ddtrace/llmobs/_llmobs.py index 95aa1c2e532..14565a97653 100644 --- a/ddtrace/llmobs/_llmobs.py +++ b/ddtrace/llmobs/_llmobs.py @@ -65,6 +65,10 @@ "langchain": "langchain", } +SUPPORTED_RAGAS_EVALUATORS = { + "faithfulness": RagasFaithfulnessEvaluator, +} + class LLMObs(Service): _instance = None # type: LLMObs @@ -89,14 +93,18 @@ def __init__(self, tracer=None): ) self._evaluators = [] - if config._llmobs_ragas_faithfulness_enabled: - self._evaluators.append( - RagasFaithfulnessEvaluator( - interval=float(os.getenv("_DD_LLMOBS_EVALUATOR_RAGAS_FAITHFULNESS_INTERVAL", 1.0)), - _evaluation_metric_writer=self._llmobs_eval_metric_writer, - _llmobs_instance=self, - ) - ) + + if os.getenv("_DD_LLMOBS_EVALUATORS_RAGAS"): + ragas_evaluators = os.getenv("_DD_LLMOBS_EVALUATORS_RAGAS").split(",") + for evaluator in ragas_evaluators: + if evaluator in SUPPORTED_RAGAS_EVALUATORS: + self._evaluators.append( + SUPPORTED_RAGAS_EVALUATORS[evaluator]( + interval=float(os.getenv("_DD_LLMOBS_EVALUATOR_RAGAS_FAITHFULNESS_INTERVAL", 1.0)), + _evaluation_metric_writer=self._llmobs_eval_metric_writer, + _llmobs_instance=self, + ) + ) self._trace_processor = LLMObsTraceProcessor(self._llmobs_span_writer, self._evaluators) forksafe.register(self._child_after_fork) diff --git a/ddtrace/settings/config.py b/ddtrace/settings/config.py index 917296e4c32..b947e8d26c4 100644 --- a/ddtrace/settings/config.py +++ b/ddtrace/settings/config.py @@ -569,7 +569,6 @@ def __init__(self): self._llmobs_sample_rate = float(os.getenv("DD_LLMOBS_SAMPLE_RATE", 1.0)) self._llmobs_ml_app = os.getenv("DD_LLMOBS_ML_APP") self._llmobs_agentless_enabled = asbool(os.getenv("DD_LLMOBS_AGENTLESS_ENABLED", False)) - self._llmobs_ragas_faithfulness_enabled = asbool(os.getenv("_DD_LLMOBS_RAGAS_FAITHFULNESS_ENABLED", False)) self._inject_force = asbool(os.getenv("DD_INJECT_FORCE", False)) self._lib_was_injected = False diff --git a/tests/llmobs/conftest.py b/tests/llmobs/conftest.py index 3ac479eb090..81eb80a833a 100644 --- a/tests/llmobs/conftest.py +++ b/tests/llmobs/conftest.py @@ -147,7 +147,7 @@ def AgentlessLLMObs(mock_llmobs_span_agentless_writer, mock_llmobs_eval_metric_w def LLMObsWithRagas(monkeypatch, mock_llmobs_span_writer, mock_llmobs_eval_metric_writer, ddtrace_global_config): global_config = default_global_config() global_config.update(ddtrace_global_config) - global_config.update(dict(_llmobs_ragas_faithfulness_enabled=True)) + monkeypatch.setenv("_DD_LLMOBS_EVALUATORS_RAGAS", "faithfulness") monkeypatch.setenv("_DD_LLMOBS_EVALUATOR_RAGAS_FAITHFULNESS_INTERVAL", "0.1") with override_global_config(global_config): dummy_tracer = DummyTracer() diff --git a/tests/llmobs/test_llmobs_service.py b/tests/llmobs/test_llmobs_service.py index 0df5e97fbda..3fdeb9bc41e 100644 --- a/tests/llmobs/test_llmobs_service.py +++ b/tests/llmobs/test_llmobs_service.py @@ -89,14 +89,14 @@ def test_service_enable_with_apm_disabled(monkeypatch): llmobs_service.disable() -def test_service_enable_with_ragas_evaluator_enabled(): +def test_service_enable_with_ragas_evaluator_enabled(monkeypatch): with override_global_config( dict( _dd_api_key="", _llmobs_ml_app="", - _llmobs_ragas_faithfulness_enabled="true", ) ): + monkeypatch.setenv("_DD_LLMOBS_EVALUATORS_RAGAS", "faithfulness") dummy_tracer = DummyTracer() llmobs_service.enable(_tracer=dummy_tracer) llmobs_instance = llmobs_service._instance @@ -120,14 +120,14 @@ def test_service_disable(): assert llmobs_service._instance._llmobs_span_writer.status.value == "stopped" -def test_service_disable_with_ragas_evaluator_enabled(): +def test_service_disable_with_ragas_evaluator_enabled(monkeypatch): with override_global_config( dict( _dd_api_key="", _llmobs_ml_app="", - _llmobs_ragas_faithfulness_enabled="true", ) ): + monkeypatch.setenv("_DD_LLMOBS_EVALUATORS_RAGAS", "faithfulness") dummy_tracer = DummyTracer() llmobs_service.enable(_tracer=dummy_tracer) llmobs_service.disable() diff --git a/tests/utils.py b/tests/utils.py index a4434371349..6c5edac85d7 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -156,7 +156,6 @@ def override_global_config(values): "_llmobs_sample_rate", "_llmobs_ml_app", "_llmobs_agentless_enabled", - "_llmobs_ragas_faithfulness_enabled", "_data_streams_enabled", ] From 6e49cca7c60c61c7f085e8eb2e469bbee9fdda51 Mon Sep 17 00:00:00 2001 From: lievan Date: Tue, 17 Sep 2024 14:33:28 -0400 Subject: [PATCH 08/12] address comments --- .../ragas/faithfulness/evaluator.py | 27 +++++++++---------- ddtrace/llmobs/_trace_processor.py | 9 ++++--- 2 files changed, 17 insertions(+), 19 deletions(-) diff --git a/ddtrace/llmobs/_evaluations/ragas/faithfulness/evaluator.py b/ddtrace/llmobs/_evaluations/ragas/faithfulness/evaluator.py index a7a17d4fb65..63e0ef1f3bf 100644 --- a/ddtrace/llmobs/_evaluations/ragas/faithfulness/evaluator.py +++ b/ddtrace/llmobs/_evaluations/ragas/faithfulness/evaluator.py @@ -3,15 +3,12 @@ import math import time from typing import Dict -from typing import Optional from ddtrace import config from ddtrace.internal import forksafe from ddtrace.internal.logger import get_logger from ddtrace.internal.periodic import PeriodicService -from ...._writer import LLMObsEvaluationMetricEvent - logger = get_logger(__name__) @@ -19,6 +16,8 @@ class RagasFaithfulnessEvaluator(PeriodicService): """Base class for evaluating LLM Observability span events""" + name = "ragas.faithfulness" + def __init__(self, interval: float, _evaluation_metric_writer=None, _llmobs_instance=None): super(RagasFaithfulnessEvaluator, self).__init__(interval=interval) self.llmobs_instance = _llmobs_instance @@ -28,8 +27,6 @@ def __init__(self, interval: float, _evaluation_metric_writer=None, _llmobs_inst self._evaluation_metric_writer = _evaluation_metric_writer - self.name = "ragas.faithfulness" - self.executor = futures.ThreadPoolExecutor() def start(self, *args, **kwargs): @@ -70,16 +67,16 @@ def periodic(self) -> None: logger.debug("failed to run evaluation: %s", e) def run(self, spans): - def dummy_score_and_return_evaluation_that_will_be_replaced(span) -> Optional[LLMObsEvaluationMetricEvent]: - return LLMObsEvaluationMetricEvent( - span_id=span.get("span_id"), - trace_id=span.get("trace_id"), - score_value=1, - ml_app=config._llmobs_ml_app, - timestamp_ms=math.floor(time.time() * 1000), - metric_type="score", - label="dummy.ragas.faithfulness", - ) + def dummy_score_and_return_evaluation_that_will_be_replaced(span): + return { + "span_id": span.get("span_id"), + "trace_id": span.get("trace_id"), + "score_value": 1, + "ml_app": config._llmobs_ml_app, + "timestamp_ms": math.floor(time.time() * 1000), + "metric_type": "score", + "label": "dummy.ragas.faithfulness", + } results = self.executor.map(dummy_score_and_return_evaluation_that_will_be_replaced, spans) return [result for result in results if result is not None] diff --git a/ddtrace/llmobs/_trace_processor.py b/ddtrace/llmobs/_trace_processor.py index bd267494340..52cf30e6261 100644 --- a/ddtrace/llmobs/_trace_processor.py +++ b/ddtrace/llmobs/_trace_processor.py @@ -45,7 +45,7 @@ class LLMObsTraceProcessor(TraceProcessor): def __init__(self, llmobs_span_writer, evaluators=None): self._span_writer = llmobs_span_writer - self._evaluators = evaluators + self._evaluators = evaluators if evaluators is not None else [] def process_trace(self, trace: List[Span]) -> Optional[List[Span]]: if not trace: @@ -64,9 +64,10 @@ def submit_llmobs_span(self, span: Span) -> None: except (KeyError, TypeError): log.error("Error generating LLMObs span event for span %s, likely due to malformed span", span) finally: - if span_event and self._evaluators: - for evaluator in self._evaluators: - evaluator.enqueue(span_event) + if not span_event: + return + for evaluator in self._evaluators: + evaluator.enqueue(span_event) def _llmobs_span_event(self, span: Span) -> Dict[str, Any]: """Span event object structure.""" From fcf9991202878f9641fe5717ff455665aea5b15e Mon Sep 17 00:00:00 2001 From: lievan Date: Wed, 18 Sep 2024 09:37:34 -0400 Subject: [PATCH 09/12] refactor into one evaluator service --- .../llmobs/_evaluators/ragas/faithfulness.py | 16 ++++++ .../evaluator.py => _evaluators/runner.py} | 53 +++++++++-------- ddtrace/llmobs/_llmobs.py | 49 +++++----------- ddtrace/llmobs/_trace_processor.py | 8 +-- tests/llmobs/conftest.py | 8 +-- ...tor.py => test_llmobs_evaluator_runner.py} | 57 +++++++++---------- tests/llmobs/test_llmobs_service.py | 43 ++------------ 7 files changed, 100 insertions(+), 134 deletions(-) create mode 100644 ddtrace/llmobs/_evaluators/ragas/faithfulness.py rename ddtrace/llmobs/{_evaluations/ragas/faithfulness/evaluator.py => _evaluators/runner.py} (66%) rename tests/llmobs/{test_llmobs_ragas_faithfulness_evaluator.py => test_llmobs_evaluator_runner.py} (56%) diff --git a/ddtrace/llmobs/_evaluators/ragas/faithfulness.py b/ddtrace/llmobs/_evaluators/ragas/faithfulness.py new file mode 100644 index 00000000000..54c7af2dfaa --- /dev/null +++ b/ddtrace/llmobs/_evaluators/ragas/faithfulness.py @@ -0,0 +1,16 @@ +import math +import time + +from ddtrace import config + + +def dummy_run(span): + return { + "span_id": span.get("span_id"), + "trace_id": span.get("trace_id"), + "score_value": 1, + "ml_app": config._llmobs_ml_app, + "timestamp_ms": math.floor(time.time() * 1000), + "metric_type": "score", + "label": "dummy.ragas.faithfulness", + } diff --git a/ddtrace/llmobs/_evaluations/ragas/faithfulness/evaluator.py b/ddtrace/llmobs/_evaluators/runner.py similarity index 66% rename from ddtrace/llmobs/_evaluations/ragas/faithfulness/evaluator.py rename to ddtrace/llmobs/_evaluators/runner.py index 63e0ef1f3bf..cc1d74bb8c8 100644 --- a/ddtrace/llmobs/_evaluations/ragas/faithfulness/evaluator.py +++ b/ddtrace/llmobs/_evaluators/runner.py @@ -1,26 +1,27 @@ import atexit from concurrent import futures -import math -import time +import os from typing import Dict -from ddtrace import config from ddtrace.internal import forksafe from ddtrace.internal.logger import get_logger from ddtrace.internal.periodic import PeriodicService +from .ragas.faithfulness import dummy_run + logger = get_logger(__name__) +SUPPORTED_EVALUATORS = { + "ragas_faithfulness": dummy_run, +} -class RagasFaithfulnessEvaluator(PeriodicService): - """Base class for evaluating LLM Observability span events""" - name = "ragas.faithfulness" +class EvaluatorRunner(PeriodicService): + """Base class for evaluating LLM Observability span events""" - def __init__(self, interval: float, _evaluation_metric_writer=None, _llmobs_instance=None): - super(RagasFaithfulnessEvaluator, self).__init__(interval=interval) - self.llmobs_instance = _llmobs_instance + def __init__(self, interval: float, _evaluation_metric_writer=None): + super(EvaluatorRunner, self).__init__(interval=interval) self._lock = forksafe.RLock() self._buffer = [] # type: list[Dict] self._buffer_limit = 1000 @@ -28,9 +29,17 @@ def __init__(self, interval: float, _evaluation_metric_writer=None, _llmobs_inst self._evaluation_metric_writer = _evaluation_metric_writer self.executor = futures.ThreadPoolExecutor() + self.evaluators = [] + + evaluator_str = os.getenv("_DD_LLMOBS_EVALUATORS") + if evaluator_str is not None: + evaluators = evaluator_str.split(",") + for evaluator in evaluators: + if evaluator in SUPPORTED_EVALUATORS: + self.evaluators.append(SUPPORTED_EVALUATORS[evaluator]) def start(self, *args, **kwargs): - super(RagasFaithfulnessEvaluator, self).start() + super(EvaluatorRunner, self).start() logger.debug("started %r to %r", self.__class__.__name__) atexit.register(self.on_shutdown) @@ -67,16 +76,14 @@ def periodic(self) -> None: logger.debug("failed to run evaluation: %s", e) def run(self, spans): - def dummy_score_and_return_evaluation_that_will_be_replaced(span): - return { - "span_id": span.get("span_id"), - "trace_id": span.get("trace_id"), - "score_value": 1, - "ml_app": config._llmobs_ml_app, - "timestamp_ms": math.floor(time.time() * 1000), - "metric_type": "score", - "label": "dummy.ragas.faithfulness", - } - - results = self.executor.map(dummy_score_and_return_evaluation_that_will_be_replaced, spans) - return [result for result in results if result is not None] + batches_of_results = [] + + for evaluator in self.evaluators: + batches_of_results.append(self.executor.map(evaluator, spans)) + + results = [] + for batch in batches_of_results: + for result in batch: + results.append(result) + + return results diff --git a/ddtrace/llmobs/_llmobs.py b/ddtrace/llmobs/_llmobs.py index 14565a97653..804d721e1ba 100644 --- a/ddtrace/llmobs/_llmobs.py +++ b/ddtrace/llmobs/_llmobs.py @@ -39,7 +39,7 @@ from ddtrace.llmobs._constants import SPAN_KIND from ddtrace.llmobs._constants import SPAN_START_WHILE_DISABLED_WARNING from ddtrace.llmobs._constants import TAGS -from ddtrace.llmobs._evaluations.ragas.faithfulness.evaluator import RagasFaithfulnessEvaluator +from ddtrace.llmobs._evaluators.runner import EvaluatorRunner from ddtrace.llmobs._trace_processor import LLMObsTraceProcessor from ddtrace.llmobs._utils import AnnotationContext from ddtrace.llmobs._utils import _get_llmobs_parent_id @@ -65,10 +65,6 @@ "langchain": "langchain", } -SUPPORTED_RAGAS_EVALUATORS = { - "faithfulness": RagasFaithfulnessEvaluator, -} - class LLMObs(Service): _instance = None # type: LLMObs @@ -92,21 +88,12 @@ def __init__(self, tracer=None): timeout=float(os.getenv("_DD_LLMOBS_WRITER_TIMEOUT", 5.0)), ) - self._evaluators = [] - - if os.getenv("_DD_LLMOBS_EVALUATORS_RAGAS"): - ragas_evaluators = os.getenv("_DD_LLMOBS_EVALUATORS_RAGAS").split(",") - for evaluator in ragas_evaluators: - if evaluator in SUPPORTED_RAGAS_EVALUATORS: - self._evaluators.append( - SUPPORTED_RAGAS_EVALUATORS[evaluator]( - interval=float(os.getenv("_DD_LLMOBS_EVALUATOR_RAGAS_FAITHFULNESS_INTERVAL", 1.0)), - _evaluation_metric_writer=self._llmobs_eval_metric_writer, - _llmobs_instance=self, - ) - ) - - self._trace_processor = LLMObsTraceProcessor(self._llmobs_span_writer, self._evaluators) + self._evaluator_runner = EvaluatorRunner( + interval=float(os.getenv("_DD_LLMOBS_EVALUATOR_INTERVAL", 1.0)), + _evaluation_metric_writer=self._llmobs_eval_metric_writer, + ) + + self._trace_processor = LLMObsTraceProcessor(self._llmobs_span_writer, self._evaluator_runner) forksafe.register(self._child_after_fork) def _child_after_fork(self): @@ -132,13 +119,10 @@ def _start_service(self) -> None: except ServiceStatusError: log.debug("Error starting LLMObs writers") - for evaluator in self._evaluators: - try: - evaluator.start() - except ServiceStatusError: - log.debug( - "Error starting evaluator: %n", evaluator.name if hasattr(evaluator, "name") else str(evaluator) - ) + try: + self._evaluator_runner.start() + except ServiceStatusError: + log.debug("Error starting evaluator runner") def _stop_service(self) -> None: try: @@ -147,13 +131,10 @@ def _stop_service(self) -> None: except ServiceStatusError: log.debug("Error stopping LLMObs writers") - for evaluator in self._evaluators: - try: - evaluator.stop() - except ServiceStatusError: - log.debug( - "Error stopping evaluator: %n", evaluator.name if hasattr(evaluator, "name") else str(evaluator) - ) + try: + self._evaluator_runner.stop() + except ServiceStatusError: + log.debug("Error stopping evaluator runner") try: forksafe.unregister(self._child_after_fork) diff --git a/ddtrace/llmobs/_trace_processor.py b/ddtrace/llmobs/_trace_processor.py index 52cf30e6261..c4017364c7f 100644 --- a/ddtrace/llmobs/_trace_processor.py +++ b/ddtrace/llmobs/_trace_processor.py @@ -43,9 +43,9 @@ class LLMObsTraceProcessor(TraceProcessor): Processor that extracts LLM-type spans in a trace to submit as separate LLMObs span events to LLM Observability. """ - def __init__(self, llmobs_span_writer, evaluators=None): + def __init__(self, llmobs_span_writer, evaluator_runner=None): self._span_writer = llmobs_span_writer - self._evaluators = evaluators if evaluators is not None else [] + self._evaluator_runner = evaluator_runner def process_trace(self, trace: List[Span]) -> Optional[List[Span]]: if not trace: @@ -66,8 +66,8 @@ def submit_llmobs_span(self, span: Span) -> None: finally: if not span_event: return - for evaluator in self._evaluators: - evaluator.enqueue(span_event) + if self._evaluator_runner: + self._evaluator_runner.enqueue(span_event) def _llmobs_span_event(self, span: Span) -> Dict[str, Any]: """Span event object structure.""" diff --git a/tests/llmobs/conftest.py b/tests/llmobs/conftest.py index 81eb80a833a..48d34215e41 100644 --- a/tests/llmobs/conftest.py +++ b/tests/llmobs/conftest.py @@ -60,7 +60,7 @@ def mock_llmobs_eval_metric_writer(): @pytest.fixture def mock_llmobs_ragas_evaluator(): - patcher = mock.patch("ddtrace.llmobs._evaluations.ragas.faithfulness.evaluator.RagasFaithfulnessEvaluator") + patcher = mock.patch("ddtrace.llmobs._evaluations.ragas.faithfulness.evaluator.run") RagasEvaluator = patcher.start() m = mock.MagicMock() RagasEvaluator.return_value = m @@ -100,7 +100,7 @@ def mock_writer_logs(): @pytest.fixture def mock_evaluator_logs(): - with mock.patch("ddtrace.llmobs._evaluations.ragas.faithfulness.evaluator.logger") as m: + with mock.patch("ddtrace.llmobs._evaluators.runner.logger") as m: yield m @@ -147,8 +147,8 @@ def AgentlessLLMObs(mock_llmobs_span_agentless_writer, mock_llmobs_eval_metric_w def LLMObsWithRagas(monkeypatch, mock_llmobs_span_writer, mock_llmobs_eval_metric_writer, ddtrace_global_config): global_config = default_global_config() global_config.update(ddtrace_global_config) - monkeypatch.setenv("_DD_LLMOBS_EVALUATORS_RAGAS", "faithfulness") - monkeypatch.setenv("_DD_LLMOBS_EVALUATOR_RAGAS_FAITHFULNESS_INTERVAL", "0.1") + monkeypatch.setenv("_DD_LLMOBS_EVALUATORS", "ragas_faithfulness") + monkeypatch.setenv("_DD_LLMOBS_EVALUATOR_INTERVAL", "0.1") with override_global_config(global_config): dummy_tracer = DummyTracer() llmobs_service.enable(_tracer=dummy_tracer) diff --git a/tests/llmobs/test_llmobs_ragas_faithfulness_evaluator.py b/tests/llmobs/test_llmobs_evaluator_runner.py similarity index 56% rename from tests/llmobs/test_llmobs_ragas_faithfulness_evaluator.py rename to tests/llmobs/test_llmobs_evaluator_runner.py index a0e801fcf52..83ef2b7ff26 100644 --- a/tests/llmobs/test_llmobs_ragas_faithfulness_evaluator.py +++ b/tests/llmobs/test_llmobs_evaluator_runner.py @@ -4,7 +4,8 @@ import mock import pytest -from ddtrace.llmobs._evaluations.ragas.faithfulness.evaluator import RagasFaithfulnessEvaluator +from ddtrace.llmobs._evaluators.ragas.faithfulness import dummy_run +from ddtrace.llmobs._evaluators.runner import EvaluatorRunner from ddtrace.llmobs._writer import LLMObsEvaluationMetricEvent @@ -49,31 +50,26 @@ def _dummy_ragas_eval_metric_event(span_id, trace_id): ) -def test_ragas_faithfulness_evaluator_start(mock_evaluator_logs): - ragas_faithfulness_evaluator = RagasFaithfulnessEvaluator( - interval=0.01, _evaluation_metric_writer=mock.MagicMock(), _llmobs_instance=mock.MagicMock() - ) - ragas_faithfulness_evaluator.start() - mock_evaluator_logs.debug.assert_has_calls([mock.call("started %r to %r", "RagasFaithfulnessEvaluator")]) +def test_evaluator_runner_start(mock_evaluator_logs): + evaluator_runner = EvaluatorRunner(interval=0.01, _evaluation_metric_writer=mock.MagicMock()) + evaluator_runner.start() + mock_evaluator_logs.debug.assert_has_calls([mock.call("started %r to %r", "EvaluatorRunner")]) -def test_ragas_faithfulness_evaluator_buffer_limit(mock_evaluator_logs): - ragas_faithfulness_evaluator = RagasFaithfulnessEvaluator( - interval=0.01, _evaluation_metric_writer=mock.MagicMock(), _llmobs_instance=mock.MagicMock() - ) +def test_evaluator_runner_buffer_limit(mock_evaluator_logs): + evaluator_runner = EvaluatorRunner(interval=0.01, _evaluation_metric_writer=mock.MagicMock()) for _ in range(1001): - ragas_faithfulness_evaluator.enqueue({}) + evaluator_runner.enqueue({}) mock_evaluator_logs.warning.assert_called_with( - "%r event buffer full (limit is %d), dropping event", "RagasFaithfulnessEvaluator", 1000 + "%r event buffer full (limit is %d), dropping event", "EvaluatorRunner", 1000 ) -def test_ragas_faithfulness_evaluator_periodic_enqueues_eval_metric(LLMObs, mock_llmobs_eval_metric_writer): - ragas_faithfulness_evaluator = RagasFaithfulnessEvaluator( - interval=0.01, _evaluation_metric_writer=mock_llmobs_eval_metric_writer, _llmobs_instance=mock.MagicMock() - ) - ragas_faithfulness_evaluator.enqueue({"span_id": "123", "trace_id": "1234"}) - ragas_faithfulness_evaluator.periodic() +def test_evaluator_runner_periodic_enqueues_eval_metric(LLMObs, mock_llmobs_eval_metric_writer): + evaluator_runner = EvaluatorRunner(interval=0.01, _evaluation_metric_writer=mock_llmobs_eval_metric_writer) + evaluator_runner.evaluators.append(dummy_run) + evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}) + evaluator_runner.periodic() mock_llmobs_eval_metric_writer.enqueue.assert_called_once_with( _dummy_ragas_eval_metric_event(span_id="123", trace_id="1234") ) @@ -81,12 +77,11 @@ def test_ragas_faithfulness_evaluator_periodic_enqueues_eval_metric(LLMObs, mock @pytest.mark.vcr_logs def test_ragas_faithfulness_evaluator_timed_enqueues_eval_metric(LLMObs, mock_llmobs_eval_metric_writer): - ragas_faithfulness_evaluator = RagasFaithfulnessEvaluator( - interval=0.01, _evaluation_metric_writer=mock_llmobs_eval_metric_writer, _llmobs_instance=mock.MagicMock() - ) - ragas_faithfulness_evaluator.start() + evaluator_runner = EvaluatorRunner(interval=0.01, _evaluation_metric_writer=mock_llmobs_eval_metric_writer) + evaluator_runner.evaluators.append(dummy_run) + evaluator_runner.start() - ragas_faithfulness_evaluator.enqueue({"span_id": "123", "trace_id": "1234"}) + evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}) time.sleep(0.1) @@ -95,7 +90,7 @@ def test_ragas_faithfulness_evaluator_timed_enqueues_eval_metric(LLMObs, mock_ll ) -def test_ragas_evaluator_on_exit(mock_writer_logs, run_python_code_in_subprocess): +def test_evaluator_runner_on_exit(mock_writer_logs, run_python_code_in_subprocess): out, err, status, pid = run_python_code_in_subprocess( """ import os @@ -104,7 +99,8 @@ def test_ragas_evaluator_on_exit(mock_writer_logs, run_python_code_in_subprocess from ddtrace.internal.utils.http import Response from ddtrace.llmobs._writer import LLMObsEvalMetricWriter -from ddtrace.llmobs._evaluations.ragas.faithfulness.evaluator import RagasFaithfulnessEvaluator +from ddtrace.llmobs._evaluators.runner import EvaluatorRunner +from ddtrace.llmobs._evaluators.ragas.faithfulness import dummy_run with mock.patch( "ddtrace.internal.writer.HTTPWriter._send_payload", @@ -117,11 +113,12 @@ def test_ragas_evaluator_on_exit(mock_writer_logs, run_python_code_in_subprocess site="datad0g.com", api_key=os.getenv("DD_API_KEY_STAGING"), interval=0.01, timeout=1 ) llmobs_eval_metric_writer.start() - ragas_faithfulness_evaluator = RagasFaithfulnessEvaluator( - interval=0.01, _evaluation_metric_writer=llmobs_eval_metric_writer, _llmobs_instance=mock.MagicMock() + evaluator_runner = EvaluatorRunner( + interval=0.01, _evaluation_metric_writer=llmobs_eval_metric_writer ) - ragas_faithfulness_evaluator.start() - ragas_faithfulness_evaluator.enqueue({"span_id": "123", "trace_id": "1234"}) + evaluator_runner.evaluators.append(dummy_run) + evaluator_runner.start() + evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}) """, ) assert status == 0, err diff --git a/tests/llmobs/test_llmobs_service.py b/tests/llmobs/test_llmobs_service.py index 3fdeb9bc41e..5ddb82cfc65 100644 --- a/tests/llmobs/test_llmobs_service.py +++ b/tests/llmobs/test_llmobs_service.py @@ -89,27 +89,6 @@ def test_service_enable_with_apm_disabled(monkeypatch): llmobs_service.disable() -def test_service_enable_with_ragas_evaluator_enabled(monkeypatch): - with override_global_config( - dict( - _dd_api_key="", - _llmobs_ml_app="", - ) - ): - monkeypatch.setenv("_DD_LLMOBS_EVALUATORS_RAGAS", "faithfulness") - dummy_tracer = DummyTracer() - llmobs_service.enable(_tracer=dummy_tracer) - llmobs_instance = llmobs_service._instance - assert llmobs_instance is not None - assert llmobs_service.enabled - assert llmobs_instance.tracer == dummy_tracer - assert any(isinstance(tracer_filter, LLMObsTraceProcessor) for tracer_filter in dummy_tracer._filters) - assert run_llmobs_trace_filter(dummy_tracer) is not None - assert len(llmobs_service._instance._evaluators) == 1 - assert llmobs_service._instance._evaluators[0].status.value == "running" - llmobs_service.disable() - - def test_service_disable(): with override_global_config(dict(_dd_api_key="", _llmobs_ml_app="")): dummy_tracer = DummyTracer() @@ -118,24 +97,7 @@ def test_service_disable(): assert llmobs_service.enabled is False assert llmobs_service._instance._llmobs_eval_metric_writer.status.value == "stopped" assert llmobs_service._instance._llmobs_span_writer.status.value == "stopped" - - -def test_service_disable_with_ragas_evaluator_enabled(monkeypatch): - with override_global_config( - dict( - _dd_api_key="", - _llmobs_ml_app="", - ) - ): - monkeypatch.setenv("_DD_LLMOBS_EVALUATORS_RAGAS", "faithfulness") - dummy_tracer = DummyTracer() - llmobs_service.enable(_tracer=dummy_tracer) - llmobs_service.disable() - assert llmobs_service.enabled is False - assert llmobs_service._instance._llmobs_eval_metric_writer.status.value == "stopped" - assert llmobs_service._instance._llmobs_span_writer.status.value == "stopped" - assert len(llmobs_service._instance._evaluators) == 1 - assert llmobs_service._instance._evaluators[0].status.value == "stopped" + assert llmobs_service._instance._evaluator_runner.status.value == "stopped" def test_service_enable_no_api_key(): @@ -146,6 +108,7 @@ def test_service_enable_no_api_key(): assert llmobs_service.enabled is False assert llmobs_service._instance._llmobs_eval_metric_writer.status.value == "stopped" assert llmobs_service._instance._llmobs_span_writer.status.value == "stopped" + assert llmobs_service._instance._evaluator_runner.status.value == "stopped" def test_service_enable_no_ml_app_specified(): @@ -156,6 +119,7 @@ def test_service_enable_no_ml_app_specified(): assert llmobs_service.enabled is False assert llmobs_service._instance._llmobs_eval_metric_writer.status.value == "stopped" assert llmobs_service._instance._llmobs_span_writer.status.value == "stopped" + assert llmobs_service._instance._evaluator_runner.status.value == "stopped" def test_service_enable_deprecated_ml_app_name(monkeypatch, mock_logs): @@ -166,6 +130,7 @@ def test_service_enable_deprecated_ml_app_name(monkeypatch, mock_logs): assert llmobs_service.enabled is True assert llmobs_service._instance._llmobs_eval_metric_writer.status.value == "running" assert llmobs_service._instance._llmobs_span_writer.status.value == "running" + assert llmobs_service._instance._evaluator_runner.status.value == "running" mock_logs.warning.assert_called_once_with("`DD_LLMOBS_APP_NAME` is deprecated. Use `DD_LLMOBS_ML_APP` instead.") llmobs_service.disable() From 10b276f0967e16250b6cfb640952f4a015365841 Mon Sep 17 00:00:00 2001 From: lievan Date: Wed, 18 Sep 2024 11:10:49 -0400 Subject: [PATCH 10/12] dont cancel futures --- ddtrace/llmobs/_evaluators/runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddtrace/llmobs/_evaluators/runner.py b/ddtrace/llmobs/_evaluators/runner.py index cc1d74bb8c8..90e72d76db8 100644 --- a/ddtrace/llmobs/_evaluators/runner.py +++ b/ddtrace/llmobs/_evaluators/runner.py @@ -44,7 +44,7 @@ def start(self, *args, **kwargs): atexit.register(self.on_shutdown) def on_shutdown(self): - self.executor.shutdown(cancel_futures=True) + self.executor.shutdown() def recreate(self): return self.__class__( From b6fa4e0496a2e383e30252b9f81bf88a0c66b602 Mon Sep 17 00:00:00 2001 From: lievan Date: Thu, 19 Sep 2024 08:46:38 -0400 Subject: [PATCH 11/12] refactor dummy faithfulness into class --- .../llmobs/_evaluators/ragas/faithfulness.py | 25 +++++++++++-------- ddtrace/llmobs/_evaluators/runner.py | 11 +++----- tests/llmobs/test_llmobs_evaluator_runner.py | 12 ++++----- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/ddtrace/llmobs/_evaluators/ragas/faithfulness.py b/ddtrace/llmobs/_evaluators/ragas/faithfulness.py index 54c7af2dfaa..515493f891c 100644 --- a/ddtrace/llmobs/_evaluators/ragas/faithfulness.py +++ b/ddtrace/llmobs/_evaluators/ragas/faithfulness.py @@ -4,13 +4,18 @@ from ddtrace import config -def dummy_run(span): - return { - "span_id": span.get("span_id"), - "trace_id": span.get("trace_id"), - "score_value": 1, - "ml_app": config._llmobs_ml_app, - "timestamp_ms": math.floor(time.time() * 1000), - "metric_type": "score", - "label": "dummy.ragas.faithfulness", - } +class RagasFaithfulnessEvaluator: + name = "ragas_faithfulness" + metric_type = "score" + + @classmethod + def evaluate(cls, span): + return { + "span_id": span.get("span_id"), + "trace_id": span.get("trace_id"), + "score_value": 1, + "ml_app": config._llmobs_ml_app, + "timestamp_ms": math.floor(time.time() * 1000), + "metric_type": cls.metric_type, + "label": cls.name, + } diff --git a/ddtrace/llmobs/_evaluators/runner.py b/ddtrace/llmobs/_evaluators/runner.py index 90e72d76db8..91d07568c9f 100644 --- a/ddtrace/llmobs/_evaluators/runner.py +++ b/ddtrace/llmobs/_evaluators/runner.py @@ -7,13 +7,13 @@ from ddtrace.internal.logger import get_logger from ddtrace.internal.periodic import PeriodicService -from .ragas.faithfulness import dummy_run +from .ragas.faithfulness import RagasFaithfulnessEvaluator logger = get_logger(__name__) SUPPORTED_EVALUATORS = { - "ragas_faithfulness": dummy_run, + "ragas_faithfulness": RagasFaithfulnessEvaluator, } @@ -46,11 +46,6 @@ def start(self, *args, **kwargs): def on_shutdown(self): self.executor.shutdown() - def recreate(self): - return self.__class__( - interval=self._interval, writer=self._llmobs_eval_metric_writer, llmobs_instance=self.llmobs_instance - ) - def enqueue(self, span_event: Dict) -> None: with self._lock: if len(self._buffer) >= self._buffer_limit: @@ -79,7 +74,7 @@ def run(self, spans): batches_of_results = [] for evaluator in self.evaluators: - batches_of_results.append(self.executor.map(evaluator, spans)) + batches_of_results.append(self.executor.map(evaluator.evaluate, spans)) results = [] for batch in batches_of_results: diff --git a/tests/llmobs/test_llmobs_evaluator_runner.py b/tests/llmobs/test_llmobs_evaluator_runner.py index 83ef2b7ff26..603a2cb1505 100644 --- a/tests/llmobs/test_llmobs_evaluator_runner.py +++ b/tests/llmobs/test_llmobs_evaluator_runner.py @@ -4,7 +4,7 @@ import mock import pytest -from ddtrace.llmobs._evaluators.ragas.faithfulness import dummy_run +from ddtrace.llmobs._evaluators.ragas.faithfulness import RagasFaithfulnessEvaluator from ddtrace.llmobs._evaluators.runner import EvaluatorRunner from ddtrace.llmobs._writer import LLMObsEvaluationMetricEvent @@ -46,7 +46,7 @@ def _dummy_ragas_eval_metric_event(span_id, trace_id): ml_app="unnamed-ml-app", timestamp_ms=mock.ANY, metric_type="score", - label="dummy.ragas.faithfulness", + label="ragas_faithfulness", ) @@ -67,7 +67,7 @@ def test_evaluator_runner_buffer_limit(mock_evaluator_logs): def test_evaluator_runner_periodic_enqueues_eval_metric(LLMObs, mock_llmobs_eval_metric_writer): evaluator_runner = EvaluatorRunner(interval=0.01, _evaluation_metric_writer=mock_llmobs_eval_metric_writer) - evaluator_runner.evaluators.append(dummy_run) + evaluator_runner.evaluators.append(RagasFaithfulnessEvaluator) evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}) evaluator_runner.periodic() mock_llmobs_eval_metric_writer.enqueue.assert_called_once_with( @@ -78,7 +78,7 @@ def test_evaluator_runner_periodic_enqueues_eval_metric(LLMObs, mock_llmobs_eval @pytest.mark.vcr_logs def test_ragas_faithfulness_evaluator_timed_enqueues_eval_metric(LLMObs, mock_llmobs_eval_metric_writer): evaluator_runner = EvaluatorRunner(interval=0.01, _evaluation_metric_writer=mock_llmobs_eval_metric_writer) - evaluator_runner.evaluators.append(dummy_run) + evaluator_runner.evaluators.append(RagasFaithfulnessEvaluator) evaluator_runner.start() evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}) @@ -100,7 +100,7 @@ def test_evaluator_runner_on_exit(mock_writer_logs, run_python_code_in_subproces from ddtrace.internal.utils.http import Response from ddtrace.llmobs._writer import LLMObsEvalMetricWriter from ddtrace.llmobs._evaluators.runner import EvaluatorRunner -from ddtrace.llmobs._evaluators.ragas.faithfulness import dummy_run +from ddtrace.llmobs._evaluators.ragas.faithfulness import RagasFaithfulnessEvaluator with mock.patch( "ddtrace.internal.writer.HTTPWriter._send_payload", @@ -116,7 +116,7 @@ def test_evaluator_runner_on_exit(mock_writer_logs, run_python_code_in_subproces evaluator_runner = EvaluatorRunner( interval=0.01, _evaluation_metric_writer=llmobs_eval_metric_writer ) - evaluator_runner.evaluators.append(dummy_run) + evaluator_runner.evaluators.append(RagasFaithfulnessEvaluator) evaluator_runner.start() evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}) """, From be893a13fa307533f9f203b3427aa66d4aabec29 Mon Sep 17 00:00:00 2001 From: lievan Date: Thu, 19 Sep 2024 10:39:41 -0400 Subject: [PATCH 12/12] rename field to label --- ddtrace/llmobs/_evaluators/ragas/faithfulness.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ddtrace/llmobs/_evaluators/ragas/faithfulness.py b/ddtrace/llmobs/_evaluators/ragas/faithfulness.py index 515493f891c..3be0700c1c1 100644 --- a/ddtrace/llmobs/_evaluators/ragas/faithfulness.py +++ b/ddtrace/llmobs/_evaluators/ragas/faithfulness.py @@ -5,7 +5,7 @@ class RagasFaithfulnessEvaluator: - name = "ragas_faithfulness" + label = "ragas_faithfulness" metric_type = "score" @classmethod @@ -17,5 +17,5 @@ def evaluate(cls, span): "ml_app": config._llmobs_ml_app, "timestamp_ms": math.floor(time.time() * 1000), "metric_type": cls.metric_type, - "label": cls.name, + "label": cls.label, }