From a8780ff58f16dfdca85d4825f86e4503ebf07e36 Mon Sep 17 00:00:00 2001 From: apurvabanka Date: Sun, 20 Oct 2024 20:24:00 -0400 Subject: [PATCH 1/9] .venv in gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 110fabc..ba12498 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ dist _version.py .idea/ venv/ +.venv/ tmp/ .vscode/ build/ From def7152a6a5cb72c3cb03cc15aca1951b253165e Mon Sep 17 00:00:00 2001 From: apurvabanka Date: Sun, 20 Oct 2024 20:24:33 -0400 Subject: [PATCH 2/9] removing too-many-positional-arguments --- .pylintrc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pylintrc b/.pylintrc index 0f486f9..7130db2 100644 --- a/.pylintrc +++ b/.pylintrc @@ -1,4 +1,4 @@ [MASTER] -disable=fixme,logging-fstring-interpolation,too-many-positional-arguments +disable=fixme,logging-fstring-interpolation [DESIGN] max-args=10 From 2afbf0ee6d1955cdb348149cd0f36f89a7170614 Mon Sep 17 00:00:00 2001 From: apurvabanka Date: Sun, 20 Oct 2024 20:27:44 -0400 Subject: [PATCH 3/9] changing function params --- dune_client/api/base.py | 36 +++--- dune_client/api/custom.py | 22 ++-- dune_client/api/execution.py | 44 +++++--- dune_client/api/extensions.py | 207 +++++++++++++++++++++------------- dune_client/api/query.py | 20 ++-- dune_client/client_async.py | 169 +++++++++++++++------------ 6 files changed, 299 insertions(+), 199 deletions(-) diff --git a/dune_client/api/base.py b/dune_client/api/base.py index 79abddf..41f3a28 100644 --- a/dune_client/api/base.py +++ b/dune_client/api/base.py @@ -89,21 +89,25 @@ def default_headers(self) -> Dict[str, str]: def _build_parameters( self, - params: Optional[Dict[str, Union[str, int]]] = None, - columns: Optional[List[str]] = None, - sample_count: Optional[int] = None, - filters: Optional[str] = None, - sort_by: Optional[List[str]] = None, - limit: Optional[int] = None, - offset: Optional[int] = None, allow_partial_results: str = "true", + params: Optional[Dict[str, Any]] = None, ) -> Dict[str, Union[str, int]]: + #TODO: Change the function class for this function """ Utility function that builds a dictionary of parameters to be used when retrieving advanced results (filters, pagination, sorting, etc.). This is shared between the sync and async client. """ # Ensure we don't specify parameters that are incompatible: + if params is None: + params = {} + parameters = params.get("params", None) + columns = params.get("columns", None) + sample_count = params.get("sample_count", None) + filters = params.get("filters", None) + sort_by = params.get("sort_by", None) + limit = params.get("limit", None) + offset = params.get("offset", None) assert ( # We are not sampling sample_count is None @@ -111,22 +115,22 @@ def _build_parameters( or (limit is None and offset is None and filters is None) ), "sampling cannot be combined with filters or pagination" - params = params or {} - params["allow_partial_results"] = allow_partial_results + parameters = parameters or {} + parameters["allow_partial_results"] = allow_partial_results if columns is not None and len(columns) > 0: - params["columns"] = ",".join(columns) + parameters["columns"] = ",".join(columns) if sample_count is not None: - params["sample_count"] = sample_count + parameters["sample_count"] = sample_count if filters is not None: - params["filters"] = filters + parameters["filters"] = filters if sort_by is not None and len(sort_by) > 0: - params["sort_by"] = ",".join(sort_by) + parameters["sort_by"] = ",".join(sort_by) if limit is not None: - params["limit"] = limit + parameters["limit"] = limit if offset is not None: - params["offset"] = offset + parameters["offset"] = offset - return params + return parameters class BaseRouter(BaseDuneClient): diff --git a/dune_client/api/custom.py b/dune_client/api/custom.py index 3809036..89285c2 100644 --- a/dune_client/api/custom.py +++ b/dune_client/api/custom.py @@ -4,7 +4,7 @@ """ from __future__ import annotations -from typing import List, Optional +from typing import Dict, Optional, Any from dune_client.api.base import BaseRouter from dune_client.models import ( @@ -25,12 +25,7 @@ def get_custom_endpoint_result( self, handle: str, endpoint: str, - limit: Optional[int] = None, - offset: Optional[int] = None, - columns: Optional[List[str]] = None, - sample_count: Optional[int] = None, - filters: Optional[str] = None, - sort_by: Optional[List[str]] = None, + params: Optional[Dict[str, Any]] = None, ) -> ResultsResponse: """ Custom endpoints allow you to fetch and filter data from any @@ -48,7 +43,16 @@ def get_custom_endpoint_result( filters (str, optional): The filters to apply. sort_by (List[str], optional): The columns to sort by. """ - params = self._build_parameters( + if params is None: + params = {} + limit = params.get("limit", None) + offset = params.get("offset", None) + columns = params.get("columns", None) + sample_count = params.get("sample_counts", None) + filters = params.get("filters", None) + sort_by = params.get("sort_by", None) + + build_params = self._build_parameters( columns=columns, sample_count=sample_count, filters=filters, @@ -58,7 +62,7 @@ def get_custom_endpoint_result( ) response_json = self._get( route=f"/endpoints/{handle}/{endpoint}/results", - params=params, + params=build_params, ) try: return ResultsResponse.from_dict(response_json) diff --git a/dune_client/api/execution.py b/dune_client/api/execution.py index e15e6bd..fe143d5 100644 --- a/dune_client/api/execution.py +++ b/dune_client/api/execution.py @@ -7,7 +7,7 @@ """ from io import BytesIO -from typing import Any, Dict, List, Optional +from typing import Any, Dict, Optional from deprecated import deprecated @@ -75,16 +75,20 @@ def get_execution_status(self, job_id: str) -> ExecutionStatusResponse: def get_execution_results( self, job_id: str, - limit: Optional[int] = None, - offset: Optional[int] = None, - columns: Optional[List[str]] = None, - sample_count: Optional[int] = None, - filters: Optional[str] = None, - sort_by: Optional[List[str]] = None, - allow_partial_results: str = "true", + params: Optional[Dict[str, Any]] = None, ) -> ResultsResponse: """GET results from Dune API for `job_id` (aka `execution_id`)""" - params = self._build_parameters( + if params is None: + params = {} + limit = params.get("limit", None) + offset = params.get("offset", None) + columns = params.get("columns", None) + sample_count = params.get("sample_counts", None) + filters = params.get("filters", None) + sort_by = params.get("sort_by", None) + allow_partial_results = params.get("allow_partial_results", None) + + build_params = self._build_parameters( columns=columns, sample_count=sample_count, filters=filters, @@ -96,17 +100,12 @@ def get_execution_results( route = f"/execution/{job_id}/results" url = self._route_url(route) - return self._get_execution_results_by_url(url=url, params=params) + return self._get_execution_results_by_url(url=url, params=build_params) def get_execution_results_csv( self, job_id: str, - limit: Optional[int] = None, - offset: Optional[int] = None, - columns: Optional[List[str]] = None, - filters: Optional[str] = None, - sample_count: Optional[int] = None, - sort_by: Optional[List[str]] = None, + params: Optional[Dict[str, Any]] = None, ) -> ExecutionResultCSV: """ GET results in CSV format from Dune API for `job_id` (aka `execution_id`) @@ -115,7 +114,16 @@ def get_execution_results_csv( use this method for large results where you want lower CPU and memory overhead if you need metadata information use get_results() or get_status() """ - params = self._build_parameters( + if params is None: + params = {} + limit = params.get("limit", None) + offset = params.get("offset", None) + columns = params.get("columns", None) + sample_count = params.get("sample_counts", None) + filters = params.get("filters", None) + sort_by = params.get("sort_by", None) + + build_params = self._build_parameters( columns=columns, sample_count=sample_count, filters=filters, @@ -126,7 +134,7 @@ def get_execution_results_csv( route = f"/execution/{job_id}/results/csv" url = self._route_url(route) - return self._get_execution_results_csv_by_url(url=url, params=params) + return self._get_execution_results_csv_by_url(url=url, params=build_params) def _get_execution_results_by_url( self, url: str, params: Optional[Dict[str, Any]] = None diff --git a/dune_client/api/extensions.py b/dune_client/api/extensions.py index 44fda20..7e42f30 100644 --- a/dune_client/api/extensions.py +++ b/dune_client/api/extensions.py @@ -8,7 +8,7 @@ import time from io import BytesIO -from typing import Any, List, Optional, Union +from typing import Any, Optional, Union, Dict from deprecated import deprecated @@ -29,7 +29,8 @@ ExecutionResultCSV, ) from dune_client.query import QueryBase, parse_query_object_or_id -from dune_client.types import QueryParameter + +# from dune_client.types import QueryParameter from dune_client.util import age_in_hours # This is the expiry time on old query results. @@ -48,13 +49,8 @@ def run_query( self, query: QueryBase, ping_frequency: int = POLL_FREQUENCY_SECONDS, - performance: Optional[str] = None, - batch_size: Optional[int] = None, - columns: Optional[List[str]] = None, - sample_count: Optional[int] = None, - filters: Optional[str] = None, - sort_by: Optional[List[str]] = None, allow_partial_results: str = "true", + params: Optional[Dict[str, Any]] = None, ) -> ResultsResponse: """ Executes a Dune `query`, waits until execution completes, @@ -62,6 +58,14 @@ def run_query( Sleeps `ping_frequency` seconds between each status request. """ # Ensure we don't specify parameters that are incompatible: + if params is None: + params = {} + performance = params.get("performance", None) + batch_size = params.get("batch_size", None) + columns = params.get("columns", None) + sample_count = params.get("sample_count", None) + filters = params.get("filters", None) + sort_by = params.get("sort_by", None) assert ( # We are not sampling sample_count is None @@ -79,12 +83,14 @@ def run_query( return self._fetch_entire_result( self.get_execution_results( job_id, - columns=columns, - sample_count=sample_count, - filters=filters, - sort_by=sort_by, - limit=limit, - allow_partial_results=allow_partial_results, + params={ + "columns": columns, + "sample_count": sample_count, + "filters": filters, + "sort_by": sort_by, + "limit": limit, + "allow_partial_results": allow_partial_results, + }, ), ) @@ -92,12 +98,7 @@ def run_query_csv( self, query: QueryBase, ping_frequency: int = POLL_FREQUENCY_SECONDS, - performance: Optional[str] = None, - batch_size: Optional[int] = None, - columns: Optional[List[str]] = None, - sample_count: Optional[int] = None, - filters: Optional[str] = None, - sort_by: Optional[List[str]] = None, + params: Optional[Dict[str, Any]] = None, ) -> ExecutionResultCSV: """ Executes a Dune query, waits till execution completes, @@ -105,6 +106,14 @@ def run_query_csv( (use it load the data directly in pandas.from_csv() or similar frameworks) """ # Ensure we don't specify parameters that are incompatible: + if params is None: + params = {} + performance = params.get("performance", None) + batch_size = params.get("batch_size", None) + columns = params.get("columns", None) + sample_count = params.get("sample_count", None) + filters = params.get("filters", None) + sort_by = params.get("sort_by", None) assert ( # We are not sampling sample_count is None @@ -122,11 +131,13 @@ def run_query_csv( return self._fetch_entire_result_csv( self.get_execution_results_csv( job_id, - columns=columns, - sample_count=sample_count, - filters=filters, - sort_by=sort_by, - limit=limit, + params={ + "columns": columns, + "sample_count": sample_count, + "filters": filters, + "sort_by": sort_by, + "limit": limit, + }, ), ) @@ -134,12 +145,7 @@ def run_query_dataframe( self, query: QueryBase, ping_frequency: int = POLL_FREQUENCY_SECONDS, - performance: Optional[str] = None, - batch_size: Optional[int] = None, - columns: Optional[List[str]] = None, - sample_count: Optional[int] = None, - filters: Optional[str] = None, - sort_by: Optional[List[str]] = None, + params: Optional[Dict[str, Any]] = None, ) -> Any: """ Execute a Dune Query, waits till execution completes, @@ -147,6 +153,15 @@ def run_query_dataframe( This is a convenience method that uses run_query_csv() + pandas.read_csv() underneath """ + if params is None: + params = {} + performance = params.get("performance", None) + batch_size = params.get("batch_size", None) + columns = params.get("columns", None) + sample_count = params.get("sample_count", None) + filters = params.get("filters", None) + sort_by = params.get("sort_by", None) + try: import pandas # pylint: disable=import-outside-toplevel except ImportError as exc: @@ -156,12 +171,14 @@ def run_query_dataframe( data = self.run_query_csv( query, ping_frequency, - performance, - batch_size=batch_size, - columns=columns, - sample_count=sample_count, - filters=filters, - sort_by=sort_by, + params={ + "performance": performance, + "batch_size": batch_size, + "columns": columns, + "sample_count": sample_count, + "filters": filters, + "sort_by": sort_by, + }, ).data return pandas.read_csv(data) @@ -169,11 +186,7 @@ def get_latest_result( self, query: Union[QueryBase, str, int], max_age_hours: int = THREE_MONTHS_IN_HOURS, - batch_size: Optional[int] = None, - columns: Optional[List[str]] = None, - sample_count: Optional[int] = None, - filters: Optional[str] = None, - sort_by: Optional[List[str]] = None, + params: Optional[Dict[str, Any]] = None, ) -> ResultsResponse: """ GET the latest results for a query_id without re-executing the query @@ -184,6 +197,13 @@ def get_latest_result( https://docs.dune.com/api-reference/executions/endpoint/get-query-result """ # Ensure we don't specify parameters that are incompatible: + if params is None: + params = {} + batch_size = params.get("batch_size", None) + columns = params.get("columns", None) + sample_count = params.get("sample_count", None) + filters = params.get("filters", None) + sort_by = params.get("sort_by", None) assert ( # We are not sampling sample_count is None @@ -214,12 +234,16 @@ def get_latest_result( f"results (from {last_run}) older than {max_age_hours} hours, re-running query" ) results = self.run_query( - query if isinstance(query, QueryBase) else QueryBase(query_id), - columns=columns, - sample_count=sample_count, - filters=filters, - sort_by=sort_by, - batch_size=batch_size, + query=( + query if isinstance(query, QueryBase) else QueryBase(query_id) + ), + params={ + "columns": columns, + "sample_count": sample_count, + "filters": filters, + "sort_by": sort_by, + "batch_size": batch_size, + }, ) else: # The results are fresh enough, retrieve the entire result @@ -227,11 +251,13 @@ def get_latest_result( results = self._fetch_entire_result( self.get_execution_results( metadata.execution_id, - columns=columns, - sample_count=sample_count, - filters=filters, - sort_by=sort_by, - limit=batch_size, + params={ + "columns": columns, + "sample_count": sample_count, + "filters": filters, + "sort_by": sort_by, + "limit": batch_size, + }, ), ) return results @@ -241,11 +267,7 @@ def get_latest_result( def get_latest_result_dataframe( self, query: Union[QueryBase, str, int], - batch_size: Optional[int] = None, - columns: Optional[List[str]] = None, - sample_count: Optional[int] = None, - filters: Optional[str] = None, - sort_by: Optional[List[str]] = None, + params: Optional[Dict[str, Any]] = None, ) -> Any: """ GET the latest results for a query_id without re-executing the query @@ -254,6 +276,13 @@ def get_latest_result_dataframe( This is a convenience method that uses get_latest_result() + pandas.read_csv() underneath """ + if params is None: + params = {} + batch_size = params.get("batch_size", None) + columns = params.get("columns", None) + sample_count = params.get("sample_count", None) + filters = params.get("filters", None) + sort_by = params.get("sort_by", None) try: import pandas # pylint: disable=import-outside-toplevel except ImportError as exc: @@ -263,28 +292,34 @@ def get_latest_result_dataframe( results = self.download_csv( query, - columns=columns, - sample_count=sample_count, - filters=filters, - sort_by=sort_by, - batch_size=batch_size, + params={ + "columns": columns, + "sample_count": sample_count, + "filters": filters, + "sort_by": sort_by, + "batch_size": batch_size, + }, ) return pandas.read_csv(results.data) def download_csv( self, query: Union[QueryBase, str, int], - batch_size: Optional[int] = None, - columns: Optional[List[str]] = None, - sample_count: Optional[int] = None, - filters: Optional[str] = None, - sort_by: Optional[List[str]] = None, + params: Optional[Dict[str, Any]] = None, ) -> ExecutionResultCSV: """ Almost like an alias for `get_latest_result` but for the csv endpoint. https://docs.dune.com/api-reference/executions/endpoint/get-query-result-csv """ # Ensure we don't specify parameters that are incompatible: + if params is None: + params = {} + batch_size = params.get("batch_size", None) + columns = params.get("columns", None) + sample_count = params.get("sample_count", None) + filters = params.get("filters", None) + sort_by = params.get("sort_by", None) + assert ( # We are not sampling sample_count is None @@ -327,12 +362,9 @@ def download_csv( def run_sql( self, query_sql: str, - params: Optional[list[QueryParameter]] = None, - is_private: bool = True, - archive_after: bool = True, - performance: Optional[str] = None, - ping_frequency: int = POLL_FREQUENCY_SECONDS, name: str = "API Query", + ping_frequency: int = POLL_FREQUENCY_SECONDS, + params: Optional[Dict[str, Any]] = None, ) -> ResultsResponse: """ Allows user to provide execute raw_sql via the CRUD interface @@ -340,10 +372,21 @@ def run_sql( - Query is by default made private and archived after execution. Requires Plus subscription! """ - query = self.create_query(name, query_sql, params, is_private) + if params is None: + params = None + + query_params = params.get("query_params", None) + is_private = params.get("is_private", None) + archive_after = params.get("archive_after", None) + performance = params.get("performance", None) + query = self.create_query(name, query_sql, query_params, is_private) try: results = self.run_query( - query=query.base, performance=performance, ping_frequency=ping_frequency + query=query.base, + ping_frequency=ping_frequency, + params={ + "performance": performance, + }, ) finally: if archive_after: @@ -365,7 +408,11 @@ def refresh( fetches and returns the results. Sleeps `ping_frequency` seconds between each status request. """ - return self.run_query(query, ping_frequency, performance) + return self.run_query( + query=query, + ping_frequency=ping_frequency, + params={"performance": performance}, + ) @deprecated(version="1.2.1", reason="Please use run_query_csv") def refresh_csv( @@ -379,7 +426,9 @@ def refresh_csv( fetches and the results in CSV format (use it load the data directly in pandas.from_csv() or similar frameworks) """ - return self.run_query_csv(query, ping_frequency, performance) + return self.run_query_csv( + query, ping_frequency, params={"performance": performance} + ) @deprecated(version="1.2.1", reason="Please use run_query_dataframe") def refresh_into_dataframe( @@ -394,7 +443,9 @@ def refresh_into_dataframe( This is a convenience method that uses refresh_csv underneath """ - return self.run_query_dataframe(query, ping_frequency, performance) + return self.run_query_dataframe( + query, ping_frequency, params={"performance": performance} + ) ################# # Private Methods diff --git a/dune_client/api/query.py b/dune_client/api/query.py index 56ddcc1..8792ccc 100644 --- a/dune_client/api/query.py +++ b/dune_client/api/query.py @@ -6,7 +6,7 @@ """ from __future__ import annotations -from typing import Optional, Any +from typing import Optional, Any, Dict from dune_client.api.base import BaseRouter from dune_client.models import DuneError @@ -57,11 +57,7 @@ def get_query(self, query_id: int) -> DuneQuery: def update_query( # pylint: disable=too-many-arguments self, query_id: int, - name: Optional[str] = None, - query_sql: Optional[str] = None, - params: Optional[list[QueryParameter]] = None, - description: Optional[str] = None, - tags: Optional[list[str]] = None, + params: Optional[Dict[str, Any]] = None, ) -> int: """ Updates Dune Query by ID @@ -72,6 +68,14 @@ def update_query( # pylint: disable=too-many-arguments If the tags or parameters are provided as an empty array, they will be deleted from the query. """ + if params is None: + params = {} + name = params.get("name", None) + query_sql = params.get("query_sql", None) + query_parms = params.get("parameters", None) + description = params.get("description", None) + tags = params.get("tags", None) + parameters: dict[str, Any] = {} if name is not None: parameters["name"] = name @@ -81,8 +85,8 @@ def update_query( # pylint: disable=too-many-arguments parameters["tags"] = tags if query_sql is not None: parameters["query_sql"] = query_sql - if params is not None: - parameters["parameters"] = [p.to_dict() for p in params] + if query_parms is not None: + parameters["parameters"] = [p.to_dict() for p in query_parms] if not bool(parameters): # Nothing to change no need to make reqeust diff --git a/dune_client/client_async.py b/dune_client/client_async.py index 39e24fa..0b0419c 100644 --- a/dune_client/client_async.py +++ b/dune_client/client_async.py @@ -9,7 +9,7 @@ import asyncio import ssl from io import BytesIO -from typing import Any, Callable, Dict, List, Optional, Union +from typing import Any, Callable, Dict, Optional, Union import certifi from aiohttp import ( @@ -232,13 +232,16 @@ async def get_status(self, job_id: str) -> ExecutionStatusResponse: async def get_result( self, job_id: str, - batch_size: Optional[int] = None, - columns: Optional[List[str]] = None, - sample_count: Optional[int] = None, - filters: Optional[str] = None, - sort_by: Optional[List[str]] = None, + params: Optional[Dict[str, Any]] = None, ) -> ResultsResponse: """GET results from Dune API for `job_id` (aka `execution_id`)""" + if params is None: + params = {} + batch_size = params.get("batch_size", None) + columns = params.get("columns", None) + sample_count = params.get("sample_count", None) + filters = params.get("filters", None) + sort_by = params.get("sort_by", None) assert ( # We are not sampling sample_count is None @@ -251,11 +254,13 @@ async def get_result( results = await self._get_result_page( job_id, - columns=columns, - sample_count=sample_count, - filters=filters, - sort_by=sort_by, - limit=batch_size, + params={ + "columns": columns, + "sample_count": sample_count, + "filters": filters, + "sort_by": sort_by, + "limit": batch_size, + }, ) while results.next_uri is not None: batch = await self._get_result_by_url(results.next_uri) @@ -266,11 +271,7 @@ async def get_result( async def get_result_csv( self, job_id: str, - batch_size: Optional[int] = None, - columns: Optional[List[str]] = None, - sample_count: Optional[int] = None, - filters: Optional[str] = None, - sort_by: Optional[List[str]] = None, + params: Optional[Dict[str, Any]] = None, ) -> ExecutionResultCSV: """ GET results in CSV format from Dune API for `job_id` (aka `execution_id`) @@ -279,6 +280,13 @@ async def get_result_csv( use this method for large results where you want lower CPU and memory overhead if you need metadata information use get_results() or get_status() """ + if params is None: + params = {} + batch_size = params.get("batch_size", None) + columns = params.get("columns", None) + sample_count = params.get("sample_count", None) + filters = params.get("filters", None) + sort_by = params.get("sort_by", None) assert ( # We are not sampling sample_count is None @@ -291,11 +299,13 @@ async def get_result_csv( results = await self._get_result_csv_page( job_id, - columns=columns, - sample_count=sample_count, - filters=filters, - sort_by=sort_by, - limit=batch_size, + params={ + "columns": columns, + "sample_count": sample_count, + "filters": filters, + "sort_by": sort_by, + "limit": batch_size, + }, ) while results.next_uri is not None: batch = await self._get_result_csv_by_url(results.next_uri) @@ -357,18 +367,21 @@ async def refresh( self, query: QueryBase, ping_frequency: int = 5, - performance: Optional[str] = None, - batch_size: Optional[int] = None, - columns: Optional[List[str]] = None, - sample_count: Optional[int] = None, - filters: Optional[str] = None, - sort_by: Optional[List[str]] = None, + params: Optional[Dict[str, Any]] = None, ) -> ResultsResponse: """ Executes a Dune `query`, waits until execution completes, fetches and returns the results. Sleeps `ping_frequency` seconds between each status request. """ + if params is None: + params = {} + performance = params.get("performance", None) + batch_size = params.get("batch_size", None) + columns = params.get("columns", None) + sample_count = params.get("sample_count", None) + filters = params.get("filters", None) + sort_by = params.get("sort_by", None) assert ( # We are not sampling sample_count is None @@ -381,29 +394,34 @@ async def refresh( ) return await self.get_result( job_id, - columns=columns, - sample_count=sample_count, - filters=filters, - sort_by=sort_by, - batch_size=batch_size, + params={ + "columns": columns, + "sample_count": sample_count, + "filters": filters, + "sort_by": sort_by, + "batch_size": batch_size, + }, ) async def refresh_csv( self, query: QueryBase, ping_frequency: int = 5, - performance: Optional[str] = None, - batch_size: Optional[int] = None, - columns: Optional[List[str]] = None, - sample_count: Optional[int] = None, - filters: Optional[str] = None, - sort_by: Optional[List[str]] = None, + params: Optional[Dict[str, Any]] = None, ) -> ExecutionResultCSV: """ Executes a Dune query, waits till execution completes, fetches and the results in CSV format (use it load the data directly in pandas.from_csv() or similar frameworks) """ + if params is None: + params = {} + performance = params.get("performance", None) + batch_size = params.get("batch_size", None) + columns = params.get("columns", None) + sample_count = params.get("sample_count", None) + filters = params.get("filters", None) + sort_by = params.get("sort_by", None) assert ( # We are not sampling sample_count is None @@ -416,22 +434,19 @@ async def refresh_csv( ) return await self.get_result_csv( job_id, - columns=columns, - sample_count=sample_count, - filters=filters, - sort_by=sort_by, - batch_size=batch_size, + params={ + "columns": columns, + "sample_count": sample_count, + "filters": filters, + "sort_by": sort_by, + "batch_size": batch_size, + }, ) async def refresh_into_dataframe( self, query: QueryBase, - performance: Optional[str] = None, - batch_size: Optional[int] = None, - columns: Optional[List[str]] = None, - sample_count: Optional[int] = None, - filters: Optional[str] = None, - sort_by: Optional[List[str]] = None, + params: Optional[Dict[str, Any]] = None, ) -> Any: """ Execute a Dune Query, waits till execution completes, @@ -439,6 +454,14 @@ async def refresh_into_dataframe( This is a convenience method that uses refresh_csv underneath """ + if params is None: + params = {} + performance = params.get("performance", None) + batch_size = params.get("batch_size", None) + columns = params.get("columns", None) + sample_count = params.get("sample_count", None) + filters = params.get("filters", None) + sort_by = params.get("sort_by", None) try: import pandas # pylint: disable=import-outside-toplevel except ImportError as exc: @@ -446,13 +469,15 @@ async def refresh_into_dataframe( "dependency failure, pandas is required but missing" ) from exc results = await self.refresh_csv( - query, - performance=performance, - columns=columns, - sample_count=sample_count, - filters=filters, - sort_by=sort_by, - batch_size=batch_size, + query=query, + params={ + "performance": performance, + "columns": columns, + "sample_count": sample_count, + "filters": filters, + "sort_by": sort_by, + "batch_size": batch_size, + }, ) return pandas.read_csv(results.data) @@ -463,15 +488,17 @@ async def refresh_into_dataframe( async def _get_result_page( self, job_id: str, - limit: Optional[int] = None, - offset: Optional[int] = None, - columns: Optional[List[str]] = None, - sample_count: Optional[int] = None, - filters: Optional[str] = None, - sort_by: Optional[List[str]] = None, + params: Optional[Dict[str, Any]] = None, ) -> ResultsResponse: """GET a page of results from Dune API for `job_id` (aka `execution_id`)""" - + if params is None: + params = {} + limit = params.get("limit", None) + offset = params.get("offset", None) + columns = params.get("columns", None) + sample_count = params.get("sample_count", None) + filters = params.get("filters", None) + sort_by = params.get("sort_by", None) if sample_count is None and limit is None and offset is None: limit = MAX_NUM_ROWS_PER_BATCH offset = 0 @@ -512,17 +539,19 @@ async def _get_result_by_url( async def _get_result_csv_page( self, job_id: str, - limit: Optional[int] = None, - offset: Optional[int] = None, - columns: Optional[List[str]] = None, - sample_count: Optional[int] = None, - filters: Optional[str] = None, - sort_by: Optional[List[str]] = None, + params: Optional[Dict[str, Any]] = None, ) -> ExecutionResultCSV: """ GET a page of results in CSV format from Dune API for `job_id` (aka `execution_id`) """ - + if params is None: + params = {} + limit = params.get("limit", None) + offset = params.get("offset", None) + columns = params.get("columns", None) + sample_count = params.get("sample_count", None) + filters = params.get("filters", None) + sort_by = params.get("sort_by", None) if sample_count is None and limit is None and offset is None: limit = MAX_NUM_ROWS_PER_BATCH offset = 0 From b07157dee75ce55cd18f934992e47f7b421f9e1c Mon Sep 17 00:00:00 2001 From: apurvabanka Date: Tue, 22 Oct 2024 02:47:10 -0400 Subject: [PATCH 4/9] Implemented classes for common params --- dune_client/api/base.py | 3 +- dune_client/api/custom.py | 14 +- dune_client/api/execution.py | 83 +++++------ dune_client/api/extensions.py | 51 +++---- dune_client/client_async.py | 265 +++++++++++++++++----------------- 5 files changed, 205 insertions(+), 211 deletions(-) diff --git a/dune_client/api/base.py b/dune_client/api/base.py index 41f3a28..fe30d47 100644 --- a/dune_client/api/base.py +++ b/dune_client/api/base.py @@ -9,7 +9,7 @@ import logging.config import os from json import JSONDecodeError -from typing import Any, Dict, List, Optional, Union, IO +from typing import Any, Dict, Optional, Union, IO from requests import Response, Session from requests.adapters import HTTPAdapter, Retry @@ -92,7 +92,6 @@ def _build_parameters( allow_partial_results: str = "true", params: Optional[Dict[str, Any]] = None, ) -> Dict[str, Union[str, int]]: - #TODO: Change the function class for this function """ Utility function that builds a dictionary of parameters to be used when retrieving advanced results (filters, pagination, sorting, etc.). diff --git a/dune_client/api/custom.py b/dune_client/api/custom.py index 89285c2..3b6cece 100644 --- a/dune_client/api/custom.py +++ b/dune_client/api/custom.py @@ -53,12 +53,14 @@ def get_custom_endpoint_result( sort_by = params.get("sort_by", None) build_params = self._build_parameters( - columns=columns, - sample_count=sample_count, - filters=filters, - sort_by=sort_by, - limit=limit, - offset=offset, + params={ + "columns": columns, + "sample_count": sample_count, + "filters": filters, + "sort_by": sort_by, + "limit": limit, + "offset": offset, + } ) response_json = self._get( route=f"/endpoints/{handle}/{endpoint}/results", diff --git a/dune_client/api/execution.py b/dune_client/api/execution.py index fe143d5..0d05fac 100644 --- a/dune_client/api/execution.py +++ b/dune_client/api/execution.py @@ -7,7 +7,7 @@ """ from io import BytesIO -from typing import Any, Dict, Optional +from typing import Any, Dict, List, NamedTuple, Optional from deprecated import deprecated @@ -27,6 +27,19 @@ from dune_client.query import QueryBase +class GetExecutionResultsParams(NamedTuple): + """ + Parameters for get execution result functions + """ + + limit: Optional[int] + columns: Optional[List[str]] + sample_count: Optional[int] + filters: Optional[str] + sort_by: Optional[List[str]] + offset: Optional[int] + + class ExecutionAPI(BaseRouter): """ Query execution and result fetching functions. @@ -75,37 +88,30 @@ def get_execution_status(self, job_id: str) -> ExecutionStatusResponse: def get_execution_results( self, job_id: str, - params: Optional[Dict[str, Any]] = None, + params: Optional[GetExecutionResultsParams] = None, + allow_partial_results: str = "true", ) -> ResultsResponse: """GET results from Dune API for `job_id` (aka `execution_id`)""" - if params is None: - params = {} - limit = params.get("limit", None) - offset = params.get("offset", None) - columns = params.get("columns", None) - sample_count = params.get("sample_counts", None) - filters = params.get("filters", None) - sort_by = params.get("sort_by", None) - allow_partial_results = params.get("allow_partial_results", None) - - build_params = self._build_parameters( - columns=columns, - sample_count=sample_count, - filters=filters, - sort_by=sort_by, - limit=limit, - offset=offset, - allow_partial_results=allow_partial_results, - ) + build_params = None + if params is not None: + build_params = self._build_parameters( + allow_partial_results=allow_partial_results, + params={ + "columns": params.columns, + "sample_count": params.sample_count, + "filters": params.filters, + "sort_by": params.sort_by, + "limit": params.limit, + "offset": params.offset, + }, + ) route = f"/execution/{job_id}/results" url = self._route_url(route) return self._get_execution_results_by_url(url=url, params=build_params) def get_execution_results_csv( - self, - job_id: str, - params: Optional[Dict[str, Any]] = None, + self, job_id: str, params: Optional[GetExecutionResultsParams] = None ) -> ExecutionResultCSV: """ GET results in CSV format from Dune API for `job_id` (aka `execution_id`) @@ -114,23 +120,18 @@ def get_execution_results_csv( use this method for large results where you want lower CPU and memory overhead if you need metadata information use get_results() or get_status() """ - if params is None: - params = {} - limit = params.get("limit", None) - offset = params.get("offset", None) - columns = params.get("columns", None) - sample_count = params.get("sample_counts", None) - filters = params.get("filters", None) - sort_by = params.get("sort_by", None) - - build_params = self._build_parameters( - columns=columns, - sample_count=sample_count, - filters=filters, - sort_by=sort_by, - limit=limit, - offset=offset, - ) + build_params = None + if params is not None: + build_params = self._build_parameters( + params={ + "columns": params.columns, + "sample_count": params.sample_count, + "filters": params.filters, + "sort_by": params.sort_by, + "limit": params.limit, + "offset": params.offset, + } + ) route = f"/execution/{job_id}/results/csv" url = self._route_url(route) diff --git a/dune_client/api/extensions.py b/dune_client/api/extensions.py index 7e42f30..1b8f28b 100644 --- a/dune_client/api/extensions.py +++ b/dune_client/api/extensions.py @@ -17,7 +17,7 @@ DUNE_CSV_NEXT_OFFSET_HEADER, MAX_NUM_ROWS_PER_BATCH, ) -from dune_client.api.execution import ExecutionAPI +from dune_client.api.execution import ExecutionAPI, GetExecutionResultsParams from dune_client.api.query import QueryAPI from dune_client.api.table import TableAPI from dune_client.api.custom import CustomEndpointAPI @@ -80,17 +80,14 @@ def run_query( # pylint: disable=duplicate-code job_id = self._refresh(query, ping_frequency, performance) + params = GetExecutionResultsParams( + limit, columns, sample_count, filters, sort_by, None + ) return self._fetch_entire_result( self.get_execution_results( job_id, - params={ - "columns": columns, - "sample_count": sample_count, - "filters": filters, - "sort_by": sort_by, - "limit": limit, - "allow_partial_results": allow_partial_results, - }, + allow_partial_results=allow_partial_results, + params=params, ), ) @@ -128,16 +125,13 @@ def run_query_csv( # pylint: disable=duplicate-code job_id = self._refresh(query, ping_frequency, performance) + params = GetExecutionResultsParams( + limit, columns, sample_count, filters, sort_by, None + ) return self._fetch_entire_result_csv( self.get_execution_results_csv( job_id, - params={ - "columns": columns, - "sample_count": sample_count, - "filters": filters, - "sort_by": sort_by, - "limit": limit, - }, + params=params, ), ) @@ -248,16 +242,13 @@ def get_latest_result( else: # The results are fresh enough, retrieve the entire result # pylint: disable=duplicate-code + params = GetExecutionResultsParams( + batch_size, columns, sample_count, filters, sort_by, None + ) results = self._fetch_entire_result( self.get_execution_results( metadata.execution_id, - params={ - "columns": columns, - "sample_count": sample_count, - "filters": filters, - "sort_by": sort_by, - "limit": batch_size, - }, + params=params, ), ) return results @@ -330,12 +321,14 @@ def download_csv( params, query_id = parse_query_object_or_id(query) params = self._build_parameters( - params=params, - columns=columns, - sample_count=sample_count, - filters=filters, - sort_by=sort_by, - limit=batch_size, + params={ + "params": params, + "columns": columns, + "sample_count": sample_count, + "filters": filters, + "sort_by": sort_by, + "limit": batch_size, + } ) if sample_count is None and batch_size is None: params["limit"] = MAX_NUM_ROWS_PER_BATCH diff --git a/dune_client/client_async.py b/dune_client/client_async.py index 0b0419c..8073dd6 100644 --- a/dune_client/client_async.py +++ b/dune_client/client_async.py @@ -9,7 +9,7 @@ import asyncio import ssl from io import BytesIO -from typing import Any, Callable, Dict, Optional, Union +from typing import Any, Callable, Dict, List, NamedTuple, Optional, Union import certifi from aiohttp import ( @@ -40,6 +40,44 @@ from dune_client.query import QueryBase, parse_query_object_or_id +class GetResultParams(NamedTuple): + """ + Parameters for get reult functions + """ + + batch_size: Optional[int] + columns: Optional[List[str]] + sample_count: Optional[int] + filters: Optional[str] + sort_by: Optional[List[str]] + + +class RefreshParams(NamedTuple): + """ + Parameters for refersh functions + """ + + performance: Optional[str] + batch_size: Optional[int] + columns: Optional[List[str]] + sample_count: Optional[int] + filters: Optional[str] + sort_by: Optional[List[str]] + + +class ResultPageParams(NamedTuple): + """ + Parameters for result page functions + """ + + limit: Optional[int] = (None,) + offset: Optional[int] = (None,) + columns: Optional[List[str]] = (None,) + sample_count: Optional[int] = (None,) + filters: Optional[str] = (None,) + sort_by: Optional[List[str]] = (None,) + + class RetryableError(Exception): """ Internal exception used to signal that the request should be retried @@ -232,35 +270,30 @@ async def get_status(self, job_id: str) -> ExecutionStatusResponse: async def get_result( self, job_id: str, - params: Optional[Dict[str, Any]] = None, + params: GetResultParams, ) -> ResultsResponse: """GET results from Dune API for `job_id` (aka `execution_id`)""" - if params is None: - params = {} - batch_size = params.get("batch_size", None) - columns = params.get("columns", None) - sample_count = params.get("sample_count", None) - filters = params.get("filters", None) - sort_by = params.get("sort_by", None) assert ( # We are not sampling - sample_count is None + params.sample_count is None # We are sampling and don't use filters or pagination - or (batch_size is None and filters is None) + or (params.batch_size is None and params.filters is None) ), "sampling cannot be combined with filters or pagination" - if sample_count is None and batch_size is None: - batch_size = MAX_NUM_ROWS_PER_BATCH + if params.sample_count is None and params.batch_size is None: + params.batch_size = MAX_NUM_ROWS_PER_BATCH + params = ResultPageParams( + params.batch_size, + None, + params.columns, + params.sample_count, + params.filters, + params.sort_by, + ) results = await self._get_result_page( job_id, - params={ - "columns": columns, - "sample_count": sample_count, - "filters": filters, - "sort_by": sort_by, - "limit": batch_size, - }, + params=params, ) while results.next_uri is not None: batch = await self._get_result_by_url(results.next_uri) @@ -271,7 +304,7 @@ async def get_result( async def get_result_csv( self, job_id: str, - params: Optional[Dict[str, Any]] = None, + params: GetResultParams, ) -> ExecutionResultCSV: """ GET results in CSV format from Dune API for `job_id` (aka `execution_id`) @@ -280,31 +313,24 @@ async def get_result_csv( use this method for large results where you want lower CPU and memory overhead if you need metadata information use get_results() or get_status() """ - if params is None: - params = {} - batch_size = params.get("batch_size", None) - columns = params.get("columns", None) - sample_count = params.get("sample_count", None) - filters = params.get("filters", None) - sort_by = params.get("sort_by", None) assert ( # We are not sampling - sample_count is None + params.sample_count is None # We are sampling and don't use filters or pagination - or (batch_size is None and filters is None) + or (params.batch_size is None and params.filters is None) ), "sampling cannot be combined with filters or pagination" - if sample_count is None and batch_size is None: - batch_size = MAX_NUM_ROWS_PER_BATCH + if params.sample_count is None and params.batch_size is None: + params.batch_size = MAX_NUM_ROWS_PER_BATCH results = await self._get_result_csv_page( job_id, params={ - "columns": columns, - "sample_count": sample_count, - "filters": filters, - "sort_by": sort_by, - "limit": batch_size, + "columns": params.columns, + "sample_count": params.sample_count, + "filters": params.filters, + "sort_by": params.sort_by, + "limit": params.batch_size, }, ) while results.next_uri is not None: @@ -366,87 +392,73 @@ async def cancel_execution(self, job_id: str) -> bool: async def refresh( self, query: QueryBase, + params: RefreshParams, ping_frequency: int = 5, - params: Optional[Dict[str, Any]] = None, ) -> ResultsResponse: """ Executes a Dune `query`, waits until execution completes, fetches and returns the results. Sleeps `ping_frequency` seconds between each status request. """ - if params is None: - params = {} - performance = params.get("performance", None) - batch_size = params.get("batch_size", None) - columns = params.get("columns", None) - sample_count = params.get("sample_count", None) - filters = params.get("filters", None) - sort_by = params.get("sort_by", None) assert ( # We are not sampling - sample_count is None + params.sample_count is None # We are sampling and don't use filters or pagination - or (batch_size is None and filters is None) + or (params.batch_size is None and params.filters is None) ), "sampling cannot be combined with filters or pagination" job_id = await self._refresh( - query, ping_frequency=ping_frequency, performance=performance + query, ping_frequency=ping_frequency, performance=params.performance + ) + params = GetResultParams( + params.batch_size, + params.columns, + params.sample_count, + params.filters, + params.sort_by, ) return await self.get_result( job_id, - params={ - "columns": columns, - "sample_count": sample_count, - "filters": filters, - "sort_by": sort_by, - "batch_size": batch_size, - }, + params=params, ) async def refresh_csv( self, query: QueryBase, + params: RefreshParams, ping_frequency: int = 5, - params: Optional[Dict[str, Any]] = None, ) -> ExecutionResultCSV: """ Executes a Dune query, waits till execution completes, fetches and the results in CSV format (use it load the data directly in pandas.from_csv() or similar frameworks) """ - if params is None: - params = {} - performance = params.get("performance", None) - batch_size = params.get("batch_size", None) - columns = params.get("columns", None) - sample_count = params.get("sample_count", None) - filters = params.get("filters", None) - sort_by = params.get("sort_by", None) assert ( # We are not sampling - sample_count is None + params.sample_count is None # We are sampling and don't use filters or pagination - or (batch_size is None and filters is None) + or (params.batch_size is None and params.filters is None) ), "sampling cannot be combined with filters or pagination" job_id = await self._refresh( - query, ping_frequency=ping_frequency, performance=performance + query, ping_frequency=ping_frequency, performance=params.performance + ) + params = GetResultParams( + params.batch_size, + params.columns, + params.sample_count, + params.filters, + params.sort_by, ) return await self.get_result_csv( job_id, - params={ - "columns": columns, - "sample_count": sample_count, - "filters": filters, - "sort_by": sort_by, - "batch_size": batch_size, - }, + params=params, ) async def refresh_into_dataframe( self, query: QueryBase, - params: Optional[Dict[str, Any]] = None, + params: RefreshParams, ) -> Any: """ Execute a Dune Query, waits till execution completes, @@ -454,30 +466,23 @@ async def refresh_into_dataframe( This is a convenience method that uses refresh_csv underneath """ - if params is None: - params = {} - performance = params.get("performance", None) - batch_size = params.get("batch_size", None) - columns = params.get("columns", None) - sample_count = params.get("sample_count", None) - filters = params.get("filters", None) - sort_by = params.get("sort_by", None) try: import pandas # pylint: disable=import-outside-toplevel except ImportError as exc: raise ImportError( "dependency failure, pandas is required but missing" ) from exc + params = RefreshParams( + params.performance, + params.batch_size, + params.columns, + params.sample_count, + params.filters, + params.sort_by, + ) results = await self.refresh_csv( query=query, - params={ - "performance": performance, - "columns": columns, - "sample_count": sample_count, - "filters": filters, - "sort_by": sort_by, - "batch_size": batch_size, - }, + params=params, ) return pandas.read_csv(results.data) @@ -488,32 +493,30 @@ async def refresh_into_dataframe( async def _get_result_page( self, job_id: str, - params: Optional[Dict[str, Any]] = None, + params: ResultPageParams, ) -> ResultsResponse: """GET a page of results from Dune API for `job_id` (aka `execution_id`)""" - if params is None: - params = {} - limit = params.get("limit", None) - offset = params.get("offset", None) - columns = params.get("columns", None) - sample_count = params.get("sample_count", None) - filters = params.get("filters", None) - sort_by = params.get("sort_by", None) - if sample_count is None and limit is None and offset is None: - limit = MAX_NUM_ROWS_PER_BATCH - offset = 0 - - params = self._build_parameters( - columns=columns, - sample_count=sample_count, - filters=filters, - sort_by=sort_by, - limit=limit, - offset=offset, + if ( + params.sample_count is None + and params.limit is None + and params.offset is None + ): + params.limit = MAX_NUM_ROWS_PER_BATCH + params.offset = 0 + + build_params = self._build_parameters( + params={ + "columns": params.columns, + "sample_count": params.sample_count, + "filters": params.filters, + "sort_by": params.sort_by, + "limit": params.limit, + "offset": params.offset, + } ) response_json = await self._get( route=f"/execution/{job_id}/results", - params=params, + params=build_params, ) try: @@ -537,32 +540,28 @@ async def _get_result_by_url( raise DuneError(response_json, "ResultsResponse", err) from err async def _get_result_csv_page( - self, - job_id: str, - params: Optional[Dict[str, Any]] = None, + self, job_id: str, params: ResultPageParams ) -> ExecutionResultCSV: """ GET a page of results in CSV format from Dune API for `job_id` (aka `execution_id`) """ - if params is None: - params = {} - limit = params.get("limit", None) - offset = params.get("offset", None) - columns = params.get("columns", None) - sample_count = params.get("sample_count", None) - filters = params.get("filters", None) - sort_by = params.get("sort_by", None) - if sample_count is None and limit is None and offset is None: - limit = MAX_NUM_ROWS_PER_BATCH - offset = 0 + if ( + params.sample_count is None + and params.limit is None + and params.offset is None + ): + params.limit = MAX_NUM_ROWS_PER_BATCH + params.offset = 0 params = self._build_parameters( - columns=columns, - sample_count=sample_count, - filters=filters, - sort_by=sort_by, - limit=limit, - offset=offset, + params={ + "columns": params.columns, + "sample_count": params.sample_count, + "filters": params.filters, + "sort_by": params.sort_by, + "limit": params.limit, + "offset": params.offset, + } ) route = f"/execution/{job_id}/results/csv" From 9d59e0867ad5bac554a1e0d6d9c2fc9d2a9d025d Mon Sep 17 00:00:00 2001 From: apurvabanka Date: Tue, 22 Oct 2024 14:15:10 -0400 Subject: [PATCH 5/9] test cases updated --- dune_client/api/execution.py | 12 +-- dune_client/api/query.py | 2 +- dune_client/api/table.py | 2 +- dune_client/client_async.py | 148 +++++++++++++++++++-------------- tests/e2e/test_async_client.py | 21 +++-- tests/e2e/test_client.py | 15 ++-- 6 files changed, 118 insertions(+), 82 deletions(-) diff --git a/dune_client/api/execution.py b/dune_client/api/execution.py index 0d05fac..2826efc 100644 --- a/dune_client/api/execution.py +++ b/dune_client/api/execution.py @@ -32,12 +32,12 @@ class GetExecutionResultsParams(NamedTuple): Parameters for get execution result functions """ - limit: Optional[int] - columns: Optional[List[str]] - sample_count: Optional[int] - filters: Optional[str] - sort_by: Optional[List[str]] - offset: Optional[int] + limit: Optional[int] = None + columns: Optional[List[str]] = None + sample_count: Optional[int] = None + filters: Optional[str] = None + sort_by: Optional[List[str]] = None + offset: Optional[int] = None class ExecutionAPI(BaseRouter): diff --git a/dune_client/api/query.py b/dune_client/api/query.py index 8792ccc..6edb1fd 100644 --- a/dune_client/api/query.py +++ b/dune_client/api/query.py @@ -54,7 +54,7 @@ def get_query(self, query_id: int) -> DuneQuery: response_json = self._get(route=f"/query/{query_id}") return DuneQuery.from_dict(response_json) - def update_query( # pylint: disable=too-many-arguments + def update_query( self, query_id: int, params: Optional[Dict[str, Any]] = None, diff --git a/dune_client/api/table.py b/dune_client/api/table.py index 239d0fc..e23b217 100644 --- a/dune_client/api/table.py +++ b/dune_client/api/table.py @@ -52,7 +52,7 @@ def upload_csv( except KeyError as err: raise DuneError(response_json, "UploadCsvResponse", err) from err - def create_table( + def create_table( # pylint: disable=too-many-instance-attributes self, namespace: str, table_name: str, diff --git a/dune_client/client_async.py b/dune_client/client_async.py index 8073dd6..e1f0a98 100644 --- a/dune_client/client_async.py +++ b/dune_client/client_async.py @@ -45,24 +45,24 @@ class GetResultParams(NamedTuple): Parameters for get reult functions """ - batch_size: Optional[int] - columns: Optional[List[str]] - sample_count: Optional[int] - filters: Optional[str] - sort_by: Optional[List[str]] + batch_size: Optional[int] = None + columns: Optional[List[str]] = None + sample_count: Optional[int] = None + filters: Optional[str] = None + sort_by: Optional[List[str]] = None class RefreshParams(NamedTuple): """ - Parameters for refersh functions + Parameters for refresh functions """ - performance: Optional[str] - batch_size: Optional[int] - columns: Optional[List[str]] - sample_count: Optional[int] - filters: Optional[str] - sort_by: Optional[List[str]] + performance: Optional[str] = None + batch_size: Optional[int] = None + columns: Optional[List[str]] = None + sample_count: Optional[int] = None + filters: Optional[str] = None + sort_by: Optional[List[str]] = None class ResultPageParams(NamedTuple): @@ -70,12 +70,12 @@ class ResultPageParams(NamedTuple): Parameters for result page functions """ - limit: Optional[int] = (None,) - offset: Optional[int] = (None,) - columns: Optional[List[str]] = (None,) - sample_count: Optional[int] = (None,) - filters: Optional[str] = (None,) - sort_by: Optional[List[str]] = (None,) + limit: Optional[int] = None + offset: Optional[int] = None + columns: Optional[List[str]] = None + sample_count: Optional[int] = None + filters: Optional[str] = None + sort_by: Optional[List[str]] = None class RetryableError(Exception): @@ -270,21 +270,26 @@ async def get_status(self, job_id: str) -> ExecutionStatusResponse: async def get_result( self, job_id: str, - params: GetResultParams, + params: Optional[GetResultParams] = None, ) -> ResultsResponse: """GET results from Dune API for `job_id` (aka `execution_id`)""" + if params is None: + params = GetResultParams() + + batch_size = params.batch_size + assert ( # We are not sampling params.sample_count is None # We are sampling and don't use filters or pagination - or (params.batch_size is None and params.filters is None) + or (batch_size is None and params.filters is None) ), "sampling cannot be combined with filters or pagination" - if params.sample_count is None and params.batch_size is None: - params.batch_size = MAX_NUM_ROWS_PER_BATCH + if params.sample_count is None and batch_size is None: + batch_size = MAX_NUM_ROWS_PER_BATCH - params = ResultPageParams( - params.batch_size, + result_page_params = ResultPageParams( + batch_size, None, params.columns, params.sample_count, @@ -293,7 +298,7 @@ async def get_result( ) results = await self._get_result_page( job_id, - params=params, + params=result_page_params, ) while results.next_uri is not None: batch = await self._get_result_by_url(results.next_uri) @@ -304,7 +309,7 @@ async def get_result( async def get_result_csv( self, job_id: str, - params: GetResultParams, + params: Optional[GetResultParams] = None, ) -> ExecutionResultCSV: """ GET results in CSV format from Dune API for `job_id` (aka `execution_id`) @@ -313,25 +318,32 @@ async def get_result_csv( use this method for large results where you want lower CPU and memory overhead if you need metadata information use get_results() or get_status() """ + if params is None: + params = GetResultParams() + + batch_size = params.batch_size + assert ( # We are not sampling params.sample_count is None # We are sampling and don't use filters or pagination - or (params.batch_size is None and params.filters is None) + or (batch_size is None and params.filters is None) ), "sampling cannot be combined with filters or pagination" - if params.sample_count is None and params.batch_size is None: - params.batch_size = MAX_NUM_ROWS_PER_BATCH + if params.sample_count is None and batch_size is None: + batch_size = MAX_NUM_ROWS_PER_BATCH + params = ResultPageParams( + batch_size, + None, + params.columns, + params.sample_count, + params.filters, + params.sort_by, + ) results = await self._get_result_csv_page( job_id, - params={ - "columns": params.columns, - "sample_count": params.sample_count, - "filters": params.filters, - "sort_by": params.sort_by, - "limit": params.batch_size, - }, + params=params, ) while results.next_uri is not None: batch = await self._get_result_csv_by_url(results.next_uri) @@ -392,7 +404,7 @@ async def cancel_execution(self, job_id: str) -> bool: async def refresh( self, query: QueryBase, - params: RefreshParams, + params: Optional[RefreshParams] = None, ping_frequency: int = 5, ) -> ResultsResponse: """ @@ -400,6 +412,10 @@ async def refresh( fetches and returns the results. Sleeps `ping_frequency` seconds between each status request. """ + + if params is None: + params = RefreshParams() + assert ( # We are not sampling params.sample_count is None @@ -425,7 +441,7 @@ async def refresh( async def refresh_csv( self, query: QueryBase, - params: RefreshParams, + params: Optional[RefreshParams] = None, ping_frequency: int = 5, ) -> ExecutionResultCSV: """ @@ -433,6 +449,9 @@ async def refresh_csv( fetches and the results in CSV format (use it load the data directly in pandas.from_csv() or similar frameworks) """ + if params is None: + params = RefreshParams() + assert ( # We are not sampling params.sample_count is None @@ -443,7 +462,7 @@ async def refresh_csv( job_id = await self._refresh( query, ping_frequency=ping_frequency, performance=params.performance ) - params = GetResultParams( + get_result_params = GetResultParams( params.batch_size, params.columns, params.sample_count, @@ -452,13 +471,13 @@ async def refresh_csv( ) return await self.get_result_csv( job_id, - params=params, + params=get_result_params, ) async def refresh_into_dataframe( self, query: QueryBase, - params: RefreshParams, + params: Optional[RefreshParams] = None, ) -> Any: """ Execute a Dune Query, waits till execution completes, @@ -466,6 +485,9 @@ async def refresh_into_dataframe( This is a convenience method that uses refresh_csv underneath """ + if params is None: + params = RefreshParams() + try: import pandas # pylint: disable=import-outside-toplevel except ImportError as exc: @@ -493,16 +515,18 @@ async def refresh_into_dataframe( async def _get_result_page( self, job_id: str, - params: ResultPageParams, + params: Optional[ResultPageParams] = None, ) -> ResultsResponse: """GET a page of results from Dune API for `job_id` (aka `execution_id`)""" - if ( - params.sample_count is None - and params.limit is None - and params.offset is None - ): - params.limit = MAX_NUM_ROWS_PER_BATCH - params.offset = 0 + if params is None: + params = ResultPageParams() + + limit = params.limit + offset = params.offset + + if params.sample_count is None and limit is None and offset is None: + limit = MAX_NUM_ROWS_PER_BATCH + offset = 0 build_params = self._build_parameters( params={ @@ -510,8 +534,8 @@ async def _get_result_page( "sample_count": params.sample_count, "filters": params.filters, "sort_by": params.sort_by, - "limit": params.limit, - "offset": params.offset, + "limit": limit, + "offset": offset, } ) response_json = await self._get( @@ -540,18 +564,20 @@ async def _get_result_by_url( raise DuneError(response_json, "ResultsResponse", err) from err async def _get_result_csv_page( - self, job_id: str, params: ResultPageParams + self, job_id: str, params: Optional[ResultPageParams] = None ) -> ExecutionResultCSV: """ GET a page of results in CSV format from Dune API for `job_id` (aka `execution_id`) """ - if ( - params.sample_count is None - and params.limit is None - and params.offset is None - ): - params.limit = MAX_NUM_ROWS_PER_BATCH - params.offset = 0 + if params is None: + params = ResultPageParams() + + limit = params.limit + offset = params.offset + + if params.sample_count is None and limit is None and offset is None: + limit = MAX_NUM_ROWS_PER_BATCH + offset = 0 params = self._build_parameters( params={ @@ -559,8 +585,8 @@ async def _get_result_csv_page( "sample_count": params.sample_count, "filters": params.filters, "sort_by": params.sort_by, - "limit": params.limit, - "offset": params.offset, + "limit": limit, + "offset": offset, } ) diff --git a/tests/e2e/test_async_client.py b/tests/e2e/test_async_client.py index 1d9b97f..c7833dc 100644 --- a/tests/e2e/test_async_client.py +++ b/tests/e2e/test_async_client.py @@ -5,7 +5,7 @@ import dotenv import pandas -from dune_client.client_async import AsyncDuneClient +from dune_client.client_async import AsyncDuneClient, RefreshParams from dune_client.query import QueryBase @@ -42,7 +42,10 @@ async def test_refresh_with_pagination(self): # Arrange async with AsyncDuneClient(self.valid_api_key) as cl: # Act - results = (await cl.refresh(self.multi_rows_query, batch_size=1)).get_rows() + params = RefreshParams(batch_size=1) + results = ( + await cl.refresh(self.multi_rows_query, params=params) + ).get_rows() # Assert self.assertEqual( @@ -60,8 +63,9 @@ async def test_refresh_with_filters(self): # Arrange async with AsyncDuneClient(self.valid_api_key) as cl: # Act + params = RefreshParams(filters="number < 3") results = ( - await cl.refresh(self.multi_rows_query, filters="number < 3") + await cl.refresh(self.multi_rows_query, params=params) ).get_rows() # Assert @@ -77,7 +81,8 @@ async def test_refresh_csv_with_pagination(self): # Arrange async with AsyncDuneClient(self.valid_api_key) as cl: # Act - result_csv = await cl.refresh_csv(self.multi_rows_query, batch_size=1) + params = RefreshParams(batch_size=1) + result_csv = await cl.refresh_csv(self.multi_rows_query, params=params) # Assert self.assertEqual( @@ -95,9 +100,8 @@ async def test_refresh_csv_with_filters(self): # Arrange async with AsyncDuneClient(self.valid_api_key) as cl: # Act - result_csv = await cl.refresh_csv( - self.multi_rows_query, filters="number < 3" - ) + params = RefreshParams(filters="number < 3") + result_csv = await cl.refresh_csv(self.multi_rows_query, params=params) # Assert self.assertEqual( @@ -111,7 +115,8 @@ async def test_refresh_csv_with_filters(self): @unittest.skip("Large performance tier doesn't currently work.") async def test_refresh_context_manager_performance_large(self): async with AsyncDuneClient(self.valid_api_key) as cl: - results = (await cl.refresh(self.query, performance="large")).get_rows() + params = RefreshParams(performance="large") + results = (await cl.refresh(self.query, params=params)).get_rows() self.assertGreater(len(results), 0) async def test_get_latest_result_with_query_object(self): diff --git a/tests/e2e/test_client.py b/tests/e2e/test_client.py index 799c0c1..b66c329 100644 --- a/tests/e2e/test_client.py +++ b/tests/e2e/test_client.py @@ -78,7 +78,8 @@ def test_run_query_paginated(self): dune = DuneClient(self.valid_api_key) # Act - results = dune.run_query(self.multi_rows_query, batch_size=1).get_rows() + params = {"batch_size": 1} + results = dune.run_query(self.multi_rows_query, params=params).get_rows() # Assert self.assertEqual( @@ -97,7 +98,8 @@ def test_run_query_with_filters(self): dune = DuneClient(self.valid_api_key) # Act - results = dune.run_query(self.multi_rows_query, filters="number < 3").get_rows() + params = {"filters": "number < 3"} + results = dune.run_query(self.multi_rows_query, params=params).get_rows() # Assert self.assertEqual( @@ -110,7 +112,8 @@ def test_run_query_with_filters(self): def test_run_query_performance_large(self): dune = DuneClient(self.valid_api_key) - results = dune.run_query(self.query, performance="large").get_rows() + params = {"performace": "large"} + results = dune.run_query(self.query, params=params).get_rows() self.assertGreater(len(results), 0) def test_run_query_dataframe(self): @@ -328,7 +331,8 @@ def test_download_csv_with_pagination(self): client.run_query(self.multi_rows_query) # Act - result_csv = client.download_csv(self.multi_rows_query.query_id, batch_size=1) + params = {"batch_size": 1} + result_csv = client.download_csv(self.multi_rows_query.query_id, params=params) # Assert self.assertEqual( @@ -348,9 +352,10 @@ def test_download_csv_with_filters(self): client.run_query(self.multi_rows_query) # Act + params = {"filters": "number < 3"} result_csv = client.download_csv( self.multi_rows_query.query_id, - filters="number < 3", + params=params, ) # Assert From c168c6029fba5a9bced6cc0b34b12a8ea9d85bea Mon Sep 17 00:00:00 2001 From: apurvabanka Date: Thu, 7 Nov 2024 02:01:29 -0500 Subject: [PATCH 6/9] requested changes --- .pylintrc | 1 + dune_client/api/custom.py | 37 +++-- dune_client/api/extensions.py | 258 ++++++++++++++++++---------------- dune_client/api/query.py | 40 +++--- tests/e2e/test_client.py | 11 +- 5 files changed, 181 insertions(+), 166 deletions(-) diff --git a/.pylintrc b/.pylintrc index 7130db2..cebc305 100644 --- a/.pylintrc +++ b/.pylintrc @@ -2,3 +2,4 @@ disable=fixme,logging-fstring-interpolation [DESIGN] max-args=10 +max-attributes=6 diff --git a/dune_client/api/custom.py b/dune_client/api/custom.py index 3b6cece..4b41f5c 100644 --- a/dune_client/api/custom.py +++ b/dune_client/api/custom.py @@ -4,7 +4,7 @@ """ from __future__ import annotations -from typing import Dict, Optional, Any +from typing import List, NamedTuple, Optional from dune_client.api.base import BaseRouter from dune_client.models import ( @@ -13,6 +13,19 @@ ) +class CustomAPIParams(NamedTuple): + """ + Params for Custom Endpoint API Function + """ + + limit: Optional[int] = None + offset: Optional[int] = None + columns: Optional[List[str]] = None + sample_count: Optional[int] = None + filters: Optional[str] = None + sort_by: Optional[List[str]] = None + + # pylint: disable=duplicate-code class CustomEndpointAPI(BaseRouter): """ @@ -25,7 +38,7 @@ def get_custom_endpoint_result( self, handle: str, endpoint: str, - params: Optional[Dict[str, Any]] = None, + params: Optional[CustomAPIParams] = None, ) -> ResultsResponse: """ Custom endpoints allow you to fetch and filter data from any @@ -44,27 +57,11 @@ def get_custom_endpoint_result( sort_by (List[str], optional): The columns to sort by. """ if params is None: - params = {} - limit = params.get("limit", None) - offset = params.get("offset", None) - columns = params.get("columns", None) - sample_count = params.get("sample_counts", None) - filters = params.get("filters", None) - sort_by = params.get("sort_by", None) + params = CustomAPIParams() - build_params = self._build_parameters( - params={ - "columns": columns, - "sample_count": sample_count, - "filters": filters, - "sort_by": sort_by, - "limit": limit, - "offset": offset, - } - ) response_json = self._get( route=f"/endpoints/{handle}/{endpoint}/results", - params=build_params, + params=params.__dict__, ) try: return ResultsResponse.from_dict(response_json) diff --git a/dune_client/api/extensions.py b/dune_client/api/extensions.py index 1b8f28b..6d9ccdf 100644 --- a/dune_client/api/extensions.py +++ b/dune_client/api/extensions.py @@ -8,7 +8,7 @@ import time from io import BytesIO -from typing import Any, Optional, Union, Dict +from typing import Any, List, NamedTuple, Optional, Union from deprecated import deprecated @@ -31,6 +31,7 @@ from dune_client.query import QueryBase, parse_query_object_or_id # from dune_client.types import QueryParameter +from dune_client.types import QueryParameter from dune_client.util import age_in_hours # This is the expiry time on old query results. @@ -39,6 +40,34 @@ POLL_FREQUENCY_SECONDS = 1 +class RunQueryParams(NamedTuple): + "Params for run query function" + performance: Optional[str] = None + batch_size: Optional[int] = None + columns: Optional[List[str]] = None + sample_count: Optional[int] = None + filters: Optional[str] = None + sort_by: Optional[List[str]] = None + + +class GetLatestResultParams(NamedTuple): + "Params for get latest functions" + batch_size: Optional[int] = None + columns: Optional[List[str]] = None + sample_count: Optional[int] = None + filters: Optional[str] = None + sort_by: Optional[List[str]] = None + + +class RunSQLParams(NamedTuple): + "Params for Run SQL function" + query_params: Optional[list[QueryParameter]] = None + is_private: bool = True + archive_after: bool = True + performance: Optional[str] = None + ping_frequency: int = POLL_FREQUENCY_SECONDS + + class ExtendedAPI(ExecutionAPI, QueryAPI, TableAPI, CustomEndpointAPI): """ Provides higher level helper methods for faster @@ -50,7 +79,7 @@ def run_query( query: QueryBase, ping_frequency: int = POLL_FREQUENCY_SECONDS, allow_partial_results: str = "true", - params: Optional[Dict[str, Any]] = None, + params: Optional[RunQueryParams] = None, ) -> ResultsResponse: """ Executes a Dune `query`, waits until execution completes, @@ -59,29 +88,29 @@ def run_query( """ # Ensure we don't specify parameters that are incompatible: if params is None: - params = {} - performance = params.get("performance", None) - batch_size = params.get("batch_size", None) - columns = params.get("columns", None) - sample_count = params.get("sample_count", None) - filters = params.get("filters", None) - sort_by = params.get("sort_by", None) + params = RunQueryParams() + assert ( # We are not sampling - sample_count is None + params.sample_count is None # We are sampling and don't use filters or pagination - or (batch_size is None and filters is None) + or (params.batch_size is None and params.filters is None) ), "sampling cannot be combined with filters or pagination" - if sample_count is not None: + if params.sample_count is not None: limit = None else: - limit = batch_size or MAX_NUM_ROWS_PER_BATCH + limit = params.batch_size or MAX_NUM_ROWS_PER_BATCH # pylint: disable=duplicate-code - job_id = self._refresh(query, ping_frequency, performance) + job_id = self._refresh(query, ping_frequency, params.performance) params = GetExecutionResultsParams( - limit, columns, sample_count, filters, sort_by, None + limit, + params.columns, + params.sample_count, + params.filters, + params.sort_by, + None, ) return self._fetch_entire_result( self.get_execution_results( @@ -95,7 +124,7 @@ def run_query_csv( self, query: QueryBase, ping_frequency: int = POLL_FREQUENCY_SECONDS, - params: Optional[Dict[str, Any]] = None, + params: Optional[RunQueryParams] = None, ) -> ExecutionResultCSV: """ Executes a Dune query, waits till execution completes, @@ -104,29 +133,28 @@ def run_query_csv( """ # Ensure we don't specify parameters that are incompatible: if params is None: - params = {} - performance = params.get("performance", None) - batch_size = params.get("batch_size", None) - columns = params.get("columns", None) - sample_count = params.get("sample_count", None) - filters = params.get("filters", None) - sort_by = params.get("sort_by", None) + params = RunQueryParams() assert ( # We are not sampling - sample_count is None + params.sample_count is None # We are sampling and don't use filters or pagination - or (batch_size is None and filters is None) + or (params.batch_size is None and params.filters is None) ), "sampling cannot be combined with filters or pagination" - if sample_count is not None: + if params.sample_count is not None: limit = None else: - limit = batch_size or MAX_NUM_ROWS_PER_BATCH + limit = params.batch_size or MAX_NUM_ROWS_PER_BATCH # pylint: disable=duplicate-code - job_id = self._refresh(query, ping_frequency, performance) + job_id = self._refresh(query, ping_frequency, params.performance) params = GetExecutionResultsParams( - limit, columns, sample_count, filters, sort_by, None + limit, + params.columns, + params.sample_count, + params.filters, + params.sort_by, + None, ) return self._fetch_entire_result_csv( self.get_execution_results_csv( @@ -139,7 +167,7 @@ def run_query_dataframe( self, query: QueryBase, ping_frequency: int = POLL_FREQUENCY_SECONDS, - params: Optional[Dict[str, Any]] = None, + params: Optional[RunQueryParams] = None, ) -> Any: """ Execute a Dune Query, waits till execution completes, @@ -148,13 +176,7 @@ def run_query_dataframe( This is a convenience method that uses run_query_csv() + pandas.read_csv() underneath """ if params is None: - params = {} - performance = params.get("performance", None) - batch_size = params.get("batch_size", None) - columns = params.get("columns", None) - sample_count = params.get("sample_count", None) - filters = params.get("filters", None) - sort_by = params.get("sort_by", None) + params = RunQueryParams() try: import pandas # pylint: disable=import-outside-toplevel @@ -162,17 +184,18 @@ def run_query_dataframe( raise ImportError( "dependency failure, pandas is required but missing" ) from exc + params = RunQueryParams( + params.performance, + params.batch_size, + params.columns, + params.sample_count, + params.filters, + params.sort_by, + ) data = self.run_query_csv( query, ping_frequency, - params={ - "performance": performance, - "batch_size": batch_size, - "columns": columns, - "sample_count": sample_count, - "filters": filters, - "sort_by": sort_by, - }, + params=params, ).data return pandas.read_csv(data) @@ -180,7 +203,7 @@ def get_latest_result( self, query: Union[QueryBase, str, int], max_age_hours: int = THREE_MONTHS_IN_HOURS, - params: Optional[Dict[str, Any]] = None, + params: Optional[GetLatestResultParams] = None, ) -> ResultsResponse: """ GET the latest results for a query_id without re-executing the query @@ -192,32 +215,29 @@ def get_latest_result( """ # Ensure we don't specify parameters that are incompatible: if params is None: - params = {} - batch_size = params.get("batch_size", None) - columns = params.get("columns", None) - sample_count = params.get("sample_count", None) - filters = params.get("filters", None) - sort_by = params.get("sort_by", None) + params = GetLatestResultParams() + + batch_size = params.batch_size assert ( # We are not sampling - sample_count is None + params.sample_count is None # We are sampling and don't use filters or pagination - or (batch_size is None and filters is None) + or (batch_size is None and params.filters is None) ), "sampling cannot be combined with filters or pagination" - params, query_id = parse_query_object_or_id(query) + get_params, query_id = parse_query_object_or_id(query) # Only fetch 1 row to get metadata first to determine if the result is fresh enough - if params is None: - params = {} - params["limit"] = 1 + if get_params is None: + get_params = {} + get_params["limit"] = 1 response_json = self._get( route=f"/query/{query_id}/results", - params=params, + params=get_params, ) try: - if sample_count is None and batch_size is None: + if params.sample_count is None and batch_size is None: batch_size = MAX_NUM_ROWS_PER_BATCH metadata = ResultsResponse.from_dict(response_json) last_run = metadata.times.execution_ended_at @@ -227,23 +247,30 @@ def get_latest_result( logging.info( f"results (from {last_run}) older than {max_age_hours} hours, re-running query" ) + params = RunQueryParams( + None, + batch_size, + params.columns, + params.sample_count, + params.filters, + params.sort_by, + ) results = self.run_query( query=( query if isinstance(query, QueryBase) else QueryBase(query_id) ), - params={ - "columns": columns, - "sample_count": sample_count, - "filters": filters, - "sort_by": sort_by, - "batch_size": batch_size, - }, + params=params, ) else: # The results are fresh enough, retrieve the entire result # pylint: disable=duplicate-code params = GetExecutionResultsParams( - batch_size, columns, sample_count, filters, sort_by, None + batch_size, + params.columns, + params.sample_count, + params.filters, + params.sort_by, + None, ) results = self._fetch_entire_result( self.get_execution_results( @@ -258,7 +285,7 @@ def get_latest_result( def get_latest_result_dataframe( self, query: Union[QueryBase, str, int], - params: Optional[Dict[str, Any]] = None, + params: Optional[GetLatestResultParams] = None, ) -> Any: """ GET the latest results for a query_id without re-executing the query @@ -268,35 +295,30 @@ def get_latest_result_dataframe( This is a convenience method that uses get_latest_result() + pandas.read_csv() underneath """ if params is None: - params = {} - batch_size = params.get("batch_size", None) - columns = params.get("columns", None) - sample_count = params.get("sample_count", None) - filters = params.get("filters", None) - sort_by = params.get("sort_by", None) + params = GetLatestResultParams() try: import pandas # pylint: disable=import-outside-toplevel except ImportError as exc: raise ImportError( "dependency failure, pandas is required but missing" ) from exc - + params = GetLatestResultParams( + params.batch_size, + params.columns, + params.sample_count, + params.filters, + params.sort_by, + ) results = self.download_csv( query, - params={ - "columns": columns, - "sample_count": sample_count, - "filters": filters, - "sort_by": sort_by, - "batch_size": batch_size, - }, + params=params, ) return pandas.read_csv(results.data) def download_csv( self, query: Union[QueryBase, str, int], - params: Optional[Dict[str, Any]] = None, + params: Optional[GetLatestResultParams] = None, ) -> ExecutionResultCSV: """ Almost like an alias for `get_latest_result` but for the csv endpoint. @@ -304,37 +326,32 @@ def download_csv( """ # Ensure we don't specify parameters that are incompatible: if params is None: - params = {} - batch_size = params.get("batch_size", None) - columns = params.get("columns", None) - sample_count = params.get("sample_count", None) - filters = params.get("filters", None) - sort_by = params.get("sort_by", None) + params = GetLatestResultParams() assert ( # We are not sampling - sample_count is None + params.sample_count is None # We are sampling and don't use filters or pagination - or (batch_size is None and filters is None) + or (params.batch_size is None and params.filters is None) ), "sampling cannot be combined with filters or pagination" - params, query_id = parse_query_object_or_id(query) + get_params, query_id = parse_query_object_or_id(query) - params = self._build_parameters( + get_params = self._build_parameters( params={ - "params": params, - "columns": columns, - "sample_count": sample_count, - "filters": filters, - "sort_by": sort_by, - "limit": batch_size, + "params": get_params, + "columns": params.columns, + "sample_count": params.sample_count, + "filters": params.filters, + "sort_by": params.sort_by, + "limit": params.batch_size, } ) - if sample_count is None and batch_size is None: - params["limit"] = MAX_NUM_ROWS_PER_BATCH + if params.sample_count is None and params.batch_size is None: + get_params["limit"] = MAX_NUM_ROWS_PER_BATCH response = self._get( - route=f"/query/{query_id}/results/csv", params=params, raw=True + route=f"/query/{query_id}/results/csv", params=get_params, raw=True ) response.raise_for_status() @@ -356,8 +373,7 @@ def run_sql( self, query_sql: str, name: str = "API Query", - ping_frequency: int = POLL_FREQUENCY_SECONDS, - params: Optional[Dict[str, Any]] = None, + params: Optional[RunSQLParams] = None, ) -> ResultsResponse: """ Allows user to provide execute raw_sql via the CRUD interface @@ -366,23 +382,20 @@ def run_sql( Requires Plus subscription! """ if params is None: - params = None + params = RunSQLParams() - query_params = params.get("query_params", None) - is_private = params.get("is_private", None) - archive_after = params.get("archive_after", None) - performance = params.get("performance", None) - query = self.create_query(name, query_sql, query_params, is_private) + query = self.create_query( + name, query_sql, params.query_params, params.is_private + ) + run_query_params = RunQueryParams(params.performance) try: results = self.run_query( query=query.base, - ping_frequency=ping_frequency, - params={ - "performance": performance, - }, + ping_frequency=params.ping_frequency, + params=run_query_params, ) finally: - if archive_after: + if params.archive_after: self.archive_query(query.base.query_id) return results @@ -401,10 +414,11 @@ def refresh( fetches and returns the results. Sleeps `ping_frequency` seconds between each status request. """ + params = RunQueryParams(performance) return self.run_query( query=query, ping_frequency=ping_frequency, - params={"performance": performance}, + params=params, ) @deprecated(version="1.2.1", reason="Please use run_query_csv") @@ -419,9 +433,8 @@ def refresh_csv( fetches and the results in CSV format (use it load the data directly in pandas.from_csv() or similar frameworks) """ - return self.run_query_csv( - query, ping_frequency, params={"performance": performance} - ) + params = RunQueryParams(performance) + return self.run_query_csv(query, ping_frequency, params=params) @deprecated(version="1.2.1", reason="Please use run_query_dataframe") def refresh_into_dataframe( @@ -436,9 +449,8 @@ def refresh_into_dataframe( This is a convenience method that uses refresh_csv underneath """ - return self.run_query_dataframe( - query, ping_frequency, params={"performance": performance} - ) + params = RunQueryParams(performance=performance) + return self.run_query_dataframe(query, ping_frequency, params=params) ################# # Private Methods diff --git a/dune_client/api/query.py b/dune_client/api/query.py index 6edb1fd..a83aeeb 100644 --- a/dune_client/api/query.py +++ b/dune_client/api/query.py @@ -6,7 +6,7 @@ """ from __future__ import annotations -from typing import Optional, Any, Dict +from typing import NamedTuple, Optional, Any from dune_client.api.base import BaseRouter from dune_client.models import DuneError @@ -14,6 +14,15 @@ from dune_client.types import QueryParameter +class UpdateQueryParams(NamedTuple): + "Params for Update Query function" + name: Optional[str] = None + query_sql: Optional[str] = None + params: Optional[list[QueryParameter]] = None + description: Optional[str] = None + tags: Optional[list[str]] = None + + class QueryAPI(BaseRouter): """ Implementation of Query API (aka CRUD) Operations - Plus subscription only @@ -57,7 +66,7 @@ def get_query(self, query_id: int) -> DuneQuery: def update_query( self, query_id: int, - params: Optional[Dict[str, Any]] = None, + params: Optional[UpdateQueryParams] = None, ) -> int: """ Updates Dune Query by ID @@ -69,24 +78,19 @@ def update_query( they will be deleted from the query. """ if params is None: - params = {} - name = params.get("name", None) - query_sql = params.get("query_sql", None) - query_parms = params.get("parameters", None) - description = params.get("description", None) - tags = params.get("tags", None) + params = UpdateQueryParams() parameters: dict[str, Any] = {} - if name is not None: - parameters["name"] = name - if description is not None: - parameters["description"] = description - if tags is not None: - parameters["tags"] = tags - if query_sql is not None: - parameters["query_sql"] = query_sql - if query_parms is not None: - parameters["parameters"] = [p.to_dict() for p in query_parms] + if params.name is not None: + parameters["name"] = params.name + if params.description is not None: + parameters["description"] = params.description + if params.tags is not None: + parameters["tags"] = params.tags + if params.query_sql is not None: + parameters["query_sql"] = params.query_sql + if params.query_parms is not None: + parameters["parameters"] = [p.to_dict() for p in params.query_parms] if not bool(parameters): # Nothing to change no need to make reqeust diff --git a/tests/e2e/test_client.py b/tests/e2e/test_client.py index b66c329..99d2342 100644 --- a/tests/e2e/test_client.py +++ b/tests/e2e/test_client.py @@ -6,6 +6,7 @@ import dotenv import pandas +from dune_client.api.extensions import GetLatestResultParams, RunQueryParams from dune_client.models import ( ExecutionState, ExecutionResponse, @@ -78,7 +79,7 @@ def test_run_query_paginated(self): dune = DuneClient(self.valid_api_key) # Act - params = {"batch_size": 1} + params = RunQueryParams(batch_size=1) results = dune.run_query(self.multi_rows_query, params=params).get_rows() # Assert @@ -98,7 +99,7 @@ def test_run_query_with_filters(self): dune = DuneClient(self.valid_api_key) # Act - params = {"filters": "number < 3"} + params = RunQueryParams(filters="number < 3") results = dune.run_query(self.multi_rows_query, params=params).get_rows() # Assert @@ -112,7 +113,7 @@ def test_run_query_with_filters(self): def test_run_query_performance_large(self): dune = DuneClient(self.valid_api_key) - params = {"performace": "large"} + params = RunQueryParams(performance="large") results = dune.run_query(self.query, params=params).get_rows() self.assertGreater(len(results), 0) @@ -331,7 +332,7 @@ def test_download_csv_with_pagination(self): client.run_query(self.multi_rows_query) # Act - params = {"batch_size": 1} + params = GetLatestResultParams(batch_size=1) result_csv = client.download_csv(self.multi_rows_query.query_id, params=params) # Assert @@ -352,7 +353,7 @@ def test_download_csv_with_filters(self): client.run_query(self.multi_rows_query) # Act - params = {"filters": "number < 3"} + params = GetLatestResultParams(filters="number < 3") result_csv = client.download_csv( self.multi_rows_query.query_id, params=params, From a919f61238957d43693c604a8095b0cf3fd956c9 Mon Sep 17 00:00:00 2001 From: apurvabanka Date: Thu, 7 Nov 2024 02:47:08 -0500 Subject: [PATCH 7/9] removing build params --- dune_client/api/base.py | 48 ------------------ dune_client/api/execution.py | 31 ++---------- dune_client/api/extensions.py | 90 +++++++++++++++------------------ dune_client/client_async.py | 93 +++++++++++++++-------------------- 4 files changed, 83 insertions(+), 179 deletions(-) diff --git a/dune_client/api/base.py b/dune_client/api/base.py index fe30d47..bbefbb3 100644 --- a/dune_client/api/base.py +++ b/dune_client/api/base.py @@ -83,54 +83,6 @@ def default_headers(self) -> Dict[str, str]: "User-Agent": f"dune-client/{client_version} (https://pypi.org/project/dune-client/)", } - ############ - # Utilities: - ############ - - def _build_parameters( - self, - allow_partial_results: str = "true", - params: Optional[Dict[str, Any]] = None, - ) -> Dict[str, Union[str, int]]: - """ - Utility function that builds a dictionary of parameters to be used - when retrieving advanced results (filters, pagination, sorting, etc.). - This is shared between the sync and async client. - """ - # Ensure we don't specify parameters that are incompatible: - if params is None: - params = {} - parameters = params.get("params", None) - columns = params.get("columns", None) - sample_count = params.get("sample_count", None) - filters = params.get("filters", None) - sort_by = params.get("sort_by", None) - limit = params.get("limit", None) - offset = params.get("offset", None) - assert ( - # We are not sampling - sample_count is None - # We are sampling and don't use filters or pagination - or (limit is None and offset is None and filters is None) - ), "sampling cannot be combined with filters or pagination" - - parameters = parameters or {} - parameters["allow_partial_results"] = allow_partial_results - if columns is not None and len(columns) > 0: - parameters["columns"] = ",".join(columns) - if sample_count is not None: - parameters["sample_count"] = sample_count - if filters is not None: - parameters["filters"] = filters - if sort_by is not None and len(sort_by) > 0: - parameters["sort_by"] = ",".join(sort_by) - if limit is not None: - parameters["limit"] = limit - if offset is not None: - parameters["offset"] = offset - - return parameters - class BaseRouter(BaseDuneClient): """Extending the Base Client with elementary api routing""" diff --git a/dune_client/api/execution.py b/dune_client/api/execution.py index 2826efc..9154d42 100644 --- a/dune_client/api/execution.py +++ b/dune_client/api/execution.py @@ -38,6 +38,7 @@ class GetExecutionResultsParams(NamedTuple): filters: Optional[str] = None sort_by: Optional[List[str]] = None offset: Optional[int] = None + allow_partial_results: str = "true" class ExecutionAPI(BaseRouter): @@ -89,26 +90,12 @@ def get_execution_results( self, job_id: str, params: Optional[GetExecutionResultsParams] = None, - allow_partial_results: str = "true", ) -> ResultsResponse: """GET results from Dune API for `job_id` (aka `execution_id`)""" - build_params = None - if params is not None: - build_params = self._build_parameters( - allow_partial_results=allow_partial_results, - params={ - "columns": params.columns, - "sample_count": params.sample_count, - "filters": params.filters, - "sort_by": params.sort_by, - "limit": params.limit, - "offset": params.offset, - }, - ) route = f"/execution/{job_id}/results" url = self._route_url(route) - return self._get_execution_results_by_url(url=url, params=build_params) + return self._get_execution_results_by_url(url=url, params=params._asdict()) def get_execution_results_csv( self, job_id: str, params: Optional[GetExecutionResultsParams] = None @@ -120,22 +107,10 @@ def get_execution_results_csv( use this method for large results where you want lower CPU and memory overhead if you need metadata information use get_results() or get_status() """ - build_params = None - if params is not None: - build_params = self._build_parameters( - params={ - "columns": params.columns, - "sample_count": params.sample_count, - "filters": params.filters, - "sort_by": params.sort_by, - "limit": params.limit, - "offset": params.offset, - } - ) route = f"/execution/{job_id}/results/csv" url = self._route_url(route) - return self._get_execution_results_csv_by_url(url=url, params=build_params) + return self._get_execution_results_csv_by_url(url=url, params=params._asdict()) def _get_execution_results_by_url( self, url: str, params: Optional[Dict[str, Any]] = None diff --git a/dune_client/api/extensions.py b/dune_client/api/extensions.py index 6d9ccdf..f448b98 100644 --- a/dune_client/api/extensions.py +++ b/dune_client/api/extensions.py @@ -105,17 +105,16 @@ def run_query( # pylint: disable=duplicate-code job_id = self._refresh(query, ping_frequency, params.performance) params = GetExecutionResultsParams( - limit, - params.columns, - params.sample_count, - params.filters, - params.sort_by, - None, + limti=limit, + compile=params.columns, + sample_count=params.sample_count, + filters=params.filters, + sort_by=params.sort_by, + allow_partial_results=allow_partial_results, ) return self._fetch_entire_result( self.get_execution_results( job_id, - allow_partial_results=allow_partial_results, params=params, ), ) @@ -149,12 +148,11 @@ def run_query_csv( # pylint: disable=duplicate-code job_id = self._refresh(query, ping_frequency, params.performance) params = GetExecutionResultsParams( - limit, - params.columns, - params.sample_count, - params.filters, - params.sort_by, - None, + limit=limit, + columns=params.columns, + sample_count=params.sample_count, + filters=params.filters, + sort_by=params.sort_by, ) return self._fetch_entire_result_csv( self.get_execution_results_csv( @@ -185,12 +183,12 @@ def run_query_dataframe( "dependency failure, pandas is required but missing" ) from exc params = RunQueryParams( - params.performance, - params.batch_size, - params.columns, - params.sample_count, - params.filters, - params.sort_by, + performance=params.performance, + batch_size=params.batch_size, + columns=params.columns, + sample_count=params.sample_count, + filters=params.filters, + sort_by=params.sort_by, ) data = self.run_query_csv( query, @@ -248,12 +246,11 @@ def get_latest_result( f"results (from {last_run}) older than {max_age_hours} hours, re-running query" ) params = RunQueryParams( - None, - batch_size, - params.columns, - params.sample_count, - params.filters, - params.sort_by, + batch_size=batch_size, + columns=params.columns, + sample_count=params.sample_count, + filters=params.filters, + sort_by=params.sort_by, ) results = self.run_query( query=( @@ -265,12 +262,11 @@ def get_latest_result( # The results are fresh enough, retrieve the entire result # pylint: disable=duplicate-code params = GetExecutionResultsParams( - batch_size, - params.columns, - params.sample_count, - params.filters, - params.sort_by, - None, + batch_size=batch_size, + columns=params.columns, + sample_count=params.sample_count, + filters=params.filters, + sort_by=params.sort_by, ) results = self._fetch_entire_result( self.get_execution_results( @@ -303,11 +299,11 @@ def get_latest_result_dataframe( "dependency failure, pandas is required but missing" ) from exc params = GetLatestResultParams( - params.batch_size, - params.columns, - params.sample_count, - params.filters, - params.sort_by, + batch_size=params.batch_size, + columns=params.columns, + sample_count=params.sample_count, + filters=params.filters, + sort_by=params.sort_by, ) results = self.download_csv( query, @@ -337,21 +333,15 @@ def download_csv( get_params, query_id = parse_query_object_or_id(query) - get_params = self._build_parameters( - params={ - "params": get_params, - "columns": params.columns, - "sample_count": params.sample_count, - "filters": params.filters, - "sort_by": params.sort_by, - "limit": params.batch_size, - } - ) + params = params._asdict() + + params["params"] = get_params + if params.sample_count is None and params.batch_size is None: get_params["limit"] = MAX_NUM_ROWS_PER_BATCH response = self._get( - route=f"/query/{query_id}/results/csv", params=get_params, raw=True + route=f"/query/{query_id}/results/csv", params=params, raw=True ) response.raise_for_status() @@ -387,7 +377,7 @@ def run_sql( query = self.create_query( name, query_sql, params.query_params, params.is_private ) - run_query_params = RunQueryParams(params.performance) + run_query_params = RunQueryParams(performance=params.performance) try: results = self.run_query( query=query.base, @@ -414,7 +404,7 @@ def refresh( fetches and returns the results. Sleeps `ping_frequency` seconds between each status request. """ - params = RunQueryParams(performance) + params = RunQueryParams(performance=performance) return self.run_query( query=query, ping_frequency=ping_frequency, @@ -433,7 +423,7 @@ def refresh_csv( fetches and the results in CSV format (use it load the data directly in pandas.from_csv() or similar frameworks) """ - params = RunQueryParams(performance) + params = RunQueryParams(performance=performance) return self.run_query_csv(query, ping_frequency, params=params) @deprecated(version="1.2.1", reason="Please use run_query_dataframe") diff --git a/dune_client/client_async.py b/dune_client/client_async.py index e1f0a98..ad1f1e4 100644 --- a/dune_client/client_async.py +++ b/dune_client/client_async.py @@ -45,11 +45,11 @@ class GetResultParams(NamedTuple): Parameters for get reult functions """ - batch_size: Optional[int] = None columns: Optional[List[str]] = None sample_count: Optional[int] = None filters: Optional[str] = None sort_by: Optional[List[str]] = None + batch_size: Optional[int] = None class RefreshParams(NamedTuple): @@ -59,10 +59,10 @@ class RefreshParams(NamedTuple): performance: Optional[str] = None batch_size: Optional[int] = None - columns: Optional[List[str]] = None sample_count: Optional[int] = None filters: Optional[str] = None sort_by: Optional[List[str]] = None + columns: Optional[List[str]] = None class ResultPageParams(NamedTuple): @@ -70,12 +70,12 @@ class ResultPageParams(NamedTuple): Parameters for result page functions """ - limit: Optional[int] = None - offset: Optional[int] = None columns: Optional[List[str]] = None sample_count: Optional[int] = None filters: Optional[str] = None sort_by: Optional[List[str]] = None + limit: Optional[int] = None + offset: Optional[int] = None class RetryableError(Exception): @@ -289,12 +289,11 @@ async def get_result( batch_size = MAX_NUM_ROWS_PER_BATCH result_page_params = ResultPageParams( - batch_size, - None, - params.columns, - params.sample_count, - params.filters, - params.sort_by, + batch_size=batch_size, + columns=params.columns, + sample_count=params.sample_count, + filters=params.filters, + sort_by=params.sort_by, ) results = await self._get_result_page( job_id, @@ -334,12 +333,11 @@ async def get_result_csv( batch_size = MAX_NUM_ROWS_PER_BATCH params = ResultPageParams( - batch_size, - None, - params.columns, - params.sample_count, - params.filters, - params.sort_by, + batch_size=batch_size, + columns=params.columns, + sample_count=params.sample_count, + filters=params.filters, + sort_by=params.sort_by, ) results = await self._get_result_csv_page( job_id, @@ -427,11 +425,11 @@ async def refresh( query, ping_frequency=ping_frequency, performance=params.performance ) params = GetResultParams( - params.batch_size, - params.columns, - params.sample_count, - params.filters, - params.sort_by, + batch_size=params.batch_size, + columns=params.columns, + sample_count=params.sample_count, + filters=params.filters, + sort_by=params.sort_by, ) return await self.get_result( job_id, @@ -463,11 +461,11 @@ async def refresh_csv( query, ping_frequency=ping_frequency, performance=params.performance ) get_result_params = GetResultParams( - params.batch_size, - params.columns, - params.sample_count, - params.filters, - params.sort_by, + batch_size=params.batch_size, + columns=params.columns, + sample_count=params.sample_count, + filters=params.filters, + sort_by=params.sort_by, ) return await self.get_result_csv( job_id, @@ -495,12 +493,12 @@ async def refresh_into_dataframe( "dependency failure, pandas is required but missing" ) from exc params = RefreshParams( - params.performance, - params.batch_size, - params.columns, - params.sample_count, - params.filters, - params.sort_by, + performance=params.performance, + batch_size=params.batch_size, + columns=params.columns, + sample_count=params.sample_count, + filters=params.filters, + sort_by=params.sort_by, ) results = await self.refresh_csv( query=query, @@ -528,19 +526,14 @@ async def _get_result_page( limit = MAX_NUM_ROWS_PER_BATCH offset = 0 - build_params = self._build_parameters( - params={ - "columns": params.columns, - "sample_count": params.sample_count, - "filters": params.filters, - "sort_by": params.sort_by, - "limit": limit, - "offset": offset, - } - ) + params = params._asdict() + + params["limit"] = limit + params["offset"] = offset + response_json = await self._get( route=f"/execution/{job_id}/results", - params=build_params, + params=params, ) try: @@ -579,16 +572,10 @@ async def _get_result_csv_page( limit = MAX_NUM_ROWS_PER_BATCH offset = 0 - params = self._build_parameters( - params={ - "columns": params.columns, - "sample_count": params.sample_count, - "filters": params.filters, - "sort_by": params.sort_by, - "limit": limit, - "offset": offset, - } - ) + params = params._asdict() + + params["limit"] = limit + params["offset"] = offset route = f"/execution/{job_id}/results/csv" response = await self._get(route=route, params=params, raw=True) From a88310f831a30058133919a042f2e75da2adcb5a Mon Sep 17 00:00:00 2001 From: apurvabanka Date: Thu, 7 Nov 2024 03:36:52 -0500 Subject: [PATCH 8/9] too-many-positional-arguments --- .pylintrc | 2 +- dune_client/api/base.py | 4 ++-- dune_client/api/custom.py | 2 +- dune_client/api/extensions.py | 1 - dune_client/api/table.py | 2 +- dune_client/models.py | 2 +- 6 files changed, 6 insertions(+), 7 deletions(-) diff --git a/.pylintrc b/.pylintrc index cebc305..739330d 100644 --- a/.pylintrc +++ b/.pylintrc @@ -2,4 +2,4 @@ disable=fixme,logging-fstring-interpolation [DESIGN] max-args=10 -max-attributes=6 +max-attributes=7 diff --git a/dune_client/api/base.py b/dune_client/api/base.py index bbefbb3..cdc4aa2 100644 --- a/dune_client/api/base.py +++ b/dune_client/api/base.py @@ -9,7 +9,7 @@ import logging.config import os from json import JSONDecodeError -from typing import Any, Dict, Optional, Union, IO +from typing import Any, Dict, Optional, IO from requests import Response, Session from requests.adapters import HTTPAdapter, Retry @@ -30,7 +30,7 @@ class BaseDuneClient: and provides some convenient functions to use in other clients """ - def __init__( # pylint: disable=too-many-arguments + def __init__( # pylint: disable=too-many-arguments, too-many-positional-arguments self, api_key: str, base_url: str = "https://api.dune.com", diff --git a/dune_client/api/custom.py b/dune_client/api/custom.py index 4b41f5c..b99add2 100644 --- a/dune_client/api/custom.py +++ b/dune_client/api/custom.py @@ -61,7 +61,7 @@ def get_custom_endpoint_result( response_json = self._get( route=f"/endpoints/{handle}/{endpoint}/results", - params=params.__dict__, + params=params._asdict(), ) try: return ResultsResponse.from_dict(response_json) diff --git a/dune_client/api/extensions.py b/dune_client/api/extensions.py index f448b98..98e9df1 100644 --- a/dune_client/api/extensions.py +++ b/dune_client/api/extensions.py @@ -30,7 +30,6 @@ ) from dune_client.query import QueryBase, parse_query_object_or_id -# from dune_client.types import QueryParameter from dune_client.types import QueryParameter from dune_client.util import age_in_hours diff --git a/dune_client/api/table.py b/dune_client/api/table.py index e23b217..f5c66f9 100644 --- a/dune_client/api/table.py +++ b/dune_client/api/table.py @@ -52,7 +52,7 @@ def upload_csv( except KeyError as err: raise DuneError(response_json, "UploadCsvResponse", err) from err - def create_table( # pylint: disable=too-many-instance-attributes + def create_table( # pylint: disable=too-many-instance-attributes, too-many-positional-arguments self, namespace: str, table_name: str, diff --git a/dune_client/models.py b/dune_client/models.py index b975edb..044be72 100644 --- a/dune_client/models.py +++ b/dune_client/models.py @@ -297,7 +297,7 @@ def __add__(self, other: ExecutionResult) -> ExecutionResult: @dataclass -class ResultsResponse: +class ResultsResponse: # pylint: disable=too-many-instance-attributes """ Representation of Response from Dune's [Get] Query Results endpoint """ From fd32e143c4154a1e6b3d1a26c145d6f74f602992 Mon Sep 17 00:00:00 2001 From: apurvabanka Date: Thu, 7 Nov 2024 03:52:10 -0500 Subject: [PATCH 9/9] typo --- dune_client/api/execution.py | 7 +++++++ dune_client/api/extensions.py | 4 ++-- dune_client/api/query.py | 6 +++--- dune_client/client_async.py | 1 + 4 files changed, 13 insertions(+), 5 deletions(-) diff --git a/dune_client/api/execution.py b/dune_client/api/execution.py index 9154d42..825d091 100644 --- a/dune_client/api/execution.py +++ b/dune_client/api/execution.py @@ -34,6 +34,7 @@ class GetExecutionResultsParams(NamedTuple): limit: Optional[int] = None columns: Optional[List[str]] = None + batch_size: Optional[int] = None sample_count: Optional[int] = None filters: Optional[str] = None sort_by: Optional[List[str]] = None @@ -93,6 +94,9 @@ def get_execution_results( ) -> ResultsResponse: """GET results from Dune API for `job_id` (aka `execution_id`)""" + if params is None: + params = GetExecutionResultsParams() + route = f"/execution/{job_id}/results" url = self._route_url(route) return self._get_execution_results_by_url(url=url, params=params._asdict()) @@ -108,6 +112,9 @@ def get_execution_results_csv( if you need metadata information use get_results() or get_status() """ + if params is None: + params = GetExecutionResultsParams() + route = f"/execution/{job_id}/results/csv" url = self._route_url(route) return self._get_execution_results_csv_by_url(url=url, params=params._asdict()) diff --git a/dune_client/api/extensions.py b/dune_client/api/extensions.py index 98e9df1..e1c7107 100644 --- a/dune_client/api/extensions.py +++ b/dune_client/api/extensions.py @@ -104,8 +104,8 @@ def run_query( # pylint: disable=duplicate-code job_id = self._refresh(query, ping_frequency, params.performance) params = GetExecutionResultsParams( - limti=limit, - compile=params.columns, + limit=limit, + columns=params.columns, sample_count=params.sample_count, filters=params.filters, sort_by=params.sort_by, diff --git a/dune_client/api/query.py b/dune_client/api/query.py index a83aeeb..8737b7d 100644 --- a/dune_client/api/query.py +++ b/dune_client/api/query.py @@ -18,7 +18,7 @@ class UpdateQueryParams(NamedTuple): "Params for Update Query function" name: Optional[str] = None query_sql: Optional[str] = None - params: Optional[list[QueryParameter]] = None + query_params: Optional[list[QueryParameter]] = None description: Optional[str] = None tags: Optional[list[str]] = None @@ -89,8 +89,8 @@ def update_query( parameters["tags"] = params.tags if params.query_sql is not None: parameters["query_sql"] = params.query_sql - if params.query_parms is not None: - parameters["parameters"] = [p.to_dict() for p in params.query_parms] + if params.query_params is not None: + parameters["parameters"] = [p.to_dict() for p in params.query_params] if not bool(parameters): # Nothing to change no need to make reqeust diff --git a/dune_client/client_async.py b/dune_client/client_async.py index ad1f1e4..e1ccb40 100644 --- a/dune_client/client_async.py +++ b/dune_client/client_async.py @@ -76,6 +76,7 @@ class ResultPageParams(NamedTuple): sort_by: Optional[List[str]] = None limit: Optional[int] = None offset: Optional[int] = None + batch_size: Optional[int] = None class RetryableError(Exception):