diff --git a/apis/v1alpha1/defaulting.go b/apis/v1alpha1/defaulting.go index 39f8556f..aee3ac03 100644 --- a/apis/v1alpha1/defaulting.go +++ b/apis/v1alpha1/defaulting.go @@ -39,7 +39,7 @@ var ( // The default replicas for frontend/meta/datanode. defaultFrontendReplicas int32 = 1 defaultMetaReplicas int32 = 1 - defaultDatanodeReplicas int32 = 3 + defaultDatanodeReplicas int32 = 1 defaultFlownodeReplicas int32 = 1 // The default storage settings for datanode. diff --git a/apis/v1alpha1/testdata/greptimedbcluster/test01/expect.yaml b/apis/v1alpha1/testdata/greptimedbcluster/test01/expect.yaml index 353ffac9..fbe356ef 100644 --- a/apis/v1alpha1/testdata/greptimedbcluster/test01/expect.yaml +++ b/apis/v1alpha1/testdata/greptimedbcluster/test01/expect.yaml @@ -73,7 +73,7 @@ spec: datanode: httpPort: 4000 rpcPort: 4001 - replicas: 3 + replicas: 1 storage: dataHome: /data/greptimedb mountPath: /data/greptimedb diff --git a/controllers/greptimedbcluster/controller.go b/controllers/greptimedbcluster/controller.go index c508e856..7b4e04f3 100644 --- a/controllers/greptimedbcluster/controller.go +++ b/controllers/greptimedbcluster/controller.go @@ -101,9 +101,10 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { // +kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=get;list;watch // Reconcile is reconciliation loop for GreptimeDBCluster. -func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) { +func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { klog.V(2).Infof("Reconciling GreptimeDBCluster: %s", req.NamespacedName) + var err error cluster := new(v1alpha1.GreptimeDBCluster) if err := r.Get(ctx, req.NamespacedName, cluster); err != nil { if k8serrors.IsNotFound(err) { @@ -129,24 +130,31 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ct if err = r.addFinalizer(ctx, cluster); err != nil { r.Recorder.Event(cluster, corev1.EventTypeWarning, "AddFinalizerFailed", fmt.Sprintf("Add finalizer failed: %v", err)) - return + return ctrl.Result{}, err } if err = r.validate(ctx, cluster); err != nil { r.Recorder.Event(cluster, corev1.EventTypeWarning, "InvalidCluster", fmt.Sprintf("Invalid cluster: %v", err)) - return - } - - if err = cluster.SetDefaults(); err != nil { - r.Recorder.Event(cluster, corev1.EventTypeWarning, "SetDefaultValuesFailed", fmt.Sprintf("Set default values failed: %v", err)) - return + return ctrl.Result{}, err } // Means the cluster is just created. if len(cluster.Status.ClusterPhase) == 0 { klog.Infof("Start to create the cluster '%s/%s'", cluster.Namespace, cluster.Name) + + if err = cluster.SetDefaults(); err != nil { + r.Recorder.Event(cluster, corev1.EventTypeWarning, "SetDefaultValuesFailed", fmt.Sprintf("Set default values failed: %v", err)) + return ctrl.Result{}, err + } + + // Update the default values to the cluster spec. + if err = r.Update(ctx, cluster); err != nil { + r.Recorder.Event(cluster, corev1.EventTypeWarning, "UpdateClusterFailed", fmt.Sprintf("Update cluster failed: %v", err)) + return ctrl.Result{}, err + } + if err = r.updateClusterStatus(ctx, cluster, v1alpha1.PhaseStarting); err != nil { - return + return ctrl.Result{}, err } } diff --git a/controllers/greptimedbstandalone/controller.go b/controllers/greptimedbstandalone/controller.go index 8e92734d..37d79e55 100644 --- a/controllers/greptimedbstandalone/controller.go +++ b/controllers/greptimedbstandalone/controller.go @@ -82,9 +82,10 @@ func Setup(mgr ctrl.Manager, _ *options.Options) error { // +kubebuilder:rbac:groups=monitoring.coreos.com,resources=podmonitors,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=get;list;watch -func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) { +func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { klog.V(2).Infof("Reconciling GreptimeDBStandalone: %s", req.NamespacedName) + var err error standalone := new(v1alpha1.GreptimeDBStandalone) if err := r.Get(ctx, req.NamespacedName, standalone); err != nil { if k8serrors.IsNotFound(err) { @@ -110,24 +111,31 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ct if err = r.addFinalizer(ctx, standalone); err != nil { r.Recorder.Event(standalone, corev1.EventTypeWarning, "AddFinalizerFailed", fmt.Sprintf("Add finalizer failed: %v", err)) - return + return ctrl.Result{}, err } if err = r.validate(ctx, standalone); err != nil { r.Recorder.Event(standalone, corev1.EventTypeWarning, "InvalidStandalone", fmt.Sprintf("Invalid standalone: %v", err)) - return - } - - if err = standalone.SetDefaults(); err != nil { - r.Recorder.Event(standalone, corev1.EventTypeWarning, "SetDefaultValuesFailed", fmt.Sprintf("Set default values failed: %v", err)) - return + return ctrl.Result{}, err } // Means the standalone is just created. if len(standalone.Status.StandalonePhase) == 0 { klog.Infof("Start to create the standalone '%s/%s'", standalone.Namespace, standalone.Name) + + if err = standalone.SetDefaults(); err != nil { + r.Recorder.Event(standalone, corev1.EventTypeWarning, "SetDefaultValuesFailed", fmt.Sprintf("Set default values failed: %v", err)) + return ctrl.Result{}, err + } + + // Update the default values to the standalone spec. + if err = r.Update(ctx, standalone); err != nil { + r.Recorder.Event(standalone, corev1.EventTypeWarning, "UpdateStandaloneFailed", fmt.Sprintf("Update standalone failed: %v", err)) + return ctrl.Result{}, err + } + if err = r.setStandaloneStatus(ctx, standalone, v1alpha1.PhaseStarting); err != nil { - return + return ctrl.Result{}, err } } diff --git a/tests/e2e/testdata/resources/cluster/basic/cluster.yaml b/tests/e2e/testdata/resources/cluster/basic/cluster.yaml index e380aee8..a2d34028 100644 --- a/tests/e2e/testdata/resources/cluster/basic/cluster.yaml +++ b/tests/e2e/testdata/resources/cluster/basic/cluster.yaml @@ -35,4 +35,3 @@ spec: rpcPort: 4001 mysqlPort: 4002 postgreSQLPort: 4003 - diff --git a/tests/e2e/testdata/sql/cluster/flow_basic.sql b/tests/e2e/testdata/sql/cluster/flow_basic.sql index 6b06845e..1b532481 100644 --- a/tests/e2e/testdata/sql/cluster/flow_basic.sql +++ b/tests/e2e/testdata/sql/cluster/flow_basic.sql @@ -1,100 +1,62 @@ -CREATE TABLE numbers_input_basic ( - number INT, - ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY(number), - TIME INDEX(ts) +-- FIXME(zyy17): The test case will be replaced by the real sqlness test case. + +CREATE TABLE `ngx_access_log` ( + `client` STRING NULL, + `ua_platform` STRING NULL, + `referer` STRING NULL, + `method` STRING NULL, + `endpoint` STRING NULL, + `trace_id` STRING NULL FULLTEXT, + `protocol` STRING NULL, + `status` SMALLINT UNSIGNED NULL, + `size` DOUBLE NULL, + `agent` STRING NULL, + `access_time` TIMESTAMP(3) NOT NULL, + TIME INDEX (`access_time`) + ) +WITH( + append_mode = 'true' ); -CREATE FLOW test_numbers_basic -SINK TO out_num_cnt_basic -AS -SELECT sum(number) FROM numbers_input_basic GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); - --- TODO(discord9): confirm if it's necessary to flush flow here? --- because flush_flow result is at most 1 -select flush_flow('test_numbers_basic')<=1; - --- SQLNESS ARG restart=true -INSERT INTO numbers_input_basic +CREATE TABLE `ngx_statistics` ( + `status` SMALLINT UNSIGNED NULL, + `total_logs` BIGINT NULL, + `min_size` DOUBLE NULL, + `max_size` DOUBLE NULL, + `avg_size` DOUBLE NULL, + `high_size_count` DOUBLE NULL, + `time_window` TIMESTAMP time index, + `update_at` TIMESTAMP NULL, + PRIMARY KEY (`status`)); + +CREATE FLOW ngx_aggregation +SINK TO ngx_statistics +AS +SELECT + status, + count(client) AS total_logs, + min(size) as min_size, + max(size) as max_size, + avg(size) as avg_size, + sum(case when `size` > 550::double then 1::double else 0::double end) as high_size_count, + date_bin(INTERVAL '1 minutes', access_time) as time_window, +FROM ngx_access_log +GROUP BY + status, + time_window; + +INSERT INTO ngx_access_log VALUES - (20, "2021-07-01 00:00:00.200"), - (22, "2021-07-01 00:00:00.600"); - -select flush_flow('test_numbers_basic')<=1; + ("android", "Android", "referer", "GET", "/api/v1", "trace_id", "HTTP", 200, 1000, "agent", "2021-07-01 00:00:01.000"), + ("ios", "iOS", "referer", "GET", "/api/v1", "trace_id", "HTTP", 200, 500, "agent", "2021-07-01 00:00:30.500"), + ("android", "Android", "referer", "GET", "/api/v1", "trace_id", "HTTP", 200, 600, "agent", "2021-07-01 00:01:01.000"), + ("ios", "iOS", "referer", "GET", "/api/v1", "trace_id", "HTTP", 404, 700, "agent", "2021-07-01 00:01:01.500"); -SELECT col_0, window_start, window_end FROM out_num_cnt_basic; +SELECT * FROM ngx_statistics; -select flush_flow('test_numbers_basic')<=1; - -INSERT INTO numbers_input_basic +INSERT INTO ngx_access_log VALUES - (23,"2021-07-01 00:00:01.000"), - (24,"2021-07-01 00:00:01.500"); - -select flush_flow('test_numbers_basic')<=1; - -SELECT col_0, window_start, window_end FROM out_num_cnt_basic; - -DROP FLOW test_numbers_basic; -DROP TABLE numbers_input_basic; -DROP TABLE out_num_cnt_basic; - --- test interprete interval - -CREATE TABLE numbers_input_basic ( - number INT, - ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY(number), - TIME INDEX(ts) -); -create table out_num_cnt_basic ( - number INT, - ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX); - -CREATE FLOW filter_numbers_basic SINK TO out_num_cnt_basic AS SELECT INTERVAL '1 day 1 second', INTERVAL '1 month 1 day 1 second', INTERVAL '1 year 1 month' FROM numbers_input_basic where number > 10; - -SHOW CREATE FLOW filter_numbers_basic; - -drop flow filter_numbers_basic; - -drop table out_num_cnt_basic; - -drop table numbers_input_basic; - -CREATE TABLE bytes_log ( - byte INT, - ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, -- event time - TIME INDEX(ts) -); - -CREATE TABLE approx_rate ( - rate FLOAT, - time_window TIMESTAMP, - update_at TIMESTAMP, - TIME INDEX(time_window) -); - -CREATE FLOW find_approx_rate -SINK TO approx_rate -AS -SELECT CAST((max(byte) - min(byte)) AS FLOAT)/30.0, date_bin(INTERVAL '30 second', ts) as time_window from bytes_log GROUP BY time_window; - -INSERT INTO bytes_log VALUES -(101, '2025-01-01 00:00:01'), -(300, '2025-01-01 00:00:29'); - -SELECT flush_flow('find_approx_rate')<=1; - -SELECT rate, time_window FROM approx_rate; - -INSERT INTO bytes_log VALUES -(450, '2025-01-01 00:00:32'), -(500, '2025-01-01 00:00:37'); - -SELECT flush_flow('find_approx_rate')<=1; - -SELECT rate, time_window FROM approx_rate; + ("android", "Android", "referer", "GET", "/api/v1", "trace_id", "HTTP", 200, 500, "agent", "2021-07-01 00:01:01.000"), + ("ios", "iOS", "referer", "GET", "/api/v1", "trace_id", "HTTP", 404, 800, "agent", "2021-07-01 00:01:01.500"); -DROP TABLE bytes_log; -DROP FLOW find_approx_rate; -DROP TABLE approx_rate; +SELECT * FROM ngx_statistics;