From f4ad4695ad2644f9a1b6a3a654405cac1496d3f2 Mon Sep 17 00:00:00 2001 From: "panxuchen.pxc" Date: Thu, 18 Jan 2024 19:13:35 +0800 Subject: [PATCH] init monitor in _init --- src/agentscope/_init.py | 9 +++- src/agentscope/configs/model_config.py | 7 +++ src/agentscope/constants.py | 2 + src/agentscope/file_manager.py | 10 ++++ src/agentscope/models/openai_model.py | 24 ++++++++-- src/agentscope/utils/monitor.py | 64 +++++++++++++++++++------- tests/monitor_test.py | 27 +++++++---- 7 files changed, 112 insertions(+), 31 deletions(-) diff --git a/src/agentscope/_init.py b/src/agentscope/_init.py index e96f523a6..7e2296563 100644 --- a/src/agentscope/_init.py +++ b/src/agentscope/_init.py @@ -10,10 +10,12 @@ from ._runtime import Runtime from .file_manager import file_manager from .utils.logging_utils import LOG_LEVEL, setup_logger +from .utils.monitor import MonitorFactory from .models import read_model_configs +from .constants import _DEFAULT_DIR +from .constants import _DEFAULT_LOG_LEVEL + -_DEFAULT_DIR = "./runs" -_DEFAULT_LOG_LEVEL = "INFO" _INIT_SETTINGS = {} @@ -85,6 +87,9 @@ def init( dir_log = str(file_manager.dir_log) if save_log else None setup_logger(dir_log, logger_level) + # Set monitor + _ = MonitorFactory.get_monitor(db_path=file_manager.file_db) + # Load config and init agent by configs if agent_configs is not None: if isinstance(agent_configs, str): diff --git a/src/agentscope/configs/model_config.py b/src/agentscope/configs/model_config.py index 774be726a..452a4d2c7 100644 --- a/src/agentscope/configs/model_config.py +++ b/src/agentscope/configs/model_config.py @@ -3,6 +3,7 @@ from typing import Any from ..constants import _DEFAULT_MAX_RETRIES from ..constants import _DEFAULT_MESSAGES_KEY +from ..constants import _DEFAULT_API_BUDGET class CfgBase(dict): @@ -57,6 +58,9 @@ class OpenAICfg(CfgBase): """The arguments used in openai api generation, e.g. `temperature`, `seed`.""" + budget: float = _DEFAULT_API_BUDGET + """The total budget using this model. Set to `None` means no limit.""" + class PostApiCfg(CfgBase): """The config for Post API. The final request post will be @@ -113,3 +117,6 @@ class PostApiCfg(CfgBase): """The key of the prompt messages in `requests.post()`, e.g. `request.post(json={${messages_key}: messages, **json_args})`. For huggingface and modelscope inference API, the key is `inputs`""" + + budget: float = _DEFAULT_API_BUDGET + """The total budget using this model. Set to `None` means no limit.""" diff --git a/src/agentscope/constants.py b/src/agentscope/constants.py index 54f52688f..938c12290 100644 --- a/src/agentscope/constants.py +++ b/src/agentscope/constants.py @@ -16,10 +16,12 @@ _DEFAULT_SUBDIR_FILE = "file" _DEFAULT_SUBDIR_INVOKE = "invoke" _DEFAULT_IMAGE_NAME = "image_{}_{}.png" +_DEFAULT_SQLITE_DB_FILE = "agentscope.db" # for model wrapper _DEFAULT_MAX_RETRIES = 3 _DEFAULT_MESSAGES_KEY = "inputs" _DEFAULT_RETRY_INTERVAL = 1 +_DEFAULT_API_BUDGET = None # for execute python _DEFAULT_PYPI_MIRROR = "http://mirrors.aliyun.com/pypi/simple/" _DEFAULT_TRUSTED_HOST = "mirrors.aliyun.com" diff --git a/src/agentscope/file_manager.py b/src/agentscope/file_manager.py index db73cda40..d53f39fbf 100644 --- a/src/agentscope/file_manager.py +++ b/src/agentscope/file_manager.py @@ -14,6 +14,7 @@ _DEFAULT_SUBDIR_CODE, _DEFAULT_SUBDIR_FILE, _DEFAULT_SUBDIR_INVOKE, + _DEFAULT_SQLITE_DB_FILE, _DEFAULT_IMAGE_NAME, ) @@ -48,6 +49,10 @@ def _get_and_create_subdir(self, subdir: str) -> str: os.makedirs(path) return path + def _get_file_path(self, file_name: str) -> str: + """Get the path of the file.""" + return os.path.join(self.dir, Runtime.runtime_id, file_name) + @property def dir_log(self) -> str: """The directory for saving logs.""" @@ -69,6 +74,11 @@ def dir_invoke(self) -> str: """The directory for saving api invocations.""" return self._get_and_create_subdir(_DEFAULT_SUBDIR_INVOKE) + @property + def file_db(self) -> str: + """The path to the sqlite db file.""" + return self._get_file_path(_DEFAULT_SQLITE_DB_FILE) + def init(self, save_dir: str, save_api_invoke: bool = False) -> None: """Set the directory for saving files.""" self.dir = save_dir diff --git a/src/agentscope/models/openai_model.py b/src/agentscope/models/openai_model.py index abe1f6bd8..3eed6c846 100644 --- a/src/agentscope/models/openai_model.py +++ b/src/agentscope/models/openai_model.py @@ -12,7 +12,8 @@ except ImportError: openai = None -from ..utils import MonitorFactory +from ..utils.monitor import MonitorFactory +from ..utils.monitor import full_name from ..utils import QuotaExceededError from ..utils.token_utils import get_openai_max_length @@ -28,6 +29,7 @@ def __init__( organization: str = None, client_args: dict = None, generate_args: dict = None, + budget: float = None, ) -> None: """Initialize the openai client. @@ -49,6 +51,9 @@ def __init__( generate_args (`dict`, default `None`): The extra keyword arguments used in openai api generation, e.g. `temperature`, `seed`. + budget (`float`, default `None`): + The total budget using this model. Set to `None` means no + limit. """ super().__init__(name) @@ -77,8 +82,18 @@ def __init__( # Set monitor accordingly self.monitor = None + self.budget = budget + self._register_budget() self._register_default_metrics() + def _register_budget(self) -> None: + self.monitor = MonitorFactory.get_monitor() + self.monitor.register_budget( + model_name=self.model_name, + value=self.budget, + prefix=self.model_name, + ) + def _register_default_metrics(self) -> None: """Register metrics to the monitor.""" raise NotImplementedError( @@ -95,7 +110,7 @@ def _metric(self, metric_name: str) -> str: Returns: `str`: Metric name of this wrapper. """ - return f"{self.__class__.__name__}.{self.model_name}.{metric_name}" + return full_name(name=metric_name, prefix=self.model_name) class OpenAIChatWrapper(OpenAIWrapper): @@ -193,7 +208,10 @@ def __call__( # step5: update monitor accordingly try: - self.monitor.update(**response.usage.model_dump()) + self.monitor.update( + response.usage.model_dump(), + prefix=self.model_name, + ) except QuotaExceededError as e: # TODO: optimize quota exceeded error handling process logger.error(e.message) diff --git a/src/agentscope/utils/monitor.py b/src/agentscope/utils/monitor.py index 7781290f5..1f6a68429 100644 --- a/src/agentscope/utils/monitor.py +++ b/src/agentscope/utils/monitor.py @@ -64,10 +64,10 @@ def add(self, metric_name: str, value: float) -> bool: `bool`: whether the operation success. """ - def update(self, **kwargs: Any) -> None: + def update(self, values: dict, prefix: Optional[str] = None) -> None: """Update multiple metrics at once.""" - for k, v in kwargs.items(): - self.add(k, v) + for k, v in values: + self.add(full_name(prefix=prefix, name=k), v) @abstractmethod def clear(self, metric_name: str) -> bool: @@ -200,15 +200,30 @@ def register_budget( model_name (`str`): model that requires budget. value (`float`): the budget value. prefix (`Optional[str]`, default `None`): used to distinguish - multiple budget registrations for the same model. For multiple - registrations with the same `model_name` and `prefix`, only the - first time will take effect. + multiple budget registrations. For multiple registrations with + the same `prefix`, only the first time will take effect. Returns: `bool`: whether the operation success. """ +def full_name(name: str, prefix: Optional[str] = None) -> str: + """get the full name of a metric. + + Args: + metric_name (`str`): name of a metric. + prefix (` Optional[str]`, default `None`): metric prefix. + + Returns: + `str`: the full name of the metric + """ + if prefix is None: + return name + else: + return f"{prefix}.{name}" + + class QuotaExceededError(Exception): """An Exception used to indicate that a certain metric exceeds quota""" @@ -625,10 +640,17 @@ def exists(self, metric_name: str) -> bool: with sqlite_cursor(self.db_path) as cursor: return self._exists(cursor, metric_name) - def update(self, **kwargs: Any) -> None: + def update(self, values: dict, prefix: Optional[str] = None) -> None: with sqlite_transaction(self.db_path) as cursor: - for metric_name, value in kwargs.items(): - self._add(cursor, metric_name, value) + for metric_name, value in values.items(): + self._add( + cursor, + full_name( + name=metric_name, + prefix=prefix, + ), + value, + ) def _create_update_cost_trigger( self, @@ -661,7 +683,10 @@ def register_budget( logger.info(f"set budget {value} to {model_name}") pricing = get_pricing() if model_name in pricing: - budget_metric_name = f"{prefix}.{model_name}.cost" + budget_metric_name = full_name( + name="cost", + prefix=prefix, + ) ok = self.register( metric_name=budget_metric_name, metric_unit="dollor", @@ -670,7 +695,10 @@ def register_budget( if not ok: return False for metric_name, unit_price in pricing[model_name].items(): - token_metric_name = f"{prefix}.{model_name}.{metric_name}" + token_metric_name = full_name( + name=metric_name, + prefix=prefix, + ) self.register( metric_name=token_metric_name, metric_unit="token", @@ -721,24 +749,28 @@ class MonitorFactory: from agentscope.utils import MonitorFactory monitor = MonitorFactory.get_monitor() - """ _instance = None @classmethod - def get_monitor(cls, impl_type: Optional[str] = None) -> MonitorBase: + def get_monitor( + cls, + impl_type: Optional[str] = None, + **kwargs: Any, + ) -> MonitorBase: """Get the monitor instance. Returns: `MonitorBase`: the monitor instance. """ if cls._instance is None: - # todo: init a specific monitor implementation by input args - if impl_type is None or impl_type.lower() == "dict": + if impl_type is None or impl_type.lower() == "sqlite": + cls._instance = SqliteMonitor(**kwargs) + elif impl_type.lower() == "dict": cls._instance = DictMonitor() else: raise NotImplementedError( "Monitor with type [{type}] is not implemented.", ) - return cls._instance + return cls._instance # type: ignore [return-value] diff --git a/tests/monitor_test.py b/tests/monitor_test.py index 342d32e5c..916c6d2c3 100644 --- a/tests/monitor_test.py +++ b/tests/monitor_test.py @@ -16,8 +16,8 @@ class MonitorFactoryTest(unittest.TestCase): def test_get_monitor(self) -> None: """Test get monitor method of MonitorFactory.""" - monitor1 = MonitorFactory.get_monitor() - monitor2 = MonitorFactory.get_monitor() + monitor1 = MonitorFactory.get_monitor("dict") + monitor2 = MonitorFactory.get_monitor("dict") self.assertEqual(monitor1, monitor2) self.assertTrue( monitor1.register("token_num", metric_unit="token", quota=200), @@ -185,7 +185,7 @@ def test_register_budget(self) -> None: self.monitor.register_budget( model_name="gpt-4", value=5, - prefix="agent_A", + prefix="agent_A.gpt-4", ), ) # register an existing model with different prefix is ok @@ -193,21 +193,22 @@ def test_register_budget(self) -> None: self.monitor.register_budget( model_name="gpt-4", value=15, - prefix="agent_B", + prefix="agent_B.gpt-4", ), ) gpt_4_3d = { - "agent_A.gpt-4.prompt_tokens": 50000, - "agent_A.gpt-4.completion_tokens": 25000, - "agent_A.gpt-4.total_tokens": 750000, + "prompt_tokens": 50000, + "completion_tokens": 25000, + "total_tokens": 750000, } # agentA uses 3 dollors - self.monitor.update(**gpt_4_3d) + self.monitor.update(gpt_4_3d, prefix="agent_A.gpt-4") # agentA uses another 3 dollors and exceeds quota self.assertRaises( QuotaExceededError, self.monitor.update, - **gpt_4_3d, + gpt_4_3d, + "agent_A.gpt-4", ) self.assertLess( self.monitor.get_value( # type: ignore [arg-type] @@ -220,6 +221,12 @@ def test_register_budget(self) -> None: self.monitor.register_budget( model_name="gpt-4", value=5, - prefix="agent_A", + prefix="agent_A.gpt-4", ), ) + self.assertEqual( + self.monitor.get_value( # type: ignore [arg-type] + "agent_A.gpt-4.cost", + ), + 3, + )