diff --git a/.github/scripts/cpu_monitor.sh b/.github/scripts/cpu_monitor.sh new file mode 100755 index 00000000000..563fb952273 --- /dev/null +++ b/.github/scripts/cpu_monitor.sh @@ -0,0 +1,136 @@ +#!/bin/bash + +# Configuration +THRESHOLD=80 +MEASUREMENTS_FILE="/tmp/cpu_measurements.txt" +MONITOR_INTERVAL=5 # seconds +PROCESS_NAME="cdk-erigon" +DETAILED_LOG="/tmp/cpu_detailed.log" + +# Function to get CPU usage for all matching processes +get_process_cpu() { + # Clear previous detailed log + > "$DETAILED_LOG" + + # Get PIDs of cdk-erigon processes + pids=$(pgrep -f "[c]dk-erigon") + + if [ -n "$pids" ]; then + # Use top in batch mode for each PID to get current CPU usage + for pid in $pids; do + # Get process command + if [[ "$OSTYPE" == "darwin"* ]]; then + cmd=$(ps -p $pid -o command=) + cpu=$(top -l 1 -pid $pid | tail -1 | awk '{print $3}') + else + cmd=$(ps -p $pid -o cmd=) + cpu=$(top -b -n 1 -p $pid | tail -1 | awk '{print $9}') + fi + # Get current CPU usage + echo "$pid $cpu $cmd" >> "$DETAILED_LOG" + done + fi + + # Sum total CPU usage + total_cpu=$(awk '{sum += $2} END {printf "%.1f", sum}' "$DETAILED_LOG") + + # Return 0 if no process found + if [ -z "$total_cpu" ]; then + echo "0.0" + else + echo "$total_cpu" + fi +} + +# Function to show current process details +show_process_details() { + if [ -s "$DETAILED_LOG" ]; then + echo "Individual process details:" + printf "%-10s %-8s %-s\n" "PID" "CPU%" "Command" + echo "----------------------------------------" + while read -r line; do + pid=$(echo "$line" | awk '{print $1}') + cpu=$(echo "$line" | awk '{print $2}') + cmd=$(echo "$line" | cut -d' ' -f3-) + printf "%-10s %-8.1f %-s\n" "$pid" "$cpu" "$cmd" + done < "$DETAILED_LOG" + echo "----------------------------------------" + else + echo "No $PROCESS_NAME processes found" + fi +} + +# Function to analyze CPU measurements +analyze_cpu() { + if [ -f "$MEASUREMENTS_FILE" ]; then + # Calculate statistics + avg_cpu=$(awk '{ sum += $1 } END { print sum/NR }' "$MEASUREMENTS_FILE") + avg_cpu_rounded=$(printf "%.1f" "$avg_cpu") + max_cpu=$(awk 'BEGIN{max=0} {if($1>max) max=$1} END{print max}' "$MEASUREMENTS_FILE") + measurement_count=$(wc -l < "$MEASUREMENTS_FILE") + + echo "" + echo "=== CPU Usage Analysis for all $PROCESS_NAME processes ===" + echo "Number of measurements: $measurement_count" + echo "Average Combined CPU Usage: $avg_cpu_rounded%" + echo "Peak Combined CPU Usage: $max_cpu%" + echo "Threshold: $THRESHOLD%" + + # Get final process details for the report + echo "" + echo "Final process state:" + show_process_details + + # Compare with threshold + if [ "$(echo "$avg_cpu > $THRESHOLD" | bc -l)" -eq 1 ]; then + echo "" + echo "ERROR: Average CPU usage ($avg_cpu_rounded%) exceeded threshold of $THRESHOLD%" + cleanup_and_exit 1 + else + echo "" + echo "SUCCESS: CPU usage ($avg_cpu_rounded%) is within threshold of $THRESHOLD%" + cleanup_and_exit 0 + fi + else + echo "ERROR: No CPU measurements found at $MEASUREMENTS_FILE" + cleanup_and_exit 1 + fi +} + +# Function to clean up and exit +cleanup_and_exit() { + exit_code=$1 + rm -f "$DETAILED_LOG" + exit $exit_code +} + +# Function to handle interruption +handle_interrupt() { + echo "" + echo "Monitoring interrupted. Analyzing collected data..." + analyze_cpu +} + +# Set up trap for various signals +trap handle_interrupt TERM INT + +# Clear measurements file +> "$MEASUREMENTS_FILE" +> "$DETAILED_LOG" + +echo "Starting CPU monitoring for all '$PROCESS_NAME' processes" +echo "Storing measurements in $MEASUREMENTS_FILE" +echo "Monitoring interval: ${MONITOR_INTERVAL}s" +echo "Press Ctrl+C to stop monitoring and see analysis" +echo "" + +# Start monitoring loop +while true; do + # Get CPU usage for all matching processes + cpu_usage=$(get_process_cpu) + echo "$cpu_usage" >> "$MEASUREMENTS_FILE" + echo "$(date '+%Y-%m-%d %H:%M:%S') - Combined CPU Usage: $cpu_usage%" + show_process_details + echo "" + sleep "$MONITOR_INTERVAL" +done \ No newline at end of file diff --git a/.github/workflows/ci_zkevm.yml b/.github/workflows/ci_zkevm.yml index c2ea48397fe..f906bf93847 100644 --- a/.github/workflows/ci_zkevm.yml +++ b/.github/workflows/ci_zkevm.yml @@ -116,6 +116,23 @@ jobs: run: | kurtosis run --enclave cdk-v1 --image-download always . '{"args": {"data_availability_mode": "${{ matrix.da-mode }}", "cdk_erigon_node_image": "cdk-erigon:local"}}' + - name: Run process with CPU monitoring + working-directory: ./cdk-erigon + run: | + # Start monitoring in background + bash ./.github/scripts/cpu_monitor.sh & + monitor_pid=$! + + # Wait for 30 seconds + sleep 30 + + # Stop monitoring and get analysis + kill -TERM $monitor_pid + wait $monitor_pid || { + echo "CPU usage exceeded threshold!" + exit 1 + } + - name: Monitor verified batches working-directory: ./kurtosis-cdk shell: bash diff --git a/Makefile b/Makefile index 7db93f0e97a..69ed8bb762a 100644 --- a/Makefile +++ b/Makefile @@ -194,6 +194,9 @@ lint: @./erigon-lib/tools/golangci_lint.sh @./erigon-lib/tools/mod_tidy_check.sh +cpu_monitor: + @.github/scripts/cpu_monitor.sh + ## clean: cleans the go cache, build dir, libmdbx db dir clean: go clean -cache diff --git a/README.md b/README.md index 4e58a99cadb..4fc265aecdd 100644 --- a/README.md +++ b/README.md @@ -208,6 +208,15 @@ Useful config entries: - `zkevm.sync-limit`: This will ensure the network only syncs to a given block height. - `debug.timers`: This will enable debug timers in the logs to help with performance tuning. Recording timings of witness generation, etc. at INFO level. +Metrics and pprof configuration flags: + +- `metrics:` Enables or disables the metrics collection. Set to true to enable. +- `metrics.addr`: The address on which the metrics server will listen. Default is "0.0.0.0". +- `metrics.port`: The port on which the metrics server will listen. Default is 6060. +- `pprof`: Enables or disables the pprof profiling. Set to true to enable. +- `pprof.addr`: The address on which the pprof server will listen. Default is "0.0.0.0". +- `pprof.port`: The port on which the pprof server will listen. Default is 6061. + *** diff --git a/cmd/rpcdaemon/health/check_time.go b/cmd/rpcdaemon/health/check_time.go index ffdfde24bde..43ea4af63cb 100644 --- a/cmd/rpcdaemon/health/check_time.go +++ b/cmd/rpcdaemon/health/check_time.go @@ -5,6 +5,8 @@ import ( "fmt" "net/http" + "github.com/ledgerwatch/erigon-lib/common/hexutil" + "github.com/ledgerwatch/erigon/rpc" ) @@ -20,13 +22,13 @@ func checkTime( if err != nil { return err } - timestamp := 0 + timestamp := uint64(0) if ts, ok := i["timestamp"]; ok { - if cs, ok := ts.(uint64); ok { - timestamp = int(cs) + if cs, ok := ts.(hexutil.Uint64); ok { + timestamp = cs.Uint64() } } - if timestamp < seconds { + if timestamp < uint64(seconds) { return fmt.Errorf("%w: got ts: %d, need: %d", errTimestampTooOld, timestamp, seconds) } diff --git a/cmd/rpcdaemon/health/health_test.go b/cmd/rpcdaemon/health/health_test.go index 419c7b9912b..079bedb3165 100644 --- a/cmd/rpcdaemon/health/health_test.go +++ b/cmd/rpcdaemon/health/health_test.go @@ -245,7 +245,7 @@ func TestProcessHealthcheckIfNeeded_HeadersTests(t *testing.T) { netApiResponse: hexutil.Uint(1), netApiError: nil, ethApiBlockResult: map[string]interface{}{ - "timestamp": uint64(time.Now().Add(-10 * time.Second).Unix()), + "timestamp": hexutil.Uint64(time.Now().Add(-10 * time.Second).Unix()), }, ethApiBlockError: nil, ethApiSyncingResult: false, @@ -264,7 +264,7 @@ func TestProcessHealthcheckIfNeeded_HeadersTests(t *testing.T) { netApiResponse: hexutil.Uint(1), netApiError: nil, ethApiBlockResult: map[string]interface{}{ - "timestamp": uint64(time.Now().Add(-1 * time.Hour).Unix()), + "timestamp": hexutil.Uint64(time.Now().Add(-1 * time.Hour).Unix()), }, ethApiBlockError: nil, ethApiSyncingResult: false, @@ -283,7 +283,7 @@ func TestProcessHealthcheckIfNeeded_HeadersTests(t *testing.T) { netApiResponse: hexutil.Uint(1), netApiError: nil, ethApiBlockResult: map[string]interface{}{ - "timestamp": uint64(time.Now().Add(1 * time.Hour).Unix()), + "timestamp": hexutil.Uint64(time.Now().Add(1 * time.Hour).Unix()), }, ethApiBlockError: nil, ethApiSyncingResult: false, @@ -319,7 +319,7 @@ func TestProcessHealthcheckIfNeeded_HeadersTests(t *testing.T) { netApiResponse: hexutil.Uint(10), netApiError: nil, ethApiBlockResult: map[string]interface{}{ - "timestamp": uint64(time.Now().Add(1 * time.Second).Unix()), + "timestamp": hexutil.Uint64(time.Now().Add(1 * time.Second).Unix()), }, ethApiBlockError: nil, ethApiSyncingResult: false, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 6b74caaeaa7..3dbef7a3ac8 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -758,6 +758,11 @@ var ( Usage: "Seal the batch immediately when detecting a counter overflow", Value: false, } + MockWitnessGeneration = cli.BoolFlag{ + Name: "zkevm.mock-witness-generation", + Usage: "Mock the witness generation", + Value: false, + } ACLPrintHistory = cli.IntFlag{ Name: "acl.print-history", Usage: "Number of entries to print from the ACL history on node start up", diff --git a/docs/endpoints/endpoints.md b/docs/endpoints/endpoints.md index 20f27be1a59..e8756c070a7 100644 --- a/docs/endpoints/endpoints.md +++ b/docs/endpoints/endpoints.md @@ -200,6 +200,8 @@ If the endpoint is not in the list below, it means this specific endpoint is not - zkevm_getL2BlockInfoTree - zkevm_getLatestGlobalExitRoot - zkevm_getProverInput +- zkevm_getRollupAddress +- zkevm_getRollupManagerAddress - zkevm_getVersionHistory - zkevm_getWitness - zkevm_isBlockConsolidated diff --git a/eth/ethconfig/config_zkevm.go b/eth/ethconfig/config_zkevm.go index 6c557ff62be..c55a19429b0 100644 --- a/eth/ethconfig/config_zkevm.go +++ b/eth/ethconfig/config_zkevm.go @@ -96,6 +96,7 @@ type Zk struct { InfoTreeUpdateInterval time.Duration BadBatches []uint64 SealBatchImmediatelyOnOverflow bool + MockWitnessGeneration bool } var DefaultZkConfig = Zk{ diff --git a/test/Makefile b/test/Makefile index 1b2edad7fe5..ef1d0936beb 100644 --- a/test/Makefile +++ b/test/Makefile @@ -83,10 +83,11 @@ all: ## Runs a full node $(RUN_DOCKER_BRIDGE_COIN_KAFKA) sleep 3 $(RUN_DOCKER_DATA_AVAILABILITY) + $(RUN_DOCKER_APPROVE) # app services #sleep 3 - #$(RUN_DOCKER_STATELESS_EXECUTOR) + $(RUN_DOCKER_STATELESS_EXECUTOR) $(RUN_DOCKER_SEQ) $(RUN_DOCKER_PROVER) sleep 10 @@ -97,7 +98,7 @@ all: ## Runs a full node $(RUN_DOCKER_RPC) # bridge services - sleep 3 + sleep 30 $(RUN_DOCKER_BRIDGE_SERVICE) sleep 3 $(RUN_DOCKER_BRIDGE_UI) diff --git a/turbo/cli/default_flags.go b/turbo/cli/default_flags.go index 035461d1083..af2cd7ad053 100644 --- a/turbo/cli/default_flags.go +++ b/turbo/cli/default_flags.go @@ -328,4 +328,5 @@ var DefaultFlags = []cli.Flag{ &utils.ACLPrintHistory, &utils.InfoTreeUpdateInterval, &utils.SealBatchImmediatelyOnOverflow, + &utils.MockWitnessGeneration, } diff --git a/turbo/cli/flags_zkevm.go b/turbo/cli/flags_zkevm.go index 43d551137d9..2bc6b31fb9b 100644 --- a/turbo/cli/flags_zkevm.go +++ b/turbo/cli/flags_zkevm.go @@ -210,6 +210,7 @@ func ApplyFlagsForZkConfig(ctx *cli.Context, cfg *ethconfig.Config) { ACLPrintHistory: ctx.Int(utils.ACLPrintHistory.Name), InfoTreeUpdateInterval: ctx.Duration(utils.InfoTreeUpdateInterval.Name), SealBatchImmediatelyOnOverflow: ctx.Bool(utils.SealBatchImmediatelyOnOverflow.Name), + MockWitnessGeneration: ctx.Bool(utils.MockWitnessGeneration.Name), } // For X Layer diff --git a/turbo/jsonrpc/bor_helper.go b/turbo/jsonrpc/bor_helper.go index db0ad4ea60b..b9e826c00b7 100644 --- a/turbo/jsonrpc/bor_helper.go +++ b/turbo/jsonrpc/bor_helper.go @@ -58,7 +58,7 @@ func getHeaderByNumber(ctx context.Context, number rpc.BlockNumber, api *BorImpl return block.Header(), nil } - blockNum, _, _, err := rpchelper.GetBlockNumber(rpc.BlockNumberOrHashWithNumber(number), tx, api.filters) + blockNum, _, _, err := rpchelper.GetBlockNumber_zkevm(rpc.BlockNumberOrHashWithNumber(number), tx, api.filters) if err != nil { return nil, err } diff --git a/turbo/jsonrpc/debug_api.go b/turbo/jsonrpc/debug_api.go index 095dc8c0ec1..1f65b7d47c4 100644 --- a/turbo/jsonrpc/debug_api.go +++ b/turbo/jsonrpc/debug_api.go @@ -375,7 +375,8 @@ func (api *PrivateDebugAPIImpl) GetRawHeader(ctx context.Context, blockNrOrHash return nil, err } defer tx.Rollback() - n, h, _, err := rpchelper.GetBlockNumber(blockNrOrHash, tx, api.filters) + + n, h, _, err := rpchelper.GetBlockNumber_zkevm(blockNrOrHash, tx, api.filters) if err != nil { return nil, err } @@ -395,7 +396,7 @@ func (api *PrivateDebugAPIImpl) GetRawBlock(ctx context.Context, blockNrOrHash r return nil, err } defer tx.Rollback() - n, h, _, err := rpchelper.GetBlockNumber(blockNrOrHash, tx, api.filters) + n, h, _, err := rpchelper.GetBlockNumber_zkevm(blockNrOrHash, tx, api.filters) if err != nil { return nil, err } diff --git a/turbo/jsonrpc/erigon_block.go b/turbo/jsonrpc/erigon_block.go index f6ef01ef1cb..32a401224ef 100644 --- a/turbo/jsonrpc/erigon_block.go +++ b/turbo/jsonrpc/erigon_block.go @@ -43,7 +43,7 @@ func (api *ErigonImpl) GetHeaderByNumber(ctx context.Context, blockNumber rpc.Bl } defer tx.Rollback() - blockNum, _, _, err := rpchelper.GetBlockNumber(rpc.BlockNumberOrHashWithNumber(blockNumber), tx, api.filters) + blockNum, _, _, err := rpchelper.GetBlockNumber_zkevm(rpc.BlockNumberOrHashWithNumber(blockNumber), tx, api.filters) if err != nil { return nil, err } @@ -213,7 +213,7 @@ func (api *ErigonImpl) GetBalanceChangesInBlock(ctx context.Context, blockNrOrHa return nil, err } - blockNumber, _, _, err := rpchelper.GetBlockNumber(blockNrOrHash, tx, api.filters) + blockNumber, _, _, err := rpchelper.GetBlockNumber_zkevm(blockNrOrHash, tx, api.filters) if err != nil { return nil, err } diff --git a/turbo/jsonrpc/erigon_receipts.go b/turbo/jsonrpc/erigon_receipts.go index a738e19b509..17c6cb79b91 100644 --- a/turbo/jsonrpc/erigon_receipts.go +++ b/turbo/jsonrpc/erigon_receipts.go @@ -407,7 +407,7 @@ func (api *ErigonImpl) GetBlockReceiptsByBlockHash(ctx context.Context, cannonic } } - blockNum, _, _, err := rpchelper.GetBlockNumber(rpc.BlockNumberOrHashWithHash(cannonicalBlockHash, true), tx, api.filters) + blockNum, _, _, err := rpchelper.GetBlockNumber_zkevm(rpc.BlockNumberOrHashWithHash(cannonicalBlockHash, true), tx, api.filters) if err != nil { return nil, err } diff --git a/turbo/jsonrpc/eth_api.go b/turbo/jsonrpc/eth_api.go index f80b2e18e61..b868ea24501 100644 --- a/turbo/jsonrpc/eth_api.go +++ b/turbo/jsonrpc/eth_api.go @@ -288,7 +288,7 @@ func (api *BaseAPI) pendingBlock() *types.Block { } func (api *BaseAPI) blockByRPCNumber(ctx context.Context, number rpc.BlockNumber, tx kv.Tx) (*types.Block, error) { - n, h, _, err := rpchelper.GetBlockNumber(rpc.BlockNumberOrHashWithNumber(number), tx, api.filters) + n, h, _, err := rpchelper.GetBlockNumber_zkevm(rpc.BlockNumberOrHashWithNumber(number), tx, api.filters) if err != nil { return nil, err } @@ -299,7 +299,7 @@ func (api *BaseAPI) blockByRPCNumber(ctx context.Context, number rpc.BlockNumber } func (api *BaseAPI) headerByRPCNumber(ctx context.Context, number rpc.BlockNumber, tx kv.Tx) (*types.Header, error) { - n, h, _, err := rpchelper.GetBlockNumber(rpc.BlockNumberOrHashWithNumber(number), tx, api.filters) + n, h, _, err := rpchelper.GetBlockNumber_zkevm(rpc.BlockNumberOrHashWithNumber(number), tx, api.filters) if err != nil { return nil, err } @@ -320,7 +320,7 @@ func (api *BaseAPI) checkPruneHistory(tx kv.Tx, block uint64) error { return nil } if p.History.Enabled() { - latest, _, _, err := rpchelper.GetBlockNumber(rpc.BlockNumberOrHashWithNumber(rpc.LatestBlockNumber), tx, api.filters) + latest, _, _, err := rpchelper.GetBlockNumber_zkevm(rpc.BlockNumberOrHashWithNumber(rpc.LatestBlockNumber), tx, api.filters) if err != nil { return err } diff --git a/turbo/jsonrpc/eth_block.go b/turbo/jsonrpc/eth_block.go index d0155d6c854..2c66239b379 100644 --- a/turbo/jsonrpc/eth_block.go +++ b/turbo/jsonrpc/eth_block.go @@ -77,7 +77,7 @@ func (api *APIImpl) deprecated_CallBundle(ctx context.Context, txHashes []common } defer func(start time.Time) { log.Trace("Executing EVM call finished", "runtime", time.Since(start)) }(time.Now()) - stateBlockNumber, hash, latest, err := rpchelper.GetBlockNumber(stateBlockNumberOrHash, tx, api.filters) + stateBlockNumber, hash, latest, err := rpchelper.GetBlockNumber_zkevm(stateBlockNumberOrHash, tx, api.filters) if err != nil { return nil, err } @@ -341,7 +341,7 @@ func (api *APIImpl) GetBlockTransactionCountByNumber(ctx context.Context, blockN return &n, nil } - blockNum, blockHash, _, err := rpchelper.GetBlockNumber(rpc.BlockNumberOrHashWithNumber(blockNr), tx, api.filters) + blockNum, blockHash, _, err := rpchelper.GetBlockNumber_zkevm(rpc.BlockNumberOrHashWithNumber(blockNr), tx, api.filters) if err != nil { return nil, err } @@ -388,7 +388,7 @@ func (api *APIImpl) GetBlockTransactionCountByHash(ctx context.Context, blockHas } defer tx.Rollback() - blockNum, _, _, err := rpchelper.GetBlockNumber(rpc.BlockNumberOrHash{BlockHash: &blockHash}, tx, nil) + blockNum, _, _, err := rpchelper.GetBlockNumber_zkevm(rpc.BlockNumberOrHash{BlockHash: &blockHash}, tx, nil) if err != nil { // (Compatibility) Every other node just return `null` for when the block does not exist. log.Debug("eth_getBlockTransactionCountByHash GetBlockNumber failed", "err", err) diff --git a/turbo/jsonrpc/eth_block_zkevm.go b/turbo/jsonrpc/eth_block_zkevm.go index d0a7d87c77f..6f82477f685 100644 --- a/turbo/jsonrpc/eth_block_zkevm.go +++ b/turbo/jsonrpc/eth_block_zkevm.go @@ -75,7 +75,7 @@ func (api *APIImpl) CallBundle(ctx context.Context, txHashes []common.Hash, stat } defer func(start time.Time) { log.Trace("Executing EVM call finished", "runtime", time.Since(start)) }(time.Now()) - stateBlockNumber, hash, latest, err := rpchelper.GetBlockNumber(stateBlockNumberOrHash, tx, api.filters) + stateBlockNumber, hash, latest, err := rpchelper.GetBlockNumber_zkevm(stateBlockNumberOrHash, tx, api.filters) if err != nil { return nil, err } diff --git a/turbo/jsonrpc/eth_call.go b/turbo/jsonrpc/eth_call.go index 690bb746fc3..188ff13331c 100644 --- a/turbo/jsonrpc/eth_call.go +++ b/turbo/jsonrpc/eth_call.go @@ -56,7 +56,7 @@ func (api *APIImpl) Call(ctx context.Context, args ethapi2.CallArgs, blockNrOrHa args.Gas = (*hexutil.Uint64)(&api.GasCap) } - blockNumber, hash, _, err := rpchelper.GetCanonicalBlockNumber(blockNrOrHash, tx, api.filters) // DoCall cannot be executed on non-canonical blocks + blockNumber, hash, _, err := rpchelper.GetCanonicalBlockNumber_zkevm(blockNrOrHash, tx, api.filters) // DoCall cannot be executed on non-canonical blocks if err != nil { return nil, err } @@ -92,7 +92,7 @@ func (api *APIImpl) Call(ctx context.Context, args ethapi2.CallArgs, blockNrOrHa // headerByNumberOrHash - intent to read recent headers only, tries from the lru cache before reading from the db func headerByNumberOrHash(ctx context.Context, tx kv.Tx, blockNrOrHash rpc.BlockNumberOrHash, api *APIImpl) (*types.Header, error) { - _, bNrOrHashHash, _, err := rpchelper.GetCanonicalBlockNumber(blockNrOrHash, tx, api.filters) + _, bNrOrHashHash, _, err := rpchelper.GetCanonicalBlockNumber_zkevm(blockNrOrHash, tx, api.filters) if err != nil { return nil, err } @@ -101,7 +101,7 @@ func headerByNumberOrHash(ctx context.Context, tx kv.Tx, blockNrOrHash rpc.Block return block.Header(), nil } - blockNum, _, _, err := rpchelper.GetBlockNumber(blockNrOrHash, tx, api.filters) + blockNum, _, _, err := rpchelper.GetBlockNumber_zkevm(blockNrOrHash, tx, api.filters) if err != nil { return nil, err } @@ -227,7 +227,7 @@ func (api *APIImpl) EstimateGas(ctx context.Context, argsOrNil *ethapi2.CallArgs } engine := api.engine() - latestCanBlockNumber, latestCanHash, isLatest, err := rpchelper.GetCanonicalBlockNumber(bNrOrHash, dbtx, api.filters) // DoCall cannot be executed on non-canonical blocks + latestCanBlockNumber, latestCanHash, isLatest, err := rpchelper.GetCanonicalBlockNumber_zkevm(bNrOrHash, dbtx, api.filters) // DoCall cannot be executed on non-canonical blocks if err != nil { return 0, err } @@ -329,7 +329,7 @@ func (api *APIImpl) GetProof(ctx context.Context, address libcommon.Address, sto return nil, fmt.Errorf("not supported by Erigon3") } - blockNr, _, _, err := rpchelper.GetBlockNumber(blockNrOrHash, tx, api.filters) + blockNr, _, _, err := rpchelper.GetBlockNumber_zkevm(blockNrOrHash, tx, api.filters) if err != nil { return nil, err } @@ -444,7 +444,7 @@ func (api *APIImpl) CreateAccessList(ctx context.Context, args ethapi2.CallArgs, } engine := api.engine() - blockNumber, hash, latest, err := rpchelper.GetCanonicalBlockNumber(bNrOrHash, tx, api.filters) // DoCall cannot be executed on non-canonical blocks + blockNumber, hash, latest, err := rpchelper.GetCanonicalBlockNumber_zkevm(bNrOrHash, tx, api.filters) // DoCall cannot be executed on non-canonical blocks if err != nil { return nil, err } diff --git a/turbo/jsonrpc/eth_callMany.go b/turbo/jsonrpc/eth_callMany.go index 5aa0d59bf9a..f1748f53a83 100644 --- a/turbo/jsonrpc/eth_callMany.go +++ b/turbo/jsonrpc/eth_callMany.go @@ -106,7 +106,7 @@ func (api *APIImpl) CallMany_deprecated(ctx context.Context, bundles []Bundle, s defer func(start time.Time) { log.Trace("Executing EVM callMany finished", "runtime", time.Since(start)) }(time.Now()) - blockNum, hash, _, err := rpchelper.GetBlockNumber(simulateContext.BlockNumber, tx, api.filters) + blockNum, hash, _, err := rpchelper.GetBlockNumber_zkevm(simulateContext.BlockNumber, tx, api.filters) if err != nil { return nil, err } diff --git a/turbo/jsonrpc/eth_callMany_zkevm.go b/turbo/jsonrpc/eth_callMany_zkevm.go index d7b1c03f925..0c90a63e5f2 100644 --- a/turbo/jsonrpc/eth_callMany_zkevm.go +++ b/turbo/jsonrpc/eth_callMany_zkevm.go @@ -62,7 +62,7 @@ func (api *APIImpl) CallMany(ctx context.Context, bundles []Bundle, simulateCont defer func(start time.Time) { log.Trace("Executing EVM callMany finished", "runtime", time.Since(start)) }(time.Now()) - blockNum, hash, _, err := rpchelper.GetBlockNumber(simulateContext.BlockNumber, tx, api.filters) + blockNum, hash, _, err := rpchelper.GetBlockNumber_zkevm(simulateContext.BlockNumber, tx, api.filters) if err != nil { return nil, err } diff --git a/turbo/jsonrpc/eth_receipts.go b/turbo/jsonrpc/eth_receipts.go index e95acd6dcab..1dd8b622468 100644 --- a/turbo/jsonrpc/eth_receipts.go +++ b/turbo/jsonrpc/eth_receipts.go @@ -118,7 +118,7 @@ func (api *APIImpl) GetLogs(ctx context.Context, crit filters.FilterCriteria) (t end = header.Number.Uint64() } else { // Convert the RPC block numbers into internal representations - latest, _, _, err := rpchelper.GetBlockNumber(rpc.BlockNumberOrHashWithNumber(rpc.LatestBlockNumber), tx, nil) + latest, _, _, err := rpchelper.GetBlockNumber_zkevm(rpc.BlockNumberOrHashWithNumber(rpc.LatestBlockNumber), tx, nil) if err != nil { return nil, err } @@ -691,7 +691,7 @@ func (api *APIImpl) GetBlockReceipts(ctx context.Context, number rpc.BlockNumber } defer tx.Rollback() - blockNum, blockHash, _, err := rpchelper.GetBlockNumber(rpc.BlockNumberOrHashWithNumber(*number.BlockNumber), tx, api.filters) + blockNum, blockHash, _, err := rpchelper.GetBlockNumber_zkevm(rpc.BlockNumberOrHashWithNumber(*number.BlockNumber), tx, api.filters) if err != nil { return nil, err } diff --git a/turbo/jsonrpc/eth_txs.go b/turbo/jsonrpc/eth_txs.go index 530fe2352a3..7a4473ab3ae 100644 --- a/turbo/jsonrpc/eth_txs.go +++ b/turbo/jsonrpc/eth_txs.go @@ -236,7 +236,7 @@ func (api *APIImpl) GetTransactionByBlockNumberAndIndex_deprecated(ctx context.C } // https://infura.io/docs/ethereum/json-rpc/eth-getTransactionByBlockNumberAndIndex - blockNum, hash, _, err := rpchelper.GetBlockNumber(rpc.BlockNumberOrHashWithNumber(blockNr), tx, api.filters) + blockNum, hash, _, err := rpchelper.GetBlockNumber_zkevm(rpc.BlockNumberOrHashWithNumber(blockNr), tx, api.filters) if err != nil { return nil, err } diff --git a/turbo/jsonrpc/eth_txs_zkevm.go b/turbo/jsonrpc/eth_txs_zkevm.go index ac4cb12de00..1fc0fc26788 100644 --- a/turbo/jsonrpc/eth_txs_zkevm.go +++ b/turbo/jsonrpc/eth_txs_zkevm.go @@ -197,7 +197,7 @@ func (api *APIImpl) GetTransactionByBlockNumberAndIndex(ctx context.Context, blo } // https://infura.io/docs/ethereum/json-rpc/eth-getTransactionByBlockNumberAndIndex - blockNum, _, _, err := rpchelper.GetBlockNumber(rpc.BlockNumberOrHashWithNumber(blockNr), tx, api.filters) + blockNum, _, _, err := rpchelper.GetBlockNumber_zkevm(rpc.BlockNumberOrHashWithNumber(blockNr), tx, api.filters) if err != nil { return nil, err } diff --git a/turbo/jsonrpc/eth_uncles.go b/turbo/jsonrpc/eth_uncles.go index f0fdeb646e2..66c3a7bcbf0 100644 --- a/turbo/jsonrpc/eth_uncles.go +++ b/turbo/jsonrpc/eth_uncles.go @@ -32,7 +32,7 @@ func (api *APIImpl) GetUncleByBlockNumberAndIndex(ctx context.Context, number rp } defer tx.Rollback() - blockNum, hash, _, err := rpchelper.GetBlockNumber(rpc.BlockNumberOrHashWithNumber(number), tx, api.filters) + blockNum, hash, _, err := rpchelper.GetBlockNumber_zkevm(rpc.BlockNumberOrHashWithNumber(number), tx, api.filters) if err != nil { return nil, err } @@ -102,7 +102,7 @@ func (api *APIImpl) GetUncleCountByBlockNumber(ctx context.Context, number rpc.B } defer tx.Rollback() - blockNum, blockHash, _, err := rpchelper.GetBlockNumber(rpc.BlockNumberOrHashWithNumber(number), tx, api.filters) + blockNum, blockHash, _, err := rpchelper.GetBlockNumber_zkevm(rpc.BlockNumberOrHashWithNumber(number), tx, api.filters) if err != nil { return &n, err } diff --git a/turbo/jsonrpc/graphql_api.go b/turbo/jsonrpc/graphql_api.go index 44eff638c60..e5b4034b30f 100644 --- a/turbo/jsonrpc/graphql_api.go +++ b/turbo/jsonrpc/graphql_api.go @@ -101,7 +101,7 @@ func (api *GraphQLAPIImpl) getBlockWithSenders(ctx context.Context, number rpc.B return api.pendingBlock(), nil, nil } - blockHeight, blockHash, _, err := rpchelper.GetBlockNumber(rpc.BlockNumberOrHashWithNumber(number), tx, api.filters) + blockHeight, blockHash, _, err := rpchelper.GetBlockNumber_zkevm(rpc.BlockNumberOrHashWithNumber(number), tx, api.filters) if err != nil { return nil, nil, err } diff --git a/turbo/jsonrpc/otterscan_api.go b/turbo/jsonrpc/otterscan_api.go index 9b925903547..54b07f102bb 100644 --- a/turbo/jsonrpc/otterscan_api.go +++ b/turbo/jsonrpc/otterscan_api.go @@ -569,7 +569,7 @@ func (api *OtterscanAPIImpl) getBlockWithSenders(ctx context.Context, number rpc return api.pendingBlock(), nil, nil } - n, hash, _, err := rpchelper.GetBlockNumber(rpc.BlockNumberOrHashWithNumber(number), tx, api.filters) + n, hash, _, err := rpchelper.GetBlockNumber_zkevm(rpc.BlockNumberOrHashWithNumber(number), tx, api.filters) if err != nil { return nil, nil, err } diff --git a/turbo/jsonrpc/otterscan_has_code.go b/turbo/jsonrpc/otterscan_has_code.go index af442e8d000..8f9bfd1fe55 100644 --- a/turbo/jsonrpc/otterscan_has_code.go +++ b/turbo/jsonrpc/otterscan_has_code.go @@ -17,7 +17,7 @@ func (api *OtterscanAPIImpl) HasCode(ctx context.Context, address common.Address } defer tx.Rollback() - blockNumber, _, _, err := rpchelper.GetBlockNumber(blockNrOrHash, tx, api.filters) + blockNumber, _, _, err := rpchelper.GetBlockNumber_zkevm(blockNrOrHash, tx, api.filters) if err != nil { return false, err } diff --git a/turbo/jsonrpc/overlay_api.go b/turbo/jsonrpc/overlay_api.go index 0b6949f5b87..856452959c4 100644 --- a/turbo/jsonrpc/overlay_api.go +++ b/turbo/jsonrpc/overlay_api.go @@ -420,7 +420,7 @@ func (api *OverlayAPIImpl) replayBlock(ctx context.Context, blockNum uint64, sta overrideBlockHash = make(map[uint64]common.Hash) blockNumber := rpc.BlockNumber(blockNum) - blockNum, hash, _, err := rpchelper.GetBlockNumber(rpc.BlockNumberOrHash{BlockNumber: &blockNumber}, tx, api.filters) + blockNum, hash, _, err := rpchelper.GetBlockNumber_zkevm(rpc.BlockNumberOrHash{BlockNumber: &blockNumber}, tx, api.filters) if err != nil { return nil, err } @@ -580,7 +580,7 @@ func getBeginEnd(ctx context.Context, tx kv.Tx, api *OverlayAPIImpl, crit filter end = num } else { // Convert the RPC block numbers into internal representations - latest, _, _, err := rpchelper.GetBlockNumber(rpc.BlockNumberOrHashWithNumber(rpc.LatestExecutedBlockNumber), tx, nil) + latest, _, _, err := rpchelper.GetBlockNumber_zkevm(rpc.BlockNumberOrHashWithNumber(rpc.LatestExecutedBlockNumber), tx, nil) if err != nil { return 0, 0, err } @@ -592,7 +592,7 @@ func getBeginEnd(ctx context.Context, tx kv.Tx, api *OverlayAPIImpl, crit filter begin = uint64(fromBlock) } else { blockNum := rpc.BlockNumber(fromBlock) - begin, _, _, err = rpchelper.GetBlockNumber(rpc.BlockNumberOrHashWithNumber(blockNum), tx, api.filters) + begin, _, _, err = rpchelper.GetBlockNumber_zkevm(rpc.BlockNumberOrHashWithNumber(blockNum), tx, api.filters) if err != nil { return 0, 0, err } @@ -606,7 +606,7 @@ func getBeginEnd(ctx context.Context, tx kv.Tx, api *OverlayAPIImpl, crit filter end = uint64(toBlock) } else { blockNum := rpc.BlockNumber(toBlock) - end, _, _, err = rpchelper.GetBlockNumber(rpc.BlockNumberOrHashWithNumber(blockNum), tx, api.filters) + end, _, _, err = rpchelper.GetBlockNumber_zkevm(rpc.BlockNumberOrHashWithNumber(blockNum), tx, api.filters) if err != nil { return 0, 0, err } diff --git a/turbo/jsonrpc/trace_adhoc.go b/turbo/jsonrpc/trace_adhoc.go index 99f7d259932..224127f4429 100644 --- a/turbo/jsonrpc/trace_adhoc.go +++ b/turbo/jsonrpc/trace_adhoc.go @@ -848,7 +848,7 @@ func (api *TraceAPIImpl) ReplayBlockTransactions(ctx context.Context, blockNrOrH return nil, err } - blockNumber, blockHash, _, err := rpchelper.GetBlockNumber(blockNrOrHash, tx, api.filters) + blockNumber, blockHash, _, err := rpchelper.GetBlockNumber_zkevm(blockNrOrHash, tx, api.filters) if err != nil { return nil, err } @@ -923,7 +923,7 @@ func (api *TraceAPIImpl) Call(ctx context.Context, args TraceCallParam, traceTyp blockNrOrHash = &rpc.BlockNumberOrHash{BlockNumber: &num} } - blockNumber, hash, _, err := rpchelper.GetBlockNumber(*blockNrOrHash, tx, api.filters) + blockNumber, hash, _, err := rpchelper.GetBlockNumber_zkevm(*blockNrOrHash, tx, api.filters) if err != nil { return nil, err } @@ -1095,7 +1095,7 @@ func (api *TraceAPIImpl) CallMany(ctx context.Context, calls json.RawMessage, pa var num = rpc.LatestBlockNumber parentNrOrHash = &rpc.BlockNumberOrHash{BlockNumber: &num} } - blockNumber, hash, _, err := rpchelper.GetBlockNumber(*parentNrOrHash, dbtx, api.filters) + blockNumber, hash, _, err := rpchelper.GetBlockNumber_zkevm(*parentNrOrHash, dbtx, api.filters) if err != nil { return nil, err } @@ -1141,7 +1141,7 @@ func (api *TraceAPIImpl) doCallMany(ctx context.Context, dbtx kv.Tx, msgs []type var num = rpc.LatestBlockNumber parentNrOrHash = &rpc.BlockNumberOrHash{BlockNumber: &num} } - blockNumber, hash, _, err := rpchelper.GetBlockNumber(*parentNrOrHash, dbtx, api.filters) + blockNumber, hash, _, err := rpchelper.GetBlockNumber_zkevm(*parentNrOrHash, dbtx, api.filters) if err != nil { return nil, nil, err } diff --git a/turbo/jsonrpc/trace_filtering.go b/turbo/jsonrpc/trace_filtering.go index 66dd279e23a..2cf36b3768d 100644 --- a/turbo/jsonrpc/trace_filtering.go +++ b/turbo/jsonrpc/trace_filtering.go @@ -174,7 +174,7 @@ func (api *TraceAPIImpl) Block(ctx context.Context, blockNr rpc.BlockNumber, gas return nil, err } defer tx.Rollback() - blockNum, hash, _, err := rpchelper.GetBlockNumber(rpc.BlockNumberOrHashWithNumber(blockNr), tx, api.filters) + blockNum, hash, _, err := rpchelper.GetBlockNumber_zkevm(rpc.BlockNumberOrHashWithNumber(blockNr), tx, api.filters) if err != nil { return nil, err } diff --git a/turbo/jsonrpc/tracing.go b/turbo/jsonrpc/tracing.go index 4372a0692b7..31f1efb9d87 100644 --- a/turbo/jsonrpc/tracing.go +++ b/turbo/jsonrpc/tracing.go @@ -62,7 +62,7 @@ func (api *PrivateDebugAPIImpl) traceBlock_deprecated(ctx context.Context, block return fmt.Errorf("invalid arguments; neither block nor hash specified") } - blockNumber, hash, _, err := rpchelper.GetCanonicalBlockNumber(blockNrOrHash, tx, api.filters) + blockNumber, hash, _, err := rpchelper.GetCanonicalBlockNumber_zkevm(blockNrOrHash, tx, api.filters) if err != nil { stream.WriteNil() return err @@ -307,7 +307,7 @@ func (api *PrivateDebugAPIImpl) TraceCall(ctx context.Context, args ethapi.CallA } engine := api.engine() - blockNumber, hash, isLatest, err := rpchelper.GetBlockNumber(blockNrOrHash, dbtx, api.filters) + blockNumber, hash, isLatest, err := rpchelper.GetBlockNumber_zkevm(blockNrOrHash, dbtx, api.filters) if err != nil { return fmt.Errorf("get block number: %v", err) } @@ -405,7 +405,7 @@ func (api *PrivateDebugAPIImpl) TraceCallMany_deprecated(ctx context.Context, bu defer func(start time.Time) { log.Trace("Tracing CallMany finished", "runtime", time.Since(start)) }(time.Now()) - blockNum, hash, _, err := rpchelper.GetBlockNumber(simulateContext.BlockNumber, tx, api.filters) + blockNum, hash, _, err := rpchelper.GetBlockNumber_zkevm(simulateContext.BlockNumber, tx, api.filters) if err != nil { stream.WriteNil() return err diff --git a/turbo/jsonrpc/tracing_zkevm.go b/turbo/jsonrpc/tracing_zkevm.go index 6ea4719f881..145e64265de 100644 --- a/turbo/jsonrpc/tracing_zkevm.go +++ b/turbo/jsonrpc/tracing_zkevm.go @@ -209,7 +209,7 @@ func (api *PrivateDebugAPIImpl) TraceCallMany(ctx context.Context, bundles []Bun defer func(start time.Time) { log.Trace("Tracing CallMany finished", "runtime", time.Since(start)) }(time.Now()) - blockNum, hash, _, err := rpchelper.GetBlockNumber(simulateContext.BlockNumber, tx, api.filters) + blockNum, hash, _, err := rpchelper.GetBlockNumber_zkevm(simulateContext.BlockNumber, tx, api.filters) if err != nil { stream.WriteNil() return err diff --git a/turbo/jsonrpc/zkevm_api.go b/turbo/jsonrpc/zkevm_api.go index 14dd0b77c83..5645c49c89a 100644 --- a/turbo/jsonrpc/zkevm_api.go +++ b/turbo/jsonrpc/zkevm_api.go @@ -77,6 +77,8 @@ type ZkEvmAPI interface { GetForkById(ctx context.Context, forkId hexutil.Uint64) (res json.RawMessage, err error) GetForkIdByBatchNumber(ctx context.Context, batchNumber rpc.BlockNumber) (hexutil.Uint64, error) GetForks(ctx context.Context) (res json.RawMessage, err error) + GetRollupAddress(ctx context.Context) (res json.RawMessage, err error) + GetRollupManagerAddress(ctx context.Context) (res json.RawMessage, err error) } const getBatchWitness = "getBatchWitness" @@ -551,10 +553,13 @@ func (api *ZkEvmAPIImpl) GetBatchByNumber(ctx context.Context, rpcBatchNumber rp batch.Timestamp = types.ArgUint64(block.Time()) } - // if we don't have a datastream available to verify that a batch is actually - // closed then we fall back to existing behaviour of checking if the next batch - // has any blocks in it - if api.datastreamServer != nil { + /* + if node is a sequencer it won't have the required data stored in the db, so use the datastream + server to figure out if the batch is closed, otherwise fall back. This ensures good performance + for RPC nodes in daisy chain node which do have a datastream (previous check was testing for + presence of datastream server). + */ + if sequencer.IsSequencer() { highestClosed, err := api.datastreamServer.GetHighestClosedBatchNoCache() if err != nil { return nil, err @@ -573,7 +578,20 @@ func (api *ZkEvmAPIImpl) GetBatchByNumber(ctx context.Context, rpcBatchNumber rp return nil, err } - batch.Closed = batchNo <= latestClosedbatchNum + if batchNo <= latestClosedbatchNum { + // simple check if we have a closed batch entry higher than or equal to the one requested + batch.Closed = true + } else { + // we might be missing a batch end along the way so lets double check if we have a block + // from the next batch or not + _, foundHigher, err := hermezDb.GetLowestBlockInBatch(batchNo + 1) + if err != nil { + return nil, err + } + if foundHigher { + batch.Closed = true + } + } } // verification - if we can't find one, maybe this batch was verified along with a higher batch number @@ -718,7 +736,25 @@ func (api *ZkEvmAPIImpl) getAccInputHash(ctx context.Context, db SequenceReader, } if prevSequence == nil || batchSequence == nil { - return nil, fmt.Errorf("failed to get sequence data for batch %d", batchNum) + var missing string + if prevSequence == nil && batchSequence == nil { + missing = "previous and current batch sequences" + } else if prevSequence == nil { + missing = "previous batch sequence" + } else { + missing = "current batch sequence" + } + return nil, fmt.Errorf("failed to get %s for batch %d", missing, batchNum) + } + + // if we are asking for the injected batch or genesis return 0x0..0 + if (batchNum == 0 || batchNum == 1) && prevSequence.BatchNo == 0 { + return &common.Hash{}, nil + } + + // if prev is 0, set to 1 (injected batch) + if prevSequence.BatchNo == 0 { + prevSequence.BatchNo = 1 } // get batch range for sequence @@ -764,6 +800,7 @@ func (api *ZkEvmAPIImpl) getAccInputHash(ctx context.Context, db SequenceReader, // calculate acc input hash for i := 0; i < int(batchNum-prevSequenceBatch); i++ { accInputHash = accInputHashCalcFn(prevSequenceAccinputHash, i) + prevSequenceAccinputHash = *accInputHash } return @@ -1009,12 +1046,12 @@ func (api *ZkEvmAPIImpl) getBlockRangeWitness(ctx context.Context, db kv.RoDB, s return nil, fmt.Errorf("not supported by Erigon3") } - blockNr, _, _, err := rpchelper.GetCanonicalBlockNumber(startBlockNrOrHash, tx, api.ethApi.filters) // DoCall cannot be executed on non-canonical blocks + blockNr, _, _, err := rpchelper.GetCanonicalBlockNumber_zkevm(startBlockNrOrHash, tx, api.ethApi.filters) // DoCall cannot be executed on non-canonical blocks if err != nil { return nil, err } - endBlockNr, _, _, err := rpchelper.GetCanonicalBlockNumber(endBlockNrOrHash, tx, api.ethApi.filters) // DoCall cannot be executed on non-canonical blocks + endBlockNr, _, _, err := rpchelper.GetCanonicalBlockNumber_zkevm(endBlockNrOrHash, tx, api.ethApi.filters) // DoCall cannot be executed on non-canonical blocks if err != nil { return nil, err @@ -1608,7 +1645,7 @@ func (zkapi *ZkEvmAPIImpl) GetProof(ctx context.Context, address common.Address, return nil, fmt.Errorf("not supported by Erigon3") } - blockNr, _, _, err := rpchelper.GetBlockNumber(blockNrOrHash, tx, api.filters) + blockNr, _, _, err := rpchelper.GetBlockNumber_zkevm(blockNrOrHash, tx, api.filters) if err != nil { return nil, err } @@ -1846,3 +1883,25 @@ func (api *ZkEvmAPIImpl) GetForks(ctx context.Context) (res json.RawMessage, err return forksJson, err } + +func (api *ZkEvmAPIImpl) GetRollupAddress(ctx context.Context) (res json.RawMessage, err error) { + rollupAddress := api.config.AddressZkevm + + rollupAddressJson, err := json.Marshal(rollupAddress) + if err != nil { + return nil, err + } + + return rollupAddressJson, err +} + +func (api *ZkEvmAPIImpl) GetRollupManagerAddress(ctx context.Context) (res json.RawMessage, err error) { + rollupManagerAddress := api.config.AddressRollup + + rollupManagerAddressJson, err := json.Marshal(rollupManagerAddress) + if err != nil { + return nil, err + } + + return rollupManagerAddressJson, err +} diff --git a/turbo/jsonrpc/zkevm_api_test.go b/turbo/jsonrpc/zkevm_api_test.go index 98d48ab2e24..73d215d8190 100644 --- a/turbo/jsonrpc/zkevm_api_test.go +++ b/turbo/jsonrpc/zkevm_api_test.go @@ -1448,3 +1448,41 @@ func TestGetForks(t *testing.T) { assert.Equal(forks[2].Version, "") assert.Equal(forks[2].BlockNumber, hexutil.Uint64(3000)) } + +func TestGetRollupAddress(t *testing.T) { + assert := assert.New(t) + + // Init new ZkConfig + cfgZk := ethconfig.DefaultZkConfig + assert.NotNil(cfgZk) + + // Check rollup address of default ZkConfig + assert.Equal(cfgZk.AddressZkevm, common.HexToAddress("0x0")) + + // Modify ZkConfig + cfgZk.AddressZkevm = common.HexToAddress("0x1") + assert.Equal(cfgZk.AddressZkevm, common.HexToAddress("0x1")) + cfgZk.AddressZkevm = common.HexToAddress("0x9f77a1fB020Bf0980b75828e3fbdAB13A1D7824A") + assert.Equal(cfgZk.AddressZkevm, common.HexToAddress("0x9f77a1fB020Bf0980b75828e3fbdAB13A1D7824A")) + cfgZk.AddressZkevm = common.HexToAddress("0x5F5221e63CC430C00E65cb9D85066f710650faa9") + assert.Equal(cfgZk.AddressZkevm, common.HexToAddress("0x5F5221e63CC430C00E65cb9D85066f710650faa9")) +} + +func TestGetRollupManagerAddress(t *testing.T) { + assert := assert.New(t) + + // Init new ZkConfig + cfgZk := ethconfig.DefaultZkConfig + assert.NotNil(cfgZk) + + // Check rollup manager address of default ZkConfig + assert.Equal(cfgZk.AddressRollup, common.HexToAddress("0x0")) + + // Modify ZkConfig + cfgZk.AddressRollup = common.HexToAddress("0x1") + assert.Equal(cfgZk.AddressRollup, common.HexToAddress("0x1")) + cfgZk.AddressRollup = common.HexToAddress("0x9f77a1fB020Bf0980b75828e3fbdAB13A1D7824A") + assert.Equal(cfgZk.AddressRollup, common.HexToAddress("0x9f77a1fB020Bf0980b75828e3fbdAB13A1D7824A")) + cfgZk.AddressRollup = common.HexToAddress("0x5F5221e63CC430C00E65cb9D85066f710650faa9") + assert.Equal(cfgZk.AddressRollup, common.HexToAddress("0x5F5221e63CC430C00E65cb9D85066f710650faa9")) +} diff --git a/turbo/jsonrpc/zkevm_counters.go b/turbo/jsonrpc/zkevm_counters.go index 7d28b26ad21..00106390d85 100644 --- a/turbo/jsonrpc/zkevm_counters.go +++ b/turbo/jsonrpc/zkevm_counters.go @@ -130,7 +130,7 @@ func (zkapi *ZkEvmAPIImpl) EstimateCounters(ctx context.Context, rpcTx *zkevmRPC } engine := api.engine() - latestCanBlockNumber, latestCanHash, isLatest, err := rpchelper.GetCanonicalBlockNumber(latestNumOrHash, dbtx, api.filters) // DoCall cannot be executed on non-canonical blocks + latestCanBlockNumber, latestCanHash, isLatest, err := rpchelper.GetCanonicalBlockNumber_zkevm(latestNumOrHash, dbtx, api.filters) // DoCall cannot be executed on non-canonical blocks if err != nil { return nil, err } diff --git a/turbo/rpchelper/helper_zkevm.go b/turbo/rpchelper/helper_zkevm.go index 754d42c1cbd..659c23df7de 100644 --- a/turbo/rpchelper/helper_zkevm.go +++ b/turbo/rpchelper/helper_zkevm.go @@ -1,12 +1,18 @@ package rpchelper import ( + "errors" "fmt" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" + borfinality "github.com/ledgerwatch/erigon/polygon/bor/finality" + "github.com/ledgerwatch/erigon/polygon/bor/finality/whitelist" "github.com/ledgerwatch/erigon/rpc" "github.com/ledgerwatch/erigon/zk/hermez_db" + "github.com/ledgerwatch/erigon/zk/sequencer" ) func GetBatchNumber(rpcBatchNumber rpc.BlockNumber, tx kv.Tx, filters *Filters) (batchNumber uint64, latest bool, err error) { @@ -41,3 +47,96 @@ func GetBatchNumber(rpcBatchNumber rpc.BlockNumber, tx kv.Tx, filters *Filters) return batchNumber, latest, nil } + +func GetBlockNumber_zkevm(blockNrOrHash rpc.BlockNumberOrHash, tx kv.Tx, filters *Filters) (uint64, libcommon.Hash, bool, error) { + return _GetBlockNumber_zkevm(blockNrOrHash.RequireCanonical, blockNrOrHash, tx, filters) +} + +func GetCanonicalBlockNumber_zkevm(blockNrOrHash rpc.BlockNumberOrHash, tx kv.Tx, filters *Filters) (uint64, libcommon.Hash, bool, error) { + return _GetBlockNumber_zkevm(true, blockNrOrHash, tx, filters) +} + +func _GetBlockNumber_zkevm(requireCanonical bool, blockNrOrHash rpc.BlockNumberOrHash, tx kv.Tx, filters *Filters) (blockNumber uint64, hash libcommon.Hash, latest bool, err error) { + blockFinalizationType := stages.Finish + if sequencer.IsSequencer() { + blockFinalizationType = stages.Execution + } + + finishedBlockNumber, err := stages.GetStageProgress(tx, blockFinalizationType) + if err != nil { + return 0, libcommon.Hash{}, false, fmt.Errorf("getting finished block number: %w", err) + } + + var ok bool + hash, ok = blockNrOrHash.Hash() + if !ok { + number := *blockNrOrHash.BlockNumber + switch number { + case rpc.LatestBlockNumber: + if blockNumber, err = GetLatestFinishedBlockNumber(tx); err != nil { + return 0, libcommon.Hash{}, false, err + } + case rpc.EarliestBlockNumber: + blockNumber = 0 + case rpc.FinalizedBlockNumber: + if whitelist.GetWhitelistingService() != nil { + num := borfinality.GetFinalizedBlockNumber(tx) + if num == 0 { + // nolint + return 0, libcommon.Hash{}, false, errors.New("No finalized block") + } + + blockNum := borfinality.CurrentFinalizedBlock(tx, num).NumberU64() + blockHash := rawdb.ReadHeaderByNumber(tx, blockNum).Hash() + return blockNum, blockHash, false, nil + } + blockNumber, err = GetFinalizedBlockNumber(tx) + if err != nil { + return 0, libcommon.Hash{}, false, err + } + case rpc.SafeBlockNumber: + // [zkevm] safe not available, returns finilized instead + // blockNumber, err = GetSafeBlockNumber(tx) + blockNumber, err = GetFinalizedBlockNumber(tx) + if err != nil { + return 0, libcommon.Hash{}, false, err + } + case rpc.PendingBlockNumber: + pendingBlock := filters.LastPendingBlock() + if pendingBlock == nil { + blockNumber = finishedBlockNumber + } else { + return pendingBlock.NumberU64(), pendingBlock.Hash(), false, nil + } + case rpc.LatestExecutedBlockNumber: + blockNumber, err = stages.GetStageProgress(tx, stages.Execution) + if err != nil { + return 0, libcommon.Hash{}, false, fmt.Errorf("getting latest executed block number: %w", err) + } + default: + blockNumber = uint64(number.Int64()) + if blockNumber > finishedBlockNumber { + return 0, libcommon.Hash{}, false, fmt.Errorf("block with number %d not found", blockNumber) + } + } + hash, err = rawdb.ReadCanonicalHash(tx, blockNumber) + if err != nil { + return 0, libcommon.Hash{}, false, err + } + } else { + number := rawdb.ReadHeaderNumber(tx, hash) + if number == nil { + return 0, libcommon.Hash{}, false, fmt.Errorf("block %x not found", hash) + } + blockNumber = *number + + ch, err := rawdb.ReadCanonicalHash(tx, blockNumber) + if err != nil { + return 0, libcommon.Hash{}, false, err + } + if requireCanonical && ch != hash { + return 0, libcommon.Hash{}, false, nonCanonocalHashError{hash} + } + } + return blockNumber, hash, blockNumber == finishedBlockNumber, nil +} diff --git a/zk/datastream/client/stream_client.go b/zk/datastream/client/stream_client.go index e10471863a1..1c536b87095 100644 --- a/zk/datastream/client/stream_client.go +++ b/zk/datastream/client/stream_client.go @@ -13,6 +13,7 @@ import ( "github.com/ledgerwatch/erigon/zk/datastream/proto/github.com/0xPolygonHermez/zkevm-node/state/datastream" "github.com/ledgerwatch/erigon/zk/datastream/types" "github.com/ledgerwatch/log/v3" + "sync" ) type StreamType uint64 @@ -49,7 +50,8 @@ type StreamClient struct { // atomic lastWrittenTime atomic.Int64 - streaming atomic.Bool + mtxStreaming *sync.Mutex + streaming bool progress atomic.Uint64 stopReadingToChannel atomic.Bool @@ -58,6 +60,11 @@ type StreamClient struct { // keeps track of the latest fork from the stream to assign to l2 blocks currentFork uint64 + + // used for testing, during normal execution lots of stop streaming commands are sent + // which makes sense for an active server listening for these things but in unit tests + // this makes behaviour very unpredictable and hard to test + allowStops bool } const ( @@ -83,6 +90,7 @@ func NewClient(ctx context.Context, server string, version int, checkTimeout tim streamType: StSequencer, entryChan: make(chan interface{}, 100000), currentFork: uint64(latestDownloadedForkId), + mtxStreaming: &sync.Mutex{}, } return c @@ -133,7 +141,9 @@ func (c *StreamClient) GetL2BlockByNumber(blockNum uint64) (fullBLock *types.Ful if errors.Is(err, types.ErrAlreadyStarted) { // if the client is already started, we can stop the client and try again - c.Stop() + if errStop := c.Stop(); errStop != nil { + log.Warn("failed to send stop command", "error", errStop) + } } else if !errors.Is(err, ErrSocket) { return nil, fmt.Errorf("getL2BlockByNumber: %w", err) } @@ -142,6 +152,7 @@ func (c *StreamClient) GetL2BlockByNumber(blockNum uint64) (fullBLock *types.Ful time.Sleep(1 * time.Second) connected = c.handleSocketError(err) count++ + err = nil } return fullBLock, nil @@ -182,6 +193,10 @@ func (c *StreamClient) getL2BlockByNumber(blockNum uint64) (l2Block *types.FullL return nil, fmt.Errorf("expected block number %d but got %d", blockNum, l2Block.L2BlockNumber) } + if err := c.Stop(); err != nil { + return nil, fmt.Errorf("Stop: %w", err) + } + return l2Block, nil } @@ -203,16 +218,25 @@ func (c *StreamClient) GetLatestL2Block() (l2Block *types.FullL2Block, err error return nil, ErrFailedAttempts } if connected { - if err := c.stopStreamingIfStarted(); err != nil { - return nil, fmt.Errorf("stopStreamingIfStarted: %w", err) + if err = c.stopStreamingIfStarted(); err != nil { + err = fmt.Errorf("stopStreamingIfStarted: %w", err) } - - if l2Block, err = c.getLatestL2Block(); err == nil { - break + if err == nil { + if l2Block, err = c.getLatestL2Block(); err == nil { + break + } + err = fmt.Errorf("getLatestL2Block: %w", err) } - if !errors.Is(err, ErrSocket) { - return nil, fmt.Errorf("getLatestL2Block: %w", err) + + if err != nil && !errors.Is(err, ErrSocket) { + return nil, err + } else if errors.Is(err, types.ErrAlreadyStarted) { + // if the client is already started, we can stop the client and try again + if errStop := c.Stop(); errStop != nil { + log.Warn("failed to send stop command", "error", errStop) + } } + err = nil } time.Sleep(1 * time.Second) @@ -222,17 +246,31 @@ func (c *StreamClient) GetLatestL2Block() (l2Block *types.FullL2Block, err error return l2Block, nil } +func (c *StreamClient) getStreaming() bool { + c.mtxStreaming.Lock() + defer c.mtxStreaming.Unlock() + return c.streaming +} + +func (c *StreamClient) setStreaming(val bool) { + c.mtxStreaming.Lock() + defer c.mtxStreaming.Unlock() + c.streaming = val +} + // don't check for errors here, we just need to empty the socket for next reads func (c *StreamClient) stopStreamingIfStarted() error { - if c.streaming.Load() { - c.sendStopCmd() - c.streaming.Store(false) + if c.getStreaming() { + if err := c.sendStopCmd(); err != nil { + return fmt.Errorf("sendStopCmd: %w", err) + } + c.setStreaming(false) } // empty the socket buffer for { - c.conn.SetReadDeadline(time.Now().Add(100)) - if _, err := c.readBuffer(100); err != nil { + c.conn.SetReadDeadline(time.Now().Add(1 * time.Millisecond)) + if _, err := readBuffer(c.conn, 1000 /* arbitrary number*/); err != nil { break } } @@ -271,6 +309,10 @@ func (c *StreamClient) getLatestL2Block() (l2Block *types.FullL2Block, err error return nil, errors.New("no block found") } + if err := c.Stop(); err != nil { + return nil, fmt.Errorf("Stop: %w", err) + } + return l2Block, nil } @@ -294,15 +336,15 @@ func (c *StreamClient) Start() error { return nil } -func (c *StreamClient) Stop() { - if c.conn == nil { - return +func (c *StreamClient) Stop() error { + if c.conn == nil || !c.allowStops { + return nil } if err := c.sendStopCmd(); err != nil { - log.Warn(fmt.Sprintf("send stop command: %v", err)) + return fmt.Errorf("sendStopCmd: %w", err) } - // c.conn.Close() - // c.conn = nil + + return nil } // Command header: Get status @@ -467,7 +509,7 @@ func (c *StreamClient) handleSocketError(socketErr error) bool { // reads entries to the end of the stream // at end will wait for new entries to arrive func (c *StreamClient) readAllEntriesToChannel() (err error) { - c.streaming.Store(true) + c.setStreaming(true) c.stopReadingToChannel.Store(false) var bookmark *types.BookmarkProto @@ -502,6 +544,8 @@ func (c *StreamClient) initiateDownloadBookmark(bookmark []byte) (*types.ResultE return nil, fmt.Errorf("sendBookmarkCmd: %w", err) } + c.setStreaming(true) + re, err := c.afterStartCommand() if err != nil { return re, fmt.Errorf("afterStartCommand: %w", err) @@ -945,3 +989,11 @@ func (c *StreamClient) resetReadTimeout() error { return nil } + +// PrepUnwind handles the state of the client prior to searching to the +// common ancestor block +func (c *StreamClient) PrepUnwind() { + // this is to ensure that the later call to stop streaming if streaming + // is activated. + c.setStreaming(true) +} diff --git a/zk/datastream/client/stream_client_test.go b/zk/datastream/client/stream_client_test.go index f8078889e6b..db0f80e088a 100644 --- a/zk/datastream/client/stream_client_test.go +++ b/zk/datastream/client/stream_client_test.go @@ -50,7 +50,7 @@ func TestStreamClientReadHeaderEntry(t *testing.T) { } for _, testCase := range testCases { - c := NewClient(context.Background(), "", 0, 2*time.Second, 0) + c := NewClient(context.Background(), "", 0, 500*time.Millisecond, 0) server, conn := net.Pipe() defer server.Close() defer c.Stop() @@ -118,7 +118,7 @@ func TestStreamClientReadResultEntry(t *testing.T) { } for _, testCase := range testCases { - c := NewClient(context.Background(), "", 0, 2*time.Second, 0) + c := NewClient(context.Background(), "", 0, 500*time.Millisecond, 0) server, conn := net.Pipe() defer server.Close() defer c.Stop() @@ -191,7 +191,7 @@ func TestStreamClientReadFileEntry(t *testing.T) { }, } for _, testCase := range testCases { - c := NewClient(context.Background(), "", 0, 2*time.Second, 0) + c := NewClient(context.Background(), "", 0, 500*time.Millisecond, 0) server, conn := net.Pipe() defer c.Stop() defer server.Close() @@ -215,7 +215,7 @@ func TestStreamClientReadFileEntry(t *testing.T) { } func TestStreamClientReadParsedProto(t *testing.T) { - c := NewClient(context.Background(), "", 0, 2*time.Second, 0) + c := NewClient(context.Background(), "", 0, 500*time.Millisecond, 0) serverConn, clientConn := net.Pipe() c.conn = clientConn c.checkTimeout = 1 * time.Second @@ -287,9 +287,10 @@ func TestStreamClientGetLatestL2Block(t *testing.T) { clientConn.Close() }() - c := NewClient(context.Background(), "", 0, 2*time.Second, 0) + c := NewClient(context.Background(), "", 0, 500*time.Millisecond, 0) c.conn = clientConn c.checkTimeout = 1 * time.Second + c.allowStops = false expectedL2Block, _ := createL2BlockAndTransactions(t, 5, 0) l2BlockProto := &types.L2BlockProto{L2Block: expectedL2Block} l2BlockRaw, err := l2BlockProto.Marshal() @@ -400,11 +401,12 @@ func TestStreamClientGetL2BlockByNumber(t *testing.T) { clientConn.Close() }() - c := NewClient(context.Background(), "", 0, 2*time.Second, 0) + c := NewClient(context.Background(), "", 0, 500*time.Millisecond, 0) c.header = &types.HeaderEntry{ TotalEntries: 4, } c.conn = clientConn + c.allowStops = false c.checkTimeout = 1 * time.Second bookmark := types.NewBookmarkProto(blockNum, datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK) bookmarkRaw, err := bookmark.Marshal() @@ -487,7 +489,6 @@ func TestStreamClientGetL2BlockByNumber(t *testing.T) { return } } - } go createServerResponses(t, serverConn, bookmarkRaw, l2BlockRaw, l2TxsRaw, l2BlockEndRaw, errCh) diff --git a/zk/stages/stage_batches.go b/zk/stages/stage_batches.go index ed2b8291fa4..0fbb448c16b 100644 --- a/zk/stages/stage_batches.go +++ b/zk/stages/stage_batches.go @@ -66,7 +66,8 @@ type DatastreamClient interface { GetLatestL2Block() (*types.FullL2Block, error) GetProgressAtomic() *atomic.Uint64 Start() error - Stop() + Stop() error + PrepUnwind() } type DatastreamReadRunner interface { @@ -208,7 +209,7 @@ func SpawnStageBatches( log.Info(fmt.Sprintf("[%s] Waiting for at least one new block in datastream", logPrefix), "datastreamBlock", highestDSL2Block.L2BlockNumber, "last processed block", stageProgressBlockNo) newBlockCheckStartTIme = time.Now() } - time.Sleep(1 * time.Second) + time.Sleep(50 * time.Millisecond) } log.Debug(fmt.Sprintf("[%s] Highest block in db and datastream", logPrefix), "datastreamBlock", highestDSL2Block.L2BlockNumber, "dbBlock", stageProgressBlockNo) @@ -630,6 +631,7 @@ func rollback( tx kv.RwTx, u stagedsync.Unwinder, ) (uint64, error) { + dsQueryClient.PrepUnwind() ancestorBlockNum, ancestorBlockHash, err := findCommonAncestor(eriDb, hermezDb, dsQueryClient, latestDSBlockNum) if err != nil { return 0, err @@ -746,7 +748,9 @@ func newStreamClient(ctx context.Context, cfg BatchesCfg, latestForkId uint64) ( return nil, nil, fmt.Errorf("dsClient.Start: %w", err) } stopFn = func() { - dsClient.Stop() + if err := dsClient.Stop(); err != nil { + log.Warn("Failed to stop datastream client", "err", err) + } } } else { dsClient = cfg.dsClient diff --git a/zk/stages/stage_l1_sequencer_sync.go b/zk/stages/stage_l1_sequencer_sync.go index ee2e12f83ca..cac85804941 100644 --- a/zk/stages/stage_l1_sequencer_sync.go +++ b/zk/stages/stage_l1_sequencer_sync.go @@ -64,7 +64,6 @@ func SpawnL1SequencerSyncStage( } if progress == 0 { progress = cfg.zkCfg.L1FirstBlock - 1 - } // if the flag is set - wait for that block to be finalized on L1 before continuing @@ -200,7 +199,7 @@ Loop: const ( injectedBatchLogTransactionStartByte = 128 - injectedBatchLastGerStartByte = 31 + injectedBatchLastGerStartByte = 32 injectedBatchLastGerEndByte = 64 injectedBatchSequencerStartByte = 76 injectedBatchSequencerEndByte = 96 diff --git a/zk/stages/stage_l1_sequencer_sync_test.go b/zk/stages/stage_l1_sequencer_sync_test.go new file mode 100644 index 00000000000..5dc1f836dbb --- /dev/null +++ b/zk/stages/stage_l1_sequencer_sync_test.go @@ -0,0 +1,290 @@ +package stages + +import ( + "context" + "math/big" + "testing" + "time" + + ethereum "github.com/ledgerwatch/erigon" + "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/kv/memdb" + "github.com/ledgerwatch/erigon/cmd/rpcdaemon/commands/mocks" + "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/eth/ethconfig" + "github.com/ledgerwatch/erigon/eth/stagedsync" + "github.com/ledgerwatch/erigon/eth/stagedsync/stages" + "github.com/ledgerwatch/erigon/rpc" + "github.com/ledgerwatch/erigon/smt/pkg/db" + "github.com/ledgerwatch/erigon/zk/contracts" + "github.com/ledgerwatch/erigon/zk/hermez_db" + "github.com/ledgerwatch/erigon/zk/syncer" + "github.com/ledgerwatch/log/v3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" +) + +func TestSpawnL1SequencerSyncStage(t *testing.T) { + // arrange + ctx, db1 := context.Background(), memdb.NewTestDB(t) + tx := memdb.BeginRw(t, db1) + err := hermez_db.CreateHermezBuckets(tx) + require.NoError(t, err) + err = db.CreateEriDbBuckets(tx) + require.NoError(t, err) + + hDB := hermez_db.NewHermezDb(tx) + err = hDB.WriteBlockBatch(0, 0) + require.NoError(t, err) + err = stages.SaveStageProgress(tx, stages.L1SequencerSync, 0) + require.NoError(t, err) + + s := &stagedsync.StageState{ID: stages.L1SequencerSync, BlockNumber: 0} + u := &stagedsync.Sync{} + + // mocks + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + EthermanMock := mocks.NewMockIEtherman(mockCtrl) + + l1ContractAddresses := []common.Address{ + common.HexToAddress("0x1"), + common.HexToAddress("0x2"), + common.HexToAddress("0x3"), + } + l1ContractTopics := [][]common.Hash{ + []common.Hash{common.HexToHash("0x1")}, + []common.Hash{common.HexToHash("0x2")}, + []common.Hash{common.HexToHash("0x3")}, + } + + l1FirstBlock := big.NewInt(20) + + finalizedBlockParentHash := common.HexToHash("0x123456789") + finalizedBlockTime := uint64(time.Now().Unix()) + finalizedBlockNumber := big.NewInt(21) + finalizedBlockHeader := &types.Header{ParentHash: finalizedBlockParentHash, Number: finalizedBlockNumber, Time: finalizedBlockTime} + finalizedBlock := types.NewBlockWithHeader(finalizedBlockHeader) + + latestBlockParentHash := finalizedBlock.Hash() + latestBlockTime := uint64(time.Now().Unix()) + latestBlockNumber := big.NewInt(22) + latestBlockHeader := &types.Header{ParentHash: latestBlockParentHash, Number: latestBlockNumber, Time: latestBlockTime} + latestBlock := types.NewBlockWithHeader(latestBlockHeader) + + EthermanMock.EXPECT().HeaderByNumber(gomock.Any(), finalizedBlockNumber).Return(finalizedBlockHeader, nil).AnyTimes() + EthermanMock.EXPECT().BlockByNumber(gomock.Any(), big.NewInt(rpc.FinalizedBlockNumber.Int64())).Return(finalizedBlock, nil).AnyTimes() + EthermanMock.EXPECT().HeaderByNumber(gomock.Any(), latestBlockNumber).Return(latestBlockHeader, nil).AnyTimes() + EthermanMock.EXPECT().BlockByNumber(gomock.Any(), nil).Return(latestBlock, nil).AnyTimes() + + filterQuery := ethereum.FilterQuery{ + FromBlock: l1FirstBlock, + ToBlock: latestBlockNumber, + Addresses: l1ContractAddresses, + Topics: l1ContractTopics, + } + + type testCase struct { + name string + getLog func(hDB *hermez_db.HermezDb) (types.Log, error) + assert func(t *testing.T, hDB *hermez_db.HermezDb) + } + + const ( + forkIdBytesStartPosition = 64 + forkIdBytesEndPosition = 96 + rollupDataSize = 100 + + injectedBatchLogTransactionStartByte = 128 + injectedBatchLastGerStartByte = 32 + injectedBatchLastGerEndByte = 64 + injectedBatchSequencerStartByte = 76 + injectedBatchSequencerEndByte = 96 + ) + + testCases := []testCase{ + { + name: "InitialSequenceBatchesTopic", + getLog: func(hDB *hermez_db.HermezDb) (types.Log, error) { + ger := common.HexToHash("0x111111111") + sequencer := common.HexToAddress("0x222222222") + batchL2Data := common.HexToHash("0x333333333") + + initialSequenceBatchesData := make([]byte, 200) + copy(initialSequenceBatchesData[injectedBatchLastGerStartByte:injectedBatchLastGerEndByte], ger.Bytes()) + copy(initialSequenceBatchesData[injectedBatchSequencerStartByte:injectedBatchSequencerEndByte], sequencer.Bytes()) + copy(initialSequenceBatchesData[injectedBatchLogTransactionStartByte:], batchL2Data.Bytes()) + return types.Log{ + BlockNumber: latestBlockNumber.Uint64(), + Address: l1ContractAddresses[0], + Topics: []common.Hash{contracts.InitialSequenceBatchesTopic}, + Data: initialSequenceBatchesData, + }, nil + }, + assert: func(t *testing.T, hDB *hermez_db.HermezDb) { + ger := common.HexToHash("0x111111111") + sequencer := common.HexToAddress("0x222222222") + batchL2Data := common.HexToHash("0x333333333") + + l1InjectedBatch, err := hDB.GetL1InjectedBatch(0) + require.NoError(t, err) + + assert.Equal(t, l1InjectedBatch.L1BlockNumber, latestBlock.NumberU64()) + assert.Equal(t, l1InjectedBatch.Timestamp, latestBlock.Time()) + assert.Equal(t, l1InjectedBatch.L1BlockHash, latestBlock.Hash()) + assert.Equal(t, l1InjectedBatch.L1ParentHash, latestBlock.ParentHash()) + assert.Equal(t, l1InjectedBatch.LastGlobalExitRoot.String(), ger.String()) + assert.Equal(t, l1InjectedBatch.Sequencer.String(), sequencer.String()) + assert.ElementsMatch(t, l1InjectedBatch.Transaction, batchL2Data.Bytes()) + }, + }, + { + name: "AddNewRollupType", + getLog: func(hDB *hermez_db.HermezDb) (types.Log, error) { + rollupType := uint64(1) + rollupTypeHash := common.BytesToHash(big.NewInt(0).SetUint64(rollupType).Bytes()) + rollupData := make([]byte, rollupDataSize) + rollupForkId := uint64(111) + rollupForkIdHash := common.BytesToHash(big.NewInt(0).SetUint64(rollupForkId).Bytes()) + copy(rollupData[forkIdBytesStartPosition:forkIdBytesEndPosition], rollupForkIdHash.Bytes()) + return types.Log{ + BlockNumber: latestBlockNumber.Uint64(), + Address: l1ContractAddresses[0], + Topics: []common.Hash{contracts.AddNewRollupTypeTopic, rollupTypeHash}, + Data: rollupData, + }, nil + }, + assert: func(t *testing.T, hDB *hermez_db.HermezDb) { + forkID, err := hDB.GetForkFromRollupType(uint64(1)) + require.NoError(t, err) + + assert.Equal(t, forkID, uint64(111)) + }, + }, + { + name: "AddNewRollupTypeTopicBanana", + getLog: func(hDB *hermez_db.HermezDb) (types.Log, error) { + rollupType := uint64(2) + rollupTypeHash := common.BytesToHash(big.NewInt(0).SetUint64(rollupType).Bytes()) + rollupData := make([]byte, rollupDataSize) + rollupForkId := uint64(222) + rollupForkIdHash := common.BytesToHash(big.NewInt(0).SetUint64(rollupForkId).Bytes()) + copy(rollupData[forkIdBytesStartPosition:forkIdBytesEndPosition], rollupForkIdHash.Bytes()) + return types.Log{ + BlockNumber: latestBlockNumber.Uint64(), + Address: l1ContractAddresses[0], + Topics: []common.Hash{contracts.AddNewRollupTypeTopicBanana, rollupTypeHash}, + Data: rollupData, + }, nil + }, + assert: func(t *testing.T, hDB *hermez_db.HermezDb) { + forkID, err := hDB.GetForkFromRollupType(uint64(2)) + require.NoError(t, err) + + assert.Equal(t, forkID, uint64(222)) + }, + }, + { + name: "CreateNewRollupTopic", + getLog: func(hDB *hermez_db.HermezDb) (types.Log, error) { + rollupID := uint64(99999) + rollupIDHash := common.BytesToHash(big.NewInt(0).SetUint64(rollupID).Bytes()) + rollupType := uint64(33) + rollupForkID := uint64(333) + if funcErr := hDB.WriteRollupType(rollupType, rollupForkID); funcErr != nil { + return types.Log{}, funcErr + } + newRollupDataCreation := common.BytesToHash(big.NewInt(0).SetUint64(rollupType).Bytes()).Bytes() + + return types.Log{ + BlockNumber: latestBlockNumber.Uint64(), + Address: l1ContractAddresses[0], + Topics: []common.Hash{contracts.CreateNewRollupTopic, rollupIDHash}, + Data: newRollupDataCreation, + }, nil + }, + assert: func(t *testing.T, hDB *hermez_db.HermezDb) { + forks, batches, err := hDB.GetAllForkHistory() + for i := 0; i < len(forks); i++ { + if forks[i] == uint64(333) { + assert.Equal(t, batches[i], uint64(0)) + break + } + } + require.NoError(t, err) + }, + }, + { + name: "UpdateRollupTopic", + getLog: func(hDB *hermez_db.HermezDb) (types.Log, error) { + rollupID := uint64(99999) + rollupIDHash := common.BytesToHash(big.NewInt(0).SetUint64(rollupID).Bytes()) + rollupType := uint64(44) + rollupTypeHash := common.BytesToHash(big.NewInt(0).SetUint64(rollupType).Bytes()) + rollupForkID := uint64(444) + if funcErr := hDB.WriteRollupType(rollupType, rollupForkID); funcErr != nil { + return types.Log{}, funcErr + } + latestVerified := uint64(4444) + latestVerifiedHash := common.BytesToHash(big.NewInt(0).SetUint64(latestVerified).Bytes()) + updateRollupData := rollupTypeHash.Bytes() + updateRollupData = append(updateRollupData, latestVerifiedHash.Bytes()...) + + return types.Log{ + BlockNumber: latestBlockNumber.Uint64(), + Address: l1ContractAddresses[0], + Topics: []common.Hash{contracts.UpdateRollupTopic, rollupIDHash}, + Data: updateRollupData, + }, nil + }, + assert: func(t *testing.T, hDB *hermez_db.HermezDb) { + forks, batches, err := hDB.GetAllForkHistory() + for i := 0; i < len(forks); i++ { + if forks[i] == uint64(444) { + assert.Equal(t, batches[i], uint64(4444)) + break + } + } + require.NoError(t, err) + }, + }, + } + + filteredLogs := []types.Log{} + for _, tc := range testCases { + ll, err := tc.getLog(hDB) + require.NoError(t, err) + filteredLogs = append(filteredLogs, ll) + } + + EthermanMock.EXPECT().FilterLogs(gomock.Any(), filterQuery).Return(filteredLogs, nil).AnyTimes() + + l1Syncer := syncer.NewL1Syncer(ctx, []syncer.IEtherman{EthermanMock}, l1ContractAddresses, l1ContractTopics, 10, 0, "latest") + // updater := l1infotree.NewUpdater(ðconfig.Zk{}, l1Syncer) + zkCfg := ðconfig.Zk{ + L1RollupId: uint64(99999), + L1FirstBlock: l1FirstBlock.Uint64(), + L1FinalizedBlockRequirement: uint64(21), + } + cfg := StageL1SequencerSyncCfg(db1, zkCfg, l1Syncer) + + // act + err = SpawnL1SequencerSyncStage(s, u, tx, cfg, ctx, log.New()) + require.NoError(t, err) + + // assert + for _, tc := range testCases { + tc.assert(t, hDB) + } +} + +func TestUnwindL1SequencerSyncStage(t *testing.T) { + err := UnwindL1SequencerSyncStage(nil, nil, L1SequencerSyncCfg{}, context.Background()) + assert.Nil(t, err) +} + +func TestPruneL1SequencerSyncStage(t *testing.T) { + err := PruneL1SequencerSyncStage(nil, nil, L1SequencerSyncCfg{}, context.Background()) + assert.Nil(t, err) +} diff --git a/zk/stages/test_utils.go b/zk/stages/test_utils.go index f24557522b4..221ccc1734b 100644 --- a/zk/stages/test_utils.go +++ b/zk/stages/test_utils.go @@ -100,6 +100,11 @@ func (c *TestDatastreamClient) Start() error { return nil } -func (c *TestDatastreamClient) Stop() { +func (c *TestDatastreamClient) Stop() error { c.isStarted = false + return nil +} + +func (c *TestDatastreamClient) PrepUnwind() { + // do nothing } diff --git a/zk/tests/unwinds/unwind.sh b/zk/tests/unwinds/unwind.sh index 84b5f436180..b48f1c15c55 100755 --- a/zk/tests/unwinds/unwind.sh +++ b/zk/tests/unwinds/unwind.sh @@ -15,8 +15,8 @@ dataPath="./datadir" firstStop=11204 stopBlock=11315 unwindBatch=70 -firstTimeout=150s -secondTimeout=150s +firstTimeout=300s +secondTimeout=300s rm -rf "$dataPath/rpc-datadir" rm -rf "$dataPath/phase1-dump1" @@ -27,10 +27,10 @@ rm -rf "$dataPath/phase1-diffs" rm -rf "$dataPath/phase2-diffs" # run datastream server -timeout 600s go run ./zk/debug_tools/datastream-host --file="$(pwd)/zk/tests/unwinds/datastream/hermez-dynamic-integration8-datastream/data-stream.bin" & +go run ./zk/debug_tools/datastream-host --file="$(pwd)/zk/tests/unwinds/datastream/hermez-dynamic-integration8-datastream/data-stream.bin" & # in order to start the datastream server -sleep 5 +sleep 10 # run erigon for a while to sync to the unwind point to capture the dump timeout $firstTimeout ./build/bin/cdk-erigon \ diff --git a/zk/witness/witness.go b/zk/witness/witness.go index 2350fd250fc..1c1bb3acf7c 100644 --- a/zk/witness/witness.go +++ b/zk/witness/witness.go @@ -191,6 +191,12 @@ func (g *Generator) generateWitness(tx kv.Tx, ctx context.Context, batchNum uint log.Info("Generating witness timing", "batch", batchNum, "blockFrom", blocks[0].NumberU64(), "blockTo", blocks[len(blocks)-1].NumberU64(), "taken", diff) }() + areExecutorUrlsEmpty := len(g.zkConfig.ExecutorUrls) == 0 || g.zkConfig.ExecutorUrls[0] == "" + shouldGenerateMockWitness := g.zkConfig.MockWitnessGeneration && areExecutorUrlsEmpty + if shouldGenerateMockWitness { + return g.generateMockWitness(batchNum, blocks, debug) + } + endBlock := blocks[len(blocks)-1].NumberU64() startBlock := blocks[0].NumberU64() @@ -324,7 +330,6 @@ func (g *Generator) generateWitness(tx kv.Tx, ctx context.Context, batchNum uint chainReader := stagedsync.NewChainReaderImpl(g.chainCfg, tx, nil, log.New()) _, err = core.ExecuteBlockEphemerallyZk(g.chainCfg, &vmConfig, getHashFn, engine, block, tds, trieStateWriter, chainReader, nil, hermezDb, &prevStateRoot) - if err != nil { return nil, err } @@ -362,3 +367,21 @@ func getWitnessBytes(witness *trie.Witness, debug bool) ([]byte, error) { } return buf.Bytes(), nil } + +func (g *Generator) generateMockWitness(batchNum uint64, blocks []*eritypes.Block, debug bool) ([]byte, error) { + mockWitness := []byte("mockWitness") + startBlockNumber := blocks[0].NumberU64() + endBlockNumber := blocks[len(blocks)-1].NumberU64() + + if debug { + log.Info( + "Generated mock witness", + "witness", mockWitness, + "batch", batchNum, + "startBlockNumber", startBlockNumber, + "endBlockNumber", endBlockNumber, + ) + } + + return mockWitness, nil +}