Skip to content

Commit

Permalink
Have EventListener flush batch
Browse files Browse the repository at this point in the history
  • Loading branch information
collindutter committed Oct 10, 2024
1 parent 766d365 commit 9e8d650
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 20 deletions.
14 changes: 13 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

## [0.33.0] - 2024-10-09
### Added

- `BaseEventListener.flush_events()` to flush events from an Event Listener.

### Changed

- **BREAKING**: `BaseEventListener.publish_event` `flush` argument. Use `BaseEventListener.flush_events()` instead.

### Fixed

- Structures not flushing events when not listening for `FinishStructureRunEvent`.

## \[0.33.0\] - 2024-10-09

## Added

Expand Down
7 changes: 5 additions & 2 deletions MIGRATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ DataframeLoader().load(df)
```

#### After

```python
# Convert the dataframe to csv bytes and parse it
CsvLoader().parse(bytes(df.to_csv(line_terminator='\r\n', index=False), encoding='utf-8'))
Expand All @@ -25,12 +26,14 @@ CsvLoader().parse(bytes(df.to_csv(line_terminator='\r\n', index=False), encoding
### `TextLoader`, `PdfLoader`, `ImageLoader`, and `AudioLoader` now take a `str | PathLike` instead of `bytes`.

#### Before

```python
PdfLoader().load(Path("attention.pdf").read_bytes())
PdfLoader().load_collection([Path("attention.pdf").read_bytes(), Path("CoT.pdf").read_bytes()])
```

#### After

```python
PdfLoader().load("attention.pdf")
PdfLoader().load_collection([Path("attention.pdf"), "CoT.pdf"])
Expand All @@ -47,7 +50,7 @@ You can now pass the file path directly to the Loader.
PdfLoader().load(load_file("attention.pdf").read_bytes())
PdfLoader().load_collection(list(load_files(["attention.pdf", "CoT.pdf"]).values()))
```

```python
PdfLoader().load("attention.pdf")
PdfLoader().load_collection(["attention.pdf", "CoT.pdf"])
Expand All @@ -69,6 +72,7 @@ vector_store.upsert_text_artifacts(
```

#### After

```python
artifact = PdfLoader().load("attention.pdf")
chunks = Chunker().chunk(artifact)
Expand All @@ -79,7 +83,6 @@ vector_store.upsert_text_artifacts(
)
```


### Removed `torch` extra from `transformers` dependency

The `torch` extra has been removed from the `transformers` dependency. If you require `torch`, install it separately.
Expand Down
20 changes: 14 additions & 6 deletions griptape/drivers/event_listener/base_event_listener_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,35 @@ class BaseEventListenerDriver(FuturesExecutorMixin, ABC):
def batch(self) -> list[dict]:
return self._batch

def publish_event(self, event: BaseEvent | dict, *, flush: bool = False) -> None:
self.futures_executor.submit(self._safe_try_publish_event, event, flush=flush)
def publish_event(self, event: BaseEvent | dict) -> None:
self.futures_executor.submit(self._safe_try_publish_event, event)

def flush_events(self) -> None:
if self.batch:
with self.thread_lock:
self._flush_events()

@abstractmethod
def try_publish_event_payload(self, event_payload: dict) -> None: ...

@abstractmethod
def try_publish_event_payload_batch(self, event_payload_batch: list[dict]) -> None: ...

def _safe_try_publish_event(self, event: BaseEvent | dict, *, flush: bool) -> None:
def _safe_try_publish_event(self, event: BaseEvent | dict) -> None:
try:
event_payload = event if isinstance(event, dict) else event.to_dict()

if self.batched:
with self.thread_lock:
self._batch.append(event_payload)
if len(self.batch) >= self.batch_size or flush:
self.try_publish_event_payload_batch(self.batch)
self._batch = []
if len(self.batch) >= self.batch_size:
self._flush_events()

Check warning on line 52 in griptape/drivers/event_listener/base_event_listener_driver.py

View check run for this annotation

Codecov / codecov/patch

griptape/drivers/event_listener/base_event_listener_driver.py#L52

Added line #L52 was not covered by tests
return
else:
self.try_publish_event_payload(event_payload)
except Exception as e:
logger.error(e)

def _flush_events(self) -> None:
self.try_publish_event_payload_batch(self.batch)
self._batch = []
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def validate_run_id(self, _: Attribute, structure_run_id: str) -> None:
"structure_run_id must be set either in the constructor or as an environment variable (GT_CLOUD_STRUCTURE_RUN_ID).",
)

def publish_event(self, event: BaseEvent | dict, *, flush: bool = False) -> None:
def publish_event(self, event: BaseEvent | dict) -> None:
from griptape.observability.observability import Observability

event_payload = event.to_dict() if isinstance(event, BaseEvent) else event
Expand All @@ -51,7 +51,7 @@ def publish_event(self, event: BaseEvent | dict, *, flush: bool = False) -> None
if span_id is not None:
event_payload["span_id"] = span_id

super().publish_event(event_payload, flush=flush)
super().publish_event(event_payload)

def try_publish_event_payload(self, event_payload: dict) -> None:
self._post_event(self._get_event_request(event_payload))
Expand Down
7 changes: 5 additions & 2 deletions griptape/events/event_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ def publish_event(self, event: BaseEvent, *, flush: bool = False) -> None:
event_payload = self.handler(event)
if self.driver is not None:
if event_payload is not None and isinstance(event_payload, dict):
self.driver.publish_event(event_payload, flush=flush)
self.driver.publish_event(event_payload)
else:
self.driver.publish_event(event, flush=flush)
self.driver.publish_event(event)

if self.driver is not None and flush:
self.driver.flush_events()

Check warning on line 47 in griptape/events/event_listener.py

View check run for this annotation

Codecov / codecov/patch

griptape/events/event_listener.py#L47

Added line #L47 was not covered by tests
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@ def test__safe_try_publish_event(self):
driver = MockEventListenerDriver(batched=False)

for _ in range(4):
driver._safe_try_publish_event(MockEvent().to_dict(), flush=False)
driver._safe_try_publish_event(MockEvent().to_dict())
assert len(driver.batch) == 0

def test__safe_try_publish_event_batch(self):
driver = MockEventListenerDriver(batched=True)

for _ in range(0, 3):
driver._safe_try_publish_event(MockEvent().to_dict(), flush=False)
driver._safe_try_publish_event(MockEvent().to_dict())
assert len(driver.batch) == 3

def test__safe_try_publish_event_batch_flush(self):
driver = MockEventListenerDriver(batched=True)

for _ in range(0, 3):
driver._safe_try_publish_event(MockEvent().to_dict(), flush=True)
driver._safe_try_publish_event(MockEvent().to_dict())
driver.flush_events()
assert len(driver.batch) == 0
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ def test_init(self, driver):

def test_publish_event_without_span_id(self, mock_post, driver):
event = MockEvent()
driver.publish_event(event, flush=True)
driver.publish_event(event)
driver.flush_events()

mock_post.assert_called_with(
url="https://cloud123.griptape.ai/api/structure-runs/bar baz/events",
Expand All @@ -59,7 +60,8 @@ def test_publish_event_with_span_id(self, mock_post, driver):
observability_driver.get_span_id.return_value = "test"

with Observability(observability_driver=observability_driver):
driver.publish_event(event, flush=True)
driver.publish_event(event)
driver.flush_events()

mock_post.assert_called_with(
url="https://cloud123.griptape.ai/api/structure-runs/bar baz/events",
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/events/test_event_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def event_handler(_: BaseEvent) -> None:
event_listener = EventListener(event_handler, driver=mock_event_listener_driver, event_types=[MockEvent])
event_listener.publish_event(mock_event)

mock_event_listener_driver.publish_event.assert_called_once_with(mock_event, flush=False)
mock_event_listener_driver.publish_event.assert_called_once_with(mock_event)

def test_publish_transformed_event(self):
mock_event_listener_driver = Mock()
Expand All @@ -134,7 +134,7 @@ def event_handler(event: BaseEvent):
event_listener = EventListener(event_handler, driver=mock_event_listener_driver, event_types=[MockEvent])
event_listener.publish_event(mock_event)

mock_event_listener_driver.publish_event.assert_called_once_with({"event": mock_event.to_dict()}, flush=False)
mock_event_listener_driver.publish_event.assert_called_once_with({"event": mock_event.to_dict()})

def test_context_manager(self):
e1 = EventListener()
Expand Down

0 comments on commit 9e8d650

Please sign in to comment.