Skip to content

Commit

Permalink
Merge branch 'main' into zachg/fix_kinesis_none_return_
Browse files Browse the repository at this point in the history
  • Loading branch information
ZStriker19 authored Sep 19, 2024
2 parents 8759c3a + 9922df5 commit ab100eb
Show file tree
Hide file tree
Showing 20 changed files with 424 additions and 289 deletions.
37 changes: 37 additions & 0 deletions ddtrace/_trace/_span_pointer.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
from enum import Enum
from hashlib import sha256
import random
from typing import Any
from typing import Dict
from typing import NamedTuple
from typing import Optional

from ddtrace._trace._span_link import SpanLink
from ddtrace._trace._span_link import SpanLinkKind
from ddtrace.internal.logger import get_logger


log = get_logger(__name__)


_SPAN_POINTER_SPAN_LINK_TRACE_ID = 0
Expand Down Expand Up @@ -53,3 +59,34 @@ def __init__(
def __post_init__(self):
# Do not want to do the trace_id and span_id checks that SpanLink does.
pass


_STANDARD_HASHING_FUNCTION_FAILURE_PREFIX = "HashingFailure"


def _standard_hashing_function(*elements: bytes) -> str:
try:
if not elements:
raise ValueError("elements must not be empty")

# Please see the tests for more details about this logic.
return sha256(b"|".join(elements)).hexdigest()[:32]

except Exception as e:
log.warning(
"failed to generate standard hash for span pointer: %s",
str(e),
)
return _add_random_suffix(
prefix=_STANDARD_HASHING_FUNCTION_FAILURE_PREFIX,
minimum_length=32,
)


def _add_random_suffix(*, prefix: str, minimum_length: int) -> str:
if len(prefix) >= minimum_length:
return prefix

suffix = "".join(random.choice("0123456789abcdef") for _ in range(minimum_length - len(prefix))) # nosec

return prefix + suffix
10 changes: 4 additions & 6 deletions ddtrace/contrib/internal/anthropic/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import json
from typing import Any
from typing import Optional

from ddtrace.internal.logger import get_logger
from ddtrace.llmobs._utils import _get_attr
from ddtrace.llmobs._utils import _unserializable_default_repr
from ddtrace.llmobs._utils import safe_json


log = get_logger(__name__)
Expand Down Expand Up @@ -39,7 +38,7 @@ def tag_tool_use_input_on_span(integration, span, chat_input, message_idx, block
)
span.set_tag_str(
"anthropic.request.messages.%d.content.%d.tool_call.input" % (message_idx, block_idx),
integration.trunc(json.dumps(_get_attr(chat_input, "input", {}), default=_unserializable_default_repr)),
integration.trunc(safe_json(_get_attr(chat_input, "input", {}))),
)


Expand Down Expand Up @@ -80,8 +79,7 @@ def tag_tool_use_output_on_span(integration, span, chat_completion, idx):
span.set_tag_str("anthropic.response.completions.content.%d.tool_call.name" % idx, str(tool_name))
if tool_inputs:
span.set_tag_str(
"anthropic.response.completions.content.%d.tool_call.input" % idx,
integration.trunc(json.dumps(tool_inputs, default=_unserializable_default_repr)),
"anthropic.response.completions.content.%d.tool_call.input" % idx, integration.trunc(safe_json(tool_inputs))
)


Expand All @@ -92,7 +90,7 @@ def tag_params_on_span(span, kwargs, integration):
span.set_tag_str("anthropic.request.system", integration.trunc(str(v)))
elif k not in ("messages", "model"):
tagged_params[k] = v
span.set_tag_str("anthropic.request.parameters", json.dumps(tagged_params, default=_unserializable_default_repr))
span.set_tag_str("anthropic.request.parameters", safe_json(tagged_params))


def _extract_api_key(instance: Any) -> Optional[str]:
Expand Down
22 changes: 13 additions & 9 deletions ddtrace/contrib/internal/langchain/patch.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
import os
import sys
from typing import Any
Expand Down Expand Up @@ -60,6 +59,7 @@
from ddtrace.internal.utils.formats import deep_getattr
from ddtrace.internal.utils.version import parse_version
from ddtrace.llmobs._integrations import LangChainIntegration
from ddtrace.llmobs._utils import safe_json
from ddtrace.pin import Pin


Expand Down Expand Up @@ -467,9 +467,11 @@ def traced_chat_model_generate(langchain, pin, func, instance, args, kwargs):
"messages": [
[
{
"content": message.get("content", "")
if isinstance(message, dict)
else str(getattr(message, "content", "")),
"content": (
message.get("content", "")
if isinstance(message, dict)
else str(getattr(message, "content", ""))
),
"message_type": message.__class__.__name__,
}
for message in messages
Expand Down Expand Up @@ -597,9 +599,11 @@ async def traced_chat_model_agenerate(langchain, pin, func, instance, args, kwar
"messages": [
[
{
"content": message.get("content", "")
if isinstance(message, dict)
else str(getattr(message, "content", "")),
"content": (
message.get("content", "")
if isinstance(message, dict)
else str(getattr(message, "content", ""))
),
"message_type": message.__class__.__name__,
}
for message in messages
Expand Down Expand Up @@ -1011,7 +1015,7 @@ def traced_base_tool_invoke(langchain, pin, func, instance, args, kwargs):
if tool_input and integration.is_pc_sampled_span(span):
span.set_tag_str("langchain.request.input", integration.trunc(str(tool_input)))
if config:
span.set_tag_str("langchain.request.config", json.dumps(config))
span.set_tag_str("langchain.request.config", safe_json(config))

tool_output = func(*args, **kwargs)
if tool_output and integration.is_pc_sampled_span(span):
Expand Down Expand Up @@ -1072,7 +1076,7 @@ async def traced_base_tool_ainvoke(langchain, pin, func, instance, args, kwargs)
if tool_input and integration.is_pc_sampled_span(span):
span.set_tag_str("langchain.request.input", integration.trunc(str(tool_input)))
if config:
span.set_tag_str("langchain.request.config", json.dumps(config))
span.set_tag_str("langchain.request.config", safe_json(config))

tool_output = await func(*args, **kwargs)
if tool_output and integration.is_pc_sampled_span(span):
Expand Down
17 changes: 8 additions & 9 deletions ddtrace/llmobs/_integrations/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
from ddtrace.llmobs._constants import OUTPUT_TOKENS_METRIC_KEY
from ddtrace.llmobs._constants import SPAN_KIND
from ddtrace.llmobs._constants import TOTAL_TOKENS_METRIC_KEY
from ddtrace.llmobs._integrations.base import BaseLLMIntegration
from ddtrace.llmobs._utils import _get_attr

from .base import BaseLLMIntegration
from ddtrace.llmobs._utils import safe_json


log = get_logger(__name__)
Expand Down Expand Up @@ -71,18 +71,17 @@ def llmobs_set_tags(

span.set_tag_str(SPAN_KIND, "llm")
span.set_tag_str(MODEL_NAME, span.get_tag("anthropic.request.model") or "")
span.set_tag_str(INPUT_MESSAGES, json.dumps(input_messages))
span.set_tag_str(METADATA, json.dumps(parameters))
span.set_tag_str(INPUT_MESSAGES, safe_json(input_messages))
span.set_tag_str(METADATA, safe_json(parameters))
span.set_tag_str(MODEL_PROVIDER, "anthropic")
if err or resp is None:
span.set_tag_str(OUTPUT_MESSAGES, json.dumps([{"content": ""}]))
span.set_tag_str(OUTPUT_MESSAGES, safe_json([{"content": ""}]))
else:
output_messages = self._extract_output_message(resp)
span.set_tag_str(OUTPUT_MESSAGES, json.dumps(output_messages))

span.set_tag_str(OUTPUT_MESSAGES, safe_json(output_messages))
usage = self._get_llmobs_metrics_tags(span)
if usage != {}:
span.set_tag_str(METRICS, json.dumps(usage))
if usage:
span.set_tag_str(METRICS, safe_json(usage))

def _extract_input_message(self, messages, system_prompt=None):
"""Extract input messages from the stored prompt.
Expand Down
13 changes: 7 additions & 6 deletions ddtrace/llmobs/_integrations/bedrock.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
from typing import Any
from typing import Dict
from typing import Optional
Expand All @@ -19,6 +18,7 @@
from ddtrace.llmobs._constants import TOTAL_TOKENS_METRIC_KEY
from ddtrace.llmobs._integrations import BaseLLMIntegration
from ddtrace.llmobs._utils import _get_llmobs_parent_id
from ddtrace.llmobs._utils import safe_json


log = get_logger(__name__)
Expand Down Expand Up @@ -50,14 +50,15 @@ def llmobs_set_tags(
span.set_tag_str(SPAN_KIND, "llm")
span.set_tag_str(MODEL_NAME, span.get_tag("bedrock.request.model") or "")
span.set_tag_str(MODEL_PROVIDER, span.get_tag("bedrock.request.model_provider") or "")
span.set_tag_str(INPUT_MESSAGES, json.dumps(input_messages))
span.set_tag_str(METADATA, json.dumps(parameters))
span.set_tag_str(INPUT_MESSAGES, safe_json(input_messages))
span.set_tag_str(METADATA, safe_json(parameters))
if err or formatted_response is None:
span.set_tag_str(OUTPUT_MESSAGES, json.dumps([{"content": ""}]))
span.set_tag_str(OUTPUT_MESSAGES, safe_json([{"content": ""}]))
else:
output_messages = self._extract_output_message(formatted_response)
span.set_tag_str(OUTPUT_MESSAGES, json.dumps(output_messages))
span.set_tag_str(METRICS, json.dumps(self._llmobs_metrics(span, formatted_response)))
span.set_tag_str(OUTPUT_MESSAGES, safe_json(output_messages))
metrics = self._llmobs_metrics(span, formatted_response)
span.set_tag_str(METRICS, safe_json(metrics))

@staticmethod
def _llmobs_metrics(span: Span, formatted_response: Optional[Dict[str, Any]]) -> Dict[str, Any]:
Expand Down
13 changes: 6 additions & 7 deletions ddtrace/llmobs/_integrations/gemini.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
from typing import Any
from typing import Dict
from typing import Iterable
Expand All @@ -19,7 +18,7 @@
from ddtrace.llmobs._constants import TOTAL_TOKENS_METRIC_KEY
from ddtrace.llmobs._integrations.base import BaseLLMIntegration
from ddtrace.llmobs._utils import _get_attr
from ddtrace.llmobs._utils import _unserializable_default_repr
from ddtrace.llmobs._utils import safe_json


class GeminiIntegration(BaseLLMIntegration):
Expand All @@ -44,22 +43,22 @@ def llmobs_set_tags(
span.set_tag_str(MODEL_PROVIDER, span.get_tag("google_generativeai.request.provider") or "")

metadata = self._llmobs_set_metadata(kwargs, instance)
span.set_tag_str(METADATA, json.dumps(metadata, default=_unserializable_default_repr))
span.set_tag_str(METADATA, safe_json(metadata))

system_instruction = _get_attr(instance, "_system_instruction", None)
input_contents = get_argument_value(args, kwargs, 0, "contents")
input_messages = self._extract_input_message(input_contents, system_instruction)
span.set_tag_str(INPUT_MESSAGES, json.dumps(input_messages, default=_unserializable_default_repr))
span.set_tag_str(INPUT_MESSAGES, safe_json(input_messages))

if span.error or generations is None:
span.set_tag_str(OUTPUT_MESSAGES, json.dumps([{"content": ""}]))
span.set_tag_str(OUTPUT_MESSAGES, safe_json([{"content": ""}]))
else:
output_messages = self._extract_output_message(generations)
span.set_tag_str(OUTPUT_MESSAGES, json.dumps(output_messages, default=_unserializable_default_repr))
span.set_tag_str(OUTPUT_MESSAGES, safe_json(output_messages))

usage = self._get_llmobs_metrics_tags(span)
if usage:
span.set_tag_str(METRICS, json.dumps(usage, default=_unserializable_default_repr))
span.set_tag_str(METRICS, safe_json(usage))

@staticmethod
def _llmobs_set_metadata(kwargs, instance):
Expand Down
Loading

0 comments on commit ab100eb

Please sign in to comment.