Skip to content

Commit

Permalink
Add support for new features on query results (#111)
Browse files Browse the repository at this point in the history
Add support for a few features that will soon become available when
retrieving query results:
  - specify the output columns
  - get a sample of the results
  - filter out some rows before retrieve the results
  - sort the results
  • Loading branch information
RichardKeo authored Mar 4, 2024
1 parent 3a2c8a8 commit 1f33260
Show file tree
Hide file tree
Showing 6 changed files with 433 additions and 70 deletions.
51 changes: 50 additions & 1 deletion dune_client/api/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import logging.config
import os
from json import JSONDecodeError
from typing import Dict, Optional, Any
from typing import Any, Dict, List, Optional, Union

from requests import Response, Session
from requests.adapters import HTTPAdapter, Retry
Expand Down Expand Up @@ -83,6 +83,55 @@ def default_headers(self) -> Dict[str, str]:
"User-Agent": f"dune-client/{client_version} (https://pypi.org/project/dune-client/)",
}

############
# Utilities:
############

def _build_parameters(
self,
params: Optional[Dict[str, Union[str, int]]] = None,
columns: Optional[List[str]] = None,
sample_count: Optional[int] = None,
filters: Optional[str] = None,
sort_by: Optional[List[str]] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
) -> Dict[str, Union[str, int]]:
"""
Utility function that builds a dictionary of parameters to be used
when retrieving advanced results (filters, pagination, sorting, etc.).
This is shared between the sync and async client.
"""
# Ensure we don't specify parameters that are incompatible:
assert (
# We are not sampling
sample_count is None
# We are sampling and don't use filters or pagination
or (limit is None and offset is None and filters is None)
), "sampling cannot be combined with filters or pagination"

params = params or {}
if columns is not None and len(columns) > 0:
output = []
for column in columns:
# Escape all quotes and add quotes around it
col = '"' + column.replace('"', '\\"') + '"'
output.append(col)

params["columns"] = ",".join(output)
if sample_count is not None:
params["sample_count"] = sample_count
if filters is not None:
params["filters"] = filters
if sort_by is not None and len(sort_by) > 0:
params["sort_by"] = ",".join(sort_by)
if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset

return params


class BaseRouter(BaseDuneClient):
"""Extending the Base Client with elementary api routing"""
Expand Down
68 changes: 41 additions & 27 deletions dune_client/api/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"""

from io import BytesIO
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional

from deprecated import deprecated

Expand Down Expand Up @@ -76,39 +76,34 @@ def get_execution_results(
job_id: str,
limit: Optional[int] = None,
offset: Optional[int] = None,
columns: Optional[List[str]] = None,
sample_count: Optional[int] = None,
filters: Optional[str] = None,
sort_by: Optional[List[str]] = None,
) -> ResultsResponse:
"""GET results from Dune API for `job_id` (aka `execution_id`)"""
params = {}
if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset
params = self._build_parameters(
columns=columns,
sample_count=sample_count,
filters=filters,
sort_by=sort_by,
limit=limit,
offset=offset,
)

route = f"/execution/{job_id}/results"
url = self._route_url(route)
return self._get_execution_results_by_url(url=url, params=params)

def _get_execution_results_by_url(
self,
url: str,
params: Optional[Dict[str, Any]] = None,
) -> ResultsResponse:
"""
GET results from Dune API with a given URL. This is particularly useful for pagination.
"""
assert url.startswith(self.base_url)

response_json = self._get(url=url, params=params)
try:
return ResultsResponse.from_dict(response_json)
except KeyError as err:
raise DuneError(response_json, "ResultsResponse", err) from err

def get_execution_results_csv(
self,
job_id: str,
limit: Optional[int] = None,
offset: Optional[int] = None,
columns: Optional[List[str]] = None,
filters: Optional[str] = None,
sample_count: Optional[int] = None,
sort_by: Optional[List[str]] = None,
) -> ExecutionResultCSV:
"""
GET results in CSV format from Dune API for `job_id` (aka `execution_id`)
Expand All @@ -117,16 +112,35 @@ def get_execution_results_csv(
use this method for large results where you want lower CPU and memory overhead
if you need metadata information use get_results() or get_status()
"""
params = {}
if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset
params = self._build_parameters(
columns=columns,
sample_count=sample_count,
filters=filters,
sort_by=sort_by,
limit=limit,
offset=offset,
)

route = f"/execution/{job_id}/results/csv"
url = self._route_url(route)
return self._get_execution_results_csv_by_url(url=url, params=params)

def _get_execution_results_by_url(
self,
url: str,
params: Optional[Dict[str, Any]] = None,
) -> ResultsResponse:
"""
GET results from Dune API with a given URL. This is particularly useful for pagination.
"""
assert url.startswith(self.base_url)

response_json = self._get(url=url, params=params)
try:
return ResultsResponse.from_dict(response_json)
except KeyError as err:
raise DuneError(response_json, "ResultsResponse", err) from err

def _get_execution_results_csv_by_url(
self,
url: str,
Expand Down
Loading

0 comments on commit 1f33260

Please sign in to comment.