diff --git a/ddtrace/contrib/langchain/patch.py b/ddtrace/contrib/langchain/patch.py index d38f389bdf..3bb94d0300 100644 --- a/ddtrace/contrib/langchain/patch.py +++ b/ddtrace/contrib/langchain/patch.py @@ -626,11 +626,13 @@ def traced_embedding(langchain, pin, func, instance, args, kwargs): span = integration.trace( pin, "%s.%s" % (instance.__module__, instance.__class__.__name__), + submit_to_llmobs=True, interface_type="embedding", provider=provider, model=_extract_model_name(instance), api_key=_extract_api_key(instance), ) + embeddings = None try: if isinstance(input_texts, str): if integration.is_pc_sampled_span(span): @@ -654,6 +656,14 @@ def traced_embedding(langchain, pin, func, instance, args, kwargs): integration.metric(span, "incr", "request.error", 1) raise finally: + if integration.is_pc_sampled_llmobs(span): + integration.llmobs_set_tags( + "embedding", + span, + input_texts, + embeddings, + error=bool(span.error), + ) span.finish() integration.metric(span, "dist", "request.duration", span.duration_ns) if integration.is_pc_sampled_log(span): diff --git a/ddtrace/llmobs/_integrations/langchain.py b/ddtrace/llmobs/_integrations/langchain.py index c802b7737e..e50db2a1a1 100644 --- a/ddtrace/llmobs/_integrations/langchain.py +++ b/ddtrace/llmobs/_integrations/langchain.py @@ -10,6 +10,7 @@ from ddtrace.constants import ERROR_TYPE from ddtrace.internal.logger import get_logger from ddtrace.llmobs import LLMObs +from ddtrace.llmobs._constants import INPUT_DOCUMENTS from ddtrace.llmobs._constants import INPUT_MESSAGES from ddtrace.llmobs._constants import INPUT_VALUE from ddtrace.llmobs._constants import METADATA @@ -20,6 +21,7 @@ from ddtrace.llmobs._constants import OUTPUT_VALUE from ddtrace.llmobs._constants import SPAN_KIND +from ..utils import Document from .base import BaseLLMIntegration @@ -42,13 +44,15 @@ "system": "system", } +SUPPORTED_OPERATIONS = ["llm", "chat", "chain", "embedding"] + class LangChainIntegration(BaseLLMIntegration): _integration_name = "langchain" def llmobs_set_tags( self, - operation: str, # oneof "llm","chat","chain" + operation: str, # oneof "llm","chat","chain","embedding" span: Span, inputs: Any, response: Any = None, @@ -57,6 +61,10 @@ def llmobs_set_tags( """Sets meta tags and metrics for span events to be sent to LLMObs.""" if not self.llmobs_enabled: return + if operation not in SUPPORTED_OPERATIONS: + log.warning("Unsupported operation : %s", operation) + return + model_provider = span.get_tag(PROVIDER) self._llmobs_set_metadata(span, model_provider) @@ -79,6 +87,8 @@ def llmobs_set_tags( self._llmobs_set_meta_tags_from_chat_model(span, inputs, response, error, is_workflow=is_workflow) elif operation == "chain": self._llmobs_set_meta_tags_from_chain(span, inputs, response, error) + elif operation == "embedding": + self._llmobs_set_meta_tags_from_embedding(span, inputs, response, error, is_workflow=is_workflow) span.set_tag_str(METRICS, json.dumps({})) def _llmobs_set_metadata(self, span: Span, model_provider: Optional[str] = None) -> None: @@ -194,6 +204,62 @@ def _llmobs_set_meta_tags_from_chain( except TypeError: log.warning("Failed to serialize chain output data to JSON") + def _llmobs_set_meta_tags_from_embedding( + self, + span: Span, + input_texts: Union[str, List[str]], + output_embedding: Union[List[float], List[List[float]], None], + error: bool = False, + is_workflow: bool = False, + ) -> None: + span.set_tag_str(SPAN_KIND, "workflow" if is_workflow else "embedding") + span.set_tag_str(MODEL_NAME, span.get_tag(MODEL) or "") + span.set_tag_str(MODEL_PROVIDER, span.get_tag(PROVIDER) or "") + + input_tag_key = INPUT_VALUE if is_workflow else INPUT_DOCUMENTS + output_tag_key = OUTPUT_VALUE + + output_values: Any + + try: + if isinstance(input_texts, str) or ( + isinstance(input_texts, list) and all(isinstance(text, str) for text in input_texts) + ): + if is_workflow: + formatted_inputs = self.format_io(input_texts) + formatted_str = ( + formatted_inputs + if isinstance(formatted_inputs, str) + else json.dumps(self.format_io(input_texts)) + ) + span.set_tag_str(input_tag_key, formatted_str) + else: + if isinstance(input_texts, str): + input_texts = [input_texts] + input_documents = [Document(text=str(doc)) for doc in input_texts] + span.set_tag_str(input_tag_key, json.dumps(input_documents)) + except TypeError: + log.warning("Failed to serialize embedding input data to JSON") + if error: + span.set_tag_str(output_tag_key, "") + elif output_embedding is not None: + try: + if isinstance(output_embedding[0], float): + # single embedding through embed_query + output_values = [output_embedding] + embeddings_count = 1 + else: + # multiple embeddings through embed_documents + output_values = output_embedding + embeddings_count = len(output_embedding) + embedding_dim = len(output_values[0]) + span.set_tag_str( + output_tag_key, + "[{} embedding(s) returned with size {}]".format(embeddings_count, embedding_dim), + ) + except (TypeError, IndexError): + log.warning("Failed to write output vectors", output_embedding) + def _set_base_span_tags( # type: ignore[override] self, span: Span, diff --git a/releasenotes/notes/feat-llmobs-submit-langchain-embedding-spans-89c8704ef41cfee3.yaml b/releasenotes/notes/feat-llmobs-submit-langchain-embedding-spans-89c8704ef41cfee3.yaml new file mode 100644 index 0000000000..d1e3219fed --- /dev/null +++ b/releasenotes/notes/feat-llmobs-submit-langchain-embedding-spans-89c8704ef41cfee3.yaml @@ -0,0 +1,4 @@ +--- +features: + - | + LLM Observability: The Langchain integration now submits embedding spans to LLM Observability. diff --git a/tests/contrib/langchain/cassettes/langchain_community/openai_embedding_query_integration.yaml b/tests/contrib/langchain/cassettes/langchain_community/openai_embedding_query_integration.yaml new file mode 100644 index 0000000000..102194b5d3 --- /dev/null +++ b/tests/contrib/langchain/cassettes/langchain_community/openai_embedding_query_integration.yaml @@ -0,0 +1,89 @@ +interactions: +- request: + body: '{"input": "", "model": "text-embedding-ada-002", "encoding_format": "base64"}' + headers: + accept: + - application/json + accept-encoding: + - gzip, deflate + connection: + - keep-alive + content-length: + - '77' + content-type: + - application/json + host: + - api.openai.com + user-agent: + - OpenAI/Python 1.30.3 + x-stainless-arch: + - arm64 + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - MacOS + x-stainless-package-version: + - 1.30.3 + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.10.5 + method: POST + uri: https://api.openai.com/v1/embeddings + response: + content: "{\n \"object\": \"list\",\n \"data\": [\n {\n \"object\": + \"embedding\",\n \"index\": 0,\n \"embedding\": \"\"\n + \ }\n ],\n \"model\": \"text-embedding-ada-002\",\n \"usage\": {\n \"prompt_tokens\": + 1,\n \"total_tokens\": 1\n }\n}\n" + headers: + CF-Cache-Status: + - DYNAMIC + CF-RAY: + - 8ab773766b878298-IAD + Connection: + - keep-alive + Content-Encoding: + - gzip + Content-Type: + - application/json + Date: + - Tue, 30 Jul 2024 18:35:52 GMT + Server: + - cloudflare + Set-Cookie: + - __cf_bm=ggWVHFgioAT1pC5qtmqPKsAx5EYcmz03sJ14ffLSumE-1722364552-1.0.1.1-hQLT0WbxlShI3_4cRRp3AsHQfKlVmKcrGyAUki5OMG5ABAx3zlUdkqomhbmJtS9T8DH0T5fx8MKpn0kYv1nF.w; + path=/; expires=Tue, 30-Jul-24 19:05:52 GMT; domain=.api.openai.com; HttpOnly; + Secure; SameSite=None + - _cfuvid=65Hy9XC0Yv6_M0E8DlThaqw38AI.X7VeIW6BvODc3Ic-1722364552959-0.0.1.1-604800000; + path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - nosniff + access-control-allow-origin: + - '*' + alt-svc: + - h3=":443"; ma=86400 + openai-model: + - text-embedding-ada-002 + openai-organization: + - datadog-4 + openai-processing-ms: + - '21' + openai-version: + - '2020-10-01' + strict-transport-security: + - max-age=15552000; includeSubDomains; preload + x-ratelimit-limit-requests: + - '10000' + x-ratelimit-remaining-requests: + - '9999' + x-ratelimit-reset-requests: + - 6ms + x-request-id: + - req_59eefe40e0302d1cd8eca8b6e780227c + http_version: HTTP/1.1 + status_code: 200 +version: 1 diff --git a/tests/contrib/langchain/test_langchain_llmobs.py b/tests/contrib/langchain/test_langchain_llmobs.py index 8c82318fbb..13bc4ae86d 100644 --- a/tests/contrib/langchain/test_langchain_llmobs.py +++ b/tests/contrib/langchain/test_langchain_llmobs.py @@ -127,6 +127,26 @@ def _invoke_chain(cls, chain, prompt, mock_tracer, cassette_name, batch=False): LLMObs.disable() return mock_tracer.pop_traces()[0] + def _embed_query(cls, embedding_model, query, mock_tracer, cassette_name): + LLMObs.enable(ml_app=cls.ml_app, integrations_enabled=False, _tracer=mock_tracer) + if cassette_name is not None: + with get_request_vcr(subdirectory_name=cls.cassette_subdirectory_name).use_cassette(cassette_name): + embedding_model.embed_query(query) + else: # FakeEmbeddings does not need a cassette + embedding_model.embed_query(query) + LLMObs.disable() + return mock_tracer.pop_traces()[0] + + def _embed_documents(cls, embedding_model, documents, mock_tracer, cassette_name): + LLMObs.enable(ml_app=cls.ml_app, integrations_enabled=False, _tracer=mock_tracer) + if cassette_name is not None: + with get_request_vcr(subdirectory_name=cls.cassette_subdirectory_name).use_cassette(cassette_name): + embedding_model.embed_documents(documents) + else: # FakeEmbeddings does not need a cassette + embedding_model.embed_documents(documents) + LLMObs.disable() + return mock_tracer.pop_traces()[0] + @pytest.mark.skipif(LANGCHAIN_VERSION >= (0, 1), reason="These tests are for langchain < 0.1.0") class TestLLMObsLangchain(BaseTestLLMObsLangchain): @@ -315,6 +335,56 @@ def test_llmobs_chain_schema_io(self, langchain, mock_llmobs_span_writer, mock_t ) _assert_expected_llmobs_llm_span(trace[1], mock_llmobs_span_writer, mock_io=True) + def test_llmobs_embedding_query(self, langchain, mock_llmobs_span_writer, mock_tracer): + embedding_model = langchain.embeddings.OpenAIEmbeddings() + with mock.patch("langchain.embeddings.OpenAIEmbeddings._get_len_safe_embeddings", return_value=[0.0] * 1536): + trace = self._embed_query( + embedding_model=embedding_model, + query="hello world", + mock_tracer=mock_tracer, + cassette_name="openai_embedding_query_39.yaml" if PY39 else "openai_embedding_query.yaml", + ) + assert mock_llmobs_span_writer.enqueue.call_count == 1 + span = trace[0] if isinstance(trace, list) else trace + mock_llmobs_span_writer.enqueue.assert_called_with( + _expected_llmobs_llm_span_event( + span, + span_kind="embedding", + model_name=embedding_model.model, + model_provider="openai", + input_documents=[{"text": "hello world"}], + output_value="[1 embedding(s) returned with size 1536]", + tags={"ml_app": "langchain_test"}, + integration="langchain", + ) + ) + + def test_llmobs_embedding_documents(self, langchain, mock_llmobs_span_writer, mock_tracer): + embedding_model = langchain.embeddings.OpenAIEmbeddings() + with mock.patch( + "langchain.embeddings.OpenAIEmbeddings._get_len_safe_embeddings", return_value=[[0.0] * 1536] * 2 + ): + trace = self._embed_documents( + embedding_model=embedding_model, + documents=["hello world", "goodbye world"], + mock_tracer=mock_tracer, + cassette_name="openai_embedding_document_39.yaml" if PY39 else "openai_embedding_document.yaml", + ) + assert mock_llmobs_span_writer.enqueue.call_count == 1 + span = trace[0] if isinstance(trace, list) else trace + mock_llmobs_span_writer.enqueue.assert_called_with( + _expected_llmobs_llm_span_event( + span, + span_kind="embedding", + model_name=embedding_model.model, + model_provider="openai", + input_documents=[{"text": "hello world"}, {"text": "goodbye world"}], + output_value="[2 embedding(s) returned with size 1536]", + tags={"ml_app": "langchain_test"}, + integration="langchain", + ) + ) + @pytest.mark.skipif(LANGCHAIN_VERSION < (0, 1), reason="These tests are for langchain >= 0.1.0") class TestLLMObsLangchainCommunity(BaseTestLLMObsLangchain): @@ -499,6 +569,59 @@ def test_llmobs_anthropic_chat_model(self, langchain_anthropic, mock_llmobs_span assert mock_llmobs_span_writer.enqueue.call_count == 1 _assert_expected_llmobs_llm_span(span, mock_llmobs_span_writer, input_role="user") + def test_llmobs_embedding_query(self, langchain_community, langchain_openai, mock_llmobs_span_writer, mock_tracer): + if langchain_openai is None: + pytest.skip("langchain_openai not installed which is required for this test.") + embedding_model = langchain_openai.embeddings.OpenAIEmbeddings() + with mock.patch("langchain_openai.OpenAIEmbeddings._get_len_safe_embeddings", return_value=[0.0] * 1536): + trace = self._embed_query( + embedding_model=embedding_model, + query="hello world", + mock_tracer=mock_tracer, + cassette_name="openai_embedding_query.yaml", + ) + assert mock_llmobs_span_writer.enqueue.call_count == 1 + span = trace[0] if isinstance(trace, list) else trace + mock_llmobs_span_writer.enqueue.assert_called_with( + _expected_llmobs_llm_span_event( + span, + span_kind="embedding", + model_name=embedding_model.model, + model_provider="openai", + input_documents=[{"text": "hello world"}], + output_value="[1 embedding(s) returned with size 1536]", + tags={"ml_app": "langchain_test"}, + integration="langchain", + ) + ) + + def test_llmobs_embedding_documents( + self, langchain_community, langchain_openai, mock_llmobs_span_writer, mock_tracer + ): + if langchain_community is None: + pytest.skip("langchain-community not installed which is required for this test.") + embedding_model = langchain_community.embeddings.FakeEmbeddings(size=1536) + trace = self._embed_documents( + embedding_model=embedding_model, + documents=["hello world", "goodbye world"], + mock_tracer=mock_tracer, + cassette_name=None, # FakeEmbeddings does not need a cassette + ) + assert mock_llmobs_span_writer.enqueue.call_count == 1 + span = trace[0] if isinstance(trace, list) else trace + mock_llmobs_span_writer.enqueue.assert_called_with( + _expected_llmobs_llm_span_event( + span, + span_kind="embedding", + model_name="", + model_provider="fake", + input_documents=[{"text": "hello world"}, {"text": "goodbye world"}], + output_value="[2 embedding(s) returned with size 1536]", + tags={"ml_app": "langchain_test"}, + integration="langchain", + ) + ) + @pytest.mark.skipif(LANGCHAIN_VERSION < (0, 1), reason="These tests are for langchain >= 0.1.0") class TestTraceStructureWithLLMIntegrations(SubprocessTestCase): @@ -550,6 +673,9 @@ def _assert_trace_structure_from_writer_call_args(self, span_kinds): elif span_kind == "llm": assert len(call_args["meta"]["input"]["messages"]) > 0 assert len(call_args["meta"]["output"]["messages"]) > 0 + elif span_kind == "embedding": + assert len(call_args["meta"]["input"]["documents"]) > 0 + assert len(call_args["meta"]["output"]["value"]) > 0 @staticmethod def _call_bedrock_chat_model(ChatBedrock, HumanMessage): @@ -578,6 +704,18 @@ def _call_openai_llm(OpenAI): with get_request_vcr(subdirectory_name="langchain_community").use_cassette("openai_completion_sync.yaml"): llm.invoke("Can you explain what Descartes meant by 'I think, therefore I am'?") + @staticmethod + def _call_openai_embedding(OpenAIEmbeddings): + embedding = OpenAIEmbeddings() + with mock.patch("langchain_openai.embeddings.base.tiktoken.encoding_for_model") as mock_encoding_for_model: + mock_encoding = mock.MagicMock() + mock_encoding_for_model.return_value = mock_encoding + mock_encoding.encode.return_value = [0.0] * 1536 + with get_request_vcr(subdirectory_name="langchain_community").use_cassette( + "openai_embedding_query_integration.yaml" + ): + embedding.embed_query("hello world") + @staticmethod def _call_anthropic_chat(Anthropic): llm = Anthropic(model="claude-3-opus-20240229", max_tokens=15) @@ -647,6 +785,24 @@ def test_llmobs_with_openai_disabled(self): self._call_openai_llm(OpenAI) self._assert_trace_structure_from_writer_call_args(["llm"]) + @run_in_subprocess(env_overrides=openai_env_config) + def test_llmobs_langchain_with_embedding_model_openai_enabled(self): + from langchain_openai import OpenAIEmbeddings + + patch(langchain=True, openai=True) + LLMObs.enable(ml_app="", integrations_enabled=False) + self._call_openai_embedding(OpenAIEmbeddings) + self._assert_trace_structure_from_writer_call_args(["workflow", "embedding"]) + + @run_in_subprocess(env_overrides=openai_env_config) + def test_llmobs_langchain_with_embedding_model_openai_disabled(self): + from langchain_openai import OpenAIEmbeddings + + patch(langchain=True) + LLMObs.enable(ml_app="", integrations_enabled=False) + self._call_openai_embedding(OpenAIEmbeddings) + self._assert_trace_structure_from_writer_call_args(["embedding"]) + @run_in_subprocess(env_overrides=anthropic_env_config) def test_llmobs_with_anthropic_enabled(self): from langchain_anthropic import ChatAnthropic