Skip to content

Commit

Permalink
Merge branch 'main' into brettlangdon/system-tests.all
Browse files Browse the repository at this point in the history
  • Loading branch information
brettlangdon authored Sep 19, 2024
2 parents b13c16a + bd0db4c commit 3cf0048
Show file tree
Hide file tree
Showing 29 changed files with 951 additions and 293 deletions.
37 changes: 37 additions & 0 deletions ddtrace/_trace/_span_pointer.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
from enum import Enum
from hashlib import sha256
import random
from typing import Any
from typing import Dict
from typing import NamedTuple
from typing import Optional

from ddtrace._trace._span_link import SpanLink
from ddtrace._trace._span_link import SpanLinkKind
from ddtrace.internal.logger import get_logger


log = get_logger(__name__)


_SPAN_POINTER_SPAN_LINK_TRACE_ID = 0
Expand Down Expand Up @@ -53,3 +59,34 @@ def __init__(
def __post_init__(self):
# Do not want to do the trace_id and span_id checks that SpanLink does.
pass


_STANDARD_HASHING_FUNCTION_FAILURE_PREFIX = "HashingFailure"


def _standard_hashing_function(*elements: bytes) -> str:
try:
if not elements:
raise ValueError("elements must not be empty")

# Please see the tests for more details about this logic.
return sha256(b"|".join(elements)).hexdigest()[:32]

except Exception as e:
log.warning(
"failed to generate standard hash for span pointer: %s",
str(e),
)
return _add_random_suffix(
prefix=_STANDARD_HASHING_FUNCTION_FAILURE_PREFIX,
minimum_length=32,
)


def _add_random_suffix(*, prefix: str, minimum_length: int) -> str:
if len(prefix) >= minimum_length:
return prefix

suffix = "".join(random.choice("0123456789abcdef") for _ in range(minimum_length - len(prefix))) # nosec

return prefix + suffix
Empty file.
101 changes: 101 additions & 0 deletions ddtrace/_trace/utils_botocore/span_pointers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
from typing import Any
from typing import Dict
from typing import List

from ddtrace._trace._span_pointer import _SpanPointerDescription
from ddtrace._trace._span_pointer import _SpanPointerDirection
from ddtrace._trace._span_pointer import _standard_hashing_function
from ddtrace.internal.logger import get_logger


log = get_logger(__name__)


def extract_span_pointers_from_successful_botocore_response(
endpoint_name: str,
operation_name: str,
request_parameters: Dict[str, Any],
response: Dict[str, Any],
) -> List[_SpanPointerDescription]:
if endpoint_name == "s3":
return _extract_span_pointers_for_s3_response(operation_name, request_parameters, response)

return []


def _extract_span_pointers_for_s3_response(
operation_name: str,
request_parameters: Dict[str, Any],
response: Dict[str, Any],
) -> List[_SpanPointerDescription]:
if operation_name == "PutObject":
return _extract_span_pointers_for_s3_put_object_response(request_parameters, response)

return []


def _extract_span_pointers_for_s3_put_object_response(
request_parameters: Dict[str, Any],
response: Dict[str, Any],
) -> List[_SpanPointerDescription]:
# Endpoint Reference:
# https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html

try:
bucket = request_parameters["Bucket"]
key = request_parameters["Key"]
etag = response["ETag"]

# The ETag is surrounded by double quotes for some reason.
if etag.startswith('"') and etag.endswith('"'):
etag = etag[1:-1]

except KeyError as e:
log.warning(
"missing a parameter or response field required to make span pointer for S3.PutObject: %s",
str(e),
)
return []

try:
return [
_aws_s3_object_span_pointer_description(
pointer_direction=_SpanPointerDirection.DOWNSTREAM,
bucket=bucket,
key=key,
etag=etag,
)
]
except Exception as e:
log.warning(
"failed to generate S3.PutObject span pointer: %s",
str(e),
)
return []


def _aws_s3_object_span_pointer_description(
pointer_direction: _SpanPointerDirection,
bucket: str,
key: str,
etag: str,
) -> _SpanPointerDescription:
return _SpanPointerDescription(
pointer_kind="aws.s3.object",
pointer_direction=pointer_direction,
pointer_hash=_aws_s3_object_span_pointer_hash(bucket, key, etag),
extra_attributes={},
)


def _aws_s3_object_span_pointer_hash(bucket: str, key: str, etag: str) -> str:
if '"' in etag:
# Some AWS API endpoints put the ETag in double quotes. We expect the
# calling code to have correctly fixed this already.
raise ValueError(f"ETag should not have double quotes: {etag}")

return _standard_hashing_function(
bucket.encode("ascii"),
key.encode("utf-8"),
etag.encode("ascii"),
)
10 changes: 4 additions & 6 deletions ddtrace/contrib/internal/anthropic/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import json
from typing import Any
from typing import Optional

from ddtrace.internal.logger import get_logger
from ddtrace.llmobs._utils import _get_attr
from ddtrace.llmobs._utils import _unserializable_default_repr
from ddtrace.llmobs._utils import safe_json


log = get_logger(__name__)
Expand Down Expand Up @@ -39,7 +38,7 @@ def tag_tool_use_input_on_span(integration, span, chat_input, message_idx, block
)
span.set_tag_str(
"anthropic.request.messages.%d.content.%d.tool_call.input" % (message_idx, block_idx),
integration.trunc(json.dumps(_get_attr(chat_input, "input", {}), default=_unserializable_default_repr)),
integration.trunc(safe_json(_get_attr(chat_input, "input", {}))),
)


Expand Down Expand Up @@ -80,8 +79,7 @@ def tag_tool_use_output_on_span(integration, span, chat_completion, idx):
span.set_tag_str("anthropic.response.completions.content.%d.tool_call.name" % idx, str(tool_name))
if tool_inputs:
span.set_tag_str(
"anthropic.response.completions.content.%d.tool_call.input" % idx,
integration.trunc(json.dumps(tool_inputs, default=_unserializable_default_repr)),
"anthropic.response.completions.content.%d.tool_call.input" % idx, integration.trunc(safe_json(tool_inputs))
)


Expand All @@ -92,7 +90,7 @@ def tag_params_on_span(span, kwargs, integration):
span.set_tag_str("anthropic.request.system", integration.trunc(str(v)))
elif k not in ("messages", "model"):
tagged_params[k] = v
span.set_tag_str("anthropic.request.parameters", json.dumps(tagged_params, default=_unserializable_default_repr))
span.set_tag_str("anthropic.request.parameters", safe_json(tagged_params))


def _extract_api_key(instance: Any) -> Optional[str]:
Expand Down
22 changes: 13 additions & 9 deletions ddtrace/contrib/internal/langchain/patch.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
import os
import sys
from typing import Any
Expand Down Expand Up @@ -60,6 +59,7 @@
from ddtrace.internal.utils.formats import deep_getattr
from ddtrace.internal.utils.version import parse_version
from ddtrace.llmobs._integrations import LangChainIntegration
from ddtrace.llmobs._utils import safe_json
from ddtrace.pin import Pin


Expand Down Expand Up @@ -467,9 +467,11 @@ def traced_chat_model_generate(langchain, pin, func, instance, args, kwargs):
"messages": [
[
{
"content": message.get("content", "")
if isinstance(message, dict)
else str(getattr(message, "content", "")),
"content": (
message.get("content", "")
if isinstance(message, dict)
else str(getattr(message, "content", ""))
),
"message_type": message.__class__.__name__,
}
for message in messages
Expand Down Expand Up @@ -597,9 +599,11 @@ async def traced_chat_model_agenerate(langchain, pin, func, instance, args, kwar
"messages": [
[
{
"content": message.get("content", "")
if isinstance(message, dict)
else str(getattr(message, "content", "")),
"content": (
message.get("content", "")
if isinstance(message, dict)
else str(getattr(message, "content", ""))
),
"message_type": message.__class__.__name__,
}
for message in messages
Expand Down Expand Up @@ -1011,7 +1015,7 @@ def traced_base_tool_invoke(langchain, pin, func, instance, args, kwargs):
if tool_input and integration.is_pc_sampled_span(span):
span.set_tag_str("langchain.request.input", integration.trunc(str(tool_input)))
if config:
span.set_tag_str("langchain.request.config", json.dumps(config))
span.set_tag_str("langchain.request.config", safe_json(config))

tool_output = func(*args, **kwargs)
if tool_output and integration.is_pc_sampled_span(span):
Expand Down Expand Up @@ -1072,7 +1076,7 @@ async def traced_base_tool_ainvoke(langchain, pin, func, instance, args, kwargs)
if tool_input and integration.is_pc_sampled_span(span):
span.set_tag_str("langchain.request.input", integration.trunc(str(tool_input)))
if config:
span.set_tag_str("langchain.request.config", json.dumps(config))
span.set_tag_str("langchain.request.config", safe_json(config))

tool_output = await func(*args, **kwargs)
if tool_output and integration.is_pc_sampled_span(span):
Expand Down
3 changes: 3 additions & 0 deletions ddtrace/internal/core/crashtracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@ def start() -> bool:
if not is_available:
return False

import platform

crashtracker.set_url(crashtracker_config.debug_url or agent.get_trace_url())
crashtracker.set_service(config.service)
crashtracker.set_version(config.version)
crashtracker.set_env(config.env)
crashtracker.set_runtime_id(get_runtime_id())
crashtracker.set_runtime_version(platform.python_version())
crashtracker.set_library_version(version.get_version())
crashtracker.set_alt_stack(bool(crashtracker_config.alt_stack))
crashtracker.set_wait_for_receiver(bool(crashtracker_config.wait_for_receiver))
Expand Down
17 changes: 8 additions & 9 deletions ddtrace/llmobs/_integrations/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
from ddtrace.llmobs._constants import OUTPUT_TOKENS_METRIC_KEY
from ddtrace.llmobs._constants import SPAN_KIND
from ddtrace.llmobs._constants import TOTAL_TOKENS_METRIC_KEY
from ddtrace.llmobs._integrations.base import BaseLLMIntegration
from ddtrace.llmobs._utils import _get_attr

from .base import BaseLLMIntegration
from ddtrace.llmobs._utils import safe_json


log = get_logger(__name__)
Expand Down Expand Up @@ -71,18 +71,17 @@ def llmobs_set_tags(

span.set_tag_str(SPAN_KIND, "llm")
span.set_tag_str(MODEL_NAME, span.get_tag("anthropic.request.model") or "")
span.set_tag_str(INPUT_MESSAGES, json.dumps(input_messages))
span.set_tag_str(METADATA, json.dumps(parameters))
span.set_tag_str(INPUT_MESSAGES, safe_json(input_messages))
span.set_tag_str(METADATA, safe_json(parameters))
span.set_tag_str(MODEL_PROVIDER, "anthropic")
if err or resp is None:
span.set_tag_str(OUTPUT_MESSAGES, json.dumps([{"content": ""}]))
span.set_tag_str(OUTPUT_MESSAGES, safe_json([{"content": ""}]))
else:
output_messages = self._extract_output_message(resp)
span.set_tag_str(OUTPUT_MESSAGES, json.dumps(output_messages))

span.set_tag_str(OUTPUT_MESSAGES, safe_json(output_messages))
usage = self._get_llmobs_metrics_tags(span)
if usage != {}:
span.set_tag_str(METRICS, json.dumps(usage))
if usage:
span.set_tag_str(METRICS, safe_json(usage))

def _extract_input_message(self, messages, system_prompt=None):
"""Extract input messages from the stored prompt.
Expand Down
9 changes: 5 additions & 4 deletions ddtrace/llmobs/_integrations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,18 @@ def __init__(self, integration_config: IntegrationConfig) -> None:

@property
def metrics_enabled(self) -> bool:
"""Return whether submitting metrics is enabled for this integration, or global config if not set."""
env_metrics_enabled = asbool(os.getenv("DD_{}_METRICS_ENABLED".format(self._integration_name.upper())))
if not env_metrics_enabled and config._llmobs_agentless_enabled:
"""
Return whether submitting metrics is enabled for this integration. Agentless mode disables submitting metrics.
"""
if config._llmobs_agentless_enabled:
return False
if hasattr(self.integration_config, "metrics_enabled"):
return asbool(self.integration_config.metrics_enabled)
return False

@property
def logs_enabled(self) -> bool:
"""Return whether submitting logs is enabled for this integration, or global config if not set."""
"""Return whether submitting logs is enabled for this integration."""
if hasattr(self.integration_config, "logs_enabled"):
return asbool(self.integration_config.logs_enabled)
return False
Expand Down
13 changes: 7 additions & 6 deletions ddtrace/llmobs/_integrations/bedrock.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
from typing import Any
from typing import Dict
from typing import Optional
Expand All @@ -19,6 +18,7 @@
from ddtrace.llmobs._constants import TOTAL_TOKENS_METRIC_KEY
from ddtrace.llmobs._integrations import BaseLLMIntegration
from ddtrace.llmobs._utils import _get_llmobs_parent_id
from ddtrace.llmobs._utils import safe_json


log = get_logger(__name__)
Expand Down Expand Up @@ -50,14 +50,15 @@ def llmobs_set_tags(
span.set_tag_str(SPAN_KIND, "llm")
span.set_tag_str(MODEL_NAME, span.get_tag("bedrock.request.model") or "")
span.set_tag_str(MODEL_PROVIDER, span.get_tag("bedrock.request.model_provider") or "")
span.set_tag_str(INPUT_MESSAGES, json.dumps(input_messages))
span.set_tag_str(METADATA, json.dumps(parameters))
span.set_tag_str(INPUT_MESSAGES, safe_json(input_messages))
span.set_tag_str(METADATA, safe_json(parameters))
if err or formatted_response is None:
span.set_tag_str(OUTPUT_MESSAGES, json.dumps([{"content": ""}]))
span.set_tag_str(OUTPUT_MESSAGES, safe_json([{"content": ""}]))
else:
output_messages = self._extract_output_message(formatted_response)
span.set_tag_str(OUTPUT_MESSAGES, json.dumps(output_messages))
span.set_tag_str(METRICS, json.dumps(self._llmobs_metrics(span, formatted_response)))
span.set_tag_str(OUTPUT_MESSAGES, safe_json(output_messages))
metrics = self._llmobs_metrics(span, formatted_response)
span.set_tag_str(METRICS, safe_json(metrics))

@staticmethod
def _llmobs_metrics(span: Span, formatted_response: Optional[Dict[str, Any]]) -> Dict[str, Any]:
Expand Down
Loading

0 comments on commit 3cf0048

Please sign in to comment.