Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server/v2): init the indexer in server/v2 (partial backport #22218) #22324

Merged
merged 4 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,6 @@ func (c *Consensus[T]) SetStreamingManager(sm streaming.Manager) {
c.streaming = sm
}

// SetListener sets the listener for the consensus module.
func (c *Consensus[T]) SetListener(l *appdata.Listener) {
c.listener = l
}

// RegisterSnapshotExtensions registers the given extensions with the consensus module's snapshot manager.
// It allows additional snapshotter implementations to be used for creating and restoring snapshots.
func (c *Consensus[T]) RegisterSnapshotExtensions(extensions ...snapshots.ExtensionSnapshotter) error {
Expand Down
8 changes: 7 additions & 1 deletion server/v2/cometbft/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cometbft
import (
cmtcfg "github.com/cometbft/cometbft/config"

"cosmossdk.io/schema/indexer"
"cosmossdk.io/server/v2/cometbft/mempool"
)

Expand All @@ -23,6 +24,10 @@ func DefaultAppTomlConfig() *AppTomlConfig {
Trace: false,
Standalone: false,
Mempool: mempool.DefaultConfig(),
Indexer: indexer.IndexingConfig{
Target: make(map[string]indexer.Config),
ChannelBufferSize: 1024,
},
}
}

Expand All @@ -37,7 +42,8 @@ type AppTomlConfig struct {
Standalone bool `mapstructure:"standalone" toml:"standalone" comment:"standalone starts the application without the CometBFT node. The node should be started separately."`

// Sub configs
Mempool mempool.Config `mapstructure:"mempool" toml:"mempool" comment:"mempool defines the configuration for the SDK built-in app-side mempool implementations."`
Mempool mempool.Config `mapstructure:"mempool" toml:"mempool" comment:"mempool defines the configuration for the SDK built-in app-side mempool implementations."`
Indexer indexer.IndexingConfig `mapstructure:"indexer" toml:"indexer" comment:"indexer defines the configuration for the SDK built-in indexer implementation."`
}

// CfgOption is a function that allows to overwrite the default server configuration.
Expand Down
15 changes: 15 additions & 0 deletions server/v2/cometbft/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"cosmossdk.io/core/transaction"
"cosmossdk.io/log"
"cosmossdk.io/schema/indexer"
serverv2 "cosmossdk.io/server/v2"
cometlog "cosmossdk.io/server/v2/cometbft/log"
"cosmossdk.io/server/v2/cometbft/mempool"
Expand Down Expand Up @@ -131,6 +132,20 @@ func (s *CometBFTServer[T]) Init(appI serverv2.AppI[T], cfg map[string]any, logg
return err
}
consensus.snapshotManager = snapshots.NewManager(snapshotStore, s.serverOptions.SnapshotOptions(cfg), sc, ss, nil, s.logger)

// initialize the indexer
if indexerCfg := s.config.AppTomlConfig.Indexer; len(indexerCfg.Target) > 0 {
listener, err := indexer.StartIndexing(indexer.IndexingOptions{
Config: indexerCfg,
Resolver: appI.SchemaDecoderResolver(),
Logger: s.logger.With(log.ModuleKey, "indexer"),
})
if err != nil {
return fmt.Errorf("failed to start indexing: %w", err)
}
consensus.listener = &listener.Listener
}

s.Consensus = consensus

return nil
Expand Down
10 changes: 9 additions & 1 deletion tools/confix/data/v2-app.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ standalone = false
# max-txs defines the maximum number of transactions that can be in the mempool. A value of 0 indicates an unbounded mempool, a negative value disables the app-side mempool.
max-txs = -1

# indexer defines the configuration for the SDK built-in indexer implementation.
[comet.indexer]
# Buffer size of the channels used for buffering data sent to indexer go routines.
channel_buffer_size = 1024

# Target is a map of named indexer targets to their configuration.
[comet.indexer.target]

[grpc]
# Enable defines if the gRPC server should be enabled.
enable = true
Expand Down Expand Up @@ -77,7 +85,7 @@ skip-fast-storage-upgrade = true
# Enable enables the application telemetry functionality. When enabled, an in-memory sink is also enabled by default. Operators may also enabled other sinks such as Prometheus.
enable = true
# Address defines the metrics server address to bind to.
address = 'localhost:1338'
address = 'localhost:1318'
# Prefixed with keys to separate services.
service-name = ''
# Enable prefixing gauge values with hostname.
Expand Down
Loading