Skip to content

Commit

Permalink
init monitor in _init
Browse files Browse the repository at this point in the history
  • Loading branch information
pan-x-c committed Jan 18, 2024
1 parent 14cdebe commit f4ad469
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 31 deletions.
9 changes: 7 additions & 2 deletions src/agentscope/_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
from ._runtime import Runtime
from .file_manager import file_manager
from .utils.logging_utils import LOG_LEVEL, setup_logger
from .utils.monitor import MonitorFactory
from .models import read_model_configs
from .constants import _DEFAULT_DIR
from .constants import _DEFAULT_LOG_LEVEL


_DEFAULT_DIR = "./runs"
_DEFAULT_LOG_LEVEL = "INFO"
_INIT_SETTINGS = {}


Expand Down Expand Up @@ -85,6 +87,9 @@ def init(
dir_log = str(file_manager.dir_log) if save_log else None
setup_logger(dir_log, logger_level)

# Set monitor
_ = MonitorFactory.get_monitor(db_path=file_manager.file_db)

# Load config and init agent by configs
if agent_configs is not None:
if isinstance(agent_configs, str):
Expand Down
7 changes: 7 additions & 0 deletions src/agentscope/configs/model_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Any
from ..constants import _DEFAULT_MAX_RETRIES
from ..constants import _DEFAULT_MESSAGES_KEY
from ..constants import _DEFAULT_API_BUDGET


class CfgBase(dict):
Expand Down Expand Up @@ -57,6 +58,9 @@ class OpenAICfg(CfgBase):
"""The arguments used in openai api generation, e.g. `temperature`,
`seed`."""

budget: float = _DEFAULT_API_BUDGET
"""The total budget using this model. Set to `None` means no limit."""


class PostApiCfg(CfgBase):
"""The config for Post API. The final request post will be
Expand Down Expand Up @@ -113,3 +117,6 @@ class PostApiCfg(CfgBase):
"""The key of the prompt messages in `requests.post()`,
e.g. `request.post(json={${messages_key}: messages, **json_args})`. For
huggingface and modelscope inference API, the key is `inputs`"""

budget: float = _DEFAULT_API_BUDGET
"""The total budget using this model. Set to `None` means no limit."""
2 changes: 2 additions & 0 deletions src/agentscope/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
_DEFAULT_SUBDIR_FILE = "file"
_DEFAULT_SUBDIR_INVOKE = "invoke"
_DEFAULT_IMAGE_NAME = "image_{}_{}.png"
_DEFAULT_SQLITE_DB_FILE = "agentscope.db"
# for model wrapper
_DEFAULT_MAX_RETRIES = 3
_DEFAULT_MESSAGES_KEY = "inputs"
_DEFAULT_RETRY_INTERVAL = 1
_DEFAULT_API_BUDGET = None
# for execute python
_DEFAULT_PYPI_MIRROR = "http://mirrors.aliyun.com/pypi/simple/"
_DEFAULT_TRUSTED_HOST = "mirrors.aliyun.com"
Expand Down
10 changes: 10 additions & 0 deletions src/agentscope/file_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
_DEFAULT_SUBDIR_CODE,
_DEFAULT_SUBDIR_FILE,
_DEFAULT_SUBDIR_INVOKE,
_DEFAULT_SQLITE_DB_FILE,
_DEFAULT_IMAGE_NAME,
)

Expand Down Expand Up @@ -48,6 +49,10 @@ def _get_and_create_subdir(self, subdir: str) -> str:
os.makedirs(path)
return path

def _get_file_path(self, file_name: str) -> str:
"""Get the path of the file."""
return os.path.join(self.dir, Runtime.runtime_id, file_name)

@property
def dir_log(self) -> str:
"""The directory for saving logs."""
Expand All @@ -69,6 +74,11 @@ def dir_invoke(self) -> str:
"""The directory for saving api invocations."""
return self._get_and_create_subdir(_DEFAULT_SUBDIR_INVOKE)

@property
def file_db(self) -> str:
"""The path to the sqlite db file."""
return self._get_file_path(_DEFAULT_SQLITE_DB_FILE)

def init(self, save_dir: str, save_api_invoke: bool = False) -> None:
"""Set the directory for saving files."""
self.dir = save_dir
Expand Down
24 changes: 21 additions & 3 deletions src/agentscope/models/openai_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
except ImportError:
openai = None

from ..utils import MonitorFactory
from ..utils.monitor import MonitorFactory
from ..utils.monitor import full_name
from ..utils import QuotaExceededError
from ..utils.token_utils import get_openai_max_length

Expand All @@ -28,6 +29,7 @@ def __init__(
organization: str = None,
client_args: dict = None,
generate_args: dict = None,
budget: float = None,
) -> None:
"""Initialize the openai client.
Expand All @@ -49,6 +51,9 @@ def __init__(
generate_args (`dict`, default `None`):
The extra keyword arguments used in openai api generation,
e.g. `temperature`, `seed`.
budget (`float`, default `None`):
The total budget using this model. Set to `None` means no
limit.
"""
super().__init__(name)

Expand Down Expand Up @@ -77,8 +82,18 @@ def __init__(

# Set monitor accordingly
self.monitor = None
self.budget = budget
self._register_budget()
self._register_default_metrics()

def _register_budget(self) -> None:
self.monitor = MonitorFactory.get_monitor()
self.monitor.register_budget(
model_name=self.model_name,
value=self.budget,
prefix=self.model_name,
)

def _register_default_metrics(self) -> None:
"""Register metrics to the monitor."""
raise NotImplementedError(
Expand All @@ -95,7 +110,7 @@ def _metric(self, metric_name: str) -> str:
Returns:
`str`: Metric name of this wrapper.
"""
return f"{self.__class__.__name__}.{self.model_name}.{metric_name}"
return full_name(name=metric_name, prefix=self.model_name)


class OpenAIChatWrapper(OpenAIWrapper):
Expand Down Expand Up @@ -193,7 +208,10 @@ def __call__(

# step5: update monitor accordingly
try:
self.monitor.update(**response.usage.model_dump())
self.monitor.update(
response.usage.model_dump(),
prefix=self.model_name,
)
except QuotaExceededError as e:
# TODO: optimize quota exceeded error handling process
logger.error(e.message)
Expand Down
64 changes: 48 additions & 16 deletions src/agentscope/utils/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ def add(self, metric_name: str, value: float) -> bool:
`bool`: whether the operation success.
"""

def update(self, **kwargs: Any) -> None:
def update(self, values: dict, prefix: Optional[str] = None) -> None:
"""Update multiple metrics at once."""
for k, v in kwargs.items():
self.add(k, v)
for k, v in values:
self.add(full_name(prefix=prefix, name=k), v)

@abstractmethod
def clear(self, metric_name: str) -> bool:
Expand Down Expand Up @@ -200,15 +200,30 @@ def register_budget(
model_name (`str`): model that requires budget.
value (`float`): the budget value.
prefix (`Optional[str]`, default `None`): used to distinguish
multiple budget registrations for the same model. For multiple
registrations with the same `model_name` and `prefix`, only the
first time will take effect.
multiple budget registrations. For multiple registrations with
the same `prefix`, only the first time will take effect.
Returns:
`bool`: whether the operation success.
"""


def full_name(name: str, prefix: Optional[str] = None) -> str:
"""get the full name of a metric.
Args:
metric_name (`str`): name of a metric.
prefix (` Optional[str]`, default `None`): metric prefix.
Returns:
`str`: the full name of the metric
"""
if prefix is None:
return name
else:
return f"{prefix}.{name}"


class QuotaExceededError(Exception):
"""An Exception used to indicate that a certain metric exceeds quota"""

Expand Down Expand Up @@ -625,10 +640,17 @@ def exists(self, metric_name: str) -> bool:
with sqlite_cursor(self.db_path) as cursor:
return self._exists(cursor, metric_name)

def update(self, **kwargs: Any) -> None:
def update(self, values: dict, prefix: Optional[str] = None) -> None:
with sqlite_transaction(self.db_path) as cursor:
for metric_name, value in kwargs.items():
self._add(cursor, metric_name, value)
for metric_name, value in values.items():
self._add(
cursor,
full_name(
name=metric_name,
prefix=prefix,
),
value,
)

def _create_update_cost_trigger(
self,
Expand Down Expand Up @@ -661,7 +683,10 @@ def register_budget(
logger.info(f"set budget {value} to {model_name}")
pricing = get_pricing()
if model_name in pricing:
budget_metric_name = f"{prefix}.{model_name}.cost"
budget_metric_name = full_name(
name="cost",
prefix=prefix,
)
ok = self.register(
metric_name=budget_metric_name,
metric_unit="dollor",
Expand All @@ -670,7 +695,10 @@ def register_budget(
if not ok:
return False
for metric_name, unit_price in pricing[model_name].items():
token_metric_name = f"{prefix}.{model_name}.{metric_name}"
token_metric_name = full_name(
name=metric_name,
prefix=prefix,
)
self.register(
metric_name=token_metric_name,
metric_unit="token",
Expand Down Expand Up @@ -721,24 +749,28 @@ class MonitorFactory:
from agentscope.utils import MonitorFactory
monitor = MonitorFactory.get_monitor()
"""

_instance = None

@classmethod
def get_monitor(cls, impl_type: Optional[str] = None) -> MonitorBase:
def get_monitor(
cls,
impl_type: Optional[str] = None,
**kwargs: Any,
) -> MonitorBase:
"""Get the monitor instance.
Returns:
`MonitorBase`: the monitor instance.
"""
if cls._instance is None:
# todo: init a specific monitor implementation by input args
if impl_type is None or impl_type.lower() == "dict":
if impl_type is None or impl_type.lower() == "sqlite":
cls._instance = SqliteMonitor(**kwargs)
elif impl_type.lower() == "dict":
cls._instance = DictMonitor()
else:
raise NotImplementedError(
"Monitor with type [{type}] is not implemented.",
)
return cls._instance
return cls._instance # type: ignore [return-value]
27 changes: 17 additions & 10 deletions tests/monitor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ class MonitorFactoryTest(unittest.TestCase):

def test_get_monitor(self) -> None:
"""Test get monitor method of MonitorFactory."""
monitor1 = MonitorFactory.get_monitor()
monitor2 = MonitorFactory.get_monitor()
monitor1 = MonitorFactory.get_monitor("dict")
monitor2 = MonitorFactory.get_monitor("dict")
self.assertEqual(monitor1, monitor2)
self.assertTrue(
monitor1.register("token_num", metric_unit="token", quota=200),
Expand Down Expand Up @@ -185,29 +185,30 @@ def test_register_budget(self) -> None:
self.monitor.register_budget(
model_name="gpt-4",
value=5,
prefix="agent_A",
prefix="agent_A.gpt-4",
),
)
# register an existing model with different prefix is ok
self.assertTrue(
self.monitor.register_budget(
model_name="gpt-4",
value=15,
prefix="agent_B",
prefix="agent_B.gpt-4",
),
)
gpt_4_3d = {
"agent_A.gpt-4.prompt_tokens": 50000,
"agent_A.gpt-4.completion_tokens": 25000,
"agent_A.gpt-4.total_tokens": 750000,
"prompt_tokens": 50000,
"completion_tokens": 25000,
"total_tokens": 750000,
}
# agentA uses 3 dollors
self.monitor.update(**gpt_4_3d)
self.monitor.update(gpt_4_3d, prefix="agent_A.gpt-4")
# agentA uses another 3 dollors and exceeds quota
self.assertRaises(
QuotaExceededError,
self.monitor.update,
**gpt_4_3d,
gpt_4_3d,
"agent_A.gpt-4",
)
self.assertLess(
self.monitor.get_value( # type: ignore [arg-type]
Expand All @@ -220,6 +221,12 @@ def test_register_budget(self) -> None:
self.monitor.register_budget(
model_name="gpt-4",
value=5,
prefix="agent_A",
prefix="agent_A.gpt-4",
),
)
self.assertEqual(
self.monitor.get_value( # type: ignore [arg-type]
"agent_A.gpt-4.cost",
),
3,
)

0 comments on commit f4ad469

Please sign in to comment.