Skip to content

Commit

Permalink
feat(langfuse/): support langfuse prompt management (BerriAI#7073)
Browse files Browse the repository at this point in the history
* feat(langfuse/): support langfuse prompt management

Initial working commit for langfuse prompt management support

Closes BerriAI#6269

* test: update test

* fix(litellm_logging.py): suppress linting error
  • Loading branch information
krrishdholakia authored Dec 7, 2024
1 parent e449324 commit 19a4273
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 2 deletions.
1 change: 1 addition & 0 deletions litellm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
"opik",
"argilla",
"mlflow",
"langfuse",
]
logged_real_time_event_types: Optional[Union[List[str], Literal["*"]]] = None
_known_custom_logger_compatible_callbacks: List = list(
Expand Down
3 changes: 3 additions & 0 deletions litellm/integrations/langfuse/langfuse_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
Handles Key/Team Based Langfuse Logging
"""

import os
from typing import TYPE_CHECKING, Any, Dict, Optional

from packaging.version import Version

from litellm.litellm_core_utils.litellm_logging import StandardCallbackDynamicParams

from .langfuse import LangFuseLogger, LangfuseLoggingConfig
Expand Down
163 changes: 163 additions & 0 deletions litellm/integrations/langfuse/langfuse_prompt_management.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
"""
Call Hook for LiteLLM Proxy which allows Langfuse prompt management.
"""

import os
import traceback
from typing import Literal, Optional, Union

from packaging.version import Version

from litellm._logging import verbose_proxy_logger
from litellm.caching.dual_cache import DualCache
from litellm.integrations.custom_logger import CustomLogger
from litellm.proxy._types import UserAPIKeyAuth
from litellm.secret_managers.main import str_to_bool


class LangfusePromptManagement(CustomLogger):
def __init__(
self,
langfuse_public_key=None,
langfuse_secret=None,
langfuse_host=None,
flush_interval=1,
):
try:
import langfuse
from langfuse import Langfuse
except Exception as e:
raise Exception(
f"\033[91mLangfuse not installed, try running 'pip install langfuse' to fix this error: {e}\n{traceback.format_exc()}\033[0m"
)
# Instance variables
self.secret_key = langfuse_secret or os.getenv("LANGFUSE_SECRET_KEY")
self.public_key = langfuse_public_key or os.getenv("LANGFUSE_PUBLIC_KEY")
self.langfuse_host = langfuse_host or os.getenv(
"LANGFUSE_HOST", "https://cloud.langfuse.com"
)
if not (
self.langfuse_host.startswith("http://")
or self.langfuse_host.startswith("https://")
):
# add http:// if unset, assume communicating over private network - e.g. render
self.langfuse_host = "http://" + self.langfuse_host
self.langfuse_release = os.getenv("LANGFUSE_RELEASE")
self.langfuse_debug = os.getenv("LANGFUSE_DEBUG")
self.langfuse_flush_interval = (
os.getenv("LANGFUSE_FLUSH_INTERVAL") or flush_interval
)

parameters = {
"public_key": self.public_key,
"secret_key": self.secret_key,
"host": self.langfuse_host,
"release": self.langfuse_release,
"debug": self.langfuse_debug,
"flush_interval": self.langfuse_flush_interval, # flush interval in seconds
}

if Version(langfuse.version.__version__) >= Version("2.6.0"):
parameters["sdk_integration"] = "litellm"

self.Langfuse = Langfuse(**parameters)

# set the current langfuse project id in the environ
# this is used by Alerting to link to the correct project
try:
project_id = self.Langfuse.client.projects.get().data[0].id
os.environ["LANGFUSE_PROJECT_ID"] = project_id
except Exception:
project_id = None

if os.getenv("UPSTREAM_LANGFUSE_SECRET_KEY") is not None:
upstream_langfuse_debug = (
str_to_bool(self.upstream_langfuse_debug)
if self.upstream_langfuse_debug is not None
else None
)
self.upstream_langfuse_secret_key = os.getenv(
"UPSTREAM_LANGFUSE_SECRET_KEY"
)
self.upstream_langfuse_public_key = os.getenv(
"UPSTREAM_LANGFUSE_PUBLIC_KEY"
)
self.upstream_langfuse_host = os.getenv("UPSTREAM_LANGFUSE_HOST")
self.upstream_langfuse_release = os.getenv("UPSTREAM_LANGFUSE_RELEASE")
self.upstream_langfuse_debug = os.getenv("UPSTREAM_LANGFUSE_DEBUG")
self.upstream_langfuse = Langfuse(
public_key=self.upstream_langfuse_public_key,
secret_key=self.upstream_langfuse_secret_key,
host=self.upstream_langfuse_host,
release=self.upstream_langfuse_release,
debug=(
upstream_langfuse_debug
if upstream_langfuse_debug is not None
else False
),
)
else:
self.upstream_langfuse = None

def _compile_prompt(
self,
metadata: dict,
call_type: Union[Literal["completion"], Literal["text_completion"]],
) -> Optional[Union[str, list]]:
compiled_prompt: Optional[Union[str, list]] = None
if isinstance(metadata, dict):
langfuse_prompt_id = metadata.get("langfuse_prompt_id")

langfuse_prompt_variables = metadata.get("langfuse_prompt_variables") or {}
if (
langfuse_prompt_id
and isinstance(langfuse_prompt_id, str)
and isinstance(langfuse_prompt_variables, dict)
):
langfuse_prompt = self.Langfuse.get_prompt(langfuse_prompt_id)
compiled_prompt = langfuse_prompt.compile(**langfuse_prompt_variables)

return compiled_prompt

async def async_pre_call_hook(
self,
user_api_key_dict: UserAPIKeyAuth,
cache: DualCache,
data: dict,
call_type: Union[
Literal["completion"],
Literal["text_completion"],
Literal["embeddings"],
Literal["image_generation"],
Literal["moderation"],
Literal["audio_transcription"],
Literal["pass_through_endpoint"],
Literal["rerank"],
],
) -> Union[Exception, str, dict, None]:

metadata = data.get("metadata") or {}
compiled_prompt: Optional[Union[str, list]] = None
if call_type == "completion" or call_type == "text_completion":
compiled_prompt = self._compile_prompt(metadata, call_type)
if compiled_prompt is None:
return await super().async_pre_call_hook(
user_api_key_dict, cache, data, call_type
)
if call_type == "completion":
if isinstance(compiled_prompt, list):
data["messages"] = compiled_prompt + data["messages"]
else:
data["messages"] = [
{"role": "system", "content": compiled_prompt}
] + data["messages"]
elif call_type == "text_completion" and isinstance(compiled_prompt, str):
data["prompt"] = compiled_prompt + "\n" + data["prompt"]

verbose_proxy_logger.debug(
f"LangfusePromptManagement.async_pre_call_hook compiled_prompt: {compiled_prompt}, type: {type(compiled_prompt)}"
)

return await super().async_pre_call_hook(
user_api_key_dict, cache, data, call_type
)
15 changes: 14 additions & 1 deletion litellm/litellm_core_utils/litellm_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
from ..integrations.lago import LagoLogger
from ..integrations.langfuse.langfuse import LangFuseLogger
from ..integrations.langfuse.langfuse_handler import LangFuseHandler
from ..integrations.langfuse.langfuse_prompt_management import LangfusePromptManagement
from ..integrations.langsmith import LangsmithLogger
from ..integrations.literal_ai import LiteralAILogger
from ..integrations.logfire_logger import LogfireLevel, LogfireLogger
Expand Down Expand Up @@ -2349,9 +2350,17 @@ def _init_custom_logger_compatible_class( # noqa: PLR0915
_mlflow_logger = MlflowLogger()
_in_memory_loggers.append(_mlflow_logger)
return _mlflow_logger # type: ignore
elif logging_integration == "langfuse":
for callback in _in_memory_loggers:
if isinstance(callback, LangfusePromptManagement):
return callback

langfuse_logger = LangfusePromptManagement()
_in_memory_loggers.append(langfuse_logger)
return langfuse_logger # type: ignore


def get_custom_logger_compatible_class(
def get_custom_logger_compatible_class( # noqa: PLR0915
logging_integration: litellm._custom_logger_compatible_callbacks_literal,
) -> Optional[CustomLogger]:
if logging_integration == "lago":
Expand Down Expand Up @@ -2402,6 +2411,10 @@ def get_custom_logger_compatible_class(
for callback in _in_memory_loggers:
if isinstance(callback, OpikLogger):
return callback
elif logging_integration == "langfuse":
for callback in _in_memory_loggers:
if isinstance(callback, LangfusePromptManagement):
return callback
elif logging_integration == "otel":
from litellm.integrations.opentelemetry import OpenTelemetry

Expand Down
2 changes: 1 addition & 1 deletion litellm/proxy/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,14 +508,14 @@ async def pre_call_hook(

try:
for callback in litellm.callbacks:

_callback = None
if isinstance(callback, str):
_callback = litellm.litellm_core_utils.litellm_logging.get_custom_logger_compatible_class(
callback
)
else:
_callback = callback # type: ignore

if _callback is not None and isinstance(_callback, CustomGuardrail):
from litellm.types.guardrails import GuardrailEventHooks

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
from litellm.integrations.opentelemetry import OpenTelemetry
from litellm.integrations.mlflow import MlflowLogger
from litellm.integrations.argilla import ArgillaLogger
from litellm.integrations.langfuse.langfuse_prompt_management import (
LangfusePromptManagement,
)
from litellm.proxy.hooks.dynamic_rate_limiter import _PROXY_DynamicRateLimitHandler
from unittest.mock import patch

Expand Down Expand Up @@ -61,6 +64,7 @@
"arize": OpenTelemetry,
"langtrace": OpenTelemetry,
"mlflow": MlflowLogger,
"langfuse": LangfusePromptManagement,
}

expected_env_vars = {
Expand Down

0 comments on commit 19a4273

Please sign in to comment.