Skip to content

Commit

Permalink
refactor pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
pan-x-c committed Jan 16, 2024
1 parent ce4f390 commit 86eb21d
Show file tree
Hide file tree
Showing 28 changed files with 154 additions and 119 deletions.
19 changes: 19 additions & 0 deletions docs/sphinx_doc/source/agentscope.service.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,22 @@ Service package
agentscope.service.text_processing
agentscope.service.web_search


service\_status module
--------------------------------

.. automodule:: agentscope.service.service_status
:noindex:
:members:
:undoc-members:
:show-inheritance:


service\_response module
--------------------------------

.. automodule:: agentscope.service.service_response
:noindex:
:members:
:undoc-members:
:show-inheritance:
2 changes: 1 addition & 1 deletion src/agentscope/agents/dialog_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from ..message import Msg
from .agent import AgentBase
from ..prompt import PromptEngine
from ..constants import PromptType
from ..prompt import PromptType


class DialogAgent(AgentBase):
Expand Down
2 changes: 1 addition & 1 deletion src/agentscope/agents/dict_dialog_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from ..message import Msg
from .agent import AgentBase
from ..prompt import PromptEngine
from ..constants import PromptType
from ..prompt import PromptType


class DictDialogAgent(AgentBase):
Expand Down
11 changes: 7 additions & 4 deletions src/agentscope/agents/rpc_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,6 @@ def setup_rcp_agent_server(
f"Stopping rpc server [{servicer_class.__name__}] at port [{port}]",
)
server.stop(0)
stop_event.set()
logger.info(
f"rpc server [{servicer_class.__name__}] at port [{port}] stopped "
"successfully",
Expand Down Expand Up @@ -541,9 +540,13 @@ def shutdown(self) -> None:
if self.server is not None:
if self.stop_event is not None:
self.stop_event.set()
self.stop_event.clear()
self.stop_event.wait()
self.stop_event = None
self.server.terminate()
self.server.join()
self.server.join(timeout=5)
if self.server.is_alive():
self.server.kill()
logger.info(
f"Rpc server [{self.agent_class.__name__}] at port"
f" [{self.port}] is killed.",
)
self.server = None
2 changes: 1 addition & 1 deletion src/agentscope/agents/rpc_dialog_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from agentscope.message import Msg
from agentscope.agents.rpc_agent import RpcAgentBase
from agentscope.prompt import PromptEngine
from agentscope.constants import PromptType
from agentscope.prompt import PromptType


class RpcDialogAgent(RpcAgentBase):
Expand Down
14 changes: 0 additions & 14 deletions src/agentscope/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,6 @@ class ResponseFormat(IntEnum):
JSON = 1


class ServiceExecStatus(IntEnum):
"""Enum for service execution status."""

SUCCESS = 1
ERROR = -1


class PromptType(IntEnum):
"""Enum for prompt types."""

STRING = 0
LIST = 1


class ShrinkPolicy(IntEnum):
"""Enum for shrink strategies when the prompt is too long."""

Expand Down
64 changes: 35 additions & 29 deletions src/agentscope/pipelines/functional.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
# -*- coding: utf-8 -*-
""" Functional counterpart for Pipeline """

from typing import Callable, Sequence, Optional
from typing import Callable, Sequence, Optional, Union
from typing import Any
from typing import Mapping
from ..agents.operator import Operator

Operators = Union[Operator, Sequence[Operator]]


def placeholder(x: dict = None) -> dict:
r"""A placeholder that do nothing.
Expand Down Expand Up @@ -40,10 +41,19 @@ def sequentialpipeline(
return msg


def _operators(operators: Operators, x: Optional[dict] = None) -> dict:
"""Syntactic sugar for executing a single operator or a sequence of
operators."""
if isinstance(operators, Sequence):
return sequentialpipeline(operators, x)
else:
return operators(x)


def ifelsepipeline(
condition_func: Callable,
if_body_operator: Operator,
else_body_operator: Operator = placeholder,
if_body_operators: Operators,
else_body_operators: Operators = placeholder,
x: Optional[dict] = None,
) -> dict:
"""Functional version of IfElsePipeline.
Expand All @@ -52,10 +62,10 @@ def ifelsepipeline(
condition_func (`Callable`):
A function that determines whether to exeucte `if_body_operator`
or `else_body_operator` based on x.
if_body_operator (`Operator`):
An operator executed when `condition_func` returns True.
else_body_operator (`Operator`, defaults to `placeholder`):
An operator executed when condition_func returns False,
if_body_operator (`Operators`):
Operators executed when `condition_func` returns True.
else_body_operator (`Operators`, defaults to `placeholder`):
Operators executed when condition_func returns False,
does nothing and just return the input by default.
x (`Optional[dict]`, defaults to `None`):
The input dictionary.
Expand All @@ -64,15 +74,15 @@ def ifelsepipeline(
`dict`: the output dictionary.
"""
if condition_func(x):
return if_body_operator(x)
return _operators(if_body_operators, x)
else:
return else_body_operator(x)
return _operators(else_body_operators, x)


def switchpipeline(
condition_func: Callable[[Any], Any],
case_operators: Mapping[Any, Operator],
default_operator: Operator = placeholder,
case_operators: Mapping[Any, Operators],
default_operators: Operators = placeholder,
x: Optional[dict] = None,
) -> dict:
"""Functional version of SwitchPipeline.
Expand All @@ -85,8 +95,8 @@ def switchpipeline(
case_operators (`Mapping[Any, Operator]`):
A dictionary containing multiple operators and their
corresponding trigger conditions.
default_operator (`Operator`, defaults to `placeholder`):
An operator that is executed when the actual condition do not
default_operators (`Operators`, defaults to `placeholder`):
Operators that are executed when the actual condition do not
meet any of the case_operators, does nothing and just return the
input by default.
x (`Optional[dict]`, defaults to `None`):
Expand All @@ -97,22 +107,22 @@ def switchpipeline(
"""
target_case = condition_func(x)
if target_case in case_operators:
return case_operators[target_case](x)
return _operators(case_operators[target_case], x)
else:
return default_operator(x)
return _operators(default_operators, x)


def forlooppipeline(
loop_body_operator: Operator,
loop_body_operators: Operators,
max_loop: int,
break_func: Callable[[dict], bool] = lambda _: False,
x: Optional[dict] = None,
) -> dict:
"""Functional version of ForLoopPipeline.
Args:
loop_body_operator (`Operator`):
An operator executed as the body of the loop.
loop_body_operators (`Operators`):
Operators executed as the body of the loop.
max_loop (`int`):
maximum number of loop executions.
break_func (`Callable[[dict], bool]`):
Expand All @@ -125,26 +135,24 @@ def forlooppipeline(
Returns:
`dict`: The output dictionary.
"""
if x is None:
x = {}
for _ in range(max_loop):
# loop body
x = loop_body_operator(x)
x = _operators(loop_body_operators, x)
# check condition
if break_func(x):
break
return x
return x # type: ignore [return-value]


def whilelooppipeline(
loop_body_operator: Operator,
loop_body_operators: Operators,
condition_func: Callable[[int, Any], bool] = lambda _, __: False,
x: Optional[dict] = None,
) -> dict:
"""Functional version of WhileLoopPipeline.
Args:
loop_body_operator (`Operator`): An operator executed as the body of
loop_body_operators (`Operators`): Operators executed as the body of
the loop.
condition_func (`Callable[[int, Any], bool]`, optional): A function
that determines whether to continue executing the loop body based
Expand All @@ -156,12 +164,10 @@ def whilelooppipeline(
Returns:
`dict`: the output dictionary.
"""
if x is None:
x = {}
i = 0
while condition_func(i, x):
# loop body
x = loop_body_operator(x)
x = _operators(loop_body_operators, x)
# check condition
i += 1
return x
return x # type: ignore [return-value]
Loading

0 comments on commit 86eb21d

Please sign in to comment.