Skip to content

Commit

Permalink
rtvi: bot-ready message needs to be sent manual
Browse files Browse the repository at this point in the history
  • Loading branch information
aconchillo committed Nov 5, 2024
1 parent bd50201 commit fd04e62
Showing 1 changed file with 24 additions and 33 deletions.
57 changes: 24 additions & 33 deletions src/pipecat/processors/frameworks/rtvi.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,15 +589,7 @@ def __init__(
self._registered_actions: Dict[str, RTVIAction] = {}
self._registered_services: Dict[str, RTVIService] = {}

# A task to process incoming action frames.
self._action_queue = asyncio.Queue()
self._action_task = self.get_event_loop().create_task(self._action_task_handler())

# A task to process incoming transport messages.
self._message_queue = asyncio.Queue()
self._message_task = self.get_event_loop().create_task(self._message_task_handler())

self._register_event_handler("on_bot_ready")
self._register_event_handler("on_client_ready")

def register_action(self, action: RTVIAction):
id = self._action_id(action.service, action.action)
Expand All @@ -606,18 +598,21 @@ def register_action(self, action: RTVIAction):
def register_service(self, service: RTVIService):
self._registered_services[service.name] = service

async def set_client_ready(self):
self._client_ready = True
await self._call_event_handler("on_client_ready")

async def set_bot_ready(self):
await self._update_config(self._config, False)
await self._send_bot_ready()

async def interrupt_bot(self):
await self.push_frame(BotInterruptionFrame(), FrameDirection.UPSTREAM)

async def send_error(self, error: str):
message = RTVIError(data=RTVIErrorData(error=error, fatal=False))
await self._push_transport_message(message)

async def set_client_ready(self):
if not self._client_ready:
self._client_ready = True
await self._maybe_send_bot_ready()

async def handle_message(self, message: RTVIMessage):
await self._message_queue.put(message)

Expand Down Expand Up @@ -681,21 +676,24 @@ async def cleanup(self):
await self._pipeline.cleanup()

async def _start(self, frame: StartFrame):
self._pipeline_started = True
await self._maybe_send_bot_ready()
await self._create_tasks()

async def _stop(self, frame: EndFrame):
if self._action_task:
self._action_task.cancel()
await self._action_task
self._action_task = None

if self._message_task:
self._message_task.cancel()
await self._message_task
self._message_task = None
await self._cancel_tasks()

async def _cancel(self, frame: CancelFrame):
await self._cancel_tasks()

async def _create_tasks(self):
# A task to process incoming action frames.
self._action_queue = asyncio.Queue()
self._action_task = self.get_event_loop().create_task(self._action_task_handler())

# A task to process incoming transport messages.
self._message_queue = asyncio.Queue()
self._message_task = self.get_event_loop().create_task(self._message_task_handler())

async def _cancel_tasks(self):
if self._action_task:
self._action_task.cancel()
await self._action_task
Expand Down Expand Up @@ -769,9 +767,8 @@ async def _handle_message(self, message: RTVIMessage):
logger.warning(f"Exception processing message: {e}")

async def _handle_client_ready(self, request_id: str):
self._client_ready = True
self._client_ready_id = request_id
await self._maybe_send_bot_ready()
await self.set_client_ready()

async def _handle_describe_config(self, request_id: str):
services = list(self._registered_services.values())
Expand Down Expand Up @@ -841,12 +838,6 @@ async def _handle_action(self, request_id: str | None, data: RTVIActionRun):
message = RTVIActionResponse(id=request_id, data=RTVIActionResponseData(result=result))
await self._push_transport_message(message)

async def _maybe_send_bot_ready(self):
if self._pipeline_started and self._client_ready:
await self._update_config(self._config, False)
await self._send_bot_ready()
await self._call_event_handler("on_bot_ready")

async def _send_bot_ready(self):
if not self._params.send_bot_ready:
return
Expand Down

0 comments on commit fd04e62

Please sign in to comment.