Skip to content

Commit

Permalink
Merge pull request #7 from banteg/feat/parity-trace
Browse files Browse the repository at this point in the history
  • Loading branch information
antazoey authored Jun 14, 2022
2 parents 71b07d7 + cb8ebfa commit eeb3515
Show file tree
Hide file tree
Showing 17 changed files with 2,679 additions and 59 deletions.
31 changes: 27 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ python3 setup.py install

## Quick Usage

### Geth Style Traces

If you are using a node that supports the `debug_traceTransaction` RPC, you can use `web3.py` to get trace frames:

```python
Expand All @@ -43,21 +45,42 @@ for item in struct_logs:
If you want to get the call-tree node, you can do:

```python
from evm_trace import CallType, get_calltree_from_trace
from evm_trace import CallType, get_calltree_from_geth_trace

root_node_kwargs = {
"gas_cost": 10000000,
"gas_limit": 10000000000,
"address": "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045",
"calldata": "0x00",
"value": 1000,
"call_type": CallType.MUTABLE,
"call_type": CallType.CALL,
}

# Where `trace` is a `TraceFrame` (see example above)
calltree = get_calltree_from_trace(trace, **root_node_kwargs)
calltree = get_calltree_from_geth_trace(trace, **root_node_kwargs)
```

### Parity Style Traces

If you are using a node that supports the `trace_transaction` RPC, you can use `web3.py` to get trace objects:

```python
from evm_trace import CallType, ParityTraceList

raw_trace_list = web3.manager.request_blocking("trace_transaction", [txn_hash])
trace_list = ParityTraceList.parse_obj(raw_trace_list)
```

And to make call-tree nodes, you can do:

```python
from evm_trace import get_calltree_from_parity_trace

tree = get_calltree_from_parity_trace(trace_list)
```

### Call Tree Node Customization

You can also customize the output by making your own display class:

```python
Expand All @@ -72,7 +95,7 @@ class CustomDisplay(DisplayableCallTreeNode):
return f"{call_type} call @ {address} gas_cost={cost}"


calltree = get_calltree_from_trace(trace, display_cls=CustomDisplay)
calltree = get_calltree_from_geth_trace(trace, display_cls=CustomDisplay)
```

## Development
Expand Down
13 changes: 11 additions & 2 deletions evm_trace/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
from .base import CallTreeNode, CallType, TraceFrame, get_calltree_from_trace
from .base import CallTreeNode, CallType, TraceFrame, get_calltree_from_geth_trace
from .parity import ParityTrace, ParityTraceList, get_calltree_from_parity_trace

__all__ = ["CallTreeNode", "CallType", "get_calltree_from_trace", "TraceFrame"]
__all__ = [
"CallTreeNode",
"CallType",
"get_calltree_from_geth_trace",
"get_calltree_from_parity_trace",
"ParityTrace",
"ParityTraceList",
"TraceFrame",
]
14 changes: 7 additions & 7 deletions evm_trace/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import math
from typing import Any, Dict, Iterable, Iterator, List, Type
from typing import Any, Dict, Iterable, Iterator, List, Optional, Type

from eth_utils import to_int
from hexbytes import HexBytes
Expand Down Expand Up @@ -40,8 +40,8 @@ class CallTreeNode(BaseModel):
address: Any
value: int = 0
depth: int = 0
gas_limit: int
gas_cost: int # calculated from call starting and return
gas_limit: Optional[int]
gas_cost: Optional[int] # calculated from call starting and return
calldata: Any = HexBytes(b"")
returndata: Any = HexBytes(b"")
calls: List["CallTreeNode"] = []
Expand All @@ -67,7 +67,7 @@ def __getitem__(self, index: int) -> "CallTreeNode":
return self.calls[index]


def get_calltree_from_trace(
def get_calltree_from_geth_trace(
trace: Iterator[TraceFrame], show_internal=False, **root_node_kwargs
) -> CallTreeNode:
"""
Expand Down Expand Up @@ -150,18 +150,18 @@ def _create_node_from_call(
# TODO: Validate gas values

if frame.op == "CALL":
child_node_kwargs["call_type"] = CallType.MUTABLE
child_node_kwargs["call_type"] = CallType.CALL
child_node_kwargs["value"] = int(frame.stack[-3].hex(), 16)
child_node_kwargs["calldata"] = _extract_memory(
offset=frame.stack[-4], size=frame.stack[-5], memory=frame.memory
)
elif frame.op == "DELEGATECALL":
child_node_kwargs["call_type"] = CallType.DELEGATE
child_node_kwargs["call_type"] = CallType.DELEGATECALL
child_node_kwargs["calldata"] = _extract_memory(
offset=frame.stack[-3], size=frame.stack[-4], memory=frame.memory
)
else:
child_node_kwargs["call_type"] = CallType.STATIC
child_node_kwargs["call_type"] = CallType.STATICCALL
child_node_kwargs["calldata"] = _extract_memory(
offset=frame.stack[-3], size=frame.stack[-4], memory=frame.memory
)
Expand Down
11 changes: 4 additions & 7 deletions evm_trace/display.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

from eth_utils import to_checksum_address

from evm_trace.enums import CallType

if TYPE_CHECKING:
from evm_trace.base import CallTreeNode

Expand All @@ -30,12 +28,11 @@ def depth(self) -> int:

@property
def title(self) -> str:
call_type = self.call.call_type.value.upper()
call_mnemonic = "CALL" if self.call.call_type == CallType.MUTABLE else f"{call_type}CALL"
address_hex_str = self.call.address.hex()
call_type = self.call.call_type.value
address_hex_str = self.call.address.hex() if self.call.address else None

try:
address = to_checksum_address(address_hex_str)
address = to_checksum_address(address_hex_str) if address_hex_str else None
except ImportError:
# Ignore checksumming if user does not have eth-hash backend installed.
address = address_hex_str
Expand All @@ -46,7 +43,7 @@ def title(self) -> str:
if self.call.calldata:
call_path = f"{call_path}.<{self.call.calldata[:4].hex()}>"

return f"{call_mnemonic}: {call_path} [{cost} gas]"
return f"{call_type}: {call_path} [{cost} gas]"

@classmethod
def make_tree(
Expand Down
10 changes: 6 additions & 4 deletions evm_trace/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

class CallType(Enum):
INTERNAL = "INTERNAL" # Non-opcode internal call
STATIC = "STATIC" # STATICCALL opcode
MUTABLE = "MUTABLE" # CALL opcode
DELEGATE = "DELEGATE" # DELEGATECALL opcode
SELFDESTRUCT = "SELFDESTRUCT" # SELFDESTRUCT opcode
CREATE = "CREATE"
CALL = "CALL"
DELEGATECALL = "DELEGATECALL"
STATICCALL = "STATICCALL"
CALLCODE = "CALLCODE"
SELFDESTRUCT = "SELFDESTRUCT"
171 changes: 171 additions & 0 deletions evm_trace/parity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
from typing import List, Optional, Type, Union

from pydantic import BaseModel, Field, validator

from evm_trace.base import CallTreeNode, CallType
from evm_trace.display import DisplayableCallTreeNode


class CallAction(BaseModel):
gas: int
input: Optional[str] = None
receiver: Optional[str] = Field(alias="to", default=None)
sender: str = Field(alias="from")
value: int
# only used to recover the specific call type
call_type: str = Field(alias="callType", repr=False)

@validator("value", "gas", pre=True)
def convert_integer(cls, v):
return int(v, 16)


class CreateAction(BaseModel):
gas: int
init: str
value: int

@validator("value", "gas", pre=True)
def convert_integer(cls, v):
return int(v, 16)


class SelfDestructAction(BaseModel):
address: str
balance: int

@validator("balance", pre=True)
def convert_integer(cls, v):
return int(v, 16)


class ActionResult(BaseModel):
gas_used: str = Field(alias="gasUsed")

@validator("gas_used", pre=True)
def convert_integer(cls, v):
return int(v, 16)


class CallResult(ActionResult):
output: str


class CreateResult(ActionResult):
address: str
code: str


ParityTraceAction = Union[CreateAction, CallAction, SelfDestructAction]
ParityTraceResult = Union[CallResult, CreateResult]


class ParityTrace(BaseModel):
error: Optional[str] = None
action: ParityTraceAction
block_hash: str = Field(alias="blockHash")
call_type: CallType = Field(alias="type")
result: Optional[ParityTraceResult] = None
subtraces: int
trace_address: List[int] = Field(alias="traceAddress")
transaction_hash: str = Field(alias="transactionHash")

@validator("call_type", pre=True)
def convert_call_type(cls, v, values) -> CallType:
if isinstance(values["action"], CallAction):
v = values["action"].call_type
value = v.upper()
if value == "SUICIDE":
value = "SELFDESTRUCT"

return CallType(value)


class ParityTraceList(BaseModel):
__root__: List[ParityTrace]

# pydantic models with custom root don't have this by default
def __iter__(self):
return iter(self.__root__)

def __getitem__(self, item):
return self.__root__[item]


def get_calltree_from_parity_trace(
traces: ParityTraceList,
root: Optional[ParityTrace] = None,
display_cls: Type[DisplayableCallTreeNode] = DisplayableCallTreeNode,
) -> CallTreeNode:
"""
Create a :class:`~evm_trace.base.CallTreeNode` from output models using the Parity approach
(e.g. from the ``trace_transaction`` RPC).
Args:
traces (List[:class:~evm_trace.parity.ParityTraceList]): The list of parity trace nodes,
likely loaded from the response data from the ``trace_transaction`` RPC response.
root (:class:`~evm_trace.parity.ParityTrace`): The root parity trace node. Optional, uses
the first item by default.
display_cls (Type[DisplayableCallTreeNode]]: A custom class to use for representing
the call tree. Defaults to :class:`~evm_trace.display.DisplayableCallTreeNode`.
Returns:
:class:`~evm_trace.base.CallTreeNode`
"""
if root is None:
root = traces[0]

node_kwargs = dict(
call_type=root.call_type,
error=root.error is not None,
display_cls=display_cls,
)

if root.call_type == CallType.CREATE:
create_action: CreateAction = root.action # type: ignore
create_result: CreateResult = root.result # type: ignore

node_kwargs.update(
value=create_action.value,
gas_limit=create_action.gas,
)
if create_result:
node_kwargs.update(gas_used=create_result.gas_used, address=create_result.address)

elif root.call_type in (
CallType.CALL,
CallType.DELEGATECALL,
CallType.STATICCALL,
CallType.CALLCODE,
):
call_action: CallAction = root.action # type: ignore
call_result: CallResult = root.result # type: ignore

node_kwargs.update(
address=call_action.receiver,
value=call_action.value,
gas_limit=call_action.gas,
calldata=call_action.input,
)
# no result if the call has an error
if call_result:
node_kwargs.update(
gas_cost=call_result.gas_used,
returndata=call_result.output,
)

elif root.call_type == CallType.SELFDESTRUCT:
selfdestruct_action: SelfDestructAction = root.action # type: ignore
node_kwargs.update(
address=selfdestruct_action.address,
)

subtraces = [
sub
for sub in traces
if len(sub.trace_address) == len(root.trace_address) + 1
and sub.trace_address[:-1] == root.trace_address
]
node_kwargs["calls"] = [get_calltree_from_parity_trace(traces, root=sub) for sub in subtraces]
node = CallTreeNode.parse_obj(node_kwargs)
return node
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@ exclude =
.eggs
docs
build
per-file-ignores =
# The traces have to be formatted this way for the tests.
tests/expected_traces.py: E501
Loading

0 comments on commit eeb3515

Please sign in to comment.