Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Allow passing different configurators to pitaya #389

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewBuilderWithConfigs(
serverType string,
serverMode ServerMode,
serverMetadata map[string]string,
conf *config.Config,
conf config.Config,
) *Builder {
pitayaConfig := config.NewPitayaConfig(conf)
return NewBuilder(
Expand Down
2 changes: 1 addition & 1 deletion cluster/info_retriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestInfoRetrieverRegion(t *testing.T) {

c := viper.New()
c.Set("pitaya.cluster.info.region", "us")
conf := config.NewConfig(c)
conf := config.NewConfig(*c)

infoRetriever := NewInfoRetriever(*&config.NewPitayaConfig(conf).Cluster.Info)

Expand Down
6 changes: 3 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func NewDefaultPitayaConfig() *PitayaConfig {
}

// NewPitayaConfig returns a config instance with values extracted from default config paths
func NewPitayaConfig(config *Config) *PitayaConfig {
func NewPitayaConfig(config Config) *PitayaConfig {
conf := NewDefaultPitayaConfig()
if err := config.UnmarshalKey("pitaya", &conf); err != nil {
panic(err)
Expand Down Expand Up @@ -339,7 +339,7 @@ func NewDefaultCustomMetricsSpec() *models.CustomMetricsSpec {
}

// NewCustomMetricsSpec returns a *CustomMetricsSpec by reading config key (DEPRECATED)
func NewCustomMetricsSpec(config *Config) *models.CustomMetricsSpec {
func NewCustomMetricsSpec(config Config) *models.CustomMetricsSpec {
spec := &models.CustomMetricsSpec{}

if err := config.UnmarshalKey("pitaya.metrics.custom", &spec); err != nil {
Expand Down Expand Up @@ -537,7 +537,7 @@ func newDefaultEtcdGroupServiceConfig() *EtcdGroupServiceConfig {
}

// NewEtcdGroupServiceConfig reads from config to build ETCD configuration
func newEtcdGroupServiceConfig(config *Config) *EtcdGroupServiceConfig {
func newEtcdGroupServiceConfig(config Config) *EtcdGroupServiceConfig {
conf := newDefaultEtcdGroupServiceConfig()
if err := config.UnmarshalKey("pitaya.groups.etcd", &conf); err != nil {
panic(err)
Expand Down
26 changes: 13 additions & 13 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,22 +58,22 @@ func TestNewConfig(t *testing.T) {
cfg.SetDefault("pitaya.no.default", "custom")

tables := []struct {
in []*viper.Viper
in []viper.Viper
key string
val interface{}
}{
{[]*viper.Viper{}, "pitaya.buffer.agent.messages", 100},
{[]*viper.Viper{cfg}, "pitaya.buffer.agent.messages", 20},
{[]*viper.Viper{}, "pitaya.no.default", nil},
{[]*viper.Viper{cfg}, "pitaya.no.default", "custom"},
{[]*viper.Viper{}, "pitaya.concurrency.handler.dispatch", 25},
{[]*viper.Viper{cfg}, "pitaya.concurrency.handler.dispatch", 23},
{[]*viper.Viper{}, "pitaya.heartbeat.interval", "123s"},
{[]*viper.Viper{cfg}, "pitaya.heartbeat.interval", "123s"},
{[]*viper.Viper{}, "pitaya.concurrency.test", "42"},
{[]*viper.Viper{cfg}, "pitaya.concurrency.test", "42"},
{[]*viper.Viper{}, "pitaya.buffer.test", "14"},
{[]*viper.Viper{cfg}, "pitaya.buffer.test", "14"},
{[]viper.Viper{}, "pitaya.buffer.agent.messages", 100},
{[]viper.Viper{*cfg}, "pitaya.buffer.agent.messages", 20},
{[]viper.Viper{}, "pitaya.no.default", nil},
{[]viper.Viper{*cfg}, "pitaya.no.default", "custom"},
{[]viper.Viper{}, "pitaya.concurrency.handler.dispatch", 25},
{[]viper.Viper{*cfg}, "pitaya.concurrency.handler.dispatch", 23},
{[]viper.Viper{}, "pitaya.heartbeat.interval", "123s"},
{[]viper.Viper{*cfg}, "pitaya.heartbeat.interval", "123s"},
{[]viper.Viper{}, "pitaya.concurrency.test", "42"},
{[]viper.Viper{*cfg}, "pitaya.concurrency.test", "42"},
{[]viper.Viper{}, "pitaya.buffer.test", "14"},
{[]viper.Viper{*cfg}, "pitaya.buffer.test", "14"},
}

for _, table := range tables {
Expand Down
104 changes: 104 additions & 0 deletions config/defaults.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package config

func fillDefaultValues(c Config) {
pitayaConfig := NewDefaultPitayaConfig()

defaultsMap := map[string]interface{}{
"pitaya.serializertype": pitayaConfig.SerializerType,
"pitaya.buffer.agent.messages": pitayaConfig.Buffer.Agent.Messages,
// the max buffer size that nats will accept, if this buffer overflows, messages will begin to be dropped
"pitaya.buffer.handler.localprocess": pitayaConfig.Buffer.Handler.LocalProcess,
"pitaya.buffer.handler.remoteprocess": pitayaConfig.Buffer.Handler.RemoteProcess,
"pitaya.cluster.info.region": pitayaConfig.Cluster.Info.Region,
"pitaya.cluster.rpc.client.grpc.dialtimeout": pitayaConfig.Cluster.RPC.Client.Grpc.DialTimeout,
"pitaya.cluster.rpc.client.grpc.requesttimeout": pitayaConfig.Cluster.RPC.Client.Grpc.RequestTimeout,
"pitaya.cluster.rpc.client.grpc.lazyconnection": pitayaConfig.Cluster.RPC.Client.Grpc.LazyConnection,
"pitaya.cluster.rpc.client.nats.connect": pitayaConfig.Cluster.RPC.Client.Nats.Connect,
"pitaya.cluster.rpc.client.nats.connectiontimeout": pitayaConfig.Cluster.RPC.Client.Nats.ConnectionTimeout,
"pitaya.cluster.rpc.client.nats.maxreconnectionretries": pitayaConfig.Cluster.RPC.Client.Nats.MaxReconnectionRetries,
"pitaya.cluster.rpc.client.nats.requesttimeout": pitayaConfig.Cluster.RPC.Client.Nats.RequestTimeout,
"pitaya.cluster.rpc.server.grpc.port": pitayaConfig.Cluster.RPC.Server.Grpc.Port,
"pitaya.cluster.rpc.server.nats.connect": pitayaConfig.Cluster.RPC.Server.Nats.Connect,
"pitaya.cluster.rpc.server.nats.connectiontimeout": pitayaConfig.Cluster.RPC.Server.Nats.ConnectionTimeout,
"pitaya.cluster.rpc.server.nats.maxreconnectionretries": pitayaConfig.Cluster.RPC.Server.Nats.MaxReconnectionRetries,
"pitaya.cluster.rpc.server.nats.services": pitayaConfig.Cluster.RPC.Server.Nats.Services,
"pitaya.cluster.rpc.server.nats.buffer.messages": pitayaConfig.Cluster.RPC.Server.Nats.Buffer.Messages,
"pitaya.cluster.rpc.server.nats.buffer.push": pitayaConfig.Cluster.RPC.Server.Nats.Buffer.Push,
"pitaya.cluster.sd.etcd.dialtimeout": pitayaConfig.Cluster.SD.Etcd.DialTimeout,
"pitaya.cluster.sd.etcd.endpoints": pitayaConfig.Cluster.SD.Etcd.Endpoints,
"pitaya.cluster.sd.etcd.prefix": pitayaConfig.Cluster.SD.Etcd.Prefix,
"pitaya.cluster.sd.etcd.grantlease.maxretries": pitayaConfig.Cluster.SD.Etcd.GrantLease.MaxRetries,
"pitaya.cluster.sd.etcd.grantlease.retryinterval": pitayaConfig.Cluster.SD.Etcd.GrantLease.RetryInterval,
"pitaya.cluster.sd.etcd.grantlease.timeout": pitayaConfig.Cluster.SD.Etcd.GrantLease.Timeout,
"pitaya.cluster.sd.etcd.heartbeat.log": pitayaConfig.Cluster.SD.Etcd.Heartbeat.Log,
"pitaya.cluster.sd.etcd.heartbeat.ttl": pitayaConfig.Cluster.SD.Etcd.Heartbeat.TTL,
"pitaya.cluster.sd.etcd.revoke.timeout": pitayaConfig.Cluster.SD.Etcd.Revoke.Timeout,
"pitaya.cluster.sd.etcd.syncservers.interval": pitayaConfig.Cluster.SD.Etcd.SyncServers.Interval,
"pitaya.cluster.sd.etcd.syncservers.parallelism": pitayaConfig.Cluster.SD.Etcd.SyncServers.Parallelism,
"pitaya.cluster.sd.etcd.shutdown.delay": pitayaConfig.Cluster.SD.Etcd.Shutdown.Delay,
"pitaya.cluster.sd.etcd.servertypeblacklist": pitayaConfig.Cluster.SD.Etcd.ServerTypesBlacklist,
// the sum of this config among all the frontend servers should always be less than
// the sum of pitaya.buffer.cluster.rpc.server.nats.messages, for covering the worst case scenario
// a single backend server should have the config pitaya.buffer.cluster.rpc.server.nats.messages bigger
// than the sum of the config pitaya.concurrency.handler.dispatch among all frontend servers
"pitaya.acceptor.proxyprotocol": pitayaConfig.Acceptor.ProxyProtocol,
"pitaya.concurrency.handler.dispatch": pitayaConfig.Concurrency.Handler.Dispatch,
"pitaya.defaultpipelines.structvalidation.enabled": pitayaConfig.DefaultPipelines.StructValidation.Enabled,
"pitaya.groups.etcd.dialtimeout": pitayaConfig.Groups.Etcd.DialTimeout,
"pitaya.groups.etcd.endpoints": pitayaConfig.Groups.Etcd.Endpoints,
"pitaya.groups.etcd.prefix": pitayaConfig.Groups.Etcd.Prefix,
"pitaya.groups.etcd.transactiontimeout": pitayaConfig.Groups.Etcd.TransactionTimeout,
"pitaya.groups.memory.tickduration": pitayaConfig.Groups.Memory.TickDuration,
"pitaya.handler.messages.compression": pitayaConfig.Handler.Messages.Compression,
"pitaya.heartbeat.interval": pitayaConfig.Heartbeat.Interval,
"pitaya.metrics.additionalLabels": pitayaConfig.Metrics.AdditionalLabels,
"pitaya.metrics.constLabels": pitayaConfig.Metrics.ConstLabels,
"pitaya.metrics.custom": pitayaConfig.Metrics.Custom,
"pitaya.metrics.period": pitayaConfig.Metrics.Period,
"pitaya.metrics.prometheus.enabled": pitayaConfig.Metrics.Prometheus.Enabled,
"pitaya.metrics.prometheus.port": pitayaConfig.Metrics.Prometheus.Port,
"pitaya.metrics.statsd.enabled": pitayaConfig.Metrics.Statsd.Enabled,
"pitaya.metrics.statsd.host": pitayaConfig.Metrics.Statsd.Host,
"pitaya.metrics.statsd.prefix": pitayaConfig.Metrics.Statsd.Prefix,
"pitaya.metrics.statsd.rate": pitayaConfig.Metrics.Statsd.Rate,
"pitaya.modules.bindingstorage.etcd.dialtimeout": pitayaConfig.Modules.BindingStorage.Etcd.DialTimeout,
"pitaya.modules.bindingstorage.etcd.endpoints": pitayaConfig.Modules.BindingStorage.Etcd.Endpoints,
"pitaya.modules.bindingstorage.etcd.leasettl": pitayaConfig.Modules.BindingStorage.Etcd.LeaseTTL,
"pitaya.modules.bindingstorage.etcd.prefix": pitayaConfig.Modules.BindingStorage.Etcd.Prefix,
"pitaya.conn.ratelimiting.limit": pitayaConfig.Conn.RateLimiting.Limit,
"pitaya.conn.ratelimiting.interval": pitayaConfig.Conn.RateLimiting.Interval,
"pitaya.conn.ratelimiting.forcedisable": pitayaConfig.Conn.RateLimiting.ForceDisable,
"pitaya.session.unique": pitayaConfig.Session.Unique,
"pitaya.session.drain.enabled": pitayaConfig.Session.Drain.Enabled,
"pitaya.session.drain.timeout": pitayaConfig.Session.Drain.Timeout,
"pitaya.session.drain.period": pitayaConfig.Session.Drain.Period,
"pitaya.worker.concurrency": pitayaConfig.Worker.Concurrency,
"pitaya.worker.redis.pool": pitayaConfig.Worker.Redis.Pool,
"pitaya.worker.redis.url": pitayaConfig.Worker.Redis.ServerURL,
"pitaya.worker.retry.enabled": pitayaConfig.Worker.Retry.Enabled,
"pitaya.worker.retry.exponential": pitayaConfig.Worker.Retry.Exponential,
"pitaya.worker.retry.max": pitayaConfig.Worker.Retry.Max,
"pitaya.worker.retry.maxDelay": pitayaConfig.Worker.Retry.MaxDelay,
"pitaya.worker.retry.maxRandom": pitayaConfig.Worker.Retry.MaxRandom,
"pitaya.worker.retry.minDelay": pitayaConfig.Worker.Retry.MinDelay,
}

// Fix multi-source merging for viper
viper, ok := c.(*viperImpl)

for param := range defaultsMap {
val := c.Get(param)
if val == nil {
if ok {
viper.SetDefault(param, defaultsMap[param])
}
} else {
if ok {
viper.SetDefault(param, val)
}
c.Set(param, val)
}

}

}
41 changes: 41 additions & 0 deletions config/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package config

import "time"

type Config interface {
// Get returns the configuration path as an interface. Default: nil
Get(string) interface{}

// GetString returns the configuration path as a string. Default: ""
GetString(string) string

// GetStringSlice returns the value associated with the key as a slice of strings.
GetStringSlice(string) []string

// GetStringMapString return the configuration path as a map of strings.
// Default: map[string]string
GetStringMapString(string) map[string]string

// GetStringMap return the configuration path as a map of string and interfaces.
// Default: map[string]interface{}
GetStringMap(key string) map[string]interface{}

// GetInt returns the configuration path as an int. Default: 0
GetInt(string) int

// GetFloat64 returns the configuration path as a float64. Default: 0.0
GetFloat64(string) float64

// GetBool returns the configuration path as a boolean. Default: false
GetBool(string) bool

// GetDuration returns a time.Duration of the config. Default: 0
GetDuration(string) time.Duration

// Set sets the value at the given key
Set(string, interface{})

// UnmarshalKey can be used to scan values in the config instance and unmarshal
// into a struct.
UnmarshalKey(string, interface{}) error
}
Loading
Loading