Skip to content
This repository has been archived by the owner on Apr 16, 2023. It is now read-only.

Filip/process_block_script #1

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions contracts/mocks/mock_Fact_Registry.cairo
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
%lang starknet

from starkware.cairo.common.cairo_builtins import HashBuiltin

@storage_var
func l1_headers_store_addr() -> (address: felt) {
}


@view
func get_l1_headers_store_addr{syscall_ptr: felt*, pedersen_ptr: HashBuiltin*, range_check_ptr}() -> (address : felt) {
return l1_headers_store_addr.read();
}

@external
func set_l1_headers_store_addr{syscall_ptr: felt*, pedersen_ptr: HashBuiltin*, range_check_ptr}(
address : felt
) {
return l1_headers_store_addr.write(address);
}
67 changes: 67 additions & 0 deletions tests/process_block/block_header.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from hexbytes.main import HexBytes
from rlp import Serializable, encode
from web3.types import BlockData
from rlp.sedes import (
BigEndianInt,
big_endian_int,
Binary,
binary,
)
from eth_utils import to_bytes
from web3 import Web3


address = Binary.fixed_length(20, allow_empty=True)
hash32 = Binary.fixed_length(32)
int256 = BigEndianInt(256)
trie_root = Binary.fixed_length(32, allow_empty=True)


class BlockHeader(Serializable):
fields = (
('parentHash', hash32),
('unclesHash', hash32),
('coinbase', address),
('stateRoot', trie_root),
('transactionsRoot', trie_root),
('receiptsRoot', trie_root),
('logsBloom', int256),
('difficulty', big_endian_int),
('number', big_endian_int),
('gasLimit', big_endian_int),
('gasUsed', big_endian_int),
('timestamp', big_endian_int),
('extraData', binary),
('mixHash', binary),
('nonce', Binary(8, allow_empty=True)),
('baseFeePerGas', big_endian_int)
)

def hash(self) -> HexBytes:
_rlp = encode(self)
return Web3.keccak(_rlp)

def raw_rlp(self) -> bytes:
return encode(self)


def build_block_header(block: BlockData) -> BlockHeader:
header = BlockHeader(
block["parentHash"],
block["sha3Uncles"],
HexBytes(block['miner']),
block["stateRoot"],
block['transactionsRoot'],
block["receiptsRoot"],
int.from_bytes(block["logsBloom"], 'big'),
block["difficulty"],
block["number"],
block["gasLimit"],
block["gasUsed"],
block["timestamp"],
block["extraData"],
block["mixHash"],
block["nonce"],
block["baseFeePerGas"]
)
return header
162 changes: 162 additions & 0 deletions tests/process_block/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
from __future__ import annotations
from secrets import token_bytes
from eth_typing.encoding import HexStr
from typing import Callable, List, Tuple, Any, NamedTuple
from functools import reduce
from enum import Enum

from web3 import Web3

class IntsSequence(NamedTuple):
values: List[int]
length: int

class Encoding(Enum):
LITTLE: str = 'little'
BIG: str = 'big'

concat_arr: Callable[[List[str]], str] = lambda arr: reduce(lambda a, b: a + b, arr)

def bytes_to_int(word: bytes, encoding: Encoding = Encoding.BIG) -> int:
return int.from_bytes(word, encoding.value)

bytes_to_int_little: Callable[[bytes], int] = lambda word: int.from_bytes(word, "little")
bytes_to_int_big: Callable[[bytes], int] = lambda word: int.from_bytes(word, "big")

string_to_byte_little: Callable[[str], int] = lambda word: int.from_bytes(word.encode("UTF-8"), 'little')
string_to_byte_big: Callable[[str], int] = lambda word: int.from_bytes(word.encode("UTF-8"), 'big')

hex_string_little_to_int: Callable[[HexStr], int] = lambda word: bytes_to_int_little(bytes.fromhex(word))
hex_string_big_to_int: Callable[[HexStr], int] = lambda word: bytes_to_int_big(bytes.fromhex(word))

chunk_bytes_input: Callable[[bytes], List[bytes]] = lambda input: [input[i+0:i+8] for i in range(0, len(input), 8)]

print_bytes_array: Callable[[List[str]], str] = lambda arr: concat_arr(list(map(lambda a: a.hex()+'\n', arr)))
print_ints_array: Callable[[List[str]], str] = lambda arr: concat_arr(list(map(lambda a: hex(a)+'\n', arr)))

def hex_string_to_words64(hex_input: str, encoding: Encoding = Encoding.BIG) -> IntsSequence:
if len(hex_input) < 2:
raise Exception('Rlp string to short')
prefix = hex_input[0:2]
if prefix == '0x': hex_input = hex_input[2:]
if len(hex_input)%2==1: hex_input = '0'+hex_input
chunked = [hex_input[i+0:i+16] for i in range(0, len(hex_input), 16)]
return IntsSequence(list(map(hex_string_big_to_int if encoding == Encoding.BIG else hex_string_little_to_int, chunked)), int(len(hex_input)/2))

def hex_string_to_nibbles(hex_input: str, encoding: Encoding = Encoding.BIG) -> List[int]:
if len(hex_input) < 2:
raise Exception('Hex string to short')
prefix = hex_input[0:2]
if prefix == '0x': hex_input = hex_input[2:]

chunked = [hex_input[i+0:i+2] for i in range(0, len(hex_input), 2)]
return list(map(hex_string_big_to_int if encoding == Encoding.BIG else hex_string_little_to_int, chunked))

def ints_array_to_bytes(input: IntsSequence) -> bytes:
full_words, remainder = divmod(input.length, 8)

bytes_array = b''

for i in range(full_words):
bytes_array += input.values[i].to_bytes(8, "big")

if remainder > 0:
bytes_array += input.values[full_words].to_bytes(remainder, "big")

return bytes_array

random_bytes: Callable[[int], bytes] = lambda size: token_bytes(size)

def word64_to_bytes(word: int, word_len: int) -> List[int]:
res: List[int] = []
for i in range(word_len - 1, -1, -1):
(_, r) = divmod(word >> i * 8, 256)
res.append(r)
return res

def word64_to_bytes_recursive_rev(word: int, word_len: int, accumulator = []):
current_len = word_len - len(accumulator)
if len(accumulator) == word_len:
return accumulator
current_len = word_len - len(accumulator)
(_, r) = divmod(word >> (word_len - current_len) * 8, 256)
accumulator.append(r)
return word64_to_bytes_recursive_rev(word, word_len, accumulator)

def word64_to_nibbles_rec(word: int, nibbles_len: int, accumulator: List[int] = []) -> List[int]:
if not nibbles_len > 0:
print(f"nibbles_len: {nibbles_len}")
assert nibbles_len > 0
if nibbles_len == 1:
return accumulator + [word & 0xF]
return word64_to_nibbles_rec(word=(word >> 4), nibbles_len=nibbles_len-1, accumulator=accumulator) + [(word & 0xF)]

def word64_to_nibbles(input: IntsSequence) -> List[int]:
assert input.length <= 1
if input.length == 0: return []
return word64_to_nibbles_rec(input.values[0], input.length * 2)

def words64_to_nibbles(input: IntsSequence, skip_nibbles: int = 0) -> List[int]:
(_, remainder) = divmod(input.length * 2, 16)
acc = []
for i in range(0, len(input.values)):
word = input.values[i]
nibbles_len = 16
if i == len(input.values) - 1: # For the last word skip empty bits
nibbles_len = 16 if remainder == 0 else remainder
if i == 0 and skip_nibbles > 0: # If first word and some nibbles skipped
acc.extend(word64_to_nibbles_rec(word=word, nibbles_len=nibbles_len-skip_nibbles))
else:
acc.extend(word64_to_nibbles_rec(word=word, nibbles_len=nibbles_len))
return acc

def byte_to_nibbles(input_byte: int) -> Tuple[int, int]:
nibble_1 = input_byte & 0x0F
nibble_2 = ((input_byte & 0xF0) >> 4)
return (nibble_1, nibble_2)

def keccak_words64(input: IntsSequence) -> IntsSequence:
return hex_string_to_words64(Web3.keccak(ints_array_to_bytes(input)).hex())

def compare_lists(a: List[Any], b: List[Any]):
return reduce(lambda i, j : i and j, map(lambda m, k: m == k, a, b), True)

def split_uint256_to_uint(split_uint256: Tuple):
return split_uint256[0] + (split_uint256[1] << 128)

def uint_to_ints_array(uint) -> IntsSequence:
return hex_string_to_words64(hex(uint))


#https://github.com/starknet-edu/starknet-messaging-bridge/blob/main/utils.py
MAX_LEN_FELT = 31

def str_to_felt(text):
if len(text) > MAX_LEN_FELT:
raise Exception("Text length too long to convert to felt.")

return int.from_bytes(text.encode(), "big")


def felt_to_str(felt):
length = (felt.bit_length() + 7) // 8
return felt.to_bytes(length, byteorder="big").decode("utf-8")


def str_to_felt_array(text):
return [str_to_felt(text[i:i+MAX_LEN_FELT]) for i in range(0, len(text), MAX_LEN_FELT)]


def uint256_to_int(uint256):
return uint256[0] + uint256[1]*2**128


def uint256(val):
return (val & 2**128-1, (val & (2**256-2**128)) >> 128)


def hex_to_felt(val):
return int(val, 16)

def uint(a):
return(a, 0)
33 changes: 33 additions & 0 deletions tests/process_block/process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import asyncio

from web3 import Web3
from web3.types import HexBytes, BlockData
from util_types import Data, BlockHeaderIndexes

from block_header import BlockHeader, build_block_header
from helpers import Encoding

BLOCK_NUMBER = 7837994


def get_block(block_number: int) -> BlockData:
alchemy_url = "https://eth-goerli.g.alchemy.com/v2/uXpxHR8fJBH3fjLJpulhY__jXbTGNjN7"

w3 = Web3(Web3.HTTPProvider(alchemy_url))

block = BlockData(w3.eth.get_block(block_number))
return block


async def process_block():
block: BlockData = get_block(BLOCK_NUMBER)
block_header: BlockHeader = build_block_header(block)
block_rlp = Data.from_bytes(block_header.raw_rlp()).to_ints()

print("option_set: ", 2**BlockHeaderIndexes.STATE_ROOT)
print("block_number: ", block['number'])
print("block_header_rlp_bytes_len: ", block_rlp.length)
print("block_header_rlp_len: ", len(block_rlp.values))
print("block_header_rlp: ", block_rlp.values)

asyncio.run(process_block())
110 changes: 110 additions & 0 deletions tests/process_block/util_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
from __future__ import annotations
from typing import List, NamedTuple
from enum import Enum, IntEnum
from helpers import chunk_bytes_input, ints_array_to_bytes, concat_arr


class IntsSequence(NamedTuple):
values: List[int]
length: int


class Encoding(Enum):
LITTLE: str = 'little'
BIG: str = 'big'


class Data:
raw_bytes: bytes
odd_nibbles: bool

def __init__(self, value: bytes, odd_nibbles: bool = False):
self.raw_bytes = value
self.odd_nibbles = odd_nibbles

def to_bytes(self) -> bytes:
return self.raw_bytes

def to_ints(self, encoding: Encoding = Encoding.BIG) -> IntsSequence:
chunked = chunk_bytes_input(self.raw_bytes)
ints_array = list(
map(lambda chunk: int.from_bytes(chunk, encoding.value), chunked))
return IntsSequence(values=ints_array, length=len(self.raw_bytes))

def to_hex(self) -> str:
return "0x" + (self.raw_bytes.hex())

def to_nibbles(self) -> List[int]:
raw_bytes = list(self.raw_bytes)
output = []
for byte in raw_bytes:
output.append(byte >> 4)
output.append(byte % 2**4)

return output[1:] if self.odd_nibbles else output

def to_int(self) -> str:
return int(self.to_hex(), base=16)

@staticmethod
def from_ints(input: IntsSequence) -> Data:
raw_bytes = ints_array_to_bytes(input)
return Data(raw_bytes)

@staticmethod
def from_hex(input: str) -> Data:
prefixed = input[0:2] == '0x'
return Data(bytes.fromhex(input[2:] if prefixed else input))

@staticmethod
def from_int(input: int) -> Data:
if len(hex(input)[2:]) % 2 != 0:
return Data(bytes.fromhex("0"+hex(input)[2:]))
else:
return Data(bytes.fromhex(hex(input)[2:]))

@staticmethod
def from_bytes(input: bytes) -> Data:
return Data(input)

@staticmethod
def from_nibbles(raw_nibbles: List[int], encoding: Encoding = Encoding.BIG) -> Data:
single_bytes = []

odd_nibbles = len(raw_nibbles) % 2 != 0
nibbles = [0] + raw_nibbles if odd_nibbles else raw_nibbles

if len(nibbles) == 0:
return Data(b'')

chunked = [nibbles[i+0:i+2] for i in range(0, len(nibbles), 2)]
for chunk in chunked:
single_bytes.append(int.to_bytes(
(chunk[0] * 2**4) + chunk[1], 1, encoding.value))

return Data(bytes(concat_arr(single_bytes)), odd_nibbles)

def __str__(self) -> str:
return self.to_hex()

def __eq__(self, __o: Data) -> bool:
return __o.raw_bytes == self.raw_bytes


class BlockHeaderIndexes(IntEnum):
PARENT_HASH: int = 0
OMMERS_HASH: int = 1
BENEFICIARY: int = 2
STATE_ROOT: int = 3
TRANSACTION_ROOT: int = 4
RECEIPTS_ROOT: int = 5
LOGS_BLOOM: int = 6
DIFFICULTY: int = 7
BLOCK_NUMBER: int = 8
GAS_LIMIT: int = 9
GAS_USED: int = 10
TIMESTAMP: int = 11
EXTRA_DATA: int = 12
MIX_HASH: int = 13
NONCE: int = 14
BASE_FEE: int = 15
Loading