Skip to content

Commit

Permalink
Merge branch 'main' into taegyunkim/endpoint-test
Browse files Browse the repository at this point in the history
  • Loading branch information
taegyunkim committed Sep 19, 2024
2 parents 049e570 + 58049cb commit a4e8295
Show file tree
Hide file tree
Showing 37 changed files with 688 additions and 344 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ ddtrace/profiling @DataDog/profiling-python
ddtrace/settings/profiling.py @DataDog/profiling-python
ddtrace/internal/datadog/profiling @DataDog/profiling-python
tests/profiling @DataDog/profiling-python
tests/profiling-v2 @DataDog/profiling-python
tests/profiling_v2 @DataDog/profiling-python
.gitlab/tests/profiling.yml @DataDog/profiling-python

# MLObs
Expand Down
44 changes: 0 additions & 44 deletions ddtrace/_trace/_span_link.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,11 @@

import dataclasses
from enum import Enum
from typing import Any
from typing import Dict
from typing import Optional

from ddtrace.internal.logger import get_logger
from ddtrace.internal.utils.formats import flatten_key_value


log = get_logger(__name__)


class SpanLinkKind(Enum):
"""
A collection of standard SpanLink kinds. It's possible to use others, but these should be used when possible.
Expand Down Expand Up @@ -132,41 +126,3 @@ def __str__(self) -> str:
f"trace_id={self.trace_id} span_id={self.span_id} attributes={attrs_str} "
f"tracestate={self.tracestate} flags={self.flags} dropped_attributes={self._dropped_attributes}"
)


# Span Pointers are currently private, so let's put them here for now


_SPAN_POINTER_SPAN_LINK_TRACE_ID = 0
_SPAN_POINTER_SPAN_LINK_SPAN_ID = 0


class _SpanPointerDirection(Enum):
UPSTREAM = "u"
DOWNSTREAM = "d"


class _SpanPointer(SpanLink):
def __init__(
self,
pointer_kind: str,
pointer_direction: _SpanPointerDirection,
pointer_hash: str,
extra_attributes: Optional[Dict[str, Any]] = None,
):
super().__init__(
trace_id=_SPAN_POINTER_SPAN_LINK_TRACE_ID,
span_id=_SPAN_POINTER_SPAN_LINK_SPAN_ID,
attributes={
"ptr.kind": pointer_kind,
"ptr.dir": pointer_direction.value,
"ptr.hash": pointer_hash,
**(extra_attributes or {}),
},
)

self.kind = SpanLinkKind.SPAN_POINTER.value

def __post_init__(self):
# Do not want to do the trace_id and span_id checks that SpanLink does.
pass
92 changes: 92 additions & 0 deletions ddtrace/_trace/_span_pointer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from enum import Enum
from hashlib import sha256
import random
from typing import Any
from typing import Dict
from typing import NamedTuple
from typing import Optional

from ddtrace._trace._span_link import SpanLink
from ddtrace._trace._span_link import SpanLinkKind
from ddtrace.internal.logger import get_logger


log = get_logger(__name__)


_SPAN_POINTER_SPAN_LINK_TRACE_ID = 0
_SPAN_POINTER_SPAN_LINK_SPAN_ID = 0


class _SpanPointerDirection(Enum):
UPSTREAM = "u"
DOWNSTREAM = "d"


class _SpanPointerDescription(NamedTuple):
# Not to be confused with _SpanPointer. This class describes the parameters
# required to attach a span pointer to a Span. It lets us decouple code
# that calculates span pointers from code that actually attaches them to
# the right Span.

pointer_kind: str
pointer_direction: _SpanPointerDirection
pointer_hash: str
extra_attributes: Dict[str, Any]


class _SpanPointer(SpanLink):
def __init__(
self,
pointer_kind: str,
pointer_direction: _SpanPointerDirection,
pointer_hash: str,
extra_attributes: Optional[Dict[str, Any]] = None,
):
super().__init__(
trace_id=_SPAN_POINTER_SPAN_LINK_TRACE_ID,
span_id=_SPAN_POINTER_SPAN_LINK_SPAN_ID,
attributes={
"ptr.kind": pointer_kind,
"ptr.dir": pointer_direction.value,
"ptr.hash": pointer_hash,
**(extra_attributes or {}),
},
)

self.kind = SpanLinkKind.SPAN_POINTER.value

def __post_init__(self):
# Do not want to do the trace_id and span_id checks that SpanLink does.
pass


_STANDARD_HASHING_FUNCTION_FAILURE_PREFIX = "HashingFailure"


def _standard_hashing_function(*elements: bytes) -> str:
try:
if not elements:
raise ValueError("elements must not be empty")

# Please see the tests for more details about this logic.
return sha256(b"|".join(elements)).hexdigest()[:32]

except Exception as e:
log.warning(
"failed to generate standard hash for span pointer: %s",
str(e),
)
return _add_random_suffix(
prefix=_STANDARD_HASHING_FUNCTION_FAILURE_PREFIX,
minimum_length=32,
)


def _add_random_suffix(*, prefix: str, minimum_length: int) -> str:
if len(prefix) >= minimum_length:
return prefix

suffix = "".join(random.choice("0123456789abcdef") for _ in range(minimum_length - len(prefix))) # nosec

return prefix + suffix
4 changes: 2 additions & 2 deletions ddtrace/_trace/span.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
from ddtrace import config
from ddtrace._trace._span_link import SpanLink
from ddtrace._trace._span_link import SpanLinkKind
from ddtrace._trace._span_link import _SpanPointer
from ddtrace._trace._span_link import _SpanPointerDirection
from ddtrace._trace._span_pointer import _SpanPointer
from ddtrace._trace._span_pointer import _SpanPointerDirection
from ddtrace._trace.context import Context
from ddtrace._trace.types import _MetaDictType
from ddtrace._trace.types import _MetricDictType
Expand Down
10 changes: 4 additions & 6 deletions ddtrace/contrib/internal/anthropic/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import json
from typing import Any
from typing import Optional

from ddtrace.internal.logger import get_logger
from ddtrace.llmobs._utils import _get_attr
from ddtrace.llmobs._utils import _unserializable_default_repr
from ddtrace.llmobs._utils import safe_json


log = get_logger(__name__)
Expand Down Expand Up @@ -39,7 +38,7 @@ def tag_tool_use_input_on_span(integration, span, chat_input, message_idx, block
)
span.set_tag_str(
"anthropic.request.messages.%d.content.%d.tool_call.input" % (message_idx, block_idx),
integration.trunc(json.dumps(_get_attr(chat_input, "input", {}), default=_unserializable_default_repr)),
integration.trunc(safe_json(_get_attr(chat_input, "input", {}))),
)


Expand Down Expand Up @@ -80,8 +79,7 @@ def tag_tool_use_output_on_span(integration, span, chat_completion, idx):
span.set_tag_str("anthropic.response.completions.content.%d.tool_call.name" % idx, str(tool_name))
if tool_inputs:
span.set_tag_str(
"anthropic.response.completions.content.%d.tool_call.input" % idx,
integration.trunc(json.dumps(tool_inputs, default=_unserializable_default_repr)),
"anthropic.response.completions.content.%d.tool_call.input" % idx, integration.trunc(safe_json(tool_inputs))
)


Expand All @@ -92,7 +90,7 @@ def tag_params_on_span(span, kwargs, integration):
span.set_tag_str("anthropic.request.system", integration.trunc(str(v)))
elif k not in ("messages", "model"):
tagged_params[k] = v
span.set_tag_str("anthropic.request.parameters", json.dumps(tagged_params, default=_unserializable_default_repr))
span.set_tag_str("anthropic.request.parameters", safe_json(tagged_params))


def _extract_api_key(instance: Any) -> Optional[str]:
Expand Down
22 changes: 13 additions & 9 deletions ddtrace/contrib/internal/langchain/patch.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
import os
import sys
from typing import Any
Expand Down Expand Up @@ -60,6 +59,7 @@
from ddtrace.internal.utils.formats import deep_getattr
from ddtrace.internal.utils.version import parse_version
from ddtrace.llmobs._integrations import LangChainIntegration
from ddtrace.llmobs._utils import safe_json
from ddtrace.pin import Pin


Expand Down Expand Up @@ -467,9 +467,11 @@ def traced_chat_model_generate(langchain, pin, func, instance, args, kwargs):
"messages": [
[
{
"content": message.get("content", "")
if isinstance(message, dict)
else str(getattr(message, "content", "")),
"content": (
message.get("content", "")
if isinstance(message, dict)
else str(getattr(message, "content", ""))
),
"message_type": message.__class__.__name__,
}
for message in messages
Expand Down Expand Up @@ -597,9 +599,11 @@ async def traced_chat_model_agenerate(langchain, pin, func, instance, args, kwar
"messages": [
[
{
"content": message.get("content", "")
if isinstance(message, dict)
else str(getattr(message, "content", "")),
"content": (
message.get("content", "")
if isinstance(message, dict)
else str(getattr(message, "content", ""))
),
"message_type": message.__class__.__name__,
}
for message in messages
Expand Down Expand Up @@ -1011,7 +1015,7 @@ def traced_base_tool_invoke(langchain, pin, func, instance, args, kwargs):
if tool_input and integration.is_pc_sampled_span(span):
span.set_tag_str("langchain.request.input", integration.trunc(str(tool_input)))
if config:
span.set_tag_str("langchain.request.config", json.dumps(config))
span.set_tag_str("langchain.request.config", safe_json(config))

tool_output = func(*args, **kwargs)
if tool_output and integration.is_pc_sampled_span(span):
Expand Down Expand Up @@ -1072,7 +1076,7 @@ async def traced_base_tool_ainvoke(langchain, pin, func, instance, args, kwargs)
if tool_input and integration.is_pc_sampled_span(span):
span.set_tag_str("langchain.request.input", integration.trunc(str(tool_input)))
if config:
span.set_tag_str("langchain.request.config", json.dumps(config))
span.set_tag_str("langchain.request.config", safe_json(config))

tool_output = await func(*args, **kwargs)
if tool_output and integration.is_pc_sampled_span(span):
Expand Down
17 changes: 8 additions & 9 deletions ddtrace/llmobs/_integrations/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
from ddtrace.llmobs._constants import OUTPUT_TOKENS_METRIC_KEY
from ddtrace.llmobs._constants import SPAN_KIND
from ddtrace.llmobs._constants import TOTAL_TOKENS_METRIC_KEY
from ddtrace.llmobs._integrations.base import BaseLLMIntegration
from ddtrace.llmobs._utils import _get_attr

from .base import BaseLLMIntegration
from ddtrace.llmobs._utils import safe_json


log = get_logger(__name__)
Expand Down Expand Up @@ -71,18 +71,17 @@ def llmobs_set_tags(

span.set_tag_str(SPAN_KIND, "llm")
span.set_tag_str(MODEL_NAME, span.get_tag("anthropic.request.model") or "")
span.set_tag_str(INPUT_MESSAGES, json.dumps(input_messages))
span.set_tag_str(METADATA, json.dumps(parameters))
span.set_tag_str(INPUT_MESSAGES, safe_json(input_messages))
span.set_tag_str(METADATA, safe_json(parameters))
span.set_tag_str(MODEL_PROVIDER, "anthropic")
if err or resp is None:
span.set_tag_str(OUTPUT_MESSAGES, json.dumps([{"content": ""}]))
span.set_tag_str(OUTPUT_MESSAGES, safe_json([{"content": ""}]))
else:
output_messages = self._extract_output_message(resp)
span.set_tag_str(OUTPUT_MESSAGES, json.dumps(output_messages))

span.set_tag_str(OUTPUT_MESSAGES, safe_json(output_messages))
usage = self._get_llmobs_metrics_tags(span)
if usage != {}:
span.set_tag_str(METRICS, json.dumps(usage))
if usage:
span.set_tag_str(METRICS, safe_json(usage))

def _extract_input_message(self, messages, system_prompt=None):
"""Extract input messages from the stored prompt.
Expand Down
9 changes: 5 additions & 4 deletions ddtrace/llmobs/_integrations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,18 @@ def __init__(self, integration_config: IntegrationConfig) -> None:

@property
def metrics_enabled(self) -> bool:
"""Return whether submitting metrics is enabled for this integration, or global config if not set."""
env_metrics_enabled = asbool(os.getenv("DD_{}_METRICS_ENABLED".format(self._integration_name.upper())))
if not env_metrics_enabled and config._llmobs_agentless_enabled:
"""
Return whether submitting metrics is enabled for this integration. Agentless mode disables submitting metrics.
"""
if config._llmobs_agentless_enabled:
return False
if hasattr(self.integration_config, "metrics_enabled"):
return asbool(self.integration_config.metrics_enabled)
return False

@property
def logs_enabled(self) -> bool:
"""Return whether submitting logs is enabled for this integration, or global config if not set."""
"""Return whether submitting logs is enabled for this integration."""
if hasattr(self.integration_config, "logs_enabled"):
return asbool(self.integration_config.logs_enabled)
return False
Expand Down
13 changes: 7 additions & 6 deletions ddtrace/llmobs/_integrations/bedrock.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
from typing import Any
from typing import Dict
from typing import Optional
Expand All @@ -19,6 +18,7 @@
from ddtrace.llmobs._constants import TOTAL_TOKENS_METRIC_KEY
from ddtrace.llmobs._integrations import BaseLLMIntegration
from ddtrace.llmobs._utils import _get_llmobs_parent_id
from ddtrace.llmobs._utils import safe_json


log = get_logger(__name__)
Expand Down Expand Up @@ -50,14 +50,15 @@ def llmobs_set_tags(
span.set_tag_str(SPAN_KIND, "llm")
span.set_tag_str(MODEL_NAME, span.get_tag("bedrock.request.model") or "")
span.set_tag_str(MODEL_PROVIDER, span.get_tag("bedrock.request.model_provider") or "")
span.set_tag_str(INPUT_MESSAGES, json.dumps(input_messages))
span.set_tag_str(METADATA, json.dumps(parameters))
span.set_tag_str(INPUT_MESSAGES, safe_json(input_messages))
span.set_tag_str(METADATA, safe_json(parameters))
if err or formatted_response is None:
span.set_tag_str(OUTPUT_MESSAGES, json.dumps([{"content": ""}]))
span.set_tag_str(OUTPUT_MESSAGES, safe_json([{"content": ""}]))
else:
output_messages = self._extract_output_message(formatted_response)
span.set_tag_str(OUTPUT_MESSAGES, json.dumps(output_messages))
span.set_tag_str(METRICS, json.dumps(self._llmobs_metrics(span, formatted_response)))
span.set_tag_str(OUTPUT_MESSAGES, safe_json(output_messages))
metrics = self._llmobs_metrics(span, formatted_response)
span.set_tag_str(METRICS, safe_json(metrics))

@staticmethod
def _llmobs_metrics(span: Span, formatted_response: Optional[Dict[str, Any]]) -> Dict[str, Any]:
Expand Down
Loading

0 comments on commit a4e8295

Please sign in to comment.