From ddab69260cf3a2a3cbcade4ff6c07a4d5b4a29ff Mon Sep 17 00:00:00 2001 From: Collin Dutter Date: Mon, 26 Aug 2024 10:54:17 -0700 Subject: [PATCH] Add thread lock when batch publishing events --- .../event_listener/base_event_listener_driver.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/griptape/drivers/event_listener/base_event_listener_driver.py b/griptape/drivers/event_listener/base_event_listener_driver.py index 0af57f0f38..75bdc9f75d 100644 --- a/griptape/drivers/event_listener/base_event_listener_driver.py +++ b/griptape/drivers/event_listener/base_event_listener_driver.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +import threading from abc import ABC, abstractmethod from typing import TYPE_CHECKING @@ -18,6 +19,7 @@ class BaseEventListenerDriver(FuturesExecutorMixin, ABC): batched: bool = field(default=True, kw_only=True) batch_size: int = field(default=10, kw_only=True) + thread_lock: threading.Lock = field(default=Factory(lambda: threading.Lock())) _batch: list[dict] = field(default=Factory(list), kw_only=True) @@ -39,10 +41,11 @@ def _safe_try_publish_event(self, event: BaseEvent | dict, *, flush: bool) -> No event_payload = event if isinstance(event, dict) else event.to_dict() if self.batched: - self._batch.append(event_payload) - if len(self.batch) >= self.batch_size or flush: - self.try_publish_event_payload_batch(self.batch) - self._batch = [] + with self.thread_lock: + self._batch.append(event_payload) + if len(self.batch) >= self.batch_size or flush: + self.try_publish_event_payload_batch(self.batch) + self._batch = [] return else: self.try_publish_event_payload(event_payload)