Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating langfuse integration #345

Merged
merged 22 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ ignore_missing_imports = True
# We don't want to ignore all missing imports as we want to catch those in our own code
# But for certain libraries they don't have a stub file, so we only enforce import checking for our own libraries.
# Another alternative would be to list out every single dependency that does not have a stub.
[mypy-prediction_market_agent.*]
[mypy-prediction_market_agent_tooling.*]
ignore_missing_imports = False
[mypy-scripts.*]
ignore_missing_imports = False
Expand Down
548 changes: 337 additions & 211 deletions poetry.lock

Large diffs are not rendered by default.

13 changes: 11 additions & 2 deletions prediction_market_agent_tooling/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ class APIKeys(BaseSettings):
GOOGLE_SEARCH_ENGINE_ID: t.Optional[SecretStr] = None

LANGFUSE_SECRET_KEY: t.Optional[SecretStr] = None
LANGFUSE_PUBLIC_KEY: t.Optional[SecretStr] = None
LANGFUSE_PUBLIC_KEY: t.Optional[str] = None
LANGFUSE_HOST: t.Optional[str] = None
LANGFUSE_DEPLOYMENT_VERSION: t.Optional[str] = None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a new env variable that is populated by git commit sha in PMA's docker building pipeline; it will show us, in the UI, what version of PMA was used.


TAVILY_API_KEY: t.Optional[SecretStr] = None

Expand Down Expand Up @@ -127,7 +128,7 @@ def langfuse_secret_key(self) -> SecretStr:
)

@property
def langfuse_public_key(self) -> SecretStr:
def langfuse_public_key(self) -> str:
return check_not_none(
self.LANGFUSE_PUBLIC_KEY, "LANGFUSE_PUBLIC_KEY missing in the environment."
)
Expand All @@ -138,6 +139,14 @@ def langfuse_host(self) -> str:
self.LANGFUSE_HOST, "LANGFUSE_HOST missing in the environment."
)

@property
def default_enable_langfuse(self) -> bool:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used as the default argument in DeployableAgent init method

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and self.LANGFUSE_DEPLOYMENT_VERSION is not None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not necessary for langfuse to work, it can be None

return (
self.LANGFUSE_SECRET_KEY is not None
and self.LANGFUSE_PUBLIC_KEY is not None
and self.LANGFUSE_HOST is not None
)

@property
def tavily_api_key(self) -> SecretStr:
return check_not_none(
Expand Down
253 changes: 199 additions & 54 deletions prediction_market_agent_tooling/deploy/agent.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import getpass
import inspect
import os
import tempfile
import time
import typing as t
from datetime import datetime, timedelta
from enum import Enum
from functools import cached_property

from pydantic import BaseModel, BeforeValidator
from pydantic import BaseModel, BeforeValidator, computed_field
from typing_extensions import Annotated

from prediction_market_agent_tooling.config import APIKeys
Expand Down Expand Up @@ -39,16 +42,15 @@
redeem_from_all_user_positions,
withdraw_wxdai_to_xdai_to_keep_balance,
)
from prediction_market_agent_tooling.monitor.langfuse.langfuse_wrapper import (
LangfuseWrapper,
)
from prediction_market_agent_tooling.monitor.monitor_app import (
MARKET_TYPE_TO_DEPLOYED_AGENT,
)
from prediction_market_agent_tooling.tools.is_predictable import is_predictable_binary
from prediction_market_agent_tooling.tools.langfuse_ import langfuse_context, observe
from prediction_market_agent_tooling.tools.utils import DatetimeWithTimezone, utcnow

MAX_AVAILABLE_MARKETS = 20
TRADER_TAG = "trader"


def to_boolean_outcome(value: str | bool) -> bool:
Expand All @@ -71,6 +73,21 @@ def to_boolean_outcome(value: str | bool) -> bool:
raise ValueError(f"Expected a boolean or a string, but got {value}")


def initialize_langfuse(enable_langfuse: bool) -> None:
# Configure Langfuse singleton with our APIKeys.
# If langfuse is disabled, it will just ignore all the calls, so no need to do if-else around the code.
keys = APIKeys()
if enable_langfuse:
langfuse_context.configure(
public_key=keys.langfuse_public_key,
secret_key=keys.langfuse_secret_key.get_secret_value(),
host=keys.langfuse_host,
enabled=enable_langfuse,
)
else:
langfuse_context.configure(enabled=enable_langfuse)


Decision = Annotated[bool, BeforeValidator(to_boolean_outcome)]


Expand All @@ -89,11 +106,66 @@ def p_no(self) -> Probability:
return Probability(1 - self.p_yes)


class ProcessedMarket(BaseModel):
answer: Answer
amount: BetAmount


class AnsweredEnum(str, Enum):
ANSWERED = "answered"
NOT_ANSWERED = "not_answered"


class DeployableAgent:
def __init__(self) -> None:
self.langfuse_wrapper = LangfuseWrapper(agent_name=self.__class__.__name__)
def __init__(
self,
enable_langfuse: bool = APIKeys().default_enable_langfuse,
) -> None:
self.start_time = utcnow()
self.enable_langfuse = enable_langfuse
self.initialize_langfuse()
self.load()

def initialize_langfuse(self) -> None:
initialize_langfuse(self.enable_langfuse)

def langfuse_update_current_trace(
self,
name: str | None = None,
input: t.Any | None = None,
output: t.Any | None = None,
user_id: str | None = None,
session_id: str | None = None,
version: str | None = None,
release: str | None = None,
metadata: t.Any | None = None,
tags: list[str] | None = None,
public: bool | None = None,
) -> None:
"""
Provide some useful default arguments when updating the current trace in our agents.
"""
langfuse_context.update_current_trace(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a wrapper around update_current_trace, see the default arguments below. The reason is to always include these in our PMAT-based agents.

name=name,
input=input,
output=output,
user_id=user_id or getpass.getuser(),
session_id=session_id
or self.session_id, # All traces within a single run execution will be grouped under a single session.
version=version
or APIKeys().LANGFUSE_DEPLOYMENT_VERSION, # Optionally, mark the current deployment with version (e.g. add git commit hash during docker building).
release=release,
metadata=metadata,
tags=tags,
public=public,
)

@computed_field # type: ignore[prop-decorator] # Mypy issue: https://github.com/python/mypy/issues/14461
@cached_property
def session_id(self) -> str:
# Each agent should be an unique class.
return f"{self.__class__.__name__} - {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}"

def __init_subclass__(cls, **kwargs: t.Any) -> None:
if "DeployableAgent" not in str(
cls.__init__
Expand Down Expand Up @@ -209,12 +281,50 @@ class DeployableTraderAgent(DeployableAgent):
min_required_balance_to_operate: xDai | None = xdai_type(1)
min_balance_to_keep_in_native_currency: xDai | None = xdai_type(0.1)

def __init__(self, place_bet: bool = True) -> None:
super().__init__()
def __init__(
self,
enable_langfuse: bool = APIKeys().default_enable_langfuse,
place_bet: bool = True,
) -> None:
super().__init__(enable_langfuse=enable_langfuse)
self.place_bet = place_bet

def have_bet_on_market_since(self, market: AgentMarket, since: timedelta) -> bool:
return have_bet_on_market_since(keys=APIKeys(), market=market, since=since)
def initialize_langfuse(self) -> None:
super().initialize_langfuse()
# Auto-observe all the methods where it makes sense, so that subclassses don't need to do it manually.
self.have_bet_on_market_since = observe()(self.have_bet_on_market_since) # type: ignore[method-assign]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason you do @observe() in some places, but observe()(self.some_class_method) here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, if something is just a function:

@observe()
def research(...) -> ...

We can use the decorator, and no harm is done. Anyone who uses research will use the observed variant.

But in our agent:

class Agent:
   @observe()
   def xyz(...) -> <default strategy>

class ProphetAgent(Agent):
   def xyz(...) -> <some more fancy strategy> <<<--- This would remove the decorator, so all deployable agent would have to manually @observe their methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is related to #345 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah makes sense, thanks

self.verify_market = observe()(self.verify_market) # type: ignore[method-assign]
self.answer_binary_market = observe()(self.answer_binary_market) # type: ignore[method-assign]
self.calculate_bet_amount = observe()(self.calculate_bet_amount) # type: ignore[method-assign]
self.process_market = observe()(self.process_market) # type: ignore[method-assign]

def update_langfuse_trace_by_market(
self, market_type: MarketType, market: AgentMarket
) -> None:
self.langfuse_update_current_trace(
# UI allows to do filtering by these.
metadata={
"agent_class": self.__class__.__name__,
"market_id": market.id,
"market_question": market.question,
"market_outcomes": market.outcomes,
},
)

def update_langfuse_trace_by_processed_market(
self, market_type: MarketType, processed_market: ProcessedMarket | None
) -> None:
self.langfuse_update_current_trace(
tags=[
TRADER_TAG,
(
AnsweredEnum.ANSWERED
if processed_market is not None
else AnsweredEnum.NOT_ANSWERED
),
market_type.value,
]
)

def check_min_required_balance_to_operate(self, market_type: MarketType) -> None:
api_keys = APIKeys()
Expand All @@ -229,35 +339,26 @@ def check_min_required_balance_to_operate(self, market_type: MarketType) -> None
f"for agent with address {api_keys.public_key} is not met."
)

def pick_markets(
self, market_type: MarketType, markets: t.Sequence[AgentMarket]
) -> t.Sequence[AgentMarket]:
def have_bet_on_market_since(self, market: AgentMarket, since: timedelta) -> bool:
return have_bet_on_market_since(keys=APIKeys(), market=market, since=since)

def verify_market(self, market_type: MarketType, market: AgentMarket) -> bool:
"""
Subclasses can implement their own logic instead of this one, or on top of this one.
By default, it picks only the first {n_markets_per_run} markets where user didn't bet recently and it's a reasonable question.
By default, it allows only markets where user didn't bet recently and it's a reasonable question.
"""
picked: list[AgentMarket] = []

for market in markets:
if len(picked) >= self.bet_on_n_markets_per_run:
break

if self.have_bet_on_market_since(market, since=timedelta(hours=24)):
continue

# Do as a last check, as it uses paid OpenAI API.
if not is_predictable_binary(market.question):
continue
if self.have_bet_on_market_since(market, since=timedelta(hours=24)):
return False

# Manifold allows to bet only on markets with probability between 1 and 99.
if market_type == MarketType.MANIFOLD and not (
1 < market.current_p_yes < 99
):
continue
# Manifold allows to bet only on markets with probability between 1 and 99.
if market_type == MarketType.MANIFOLD and not (1 < market.current_p_yes < 99):
return False

picked.append(market)
# Do as a last check, as it uses paid OpenAI API.
if not is_predictable_binary(market.question):
return False

return picked
return True
Comment on lines +358 to +361
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplify the return statement.

The return statement can be simplified by directly returning the condition.

- if not is_predictable_binary(market.question):
-     return False
- return True
+ return is_predictable_binary(market.question)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if not is_predictable_binary(market.question):
return False
return picked
return True
return is_predictable_binary(market.question)
Tools
Ruff

358-361: Return the condition is_predictable_binary(market.question) directly

Replace with return is_predictable_binary(market.question)

(SIM103)


def answer_binary_market(self, market: AgentMarket) -> Answer | None:
"""
Expand Down Expand Up @@ -285,7 +386,55 @@ def get_markets(
)
return available_markets

def before(self, market_type: MarketType) -> None:
def before_process_market(
self, market_type: MarketType, market: AgentMarket
) -> None:
self.update_langfuse_trace_by_market(market_type, market)

def process_market(
self, market_type: MarketType, market: AgentMarket, verify_market: bool = True
) -> ProcessedMarket | None:
self.before_process_market(market_type, market)

if verify_market and not self.verify_market(market_type, market):
logger.info(f"Market '{market.question}' doesn't meet the criteria.")
self.update_langfuse_trace_by_processed_market(market_type, None)
return None

answer = self.answer_binary_market(market)

if answer is None:
logger.info(f"No answer for market '{market.question}'.")
self.update_langfuse_trace_by_processed_market(market_type, None)
return None

amount = self.calculate_bet_amount(answer, market)

if self.place_bet:
logger.info(
f"Placing bet on {market} with result {answer} and amount {amount}"
)
market.place_bet(
amount=amount,
outcome=answer.decision,
)

self.after_process_market(market_type, market)

processed_market = ProcessedMarket(
answer=answer,
amount=amount,
)
self.update_langfuse_trace_by_processed_market(market_type, processed_market)

return processed_market

def after_process_market(
self, market_type: MarketType, market: AgentMarket
) -> None:
pass

def before_process_markets(self, market_type: MarketType) -> None:
"""
Executes actions that occur before bets are placed.
"""
Expand All @@ -302,33 +451,29 @@ def before(self, market_type: MarketType) -> None:
withdraw_multiplier=2,
)

def process_bets(self, market_type: MarketType) -> None:
def process_markets(self, market_type: MarketType) -> None:
"""
Processes bets placed by agents on a given market.
"""
available_markets = self.get_markets(market_type)
markets = self.pick_markets(market_type, available_markets)
for market in markets:
processed = 0

for market in available_markets:
# We need to check it again before each market bet, as the balance might have changed.
self.check_min_required_balance_to_operate(market_type)
result = self.answer_binary_market(market)
if result is None:
logger.info(f"Skipping market {market} as no answer was provided")
continue
if self.place_bet:
amount = self.calculate_bet_amount(result, market)
logger.info(
f"Placing bet on {market} with result {result} and amount {amount}"
)
market.place_bet(
amount=amount,
outcome=result.decision,
)

def after(self, market_type: MarketType) -> None:
processed_market = self.process_market(market_type, market)

if processed_market is not None:
processed += 1

if processed == self.bet_on_n_markets_per_run:
break

def after_process_markets(self, market_type: MarketType) -> None:
pass

def run(self, market_type: MarketType) -> None:
self.before(market_type)
self.process_bets(market_type)
self.after(market_type)
self.before_process_markets(market_type)
self.process_markets(market_type)
self.after_process_markets(market_type)
Loading
Loading