Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adds Web3SubscriptionsManager.get_subscription_data_nowait() #95

Merged
merged 10 commits into from
Jul 17, 2024
38 changes: 37 additions & 1 deletion silverback/subscriptions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import json
from enum import Enum
from typing import AsyncGenerator
from typing import AsyncGenerator, Union

from ape.logging import logger
from websockets import ConnectionClosedError
Expand Down Expand Up @@ -115,6 +115,9 @@ async def subscribe(self, type: SubscriptionType, **filter_params) -> str:
return sub_id

async def get_subscription_data(self, sub_id: str) -> AsyncGenerator[dict, None]:
"""Iterate items from the subscription queue. If nothing is in the
queue, await.
"""
while True:
if not (queue := self._subscriptions.get(sub_id)) or queue.empty():
async with self._ws_lock:
Expand All @@ -124,6 +127,39 @@ async def get_subscription_data(self, sub_id: str) -> AsyncGenerator[dict, None]
else:
yield await queue.get()

async def get_subscription_data_nowait(self, sub_id: str) -> AsyncGenerator[dict, None]:
"""Iterate items from the subscription queue. If nothing is in the
queue, return.
"""
while True:
if not (queue := self._subscriptions.get(sub_id)) or queue.empty():
async with self._ws_lock:
# Keep pulling until a message comes to process
# NOTE: Python <3.10 does not support `anext` function
await self.__anext__()
else:
try:
yield queue.get_nowait()
except asyncio.QueueEmpty:
pass

async def pop_subscription_data(self, sub_id: str) -> Union[dict, None]:
"""Remove and return a single item from the subscription queue."""

async with self._ws_lock:
# NOTE: Python <3.10 does not support `anext` function
mikeshultz marked this conversation as resolved.
Show resolved Hide resolved
await self.__anext__()

queue = self._subscriptions.get(sub_id)

if queue:
try:
return queue.get_nowait()
except asyncio.QueueEmpty:
pass

return None

async def unsubscribe(self, sub_id: str) -> bool:
if sub_id not in self._subscriptions:
raise ValueError(f"Unknown sub_id '{sub_id}'")
Expand Down
Loading