Skip to content

Commit

Permalink
removing object
Browse files Browse the repository at this point in the history
  • Loading branch information
orman committed Nov 20, 2023
1 parent 39fb079 commit 1811fe0
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 46 deletions.
16 changes: 16 additions & 0 deletions fireblocks_sdk/api_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,16 @@ def __init__(
def to_dict(self):
return convert_class_to_dict(self.__dict__)

class ContractInitializationPhase(str, Enum):
ON_DEPLOYMENT = "ON_DEPLOYMENT"
POST_DEPLOYMENT = "POST_DEPLOYMENT"

class ContractTemplateType(str, Enum):
FUNGIBLE_TOKEN = "FUNGIBLE_TOKEN"
NON_FUNGIBLE_TOKEN = "NON_FUNGIBLE_TOKEN"
NON_TOKEN = "NON_TOKEN"
UUPS_PROXY = "UUPS_PROXY"

class ContractUploadRequest:
def __init__(
self,
Expand All @@ -506,20 +516,26 @@ def __init__(
longDescription: str,
bytecode: str,
sourcecode: str,
initializationPhase: ContractInitializationPhase,
abi: Optional[List[AbiFunction]] = None,
compilerOutputMetadata: Optional[object] = None,
docs: Optional[object] = None,
attributes: Optional[Dict[str, str]] = None,
type: Optional[ContractTemplateType] = None,
inputFieldsMetadata: Optional[str] = None,
):
self.name = name
self.description = description
self.longDescription = longDescription
self.bytecode = bytecode
self.sourcecode = sourcecode
self.initializationPhase = initializationPhase
self.abi = abi
self.compilerOutputMetadata = compilerOutputMetadata
self.docs = docs
self.attributes = attributes
self.type = type
self.inputFieldsMetadata = inputFieldsMetadata

def to_dict(self):
return convert_class_to_dict(self.__dict__)
Expand Down
108 changes: 62 additions & 46 deletions fireblocks_sdk/sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import urllib
from importlib.metadata import version
from operator import attrgetter
from typing import Any, Dict, Optional, List, Union
from typing import Any, Dict, Optional, List

import requests

Expand Down Expand Up @@ -33,7 +33,11 @@
NFTsWalletTypeValues,
NFTOwnershipStatusUpdatedPayload,
CreateTokenRequest,
PagedExchangeAccountRequestFilters, StakeRequestDto, UnstakeRequestDto, WithdrawRequestDto,
ContractUploadRequest,
ContractDeployRequest,
ReadCallFunction,
WriteCallFunction,
PagedExchangeAccountRequestFilters,
)
from .sdk_token_provider import SdkTokenProvider

Expand Down Expand Up @@ -114,42 +118,75 @@ def unlink_token(self, asset_id: str):

def get_staking_chains(self):
"""Get all staking chains."""
return self._get_request("/v1/staking/chains")
return self._get_request(f"/v1/staking/chains")

def get_staking_chain_info(self, chain_descriptor: str):
"""Get chain info."""
return self._get_request(f"/v1/staking/chains/{chain_descriptor}/chainInfo")

def get_staking_positions_summary(self):
def get_staking_positions_summary(self, by_vault: bool = None):
"""Get staking positions summary."""
return self._get_request(f"/v1/staking/positions/summary")
return self._get_request(f"/v1/staking/positions/summary",
query_params={"byVault": "true"} if by_vault else None)

def get_staking_positions_summary_by_vault(self):
"""Get staking positions summary by vault."""
return self._get_request("/v1/staking/positions/summary/vaults")

def execute_staking_action(self, chain_descriptor: str, action_id: str,
request_body: Union[StakeRequestDto, UnstakeRequestDto, WithdrawRequestDto]):
"""Execute staking action on a chain.
"""
return self._post_request(f"/v1/staking/chains/{chain_descriptor}/{action_id}", request_body.to_dict())
def execute_staking_action(self, chain_descriptor: str, action_id: str, request_body):
"""Execute staking action on a chain."""
return self._post_request(f"/v1/staking/chains/{chain_descriptor}/{action_id}", request_body)

def get_staking_positions(self, chain_descriptor: str = None):
"""Get all staking positions, optionally filtered by chain."""
return self._get_request("/v1/staking/positions",
return self._get_request(f"/v1/staking/positions",
query_params={"chainDescriptor": chain_descriptor} if chain_descriptor else None)

def get_staking_position(self, position_id: str):
"""Get a staking position by id."""
return self._get_request(f"/v1/staking/positions/{position_id}")

def get_staking_providers(self):
"""Get all staking providers."""
return self._get_request(f"/v1/staking/providers")
def get_staking_validators(self, chain_descriptor: str):
"""Get all staking validators, filtered by chain."""
return self._get_request(f"/v1/staking/validators/{chain_descriptor}")

def approve_staking_provider_terms_of_service(self, provider_id: str):
def approve_staking_provider(self, validator_provider_id: int):
"""Approve staking provider terms of service."""
return self._post_request(f"/v1/staking/providers/{provider_id}/approveTermsOfService")
return self._post_request(f"/v1/staking/providers/approveTermsOfService",
body={"validatorProviderId": validator_provider_id})

def get_contract_templates(self, limit: int = 100, offset: int = 0):
request_filter = {
"limit": limit,
"offset": offset
}
return self._get_request(f"/v1/contract-registry/contracts", query_params=request_filter)

def upload_contract_template(self, request: ContractUploadRequest):
return self._post_request(f"/v1/contract-registry/contracts", request)

def get_contract_template(self, contract_id: str):
return self._get_request(f"/v1/contract-registry/contracts/{contract_id}")

def get_contract_template_constructor(self, contract_id: str, with_docs: bool=False):
return self._get_request(f"/v1/contract-registry/contracts/{contract_id}/constructor?withDocs=${with_docs}`")

def delete_contract_template(self, contract_id: str):
return self._delete_request(f"/v1/contract-registry/contracts/{contract_id}")

def deploy_contract(self, contract_id: str, request: ContractDeployRequest):
return self._post_request(f"/v1/contract-registry/contracts/{contract_id}/deploy", request)

def get_contracts_by_filter(self, templateId: str, blockchainId: str = None):
return self._get_request(f"/v1/contract-service/contract?templateId={templateId}&blockchainId={blockchainId}")

def get_contract_by_address(self, blockchainId: str, contractAddress: str):
return self._get_request(f"/v1/contract-service/contract/{blockchainId}/{contractAddress}")

def get_contract_abi(self, blockchainId: str, contractAddress: str):
return self._get_request(f"/v1/contract-service/contract/{blockchainId}/{contractAddress}/abi")

def read_contract_call_function(self, blockchainId: str, contractAddress: str, payload: ReadCallFunction):
return self._post_request(f"/v1/contract-service/contract/{blockchainId}/{contractAddress}/function/read", payload)

def write_contract_call_function(self, blockchainId: str, contractAddress: str, payload: WriteCallFunction):
return self._post_request(f"/v1/contract-service/contract/{blockchainId}/{contractAddress}/function/write", payload)

def get_nft(self, id: str):
url = "/v1/nfts/tokens/" + id
Expand Down Expand Up @@ -223,8 +260,7 @@ def refresh_nft_ownership_by_vault(
return self._put_request(url, query_params=params)

def get_owned_nfts(self, blockchain_descriptor: str, vault_account_ids: List[str] = None, ids: List[str] = None,
collection_ids: List[str] = None, page_cursor: str = '', page_size: int = 100,
sort: List[GetOwnedNftsSortValues] = None,
collection_ids: List[str] = None, page_cursor: str = '', page_size: int = 100, sort: List[GetOwnedNftsSortValues] = None,
order: OrderValues = None, status: NFTOwnershipStatusValues = None, search: str = None,
ncw_account_ids: List[str] = None, ncw_id: str = None, wallet_type: NFTsWalletTypeValues = None):
"""
Expand Down Expand Up @@ -344,6 +380,7 @@ def list_owned_assets(self, search: str = None, status: NFTOwnershipStatusValues
params["sort"] = ",".join(sort)

if order:

params['order'] = order

return self._get_request(url, query_params=params)
Expand Down Expand Up @@ -742,6 +779,7 @@ def get_exchange_accounts_paged(self, paged_exchange_accounts_request_filters: P

params = {}


if limit is not None:
params['limit'] = limit

Expand Down Expand Up @@ -2042,27 +2080,6 @@ def get_max_bip44_index_used(self, vault_account_id, asset_id):

return self._get_request(url)

def get_paginated_addresses(self, vault_account_id, asset_id, limit=500, before=None, after=None):
"""Gets a paginated response of the addresses for a given vault account and asset
Args:
vault_account_id (str): The vault account Id
asset_id (str): the asset Id
limit(number, optional): limit of addresses per paging request
before (str, optional): curser for the previous paging
after (str, optional): curser for the next paging
"""
path = f"/v1/vault/accounts/{vault_account_id}/{asset_id}/addresses_paginated"
params = {}
if limit:
params["limit"] = limit
if before:
params["before"] = before
if after:
params["after"] = after
if params:
path = path + "?" + urllib.parse.urlencode(params)
return self._get_request(path)

def set_auto_fuel(self, vault_account_id, auto_fuel, idempotency_key=None):
"""Sets autoFuel to true/false for a vault account
Expand Down Expand Up @@ -2156,8 +2173,7 @@ def create_user_group(self, group_name: str, member_ids: Optional[List[str]] = N

return self._post_request(url, body)

def update_user_group(self, id: str, group_name: Optional[str] = None, member_ids: Optional[List[str]] = None) -> \
Dict[str, Any]:
def update_user_group(self, id: str, group_name: Optional[str] = None, member_ids: Optional[List[str]] = None) -> Dict[str, Any]:
"""
Updates a Users Group
@param id: The ID of the Users Group
Expand Down Expand Up @@ -2749,7 +2765,7 @@ def _put_request(self, path, body=None, query_params=None):
headers=headers,
data=json.dumps(body),
timeout=self.timeout,
)
)
return handle_response(response)

def _patch_request(self, path, body=None):
Expand Down

0 comments on commit 1811fe0

Please sign in to comment.