Skip to content

Commit

Permalink
da: add cli flags for da config
Browse files Browse the repository at this point in the history
da: add get timeout to da client
  • Loading branch information
tuxcanfly committed Jan 16, 2024
1 parent 761d07a commit 9c6a1db
Show file tree
Hide file tree
Showing 15 changed files with 162 additions and 53 deletions.
6 changes: 6 additions & 0 deletions op-batcher/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/flags"
celestia "github.com/ethereum-optimism/optimism/op-celestia"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
Expand Down Expand Up @@ -63,6 +64,7 @@ type CLIConfig struct {
PprofConfig oppprof.CLIConfig
CompressorConfig compressor.CLIConfig
RPC oprpc.CLIConfig
DaConfig celestia.CLIConfig
}

func (c *CLIConfig) Check() error {
Expand Down Expand Up @@ -100,6 +102,9 @@ func (c *CLIConfig) Check() error {
if err := c.RPC.Check(); err != nil {
return err
}
if err := c.DaConfig.Check(); err != nil {
return err
}
return nil
}

Expand All @@ -125,5 +130,6 @@ func NewConfig(ctx *cli.Context) *CLIConfig {
PprofConfig: oppprof.ReadCLIConfig(ctx),
CompressorConfig: compressor.ReadCLIConfig(ctx),
RPC: oprpc.ReadCLIConfig(ctx),
DaConfig: celestia.ReadCLIConfig(ctx),
}
}
19 changes: 4 additions & 15 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"io"
"math/big"
_ "net/http/pprof"
"os"
"sync"
"time"

Expand All @@ -17,6 +16,7 @@ import (
"github.com/ethereum/go-ethereum/log"

"github.com/ethereum-optimism/optimism/op-batcher/metrics"
celestia "github.com/ethereum-optimism/optimism/op-celestia"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/dial"
Expand Down Expand Up @@ -48,6 +48,7 @@ type DriverSetup struct {
L1Client L1Client
EndpointProvider dial.L2EndpointProvider
ChannelConfig ChannelConfig
DAClient *celestia.DAClient
}

// BatchSubmitter encapsulates a service responsible for submitting L2 tx
Expand All @@ -62,8 +63,6 @@ type BatchSubmitter struct {
killCtx context.Context
cancelKillCtx context.CancelFunc

daClient *rollup.DAClient

mutex sync.Mutex
running bool

Expand Down Expand Up @@ -101,16 +100,6 @@ func (l *BatchSubmitter) StartBatchSubmitting() error {
l.wg.Add(1)
go l.loop()

daRpc := os.Getenv("OP_BATCHER_DA_RPC")
if daRpc == "" {
daRpc = "localhost:26650"
}
daClient, err := rollup.NewDAClient(daRpc)
if err != nil {
return err
}
l.daClient = daClient

l.Log.Info("Batch Submitter started")
return nil
}
Expand Down Expand Up @@ -375,11 +364,11 @@ func (l *BatchSubmitter) sendTransaction(txdata txData, queue *txmgr.Queue[txDat
data := txdata.Bytes()

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Duration(l.RollupConfig.BlockTime)*time.Second)
ids, _, err := l.daClient.Client.Submit(ctx, [][]byte{data}, -1)
ids, _, err := l.DAClient.Client.Submit(ctx, [][]byte{data}, -1)
cancel()
if err == nil && len(ids) == 1 {
l.Log.Info("celestia: blob successfully submitted", "id", hex.EncodeToString(ids[0]))
data = append([]byte{derive.DerivationVersionCelestia}, ids[0]...)
data = append([]byte{celestia.DerivationVersionCelestia}, ids[0]...)
} else {
l.Log.Info("celestia: blob submission failed; falling back to eth", "err", err)
}
Expand Down
15 changes: 15 additions & 0 deletions op-batcher/batcher/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-batcher/rpc"
celestia "github.com/ethereum-optimism/optimism/op-celestia"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
"github.com/ethereum-optimism/optimism/op-service/dial"
Expand Down Expand Up @@ -66,6 +67,7 @@ type BatcherService struct {
stopped atomic.Bool

NotSubmittingOnStart bool
DAClient *celestia.DAClient
}

// BatcherServiceFromCLIConfig creates a new BatcherService from a CLIConfig.
Expand Down Expand Up @@ -109,6 +111,9 @@ func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string,
if err := bs.initPProf(cfg); err != nil {
return fmt.Errorf("failed to start pprof server: %w", err)
}
if err := bs.initDA(cfg); err != nil {
return fmt.Errorf("failed to start da server: %w", err)
}
bs.initDriver()
if err := bs.initRPCServer(cfg); err != nil {
return fmt.Errorf("failed to start RPC server: %w", err)
Expand Down Expand Up @@ -242,6 +247,7 @@ func (bs *BatcherService) initDriver() {
L1Client: bs.L1Client,
EndpointProvider: bs.EndpointProvider,
ChannelConfig: bs.ChannelConfig,
DAClient: bs.DAClient,
})
}

Expand All @@ -265,6 +271,15 @@ func (bs *BatcherService) initRPCServer(cfg *CLIConfig) error {
return nil
}

func (bs *BatcherService) initDA(cfg *CLIConfig) error {
client, err := celestia.NewDAClient(cfg.DaConfig.DaRpc)
if err != nil {
return err
}
bs.DAClient = client
return nil
}

// Start runs once upon start of the batcher lifecycle,
// and starts batch-submission work if the batcher is configured to start submit data on startup.
func (bs *BatcherService) Start(_ context.Context) error {
Expand Down
2 changes: 2 additions & 0 deletions op-batcher/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/urfave/cli/v2"

"github.com/ethereum-optimism/optimism/op-batcher/compressor"
celestia "github.com/ethereum-optimism/optimism/op-celestia"
opservice "github.com/ethereum-optimism/optimism/op-service"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
Expand Down Expand Up @@ -109,6 +110,7 @@ func init() {
optionalFlags = append(optionalFlags, oppprof.CLIFlags(EnvVarPrefix)...)
optionalFlags = append(optionalFlags, txmgr.CLIFlags(EnvVarPrefix)...)
optionalFlags = append(optionalFlags, compressor.CLIFlags(EnvVarPrefix)...)
optionalFlags = append(optionalFlags, celestia.CLIFlags(EnvVarPrefix)...)

Flags = append(requiredFlags, optionalFlags...)
}
Expand Down
73 changes: 73 additions & 0 deletions op-celestia/cli.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package celestia

import (
"fmt"
"net/url"

"github.com/urfave/cli/v2"

opservice "github.com/ethereum-optimism/optimism/op-service"
)

const (
DaRpcFlagName = "da.rpc"
)

var (
defaultDaRpc = "localhost:26650"
)

func CLIFlags(envPrefix string) []cli.Flag {
return []cli.Flag{
&cli.StringFlag{
Name: DaRpcFlagName,
Usage: "dial address of data availability grpc client",
Value: defaultDaRpc,
EnvVars: opservice.PrefixEnvVar(envPrefix, "DA_RPC"),
},
}
}

type Config struct {
DaRpc string
}

func (c Config) Check() error {
if c.DaRpc == "" {
c.DaRpc = defaultDaRpc
}

if _, err := url.Parse(c.DaRpc); err != nil {
return fmt.Errorf("invalid da rpc: %w", err)
}

return nil
}

type CLIConfig struct {
DaRpc string
}

func (c CLIConfig) Check() error {
if c.DaRpc == "" {
c.DaRpc = defaultDaRpc
}

if _, err := url.Parse(c.DaRpc); err != nil {
return fmt.Errorf("invalid da rpc: %w", err)
}

return nil
}

func NewCLIConfig() CLIConfig {
return CLIConfig{
DaRpc: defaultDaRpc,
}
}

func ReadCLIConfig(ctx *cli.Context) CLIConfig {
return CLIConfig{
DaRpc: ctx.String(DaRpcFlagName),
}
}
10 changes: 10 additions & 0 deletions op-celestia/da.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package celestia

// DerivationVersionCelestia is a byte marker for celestia references submitted
// to the batch inbox address as calldata.
// Mnemonic 0xce = celestia
// version 0xce references are encoded as:
// [8]byte block height ++ [32]byte commitment
// in little-endian encoding.
// see: https://github.com/rollkit/celestia-da/blob/1f2df375fd2fcc59e425a50f7eb950daa5382ef0/celestia.go#L141-L160
const DerivationVersionCelestia = 0xce
10 changes: 7 additions & 3 deletions op-node/rollup/da_client.go → op-celestia/da_client.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package rollup
package celestia

import (
"time"

"github.com/rollkit/go-da/proxy"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

type DAClient struct {
Client *proxy.Client
Client *proxy.Client
GetTimeout time.Duration
}

func NewDAClient(rpc string) (*DAClient, error) {
Expand All @@ -17,6 +20,7 @@ func NewDAClient(rpc string) (*DAClient, error) {
return nil, err
}
return &DAClient{
Client: client,
Client: client,
GetTimeout: time.Minute,
}, nil
}
2 changes: 2 additions & 0 deletions op-node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/urfave/cli/v2"

celestia "github.com/ethereum-optimism/optimism/op-celestia"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
openum "github.com/ethereum-optimism/optimism/op-service/enum"
opflags "github.com/ethereum-optimism/optimism/op-service/flags"
Expand Down Expand Up @@ -319,6 +320,7 @@ func init() {
optionalFlags = append(optionalFlags, oplog.CLIFlags(EnvVarPrefix)...)
optionalFlags = append(optionalFlags, DeprecatedFlags...)
optionalFlags = append(optionalFlags, opflags.CLIFlags(EnvVarPrefix)...)
optionalFlags = append(optionalFlags, celestia.CLIFlags(EnvVarPrefix)...)
Flags = append(requiredFlags, optionalFlags...)
}

Expand Down
6 changes: 6 additions & 0 deletions op-node/node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math"
"time"

celestia "github.com/ethereum-optimism/optimism/op-celestia"
"github.com/ethereum-optimism/optimism/op-node/flags"
"github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/rollup"
Expand Down Expand Up @@ -62,6 +63,8 @@ type Config struct {

// [OPTIONAL] The reth DB path to read receipts from
RethDBPath string

DaConfig celestia.Config
}

type RPCConfig struct {
Expand Down Expand Up @@ -141,5 +144,8 @@ func (cfg *Config) Check() error {
if !(cfg.RollupHalt == "" || cfg.RollupHalt == "major" || cfg.RollupHalt == "minor" || cfg.RollupHalt == "patch") {
return fmt.Errorf("invalid rollup halting option: %q", cfg.RollupHalt)
}
if err := cfg.DaConfig.Check(); err != nil {
return fmt.Errorf("da config error: %w", err)
}
return nil
}
7 changes: 7 additions & 0 deletions op-node/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger)
if err := n.initL1(ctx, cfg); err != nil {
return fmt.Errorf("failed to init L1: %w", err)
}
if err := n.initDA(ctx, cfg); err != nil {
return fmt.Errorf("failed to init da: %w", err)
}
if err := n.initL2(ctx, cfg, snapshotLog); err != nil {
return fmt.Errorf("failed to init L2: %w", err)
}
Expand Down Expand Up @@ -288,6 +291,10 @@ func (n *OpNode) initRuntimeConfig(ctx context.Context, cfg *Config) error {
return nil
}

func (n *OpNode) initDA(ctx context.Context, cfg *Config) error {
return driver.SetDAClient(cfg.DaConfig)
}

func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger) error {
rpcClient, rpcCfg, err := cfg.L2.Setup(ctx, n.log, &cfg.Rollup)
if err != nil {
Expand Down
16 changes: 13 additions & 3 deletions op-node/rollup/derive/calldata_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,27 @@ import (
"errors"
"fmt"
"io"
"time"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"

celestia "github.com/ethereum-optimism/optimism/op-celestia"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
)

var daClient *celestia.DAClient

func SetDAClient(c *celestia.DAClient) error {
if daClient != nil {
return errors.New("da client already configured")
}
daClient = c
return nil
}

type DataIter interface {
Next(ctx context.Context) (eth.Data, error)
}
Expand Down Expand Up @@ -148,9 +158,9 @@ func DataFromEVMTransactions(dsCfg DataSourceConfig, batcherAddr common.Address,
out = append(out, data)
default:
switch data[0] {
case DerivationVersionCelestia:
case celestia.DerivationVersionCelestia:
log.Info("celestia: blob request", "id", hex.EncodeToString(tx.Data()))
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Duration(config.BlockTime)*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), daClient.GetTimeout)
blobs, err := daClient.Client.Get(ctx, [][]byte{data[1:]})
cancel()
if err != nil {
Expand Down
Loading

0 comments on commit 9c6a1db

Please sign in to comment.