Skip to content

Commit

Permalink
[Feature] Introducing CRUD Operations (#59)
Browse files Browse the repository at this point in the history
Dune recently added endpoints and documentation for their Queries Editing Support via the API. This PR introduces new functionality

    Create: create_query
    Read (Get): get_query
    Update update_query
    Delete (Archive) archive_query

as well as make_private
  • Loading branch information
bh2smith authored Aug 7, 2023
1 parent 914b0d4 commit 395f826
Show file tree
Hide file tree
Showing 12 changed files with 345 additions and 50 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import os

from dune_client.types import QueryParameter
from dune_client.client import DuneClient
from dune_client.query import Query
from dune_client.query import QueryBase

query = Query(
query = QueryBase(
name="Sample Query",
query_id=1215383,
params=[
Expand Down
11 changes: 9 additions & 2 deletions dune_client/base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,22 @@ class BaseDuneClient:
"""

BASE_URL = "https://api.dune.com"
API_PATH = "/api/v1"
DEFAULT_TIMEOUT = 10

def __init__(self, api_key: str, performance: str = "medium"):
def __init__(
self, api_key: str, client_version: str = "v1", performance: str = "medium"
):
self.token = api_key
self.client_version = client_version
self.performance = performance
self.logger = logging.getLogger(__name__)
logging.basicConfig(format="%(asctime)s %(levelname)s %(name)s %(message)s")

@property
def api_version(self) -> str:
"""Returns client version string"""
return f"/api/{self.client_version}"

def default_headers(self) -> Dict[str, str]:
"""Return default headers containing Dune Api token"""
return {"x-dune-api-key": self.token}
163 changes: 153 additions & 10 deletions dune_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
ExecutionState,
)

from dune_client.query import Query
from dune_client.query import QueryBase, DuneQuery
from dune_client.types import QueryParameter


class DuneClient(DuneInterface, BaseDuneClient):
Expand All @@ -34,6 +35,7 @@ class DuneClient(DuneInterface, BaseDuneClient):
"""

def _handle_response(self, response: Response) -> Any:
"""Generic response handler utilized by all Dune API routes"""
try:
# Some responses can be decoded and converted to DuneErrors
response_json = response.json()
Expand All @@ -45,14 +47,15 @@ def _handle_response(self, response: Response) -> Any:
raise ValueError("Unreachable since previous line raises") from err

def _route_url(self, route: str) -> str:
return f"{self.BASE_URL}{self.API_PATH}{route}"
return f"{self.BASE_URL}{self.api_version}{route}"

def _get(
self,
route: str,
params: Optional[Any] = None,
raw: bool = False,
) -> Any:
"""Generic interface for the GET method of a Dune API request"""
url = self._route_url(route)
self.logger.debug(f"GET received input url={url}")
response = requests.get(
Expand All @@ -65,7 +68,8 @@ def _get(
return response
return self._handle_response(response)

def _post(self, route: str, params: Any) -> Any:
def _post(self, route: str, params: Optional[Any] = None) -> Any:
"""Generic interface for the POST method of a Dune API request"""
url = self._route_url(route)
self.logger.debug(f"POST received input url={url}, params={params}")
response = requests.post(
Expand All @@ -76,8 +80,21 @@ def _post(self, route: str, params: Any) -> Any:
)
return self._handle_response(response)

def _patch(self, route: str, params: Any) -> Any:
"""Generic interface for the PATCH method of a Dune API request"""
url = self._route_url(route)
self.logger.debug(f"PATCH received input url={url}, params={params}")
response = requests.request(
method="PATCH",
url=url,
json=params,
headers={"x-dune-api-key": self.token},
timeout=self.DEFAULT_TIMEOUT,
)
return self._handle_response(response)

def execute(
self, query: Query, performance: Optional[str] = None
self, query: QueryBase, performance: Optional[str] = None
) -> ExecutionResponse:
"""Post's to Dune API for execute `query`"""
params = query.request_format()
Expand Down Expand Up @@ -126,15 +143,15 @@ def get_result_csv(self, job_id: str) -> ExecutionResultCSV:
response.raise_for_status()
return ExecutionResultCSV(data=BytesIO(response.content))

def get_latest_result(self, query: Union[Query, str, int]) -> ResultsResponse:
def get_latest_result(self, query: Union[QueryBase, str, int]) -> ResultsResponse:
"""
GET the latest results for a query_id without having to execute the query again.
:param query: :class:`Query` object OR query id as string | int
https://dune.com/docs/api/api-reference/latest_results/
"""
if isinstance(query, Query):
if isinstance(query, QueryBase):
params = {
f"params.{p.key}": p.to_dict()["value"] for p in query.parameters()
}
Expand Down Expand Up @@ -167,7 +184,7 @@ def cancel_execution(self, job_id: str) -> bool:

def _refresh(
self,
query: Query,
query: QueryBase,
ping_frequency: int = 5,
performance: Optional[str] = None,
) -> str:
Expand All @@ -191,7 +208,10 @@ def _refresh(
return job_id

def refresh(
self, query: Query, ping_frequency: int = 5, performance: Optional[str] = None
self,
query: QueryBase,
ping_frequency: int = 5,
performance: Optional[str] = None,
) -> ResultsResponse:
"""
Executes a Dune `query`, waits until execution completes,
Expand All @@ -204,7 +224,10 @@ def refresh(
return self.get_result(job_id)

def refresh_csv(
self, query: Query, ping_frequency: int = 5, performance: Optional[str] = None
self,
query: QueryBase,
ping_frequency: int = 5,
performance: Optional[str] = None,
) -> ExecutionResultCSV:
"""
Executes a Dune query, waits till execution completes,
Expand All @@ -217,7 +240,7 @@ def refresh_csv(
return self.get_result_csv(job_id)

def refresh_into_dataframe(
self, query: Query, performance: Optional[str] = None
self, query: QueryBase, performance: Optional[str] = None
) -> Any:
"""
Execute a Dune Query, waits till execution completes,
Expand All @@ -233,3 +256,123 @@ def refresh_into_dataframe(
) from exc
data = self.refresh_csv(query, performance=performance).data
return pandas.read_csv(data)

# CRUD Operations: https://dune.com/docs/api/api-reference/edit-queries/
def create_query(
self,
name: str,
query_sql: str,
params: Optional[list[QueryParameter]] = None,
is_private: bool = False,
) -> DuneQuery:
"""
Creates Dune Query by ID
https://dune.com/docs/api/api-reference/edit-queries/create-query/
"""
parameters = {
"name": name,
"query_sql": query_sql,
"private": is_private,
}
if params is not None:
parameters["parameters"] = [p.to_dict() for p in params]
response_json = self._post(route="/query/", params=parameters)
try:
query_id = int(response_json["query_id"])
# Note that this requires an extra request.
return self.get_query(query_id)
except KeyError as err:
raise DuneError(response_json, "create_query Response", err) from err

def get_query(self, query_id: int) -> DuneQuery:
"""
Retrieves Dune Query by ID
https://dune.com/docs/api/api-reference/edit-queries/get-query/
"""
response_json = self._get(route=f"/query/{query_id}")
return DuneQuery.from_dict(response_json)

def update_query( # pylint: disable=too-many-arguments
self,
query_id: int,
name: Optional[str] = None,
query_sql: Optional[str] = None,
params: Optional[list[QueryParameter]] = None,
description: Optional[str] = None,
tags: Optional[list[str]] = None,
) -> int:
"""
Updates Dune Query by ID
https://dune.com/docs/api/api-reference/edit-queries/update-query
The request body should contain all fields that need to be updated.
Any omitted fields will be left untouched.
If the tags or parameters are provided as an empty array,
they will be deleted from the query.
"""
parameters: dict[str, Any] = {}
if name is not None:
parameters["name"] = name
if description is not None:
parameters["description"] = description
if tags is not None:
parameters["tags"] = tags
if query_sql is not None:
parameters["query_sql"] = query_sql
if params is not None:
parameters["parameters"] = [p.to_dict() for p in params]

if not bool(parameters):
# Nothing to change no need to make reqeust
self.logger.warning("called update_query with no proposed changes.")
return query_id

response_json = self._patch(
route=f"/query/{query_id}",
params=parameters,
)
try:
# No need to make a dataclass for this since it's just a boolean.
return int(response_json["query_id"])
except KeyError as err:
raise DuneError(response_json, "update_query Response", err) from err

def archive_query(self, query_id: int) -> bool:
"""
https://dune.com/docs/api/api-reference/edit-queries/archive-query
returns resulting value of Query.is_archived
"""
response_json = self._post(route=f"/query/{query_id}/archive")
try:
# No need to make a dataclass for this since it's just a boolean.
return self.get_query(int(response_json["query_id"])).meta.is_archived
except KeyError as err:
raise DuneError(response_json, "make_private Response", err) from err

def unarchive_query(self, query_id: int) -> bool:
"""
https://dune.com/docs/api/api-reference/edit-queries/archive-query
returns resulting value of Query.is_archived
"""
response_json = self._post(route=f"/query/{query_id}/unarchive")
try:
# No need to make a dataclass for this since it's just a boolean.
return self.get_query(int(response_json["query_id"])).meta.is_archived
except KeyError as err:
raise DuneError(response_json, "make_private Response", err) from err

def make_private(self, query_id: int) -> None:
"""
https://dune.com/docs/api/api-reference/edit-queries/private-query
returns resulting value of Query.is_private
"""
response_json = self._post(route=f"/query/{query_id}/private")
assert self.get_query(int(response_json["query_id"])).meta.is_private

def make_public(self, query_id: int) -> None:
"""
https://dune.com/docs/api/api-reference/edit-queries/private-query
returns resulting value of Query.is_private
"""
response_json = self._post(route=f"/query/{query_id}/unprivate")
assert not self.get_query(int(response_json["query_id"])).meta.is_private
26 changes: 17 additions & 9 deletions dune_client/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
ExecutionState,
)

from dune_client.query import Query
from dune_client.query import QueryBase


# pylint: disable=duplicate-code
Expand Down Expand Up @@ -88,7 +88,7 @@ async def _handle_response(self, response: ClientResponse) -> Any:
raise ValueError("Unreachable since previous line raises") from err

def _route_url(self, route: str) -> str:
return f"{self.API_PATH}{route}"
return f"{self.api_version}{route}"

async def _get(
self,
Expand Down Expand Up @@ -122,7 +122,7 @@ async def _post(self, route: str, params: Any) -> Any:
return await self._handle_response(response)

async def execute(
self, query: Query, performance: Optional[str] = None
self, query: QueryBase, performance: Optional[str] = None
) -> ExecutionResponse:
"""Post's to Dune API for execute `query`"""
params = query.request_format()
Expand Down Expand Up @@ -171,15 +171,17 @@ async def get_result_csv(self, job_id: str) -> ExecutionResultCSV:
response.raise_for_status()
return ExecutionResultCSV(data=BytesIO(await response.content.read(-1)))

async def get_latest_result(self, query: Union[Query, str, int]) -> ResultsResponse:
async def get_latest_result(
self, query: Union[QueryBase, str, int]
) -> ResultsResponse:
"""
GET the latest results for a query_id without having to execute the query again.
:param query: :class:`Query` object OR query id as string | int
https://dune.com/docs/api/api-reference/latest_results/
"""
if isinstance(query, Query):
if isinstance(query, QueryBase):
params = {
f"params.{p.key}": p.to_dict()["value"] for p in query.parameters()
}
Expand Down Expand Up @@ -212,7 +214,7 @@ async def cancel_execution(self, job_id: str) -> bool:

async def _refresh(
self,
query: Query,
query: QueryBase,
ping_frequency: int = 5,
performance: Optional[str] = None,
) -> str:
Expand All @@ -236,7 +238,10 @@ async def _refresh(
return job_id

async def refresh(
self, query: Query, ping_frequency: int = 5, performance: Optional[str] = None
self,
query: QueryBase,
ping_frequency: int = 5,
performance: Optional[str] = None,
) -> ResultsResponse:
"""
Executes a Dune `query`, waits until execution completes,
Expand All @@ -249,7 +254,10 @@ async def refresh(
return await self.get_result(job_id)

async def refresh_csv(
self, query: Query, ping_frequency: int = 5, performance: Optional[str] = None
self,
query: QueryBase,
ping_frequency: int = 5,
performance: Optional[str] = None,
) -> ExecutionResultCSV:
"""
Executes a Dune query, waits till execution completes,
Expand All @@ -262,7 +270,7 @@ async def refresh_csv(
return await self.get_result_csv(job_id)

async def refresh_into_dataframe(
self, query: Query, performance: Optional[str] = None
self, query: QueryBase, performance: Optional[str] = None
) -> Any:
"""
Execute a Dune Query, waits till execution completes,
Expand Down
Loading

0 comments on commit 395f826

Please sign in to comment.