Skip to content

Commit

Permalink
Refactor Part I: Deprecate & Rename Old Routes (#64)
Browse files Browse the repository at this point in the history
* deprecate and rename old routes

* fix deprecation warnings
  • Loading branch information
bh2smith authored Sep 7, 2023
1 parent d2d4d17 commit 36cb0d0
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 74 deletions.
171 changes: 119 additions & 52 deletions dune_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
from typing import Any, Optional, Union

import requests
from deprecated import deprecated
from requests import Response, JSONDecodeError

from dune_client.base_client import BaseDuneClient
from dune_client.interface import DuneInterface
from dune_client.models import (
ExecutionResponse,
ExecutionResultCSV,
Expand All @@ -23,12 +23,11 @@
ResultsResponse,
ExecutionState,
)

from dune_client.query import QueryBase, DuneQuery
from dune_client.types import QueryParameter


class DuneClient(DuneInterface, BaseDuneClient):
class DuneClient(BaseDuneClient): # pylint: disable=too-many-public-methods
"""
An interface for Dune API with a few convenience methods
combining the use of endpoints (e.g. refresh)
Expand Down Expand Up @@ -93,41 +92,24 @@ def _patch(self, route: str, params: Any) -> Any:
)
return self._handle_response(response)

@deprecated(version="1.2.1", reason="Please use execute_query")
def execute(
self, query: QueryBase, performance: Optional[str] = None
) -> ExecutionResponse:
"""Post's to Dune API for execute `query`"""
params = query.request_format()
params["performance"] = performance or self.performance

self.logger.info(
f"executing {query.query_id} on {performance or self.performance} cluster"
)
response_json = self._post(
route=f"/query/{query.query_id}/execute",
params=params,
)
try:
return ExecutionResponse.from_dict(response_json)
except KeyError as err:
raise DuneError(response_json, "ExecutionResponse", err) from err
return self.execute_query(query, performance)

@deprecated(version="1.2.1", reason="Please use get_execution_status")
def get_status(self, job_id: str) -> ExecutionStatusResponse:
"""GET status from Dune API for `job_id` (aka `execution_id`)"""
response_json = self._get(route=f"/execution/{job_id}/status")
try:
return ExecutionStatusResponse.from_dict(response_json)
except KeyError as err:
raise DuneError(response_json, "ExecutionStatusResponse", err) from err
return self.get_execution_status(job_id)

@deprecated(version="1.2.1", reason="Please use get_execution_results")
def get_result(self, job_id: str) -> ResultsResponse:
"""GET results from Dune API for `job_id` (aka `execution_id`)"""
response_json = self._get(route=f"/execution/{job_id}/results")
try:
return ResultsResponse.from_dict(response_json)
except KeyError as err:
raise DuneError(response_json, "ResultsResponse", err) from err
return self.get_execution_results(job_id)

@deprecated(version="1.2.1", reason="Please use get_execution_results_csv")
def get_result_csv(self, job_id: str) -> ExecutionResultCSV:
"""
GET results in CSV format from Dune API for `job_id` (aka `execution_id`)
Expand All @@ -136,12 +118,7 @@ def get_result_csv(self, job_id: str) -> ExecutionResultCSV:
use this method for large results where you want lower CPU and memory overhead
if you need metadata information use get_results() or get_status()
"""
route = f"/execution/{job_id}/results/csv"
url = self._route_url(f"/execution/{job_id}/results/csv")
self.logger.debug(f"GET CSV received input url={url}")
response = self._get(route=route, raw=True)
response.raise_for_status()
return ExecutionResultCSV(data=BytesIO(response.content))
return self.get_execution_results_csv(job_id)

def get_latest_result(self, query: Union[QueryBase, str, int]) -> ResultsResponse:
"""
Expand Down Expand Up @@ -193,20 +170,21 @@ def _refresh(
fetches and returns the results.
Sleeps `ping_frequency` seconds between each status request.
"""
job_id = self.execute(query=query, performance=performance).execution_id
status = self.get_status(job_id)
job_id = self.execute_query(query=query, performance=performance).execution_id
status = self.get_execution_status(job_id)
while status.state not in ExecutionState.terminal_states():
self.logger.info(
f"waiting for query execution {job_id} to complete: {status}"
)
time.sleep(ping_frequency)
status = self.get_status(job_id)
status = self.get_execution_status(job_id)
if status.state == ExecutionState.FAILED:
self.logger.error(status)
raise QueryFailed(f"{status}. Perhaps your query took too long to run!")

return job_id

@deprecated(version="1.2.1", reason="Please use run_query")
def refresh(
self,
query: QueryBase,
Expand All @@ -218,11 +196,9 @@ def refresh(
fetches and returns the results.
Sleeps `ping_frequency` seconds between each status request.
"""
job_id = self._refresh(
query, ping_frequency=ping_frequency, performance=performance
)
return self.get_result(job_id)
return self.run_query(query, ping_frequency, performance)

@deprecated(version="1.2.1", reason="Please use run_query_csv")
def refresh_csv(
self,
query: QueryBase,
Expand All @@ -234,11 +210,9 @@ def refresh_csv(
fetches and the results in CSV format
(use it load the data directly in pandas.from_csv() or similar frameworks)
"""
job_id = self._refresh(
query, ping_frequency=ping_frequency, performance=performance
)
return self.get_result_csv(job_id)
return self.run_query_csv(query, ping_frequency, performance)

@deprecated(version="1.2.1", reason="Please use run_query_dataframe")
def refresh_into_dataframe(
self, query: QueryBase, performance: Optional[str] = None
) -> Any:
Expand All @@ -248,14 +222,7 @@ def refresh_into_dataframe(
This is a convenience method that uses refresh_csv underneath
"""
try:
import pandas # type: ignore # pylint: disable=import-outside-toplevel
except ImportError as exc:
raise ImportError(
"dependency failure, pandas is required but missing"
) from exc
data = self.refresh_csv(query, performance=performance).data
return pandas.read_csv(data)
return self.run_query_dataframe(query, performance)

# CRUD Operations: https://dune.com/docs/api/api-reference/edit-queries/
def create_query(
Expand Down Expand Up @@ -397,3 +364,103 @@ def upload_csv(self, table_name: str, data: str, description: str = "") -> bool:
return bool(response_json["success"])
except KeyError as err:
raise DuneError(response_json, "upload_csv response", err) from err

def execute_query(
self, query: QueryBase, performance: Optional[str] = None
) -> ExecutionResponse:
"""Post's to Dune API for execute `query`"""
params = query.request_format()
params["performance"] = performance or self.performance

self.logger.info(
f"executing {query.query_id} on {performance or self.performance} cluster"
)
response_json = self._post(
route=f"/query/{query.query_id}/execute",
params=params,
)
try:
return ExecutionResponse.from_dict(response_json)
except KeyError as err:
raise DuneError(response_json, "ExecutionResponse", err) from err

def get_execution_status(self, job_id: str) -> ExecutionStatusResponse:
"""GET status from Dune API for `job_id` (aka `execution_id`)"""
response_json = self._get(route=f"/execution/{job_id}/status")
try:
return ExecutionStatusResponse.from_dict(response_json)
except KeyError as err:
raise DuneError(response_json, "ExecutionStatusResponse", err) from err

def get_execution_results(self, job_id: str) -> ResultsResponse:
"""GET results from Dune API for `job_id` (aka `execution_id`)"""
response_json = self._get(route=f"/execution/{job_id}/results")
try:
return ResultsResponse.from_dict(response_json)
except KeyError as err:
raise DuneError(response_json, "ResultsResponse", err) from err

def get_execution_results_csv(self, job_id: str) -> ExecutionResultCSV:
"""
GET results in CSV format from Dune API for `job_id` (aka `execution_id`)
this API only returns the raw data in CSV format, it is faster & lighterweight
use this method for large results where you want lower CPU and memory overhead
if you need metadata information use get_results() or get_status()
"""
route = f"/execution/{job_id}/results/csv"
url = self._route_url(f"/execution/{job_id}/results/csv")
self.logger.debug(f"GET CSV received input url={url}")
response = self._get(route=route, raw=True)
response.raise_for_status()
return ExecutionResultCSV(data=BytesIO(response.content))

def run_query(
self,
query: QueryBase,
ping_frequency: int = 5,
performance: Optional[str] = None,
) -> ResultsResponse:
"""
Executes a Dune `query`, waits until execution completes,
fetches and returns the results.
Sleeps `ping_frequency` seconds between each status request.
"""
job_id = self._refresh(
query, ping_frequency=ping_frequency, performance=performance
)
return self.get_execution_results(job_id)

def run_query_csv(
self,
query: QueryBase,
ping_frequency: int = 5,
performance: Optional[str] = None,
) -> ExecutionResultCSV:
"""
Executes a Dune query, waits till execution completes,
fetches and the results in CSV format
(use it load the data directly in pandas.from_csv() or similar frameworks)
"""
job_id = self._refresh(
query, ping_frequency=ping_frequency, performance=performance
)
return self.get_execution_results_csv(job_id)

def run_query_dataframe(
self, query: QueryBase, performance: Optional[str] = None
) -> Any:
"""
Execute a Dune Query, waits till execution completes,
fetched and returns the result as a Pandas DataFrame
This is a convenience method that uses refresh_csv underneath
"""
try:
import pandas # type: ignore # pylint: disable=import-outside-toplevel
except ImportError as exc:
raise ImportError(
"dependency failure, pandas is required but missing"
) from exc
data = self.run_query_csv(query, performance=performance).data
return pandas.read_csv(data)
2 changes: 2 additions & 0 deletions requirements/prod.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ types-requests>=2.31.0.2
python-dateutil>=2.8.2
requests>=2.31.0
ndjson>=0.3.1
Deprecated>=1.2.14
types-Deprecated==1.2.9.3
44 changes: 22 additions & 22 deletions tests/e2e/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,28 +38,28 @@ def test_from_env_constructor(self):
except KeyError:
self.fail("DuneClient.from_env raised unexpectedly!")

def test_get_status(self):
def test_get_execution_status(self):
query = QueryBase(name="No Name", query_id=1276442, params=[])
dune = DuneClient(self.valid_api_key)
job_id = dune.execute(query).execution_id
status = dune.get_status(job_id)
job_id = dune.execute_query(query).execution_id
status = dune.get_execution_status(job_id)
self.assertTrue(
status.state in [ExecutionState.EXECUTING, ExecutionState.PENDING]
)

def test_refresh(self):
def test_run_query(self):
dune = DuneClient(self.valid_api_key)
results = dune.refresh(self.query).get_rows()
results = dune.run_query(self.query).get_rows()
self.assertGreater(len(results), 0)

def test_refresh_performance_large(self):
def test_run_query_performance_large(self):
dune = DuneClient(self.valid_api_key)
results = dune.refresh(self.query, performance="large").get_rows()
results = dune.run_query(self.query, performance="large").get_rows()
self.assertGreater(len(results), 0)

def test_refresh_into_dataframe(self):
def test_run_query_dataframe(self):
dune = DuneClient(self.valid_api_key)
pd = dune.refresh_into_dataframe(self.query)
pd = dune.run_query_dataframe(self.query)
self.assertGreater(len(pd), 0)

def test_parameters_recognized(self):
Expand All @@ -75,7 +75,7 @@ def test_parameters_recognized(self):
self.assertEqual(query.parameters(), new_params)

dune = DuneClient(self.valid_api_key)
results = dune.refresh(query)
results = dune.run_query(query)
self.assertEqual(
results.get_rows(),
[
Expand All @@ -90,14 +90,14 @@ def test_parameters_recognized(self):

def test_endpoints(self):
dune = DuneClient(self.valid_api_key)
execution_response = dune.execute(self.query)
execution_response = dune.execute_query(self.query)
self.assertIsInstance(execution_response, ExecutionResponse)
job_id = execution_response.execution_id
status = dune.get_status(job_id)
status = dune.get_execution_status(job_id)
self.assertIsInstance(status, ExecutionStatusResponse)
while dune.get_status(job_id).state != ExecutionState.COMPLETED:
while dune.get_execution_status(job_id).state != ExecutionState.COMPLETED:
time.sleep(1)
results = dune.get_result(job_id).result.rows
results = dune.get_execution_results(job_id).result.rows
self.assertGreater(len(results), 0)

def test_cancel_execution(self):
Expand All @@ -106,31 +106,31 @@ def test_cancel_execution(self):
name="Long Running Query",
query_id=1229120,
)
execution_response = dune.execute(query)
execution_response = dune.execute_query(query)
job_id = execution_response.execution_id
# POST Cancellation
success = dune.cancel_execution(job_id)
self.assertTrue(success)

results = dune.get_result(job_id)
results = dune.get_execution_results(job_id)
self.assertEqual(results.state, ExecutionState.CANCELLED)

def test_invalid_api_key_error(self):
dune = DuneClient(api_key="Invalid Key")
with self.assertRaises(DuneError) as err:
dune.execute(self.query)
dune.execute_query(self.query)
self.assertEqual(
str(err.exception),
"Can't build ExecutionResponse from {'error': 'invalid API Key'}",
)
with self.assertRaises(DuneError) as err:
dune.get_status("wonky job_id")
dune.get_execution_status("wonky job_id")
self.assertEqual(
str(err.exception),
"Can't build ExecutionStatusResponse from {'error': 'invalid API Key'}",
)
with self.assertRaises(DuneError) as err:
dune.get_result("wonky job_id")
dune.get_execution_results("wonky job_id")
self.assertEqual(
str(err.exception),
"Can't build ResultsResponse from {'error': 'invalid API Key'}",
Expand All @@ -142,7 +142,7 @@ def test_query_not_found_error(self):
query.query_id = 99999999 # Invalid Query Id.

with self.assertRaises(DuneError) as err:
dune.execute(query)
dune.execute_query(query)
self.assertEqual(
str(err.exception),
"Can't build ExecutionResponse from {'error': 'Query not found'}",
Expand All @@ -155,7 +155,7 @@ def test_internal_error(self):
query.query_id = 9999999999999

with self.assertRaises(DuneError) as err:
dune.execute(query)
dune.execute_query(query)
self.assertEqual(
str(err.exception),
"Can't build ExecutionResponse from {'error': 'An internal error occured'}",
Expand All @@ -164,7 +164,7 @@ def test_internal_error(self):
def test_invalid_job_id_error(self):
dune = DuneClient(self.valid_api_key)
with self.assertRaises(DuneError) as err:
dune.get_status("Wonky Job ID")
dune.get_execution_status("Wonky Job ID")
self.assertEqual(
str(err.exception),
"Can't build ExecutionStatusResponse from "
Expand Down

0 comments on commit 36cb0d0

Please sign in to comment.