Skip to content

Commit

Permalink
Async client (#31)
Browse files Browse the repository at this point in the history
Add Async Client
  • Loading branch information
eliseygusev committed Nov 8, 2022
1 parent 855d787 commit 7271822
Show file tree
Hide file tree
Showing 10 changed files with 417 additions and 37 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
.env
__pycache__/
dist
*.egg-info
*.egg-info
_version.py
.idea/
30 changes: 30 additions & 0 deletions dune_client/base_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
""""
Basic Dune Client Class responsible for refreshing Dune Queries
Framework built on Dune's API Documentation
https://duneanalytics.notion.site/API-Documentation-1b93d16e0fa941398e15047f643e003a
"""
from __future__ import annotations

import logging.config
from typing import Dict


# pylint: disable=too-few-public-methods
class BaseDuneClient:
"""
A Base Client for Dune which sets up default values
and provides some convenient functions to use in other clients
"""

BASE_URL = "https://api.dune.com"
API_PATH = "/api/v1"
DEFAULT_TIMEOUT = 10

def __init__(self, api_key: str):
self.token = api_key
self.logger = logging.getLogger(__name__)
logging.basicConfig(format="%(asctime)s %(levelname)s %(name)s %(message)s")

def default_headers(self) -> Dict[str, str]:
"""Return default headers containing Dune Api token"""
return {"x-dune-api-key": self.token}
61 changes: 31 additions & 30 deletions dune_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@
"""
from __future__ import annotations

import logging.config
import time
from json import JSONDecodeError
from typing import Any

import requests
from requests import Response
from requests import Response, JSONDecodeError

from dune_client.base_client import BaseDuneClient
from dune_client.interface import DuneInterface
from dune_client.models import (
ExecutionResponse,
Expand All @@ -24,53 +23,55 @@

from dune_client.query import Query

log = logging.getLogger(__name__)
logging.basicConfig(
format="%(asctime)s %(levelname)s %(name)s %(message)s", level=logging.DEBUG
)

BASE_URL = "https://api.dune.com/api/v1"


class DuneClient(DuneInterface):
class DuneClient(DuneInterface, BaseDuneClient):
"""
An interface for Dune API with a few convenience methods
combining the use of endpoints (e.g. refresh)
"""

def __init__(self, api_key: str):
self.token = api_key

@staticmethod
def _handle_response(
self,
response: Response,
) -> Any:
try:
# Some responses can be decoded and converted to DuneErrors
response_json = response.json()
log.debug(f"received response {response_json}")
self.logger.debug(f"received response {response_json}")
return response_json
except JSONDecodeError as err:
# Others can't. Only raise HTTP error for not decodable errors
response.raise_for_status()
raise ValueError("Unreachable since previous line raises") from err

def _get(self, url: str) -> Any:
log.debug(f"GET received input url={url}")
response = requests.get(url, headers={"x-dune-api-key": self.token}, timeout=10)
def _route_url(self, route: str) -> str:
return f"{self.BASE_URL}{self.API_PATH}/{route}"

def _get(self, route: str) -> Any:
url = self._route_url(route)
self.logger.debug(f"GET received input url={url}")
response = requests.get(
url,
headers={"x-dune-api-key": self.token},
timeout=self.DEFAULT_TIMEOUT,
)
return self._handle_response(response)

def _post(self, url: str, params: Any) -> Any:
log.debug(f"POST received input url={url}, params={params}")
def _post(self, route: str, params: Any) -> Any:
url = self._route_url(route)
self.logger.debug(f"POST received input url={url}, params={params}")
response = requests.post(
url=url, json=params, headers={"x-dune-api-key": self.token}, timeout=10
url=url,
json=params,
headers={"x-dune-api-key": self.token},
timeout=self.DEFAULT_TIMEOUT,
)
return self._handle_response(response)

def execute(self, query: Query) -> ExecutionResponse:
"""Post's to Dune API for execute `query`"""
response_json = self._post(
url=f"{BASE_URL}/query/{query.query_id}/execute",
route=f"query/{query.query_id}/execute",
params={
"query_parameters": {
p.key: p.to_dict()["value"] for p in query.parameters()
Expand All @@ -85,7 +86,7 @@ def execute(self, query: Query) -> ExecutionResponse:
def get_status(self, job_id: str) -> ExecutionStatusResponse:
"""GET status from Dune API for `job_id` (aka `execution_id`)"""
response_json = self._get(
url=f"{BASE_URL}/execution/{job_id}/status",
route=f"execution/{job_id}/status",
)
try:
return ExecutionStatusResponse.from_dict(response_json)
Expand All @@ -94,17 +95,15 @@ def get_status(self, job_id: str) -> ExecutionStatusResponse:

def get_result(self, job_id: str) -> ResultsResponse:
"""GET results from Dune API for `job_id` (aka `execution_id`)"""
response_json = self._get(url=f"{BASE_URL}/execution/{job_id}/results")
response_json = self._get(route=f"execution/{job_id}/results")
try:
return ResultsResponse.from_dict(response_json)
except KeyError as err:
raise DuneError(response_json, "ResultsResponse", err) from err

def cancel_execution(self, job_id: str) -> bool:
"""POST Execution Cancellation to Dune API for `job_id` (aka `execution_id`)"""
response_json = self._post(
url=f"{BASE_URL}/execution/{job_id}/cancel", params=None
)
response_json = self._post(route=f"execution/{job_id}/cancel", params=None)
try:
# No need to make a dataclass for this since it's just a boolean.
success: bool = response_json["success"]
Expand All @@ -121,12 +120,14 @@ def refresh(self, query: Query, ping_frequency: int = 5) -> ResultsResponse:
job_id = self.execute(query).execution_id
status = self.get_status(job_id)
while status.state not in ExecutionState.terminal_states():
log.info(f"waiting for query execution {job_id} to complete: {status}")
self.logger.info(
f"waiting for query execution {job_id} to complete: {status}"
)
time.sleep(ping_frequency)
status = self.get_status(job_id)

full_response = self.get_result(job_id)
if status.state == ExecutionState.FAILED:
log.error(status)
self.logger.error(status)
raise Exception(f"{status}. Perhaps your query took too long to run!")
return full_response
155 changes: 155 additions & 0 deletions dune_client/client_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
""""
Async Dune Client Class responsible for refreshing Dune Queries
Framework built on Dune's API Documentation
https://duneanalytics.notion.site/API-Documentation-1b93d16e0fa941398e15047f643e003a
"""
import asyncio
from typing import Any

from aiohttp import (
ClientSession,
ClientResponse,
ContentTypeError,
TCPConnector,
ClientTimeout,
)

from dune_client.base_client import BaseDuneClient
from dune_client.models import (
ExecutionResponse,
DuneError,
ExecutionStatusResponse,
ResultsResponse,
ExecutionState,
)

from dune_client.query import Query


# pylint: disable=duplicate-code
class AsyncDuneClient(BaseDuneClient):
"""
An asynchronous interface for Dune API with a few convenience methods
combining the use of endpoints (e.g. refresh)
"""

_connection_limit = 3

def __init__(self, api_key: str, connection_limit: int = 3):
"""
api_key - Dune API key
connection_limit - number of parallel requests to execute.
For non-pro accounts Dune allows only up to 3 requests but that number can be increased.
"""
super().__init__(api_key=api_key)
self._connection_limit = connection_limit
self._session = self._create_session()

def _create_session(self) -> ClientSession:
conn = TCPConnector(limit=self._connection_limit)
return ClientSession(
connector=conn,
base_url=self.BASE_URL,
timeout=ClientTimeout(total=self.DEFAULT_TIMEOUT),
)

async def close_session(self) -> None:
"""Closes client session"""
await self._session.close()

async def __aenter__(self) -> None:
self._session = self._create_session()

async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
await self.close_session()

async def _handle_response(
self,
response: ClientResponse,
) -> Any:
try:
# Some responses can be decoded and converted to DuneErrors
response_json = await response.json()
self.logger.debug(f"received response {response_json}")
return response_json
except ContentTypeError as err:
# Others can't. Only raise HTTP error for not decodable errors
response.raise_for_status()
raise ValueError("Unreachable since previous line raises") from err

async def _get(self, url: str) -> Any:
self.logger.debug(f"GET received input url={url}")
response = await self._session.get(
url=f"{self.API_PATH}{url}",
headers=self.default_headers(),
)
return await self._handle_response(response)

async def _post(self, url: str, params: Any) -> Any:
self.logger.debug(f"POST received input url={url}, params={params}")
response = await self._session.post(
url=f"{self.API_PATH}{url}",
json=params,
headers=self.default_headers(),
)
return await self._handle_response(response)

async def execute(self, query: Query) -> ExecutionResponse:
"""Post's to Dune API for execute `query`"""
response_json = await self._post(
url=f"/query/{query.query_id}/execute",
params=query.request_format(),
)
try:
return ExecutionResponse.from_dict(response_json)
except KeyError as err:
raise DuneError(response_json, "ExecutionResponse", err) from err

async def get_status(self, job_id: str) -> ExecutionStatusResponse:
"""GET status from Dune API for `job_id` (aka `execution_id`)"""
response_json = await self._get(
url=f"/execution/{job_id}/status",
)
try:
return ExecutionStatusResponse.from_dict(response_json)
except KeyError as err:
raise DuneError(response_json, "ExecutionStatusResponse", err) from err

async def get_result(self, job_id: str) -> ResultsResponse:
"""GET results from Dune API for `job_id` (aka `execution_id`)"""
response_json = await self._get(url=f"/execution/{job_id}/results")
try:
return ResultsResponse.from_dict(response_json)
except KeyError as err:
raise DuneError(response_json, "ResultsResponse", err) from err

async def cancel_execution(self, job_id: str) -> bool:
"""POST Execution Cancellation to Dune API for `job_id` (aka `execution_id`)"""
response_json = await self._post(url=f"/execution/{job_id}/cancel", params=None)
try:
# No need to make a dataclass for this since it's just a boolean.
success: bool = response_json["success"]
return success
except KeyError as err:
raise DuneError(response_json, "CancellationResponse", err) from err

async def refresh(self, query: Query, ping_frequency: int = 5) -> ResultsResponse:
"""
Executes a Dune `query`, waits until execution completes,
fetches and returns the results.
Sleeps `ping_frequency` seconds between each status request.
"""
job_id = (await self.execute(query)).execution_id
status = await self.get_status(job_id)
while status.state not in ExecutionState.terminal_states():
self.logger.info(
f"waiting for query execution {job_id} to complete: {status}"
)
await asyncio.sleep(ping_frequency)
status = await self.get_status(job_id)

full_response = await self.get_result(job_id)
if status.state == ExecutionState.FAILED:
self.logger.error(status)
raise Exception(f"{status}. Perhaps your query took too long to run!")
return full_response
5 changes: 3 additions & 2 deletions dune_client/interface.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
"""
Abstract class for a basic Dune Interface with refresh method used by Query Runner.
"""
from abc import ABC
import abc

from dune_client.models import ResultsResponse
from dune_client.query import Query


# pylint: disable=too-few-public-methods
class DuneInterface(ABC):
class DuneInterface(abc.ABC):
"""
User Facing Methods for a Dune Client
"""

@abc.abstractmethod
def refresh(self, query: Query) -> ResultsResponse:
"""
Executes a Dune query, waits till query execution completes,
Expand Down
8 changes: 7 additions & 1 deletion dune_client/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""
import urllib.parse
from dataclasses import dataclass
from typing import Optional, List
from typing import Optional, List, Dict

from dune_client.types import QueryParameter

Expand Down Expand Up @@ -40,3 +40,9 @@ def __hash__(self) -> int:
Thus, it is unique for caching purposes
"""
return self.url().__hash__()

def request_format(self) -> Dict[str, Dict[str, str]]:
"""Transforms Query objects to params to pass in API"""
return {
"query_parameters": {p.key: p.to_dict()["value"] for p in self.parameters()}
}
4 changes: 2 additions & 2 deletions dune_client/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@ def value_str(self) -> str:
return str(self.value.strftime("%Y-%m-%d %H:%M:%S"))
raise TypeError(f"Type {self.type} not recognized!")

def to_dict(self) -> dict[str, str | list[str]]:
def to_dict(self) -> dict[str, str]:
"""Converts QueryParameter into string json format accepted by Dune API"""
results: dict[str, str | list[str]] = {
results: dict[str, str] = {
"key": self.key,
"type": self.type.value,
"value": self.value_str(),
Expand Down
3 changes: 2 additions & 1 deletion requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ black>=22.8.0
pylint>=2.15.0
pytest>=7.1.3
python-dotenv>=0.21.0
mypy>=0.971
mypy>=0.971
aiounittest>=1.4.2
Loading

0 comments on commit 7271822

Please sign in to comment.