diff --git a/apis/v1alpha1/common.go b/apis/v1alpha1/common.go index 378da76c..b0cd954e 100644 --- a/apis/v1alpha1/common.go +++ b/apis/v1alpha1/common.go @@ -304,6 +304,18 @@ type StorageSpec struct { DataHome string `json:"dataHome,omitempty"` } +// RemoteWalProvider defines the remote wal provider for the cluster. +type RemoteWalProvider struct { + // +optional + KafkaRemoteWal *KafkaRemoteWal `json:"kafka,omitempty"` +} + +// KafkaRemoteWal is the specification for remote WAL that uses Kafka. +type KafkaRemoteWal struct { + // +optional + BrokerEndpoints []string `json:"brokerEndpoints,omitempty"` +} + type ServiceSpec struct { // type determines how the Service is exposed. // +optional diff --git a/apis/v1alpha1/defaulting.go b/apis/v1alpha1/defaulting.go index 7766041f..08ae6e1e 100644 --- a/apis/v1alpha1/defaulting.go +++ b/apis/v1alpha1/defaulting.go @@ -87,14 +87,13 @@ func (in *GreptimeDBCluster) SetDefaults() error { }, }, }, - Initializer: &InitializerSpec{Image: defaultInitializer}, - HTTPServicePort: int32(defaultHTTPServicePort), - GRPCServicePort: int32(defaultGRPCServicePort), - MySQLServicePort: int32(defaultMySQLServicePort), - PostgresServicePort: int32(defaultPostgresServicePort), - PrometheusServicePort: int32(defaultPrometheusServicePort), - OpenTSDBServicePort: int32(defaultOpenTSDBServicePort), - Version: defaultVersion, + Initializer: &InitializerSpec{Image: defaultInitializer}, + HTTPServicePort: int32(defaultHTTPServicePort), + GRPCServicePort: int32(defaultGRPCServicePort), + MySQLServicePort: int32(defaultMySQLServicePort), + PostgresServicePort: int32(defaultPostgresServicePort), + OpenTSDBServicePort: int32(defaultOpenTSDBServicePort), + Version: defaultVersion, } if in.Spec.Version == "" && @@ -206,13 +205,12 @@ func (in *GreptimeDBStandalone) SetDefaults() error { }, }, }, - HTTPServicePort: int32(defaultHTTPServicePort), - GRPCServicePort: int32(defaultGRPCServicePort), - MySQLServicePort: int32(defaultMySQLServicePort), - PostgresServicePort: int32(defaultPostgresServicePort), - PrometheusServicePort: int32(defaultPrometheusServicePort), - OpenTSDBServicePort: int32(defaultOpenTSDBServicePort), - Version: defaultVersion, + HTTPServicePort: int32(defaultHTTPServicePort), + GRPCServicePort: int32(defaultGRPCServicePort), + MySQLServicePort: int32(defaultMySQLServicePort), + PostgresServicePort: int32(defaultPostgresServicePort), + OpenTSDBServicePort: int32(defaultOpenTSDBServicePort), + Version: defaultVersion, LocalStorage: &StorageSpec{ Name: defaultStandaloneStorageName, StorageSize: defaultDataNodeStorageSize, diff --git a/apis/v1alpha1/defaulting_test.go b/apis/v1alpha1/defaulting_test.go index b3dae7a0..44865765 100644 --- a/apis/v1alpha1/defaulting_test.go +++ b/apis/v1alpha1/defaulting_test.go @@ -177,13 +177,12 @@ func TestSetDefaults(t *testing.T) { DataHome: defaultDataNodeStorageMountPath, }, }, - HTTPServicePort: int32(defaultHTTPServicePort), - GRPCServicePort: int32(defaultGRPCServicePort), - MySQLServicePort: int32(defaultMySQLServicePort), - PostgresServicePort: int32(defaultPostgresServicePort), - PrometheusServicePort: int32(defaultPrometheusServicePort), - OpenTSDBServicePort: int32(defaultOpenTSDBServicePort), - Version: "latest", + HTTPServicePort: int32(defaultHTTPServicePort), + GRPCServicePort: int32(defaultGRPCServicePort), + MySQLServicePort: int32(defaultMySQLServicePort), + PostgresServicePort: int32(defaultPostgresServicePort), + OpenTSDBServicePort: int32(defaultOpenTSDBServicePort), + Version: "latest", }, }, }, @@ -381,13 +380,12 @@ func TestSetDefaults(t *testing.T) { }, }, - HTTPServicePort: int32(defaultHTTPServicePort), - GRPCServicePort: int32(defaultGRPCServicePort), - MySQLServicePort: int32(defaultMySQLServicePort), - PostgresServicePort: int32(defaultPostgresServicePort), - PrometheusServicePort: int32(defaultPrometheusServicePort), - OpenTSDBServicePort: int32(defaultOpenTSDBServicePort), - Version: "latest", + HTTPServicePort: int32(defaultHTTPServicePort), + GRPCServicePort: int32(defaultGRPCServicePort), + MySQLServicePort: int32(defaultMySQLServicePort), + PostgresServicePort: int32(defaultPostgresServicePort), + OpenTSDBServicePort: int32(defaultOpenTSDBServicePort), + Version: "latest", }, }, }, @@ -551,13 +549,12 @@ func TestSetDefaults(t *testing.T) { }, }, - HTTPServicePort: int32(defaultHTTPServicePort), - GRPCServicePort: int32(defaultGRPCServicePort), - MySQLServicePort: int32(defaultMySQLServicePort), - PostgresServicePort: int32(defaultPostgresServicePort), - PrometheusServicePort: int32(defaultPrometheusServicePort), - OpenTSDBServicePort: int32(defaultOpenTSDBServicePort), - Version: "latest", + HTTPServicePort: int32(defaultHTTPServicePort), + GRPCServicePort: int32(defaultGRPCServicePort), + MySQLServicePort: int32(defaultMySQLServicePort), + PostgresServicePort: int32(defaultPostgresServicePort), + OpenTSDBServicePort: int32(defaultOpenTSDBServicePort), + Version: "latest", }, }, }, diff --git a/apis/v1alpha1/greptimedbcluster_types.go b/apis/v1alpha1/greptimedbcluster_types.go index 6d9b7894..755fa729 100644 --- a/apis/v1alpha1/greptimedbcluster_types.go +++ b/apis/v1alpha1/greptimedbcluster_types.go @@ -120,9 +120,6 @@ type GreptimeDBClusterSpec struct { // +optional OpenTSDBServicePort int32 `json:"openTSDBServicePort,omitempty"` - // +optional - PrometheusServicePort int32 `json:"prometheusServicePort,omitempty"` - // +optional EnableInfluxDBProtocol bool `json:"enableInfluxDBProtocol,omitempty"` @@ -139,6 +136,9 @@ type GreptimeDBClusterSpec struct { // +optional ObjectStorageProvider *ObjectStorageProvider `json:"objectStorage,omitempty"` + // +optional + RemoteWalProvider *RemoteWalProvider `json:"remoteWal,omitempty"` + // More cluster settings can be added here. } diff --git a/apis/v1alpha1/greptimedbstandalone_types.go b/apis/v1alpha1/greptimedbstandalone_types.go index 3d686dc7..1c87d60b 100644 --- a/apis/v1alpha1/greptimedbstandalone_types.go +++ b/apis/v1alpha1/greptimedbstandalone_types.go @@ -46,9 +46,6 @@ type GreptimeDBStandaloneSpec struct { // +optional OpenTSDBServicePort int32 `json:"openTSDBServicePort,omitempty"` - // +optional - PrometheusServicePort int32 `json:"prometheusServicePort,omitempty"` - // +optional EnableInfluxDBProtocol bool `json:"enableInfluxDBProtocol,omitempty"` @@ -68,6 +65,9 @@ type GreptimeDBStandaloneSpec struct { // +optional LocalStorage *StorageSpec `json:"localStorage,omitempty"` + // +optional + RemoteWalProvider *RemoteWalProvider `json:"remoteWal,omitempty"` + // +optional Config string `json:"config,omitempty"` } diff --git a/apis/v1alpha1/zz_generated.deepcopy.go b/apis/v1alpha1/zz_generated.deepcopy.go index 2c23dd92..a8c81aae 100644 --- a/apis/v1alpha1/zz_generated.deepcopy.go +++ b/apis/v1alpha1/zz_generated.deepcopy.go @@ -232,6 +232,11 @@ func (in *GreptimeDBClusterSpec) DeepCopyInto(out *GreptimeDBClusterSpec) { *out = new(ObjectStorageProvider) (*in).DeepCopyInto(*out) } + if in.RemoteWalProvider != nil { + in, out := &in.RemoteWalProvider, &out.RemoteWalProvider + *out = new(RemoteWalProvider) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GreptimeDBClusterSpec. @@ -371,6 +376,11 @@ func (in *GreptimeDBStandaloneSpec) DeepCopyInto(out *GreptimeDBStandaloneSpec) *out = new(StorageSpec) (*in).DeepCopyInto(*out) } + if in.RemoteWalProvider != nil { + in, out := &in.RemoteWalProvider, &out.RemoteWalProvider + *out = new(RemoteWalProvider) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GreptimeDBStandaloneSpec. @@ -425,6 +435,26 @@ func (in *InitializerSpec) DeepCopy() *InitializerSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *KafkaRemoteWal) DeepCopyInto(out *KafkaRemoteWal) { + *out = *in + if in.BrokerEndpoints != nil { + in, out := &in.BrokerEndpoints, &out.BrokerEndpoints + *out = make([]string, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KafkaRemoteWal. +func (in *KafkaRemoteWal) DeepCopy() *KafkaRemoteWal { + if in == nil { + return nil + } + out := new(KafkaRemoteWal) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *MainContainerSpec) DeepCopyInto(out *MainContainerSpec) { *out = *in @@ -622,6 +652,26 @@ func (in *PrometheusMonitorSpec) DeepCopy() *PrometheusMonitorSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RemoteWalProvider) DeepCopyInto(out *RemoteWalProvider) { + *out = *in + if in.KafkaRemoteWal != nil { + in, out := &in.KafkaRemoteWal, &out.KafkaRemoteWal + *out = new(KafkaRemoteWal) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RemoteWalProvider. +func (in *RemoteWalProvider) DeepCopy() *RemoteWalProvider { + if in == nil { + return nil + } + out := new(RemoteWalProvider) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *S3StorageProvider) DeepCopyInto(out *S3StorageProvider) { *out = *in diff --git a/config/crd/resources/greptime.io_greptimedbclusters.yaml b/config/crd/resources/greptime.io_greptimedbclusters.yaml index b4de3f25..5379a7e7 100644 --- a/config/crd/resources/greptime.io_greptimedbclusters.yaml +++ b/config/crd/resources/greptime.io_greptimedbclusters.yaml @@ -10743,9 +10743,16 @@ spec: type: string type: object type: object - prometheusServicePort: - format: int32 - type: integer + remoteWal: + properties: + kafka: + properties: + brokerEndpoints: + items: + type: string + type: array + type: object + type: object version: type: string type: object diff --git a/config/crd/resources/greptime.io_greptimedbstandalones.yaml b/config/crd/resources/greptime.io_greptimedbstandalones.yaml index 78bd1497..35f9cc28 100644 --- a/config/crd/resources/greptime.io_greptimedbstandalones.yaml +++ b/config/crd/resources/greptime.io_greptimedbstandalones.yaml @@ -2765,9 +2765,16 @@ spec: type: string type: object type: object - prometheusServicePort: - format: int32 - type: integer + remoteWal: + properties: + kafka: + properties: + brokerEndpoints: + items: + type: string + type: array + type: object + type: object service: properties: annotations: diff --git a/config/samples/kafka-remote-wal/cluster.yaml b/config/samples/kafka-remote-wal/cluster.yaml new file mode 100644 index 00000000..df641594 --- /dev/null +++ b/config/samples/kafka-remote-wal/cluster.yaml @@ -0,0 +1,21 @@ +apiVersion: greptime.io/v1alpha1 +kind: GreptimeDBCluster +metadata: + name: my-cluster + namespace: default +spec: + base: + main: + image: greptime/greptimedb:latest + frontend: + replicas: 1 + meta: + replicas: 1 + etcdEndpoints: + - "etcd.default:2379" + datanode: + replicas: 3 + remoteWal: + kafka: + brokerEndpoints: + - "my-cluster-kafka-bootstrap.kafka:9092" diff --git a/manifests/greptimedb-operator-crd.yaml b/manifests/greptimedb-operator-crd.yaml index 180ed44b..a99ad752 100644 --- a/manifests/greptimedb-operator-crd.yaml +++ b/manifests/greptimedb-operator-crd.yaml @@ -10742,9 +10742,16 @@ spec: type: string type: object type: object - prometheusServicePort: - format: int32 - type: integer + remoteWal: + properties: + kafka: + properties: + brokerEndpoints: + items: + type: string + type: array + type: object + type: object version: type: string type: object @@ -13592,9 +13599,16 @@ spec: type: string type: object type: object - prometheusServicePort: - format: int32 - type: integer + remoteWal: + properties: + kafka: + properties: + brokerEndpoints: + items: + type: string + type: array + type: object + type: object service: properties: annotations: diff --git a/manifests/greptimedb-operator-deployment.yaml b/manifests/greptimedb-operator-deployment.yaml index 67483f5d..97ffba4f 100644 --- a/manifests/greptimedb-operator-deployment.yaml +++ b/manifests/greptimedb-operator-deployment.yaml @@ -10749,9 +10749,16 @@ spec: type: string type: object type: object - prometheusServicePort: - format: int32 - type: integer + remoteWal: + properties: + kafka: + properties: + brokerEndpoints: + items: + type: string + type: array + type: object + type: object version: type: string type: object @@ -13599,9 +13606,16 @@ spec: type: string type: object type: object - prometheusServicePort: - format: int32 - type: integer + remoteWal: + properties: + kafka: + properties: + brokerEndpoints: + items: + type: string + type: array + type: object + type: object service: properties: annotations: diff --git a/pkg/dbconfig/common_config.go b/pkg/dbconfig/common_config.go deleted file mode 100644 index 9fc08db7..00000000 --- a/pkg/dbconfig/common_config.go +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package dbconfig - -type HTTPOptions struct { - Addr string `toml:"addr,omitempty"` - Timeout string `toml:"timeout,omitempty"` - DisableDashboard *bool `toml:"disable_dashboard,omitempty"` - BodyLimit string `toml:"body_limit,omitempty"` -} - -type HeartbeatOptions struct { - Interval string `toml:"interval,omitempty"` - RetryInterval string `toml:"retry_interval,omitempty"` -} - -type MetaClientOptions struct { - // Metasrv address list. - MetaSrvAddrs []string `toml:"metasrv_addrs,omitempty"` - - // Operation timeout in milliseconds. - Timeout string `toml:"timeout,omitempty"` - - // Connect server timeout in milliseconds. - ConnectTimeout string `toml:"connect_timeout,omitempty"` - - // DDL operation timeout. - DDLTimeout string `toml:"ddl_timeout,omitempty"` - - // `TCP_NODELAY` option for accepted connections. - TCPNoDelay *bool `toml:"tcp_nodelay,omitempty"` -} - -type LoggingOptions struct { - Dir string `toml:"dir,omitempty"` - Level string `toml:"level,omitempty"` - EnableJaegerTracing bool `toml:"enable_jaeger_tracing,omitempty"` -} - -type ProcedureConfig struct { - // Max retry times of procedure. - MaxRetryTimes int32 `toml:"max_retry_times,omitempty"` - - // Initial retry delay of procedures, increases exponentially. - RetryDelay string `toml:"retry_delay,omitempty"` -} - -type DatanodeClientOptions struct { - Timeout string `toml:"timeout,omitempty"` - ConnectTimeout string `toml:"connect_timeout,omitempty"` - TcpNoDelay bool `toml:"tcp_nodelay,omitempty"` -} diff --git a/pkg/dbconfig/datanode_config.go b/pkg/dbconfig/datanode_config.go index 86f7d739..3d66b9c5 100644 --- a/pkg/dbconfig/datanode_config.go +++ b/pkg/dbconfig/datanode_config.go @@ -38,8 +38,15 @@ type DatanodeConfig struct { StorageRegion *string `tomlmapping:"storage.region"` StorageEndpoint *string `tomlmapping:"storage.endpoint"` + // The wal file directory. WalDir *string `tomlmapping:"wal.dir"` + // The wal provider. + WalProvider *string `tomlmapping:"wal.provider"` + + // The kafka broker endpoints. + WalBrokerEndpoints []string `tomlmapping:"wal.broker_endpoints"` + // InputConfig is from config field of cluster spec. InputConfig string } @@ -93,6 +100,14 @@ func (c *DatanodeConfig) ConfigureByCluster(cluster *v1alpha1.GreptimeDBCluster) } } + if cluster.Spec.RemoteWalProvider != nil && cluster.Spec.RemoteWalProvider.KafkaRemoteWal != nil { + c.WalProvider = util.StringPtr("kafka") + c.WalBrokerEndpoints = cluster.Spec.RemoteWalProvider.KafkaRemoteWal.BrokerEndpoints + + // FIXME(zyy17): Unset the wal dir if the wal provider is kafka. It's a temporary solution. + c.WalDir = nil + } + return nil } diff --git a/pkg/dbconfig/dbconfig.go b/pkg/dbconfig/dbconfig.go index abd7f21e..3c5a641f 100644 --- a/pkg/dbconfig/dbconfig.go +++ b/pkg/dbconfig/dbconfig.go @@ -173,6 +173,10 @@ func setConfig(input *toml.Tree, config interface{}) (string, error) { return "", fmt.Errorf("tag '%s' with type '%s is not supported", tag, elem.Kind()) } } + case reflect.Slice: + if field.Len() > 0 { + input.Set(tag, field.Interface()) + } default: return "", fmt.Errorf("field %s is not a pointer", tag) } diff --git a/pkg/dbconfig/dbconfig_test.go b/pkg/dbconfig/dbconfig_test.go index 93ebe838..3480ce17 100644 --- a/pkg/dbconfig/dbconfig_test.go +++ b/pkg/dbconfig/dbconfig_test.go @@ -36,6 +36,14 @@ func TestFromClusterForDatanodeConfig(t *testing.T) { Bucket: "testbucket", }, }, + RemoteWalProvider: &v1alpha1.RemoteWalProvider{ + KafkaRemoteWal: &v1alpha1.KafkaRemoteWal{ + BrokerEndpoints: []string{ + "broker1:9092", + "broker2:9092", + }, + }, + }, }, } @@ -44,6 +52,10 @@ func TestFromClusterForDatanodeConfig(t *testing.T) { bucket = "testbucket" root = "testcluster" type = "S3" + +[wal] + broker_endpoints = ["broker1:9092", "broker2:9092"] + provider = "kafka" ` data, err := FromCluster(testCluster, v1alpha1.DatanodeComponentKind) diff --git a/pkg/dbconfig/metasrv_config.go b/pkg/dbconfig/metasrv_config.go index 20a73055..49e7b3d6 100644 --- a/pkg/dbconfig/metasrv_config.go +++ b/pkg/dbconfig/metasrv_config.go @@ -16,6 +16,7 @@ package dbconfig import ( "github.com/GreptimeTeam/greptimedb-operator/apis/v1alpha1" + "github.com/GreptimeTeam/greptimedb-operator/pkg/util" ) var _ Config = &MetasrvConfig{} @@ -28,6 +29,12 @@ type MetasrvConfig struct { // If it's not empty, the metasrv will store all data with this key prefix. StoreKeyPrefix *string `tomlmapping:"store_key_prefix"` + // The wal provider. + WalProvider *string `tomlmapping:"wal.provider"` + + // The kafka broker endpoints. + WalBrokerEndpoints []string `tomlmapping:"wal.broker_endpoints"` + // InputConfig is from config field of cluster spec. InputConfig string } @@ -46,6 +53,11 @@ func (c *MetasrvConfig) ConfigureByCluster(cluster *v1alpha1.GreptimeDBCluster) return err } } + + if cluster.Spec.RemoteWalProvider != nil && cluster.Spec.RemoteWalProvider.KafkaRemoteWal != nil { + c.WalProvider = util.StringPtr("kafka") + c.WalBrokerEndpoints = cluster.Spec.RemoteWalProvider.KafkaRemoteWal.BrokerEndpoints + } } return nil diff --git a/tests/e2e/greptimedbcluster_test.go b/tests/e2e/greptimedbcluster_test.go index a805b0e0..53a6a9c6 100644 --- a/tests/e2e/greptimedbcluster_test.go +++ b/tests/e2e/greptimedbcluster_test.go @@ -42,10 +42,10 @@ const ( PRIMARY KEY(n), TIME INDEX (ts) ) - PARTITION BY RANGE COLUMNS (n) ( - PARTITION r0 VALUES LESS THAN (5), - PARTITION r1 VALUES LESS THAN (9), - PARTITION r2 VALUES LESS THAN (MAXVALUE), + PARTITION ON COLUMNS (n) ( + n <= 5, + n > 5 AND n <= 10, + n > 10 AND n <= 999, ) engine=mito`