Skip to content

Commit

Permalink
Merge branch '2.13' into backport-10681-to-2.13
Browse files Browse the repository at this point in the history
  • Loading branch information
sabrenner authored Sep 20, 2024
2 parents 9220ba1 + 4efcaa4 commit ab7fb99
Show file tree
Hide file tree
Showing 20 changed files with 511 additions and 265 deletions.
10 changes: 4 additions & 6 deletions ddtrace/contrib/internal/anthropic/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import json
from typing import Any
from typing import Optional

from ddtrace.internal.logger import get_logger
from ddtrace.llmobs._integrations.anthropic import _get_attr
from ddtrace.llmobs._utils import _unserializable_default_repr
from ddtrace.llmobs._utils import safe_json


log = get_logger(__name__)
Expand Down Expand Up @@ -39,7 +38,7 @@ def tag_tool_use_input_on_span(integration, span, chat_input, message_idx, block
)
span.set_tag_str(
"anthropic.request.messages.%d.content.%d.tool_call.input" % (message_idx, block_idx),
integration.trunc(json.dumps(_get_attr(chat_input, "input", {}), default=_unserializable_default_repr)),
integration.trunc(safe_json(_get_attr(chat_input, "input", {}))),
)


Expand Down Expand Up @@ -80,8 +79,7 @@ def tag_tool_use_output_on_span(integration, span, chat_completion, idx):
span.set_tag_str("anthropic.response.completions.content.%d.tool_call.name" % idx, str(tool_name))
if tool_inputs:
span.set_tag_str(
"anthropic.response.completions.content.%d.tool_call.input" % idx,
integration.trunc(json.dumps(tool_inputs, default=_unserializable_default_repr)),
"anthropic.response.completions.content.%d.tool_call.input" % idx, integration.trunc(safe_json(tool_inputs))
)


Expand All @@ -92,7 +90,7 @@ def tag_params_on_span(span, kwargs, integration):
span.set_tag_str("anthropic.request.system", integration.trunc(str(v)))
elif k not in ("messages", "model"):
tagged_params[k] = v
span.set_tag_str("anthropic.request.parameters", json.dumps(tagged_params, default=_unserializable_default_repr))
span.set_tag_str("anthropic.request.parameters", safe_json(tagged_params))


def _extract_api_key(instance: Any) -> Optional[str]:
Expand Down
22 changes: 13 additions & 9 deletions ddtrace/contrib/internal/langchain/patch.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
import os
import sys
from typing import Any
Expand Down Expand Up @@ -60,6 +59,7 @@
from ddtrace.internal.utils.formats import deep_getattr
from ddtrace.internal.utils.version import parse_version
from ddtrace.llmobs._integrations import LangChainIntegration
from ddtrace.llmobs._utils import safe_json
from ddtrace.pin import Pin


Expand Down Expand Up @@ -467,9 +467,11 @@ def traced_chat_model_generate(langchain, pin, func, instance, args, kwargs):
"messages": [
[
{
"content": message.get("content", "")
if isinstance(message, dict)
else str(getattr(message, "content", "")),
"content": (
message.get("content", "")
if isinstance(message, dict)
else str(getattr(message, "content", ""))
),
"message_type": message.__class__.__name__,
}
for message in messages
Expand Down Expand Up @@ -597,9 +599,11 @@ async def traced_chat_model_agenerate(langchain, pin, func, instance, args, kwar
"messages": [
[
{
"content": message.get("content", "")
if isinstance(message, dict)
else str(getattr(message, "content", "")),
"content": (
message.get("content", "")
if isinstance(message, dict)
else str(getattr(message, "content", ""))
),
"message_type": message.__class__.__name__,
}
for message in messages
Expand Down Expand Up @@ -1004,7 +1008,7 @@ def traced_base_tool_invoke(langchain, pin, func, instance, args, kwargs):
if tool_input and integration.is_pc_sampled_span(span):
span.set_tag_str("langchain.request.input", integration.trunc(str(tool_input)))
if config:
span.set_tag_str("langchain.request.config", json.dumps(config))
span.set_tag_str("langchain.request.config", safe_json(config))

tool_output = func(*args, **kwargs)
if tool_output and integration.is_pc_sampled_span(span):
Expand Down Expand Up @@ -1046,7 +1050,7 @@ async def traced_base_tool_ainvoke(langchain, pin, func, instance, args, kwargs)
if tool_input and integration.is_pc_sampled_span(span):
span.set_tag_str("langchain.request.input", integration.trunc(str(tool_input)))
if config:
span.set_tag_str("langchain.request.config", json.dumps(config))
span.set_tag_str("langchain.request.config", safe_json(config))

tool_output = await func(*args, **kwargs)
if tool_output and integration.is_pc_sampled_span(span):
Expand Down
17 changes: 8 additions & 9 deletions ddtrace/llmobs/_integrations/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
from ddtrace.llmobs._constants import OUTPUT_TOKENS_METRIC_KEY
from ddtrace.llmobs._constants import SPAN_KIND
from ddtrace.llmobs._constants import TOTAL_TOKENS_METRIC_KEY

from .base import BaseLLMIntegration
from ddtrace.llmobs._integrations.base import BaseLLMIntegration
from ddtrace.llmobs._utils import safe_json


log = get_logger(__name__)
Expand Down Expand Up @@ -70,18 +70,17 @@ def llmobs_set_tags(

span.set_tag_str(SPAN_KIND, "llm")
span.set_tag_str(MODEL_NAME, span.get_tag("anthropic.request.model") or "")
span.set_tag_str(INPUT_MESSAGES, json.dumps(input_messages))
span.set_tag_str(METADATA, json.dumps(parameters))
span.set_tag_str(INPUT_MESSAGES, safe_json(input_messages))
span.set_tag_str(METADATA, safe_json(parameters))
span.set_tag_str(MODEL_PROVIDER, "anthropic")
if err or resp is None:
span.set_tag_str(OUTPUT_MESSAGES, json.dumps([{"content": ""}]))
span.set_tag_str(OUTPUT_MESSAGES, safe_json([{"content": ""}]))
else:
output_messages = self._extract_output_message(resp)
span.set_tag_str(OUTPUT_MESSAGES, json.dumps(output_messages))

span.set_tag_str(OUTPUT_MESSAGES, safe_json(output_messages))
usage = self._get_llmobs_metrics_tags(span)
if usage != {}:
span.set_tag_str(METRICS, json.dumps(usage))
if usage:
span.set_tag_str(METRICS, safe_json(usage))

def _extract_input_message(self, messages, system_prompt=None):
"""Extract input messages from the stored prompt.
Expand Down
9 changes: 5 additions & 4 deletions ddtrace/llmobs/_integrations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,18 @@ def __init__(self, integration_config: IntegrationConfig) -> None:

@property
def metrics_enabled(self) -> bool:
"""Return whether submitting metrics is enabled for this integration, or global config if not set."""
env_metrics_enabled = asbool(os.getenv("DD_{}_METRICS_ENABLED".format(self._integration_name.upper())))
if not env_metrics_enabled and config._llmobs_agentless_enabled:
"""
Return whether submitting metrics is enabled for this integration. Agentless mode disables submitting metrics.
"""
if config._llmobs_agentless_enabled:
return False
if hasattr(self.integration_config, "metrics_enabled"):
return asbool(self.integration_config.metrics_enabled)
return False

@property
def logs_enabled(self) -> bool:
"""Return whether submitting logs is enabled for this integration, or global config if not set."""
"""Return whether submitting logs is enabled for this integration."""
if hasattr(self.integration_config, "logs_enabled"):
return asbool(self.integration_config.logs_enabled)
return False
Expand Down
13 changes: 7 additions & 6 deletions ddtrace/llmobs/_integrations/bedrock.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
from typing import Any
from typing import Dict
from typing import Optional
Expand All @@ -19,6 +18,7 @@
from ddtrace.llmobs._constants import TOTAL_TOKENS_METRIC_KEY
from ddtrace.llmobs._integrations import BaseLLMIntegration
from ddtrace.llmobs._utils import _get_llmobs_parent_id
from ddtrace.llmobs._utils import safe_json


log = get_logger(__name__)
Expand Down Expand Up @@ -50,14 +50,15 @@ def llmobs_set_tags(
span.set_tag_str(SPAN_KIND, "llm")
span.set_tag_str(MODEL_NAME, span.get_tag("bedrock.request.model") or "")
span.set_tag_str(MODEL_PROVIDER, span.get_tag("bedrock.request.model_provider") or "")
span.set_tag_str(INPUT_MESSAGES, json.dumps(input_messages))
span.set_tag_str(METADATA, json.dumps(parameters))
span.set_tag_str(INPUT_MESSAGES, safe_json(input_messages))
span.set_tag_str(METADATA, safe_json(parameters))
if err or formatted_response is None:
span.set_tag_str(OUTPUT_MESSAGES, json.dumps([{"content": ""}]))
span.set_tag_str(OUTPUT_MESSAGES, safe_json([{"content": ""}]))
else:
output_messages = self._extract_output_message(formatted_response)
span.set_tag_str(OUTPUT_MESSAGES, json.dumps(output_messages))
span.set_tag_str(METRICS, json.dumps(self._llmobs_metrics(span, formatted_response)))
span.set_tag_str(OUTPUT_MESSAGES, safe_json(output_messages))
metrics = self._llmobs_metrics(span, formatted_response)
span.set_tag_str(METRICS, safe_json(metrics))

@staticmethod
def _llmobs_metrics(span: Span, formatted_response: Optional[Dict[str, Any]]) -> Dict[str, Any]:
Expand Down
93 changes: 32 additions & 61 deletions ddtrace/llmobs/_integrations/langchain.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
from typing import Any
from typing import Dict
from typing import List
Expand All @@ -21,9 +20,9 @@
from ddtrace.llmobs._constants import OUTPUT_MESSAGES
from ddtrace.llmobs._constants import OUTPUT_VALUE
from ddtrace.llmobs._constants import SPAN_KIND

from ..utils import Document
from .base import BaseLLMIntegration
from ddtrace.llmobs._integrations.base import BaseLLMIntegration
from ddtrace.llmobs._utils import safe_json
from ddtrace.llmobs.utils import Document


log = get_logger(__name__)
Expand Down Expand Up @@ -92,7 +91,7 @@ def llmobs_set_tags(
self._llmobs_set_meta_tags_from_embedding(span, inputs, response, error, is_workflow=is_workflow)
elif operation == "retrieval":
self._llmobs_set_meta_tags_from_similarity_search(span, inputs, response, error, is_workflow=is_workflow)
span.set_tag_str(METRICS, json.dumps({}))
span.set_tag_str(METRICS, safe_json({}))

def _llmobs_set_metadata(self, span: Span, model_provider: Optional[str] = None) -> None:
if not model_provider:
Expand All @@ -113,7 +112,7 @@ def _llmobs_set_metadata(self, span: Span, model_provider: Optional[str] = None)
if max_tokens is not None and max_tokens != "None":
metadata["max_tokens"] = int(max_tokens)
if metadata:
span.set_tag_str(METADATA, json.dumps(metadata))
span.set_tag_str(METADATA, safe_json(metadata))

def _llmobs_set_meta_tags_from_llm(
self, span: Span, prompts: List[Any], completions: Any, err: bool = False, is_workflow: bool = False
Expand All @@ -128,12 +127,12 @@ def _llmobs_set_meta_tags_from_llm(
if isinstance(prompts, str):
prompts = [prompts]

span.set_tag_str(input_tag_key, json.dumps([{"content": str(prompt)} for prompt in prompts]))
span.set_tag_str(input_tag_key, safe_json([{"content": str(prompt)} for prompt in prompts]))

message_content = [{"content": ""}]
if not err:
message_content = [{"content": completion[0].text} for completion in completions.generations]
span.set_tag_str(output_tag_key, json.dumps(message_content))
span.set_tag_str(output_tag_key, safe_json(message_content))

def _llmobs_set_meta_tags_from_chat_model(
self,
Expand All @@ -160,7 +159,7 @@ def _llmobs_set_meta_tags_from_chat_model(
"role": getattr(message, "role", ROLE_MAPPING.get(message.type, "")),
}
)
span.set_tag_str(input_tag_key, json.dumps(input_messages))
span.set_tag_str(input_tag_key, safe_json(input_messages))

output_messages = [{"content": ""}]
if not err:
Expand All @@ -173,13 +172,11 @@ def _llmobs_set_meta_tags_from_chat_model(
"content": str(chat_completion.text),
"role": role,
}

tool_calls_info = self._extract_tool_calls(chat_completion_msg)
if tool_calls_info:
output_message["tool_calls"] = tool_calls_info

output_messages.append(output_message)
span.set_tag_str(output_tag_key, json.dumps(output_messages))
span.set_tag_str(output_tag_key, safe_json(output_messages))

def _extract_tool_calls(self, chat_completion_msg: Any) -> List[Dict[str, Any]]:
"""Extracts tool calls from a langchain chat completion."""
Expand Down Expand Up @@ -207,25 +204,13 @@ def _llmobs_set_meta_tags_from_chain(
span.set_tag_str(SPAN_KIND, "workflow")

if inputs is not None:
try:
formatted_inputs = self.format_io(inputs)
if isinstance(formatted_inputs, str):
span.set_tag_str(INPUT_VALUE, formatted_inputs)
else:
span.set_tag_str(INPUT_VALUE, json.dumps(self.format_io(inputs)))
except TypeError:
log.warning("Failed to serialize chain input data to JSON")
formatted_inputs = self.format_io(inputs)
span.set_tag_str(INPUT_VALUE, safe_json(formatted_inputs))
if error:
span.set_tag_str(OUTPUT_VALUE, "")
elif outputs is not None:
try:
formatted_outputs = self.format_io(outputs)
if isinstance(formatted_outputs, str):
span.set_tag_str(OUTPUT_VALUE, formatted_outputs)
else:
span.set_tag_str(OUTPUT_VALUE, json.dumps(self.format_io(outputs)))
except TypeError:
log.warning("Failed to serialize chain output data to JSON")
formatted_outputs = self.format_io(outputs)
span.set_tag_str(OUTPUT_VALUE, safe_json(formatted_outputs))

def _llmobs_set_meta_tags_from_embedding(
self,
Expand All @@ -250,17 +235,12 @@ def _llmobs_set_meta_tags_from_embedding(
):
if is_workflow:
formatted_inputs = self.format_io(input_texts)
formatted_str = (
formatted_inputs
if isinstance(formatted_inputs, str)
else json.dumps(self.format_io(input_texts))
)
span.set_tag_str(input_tag_key, formatted_str)
span.set_tag_str(input_tag_key, safe_json(formatted_inputs))
else:
if isinstance(input_texts, str):
input_texts = [input_texts]
input_documents = [Document(text=str(doc)) for doc in input_texts]
span.set_tag_str(input_tag_key, json.dumps(input_documents))
span.set_tag_str(input_tag_key, safe_json(input_documents))
except TypeError:
log.warning("Failed to serialize embedding input data to JSON")
if error:
Expand Down Expand Up @@ -296,33 +276,24 @@ def _llmobs_set_meta_tags_from_similarity_search(
span.set_tag_str(MODEL_PROVIDER, span.get_tag(PROVIDER) or "")

if input_query is not None:
try:
formatted_inputs = self.format_io(input_query)
if isinstance(formatted_inputs, str):
span.set_tag_str(INPUT_VALUE, formatted_inputs)
else:
span.set_tag_str(INPUT_VALUE, json.dumps(formatted_inputs))
except TypeError:
log.warning("Failed to serialize similarity search input to JSON")
if error:
formatted_inputs = self.format_io(input_query)
span.set_tag_str(INPUT_VALUE, safe_json(formatted_inputs))
if error or output_documents is None:
span.set_tag_str(OUTPUT_VALUE, "")
elif isinstance(output_documents, list):
if is_workflow:
span.set_tag_str(OUTPUT_VALUE, "[{} document(s) retrieved]".format(len(output_documents)))
else:
documents = []
for d in output_documents:
doc = Document(text=d.page_content)
doc["id"] = getattr(d, "id", "")
metadata = getattr(d, "metadata", {})
doc["name"] = metadata.get("name", doc["id"])
documents.append(doc)
try:
span.set_tag_str(OUTPUT_DOCUMENTS, json.dumps(self.format_io(documents)))
# we set the value as well to ensure that the UI would display it in case the span was the root
span.set_tag_str(OUTPUT_VALUE, "[{} document(s) retrieved]".format(len(documents)))
except TypeError:
log.warning("Failed to serialize similarity output documents to JSON")
return
if is_workflow:
span.set_tag_str(OUTPUT_VALUE, "[{} document(s) retrieved]".format(len(output_documents)))
return
documents = []
for d in output_documents:
doc = Document(text=d.page_content)
doc["id"] = getattr(d, "id", "")
metadata = getattr(d, "metadata", {})
doc["name"] = metadata.get("name", doc["id"])
documents.append(doc)
span.set_tag_str(OUTPUT_DOCUMENTS, safe_json(self.format_io(documents)))
# we set the value as well to ensure that the UI would display it in case the span was the root
span.set_tag_str(OUTPUT_VALUE, "[{} document(s) retrieved]".format(len(documents)))

def _set_base_span_tags( # type: ignore[override]
self,
Expand Down
Loading

0 comments on commit ab7fb99

Please sign in to comment.