diff --git a/coordinator/changefeed/changefeed_db.go b/coordinator/changefeed/changefeed_db.go index 7a2d4c18c..4498dd231 100644 --- a/coordinator/changefeed/changefeed_db.go +++ b/coordinator/changefeed/changefeed_db.go @@ -41,12 +41,16 @@ type ChangefeedDB struct { func NewChangefeedDB(version int64) *ChangefeedDB { db := &ChangefeedDB{ + // id is the unique id of the changefeed db. The prefix `coordinator` distinguishes + // it from other ReplicationDB. The suffix is the version of the coordinator, which + // is useful to track the scheduling history. id: fmt.Sprintf("coordinator-%d", version), changefeeds: make(map[common.ChangeFeedID]*Changefeed), changefeedDisplayNames: make(map[common.ChangeFeedDisplayName]common.ChangeFeedID), stopped: make(map[common.ChangeFeedID]*Changefeed), } - db.ReplicationDB = replica.NewReplicationDB[common.ChangeFeedID, *Changefeed](db.id, db.withRLock) + db.ReplicationDB = replica.NewReplicationDB[common.ChangeFeedID, *Changefeed](db.id, + db.withRLock, replica.NewEmptyChecker) return db } diff --git a/coordinator/changefeed/changefeed_db_backend.go b/coordinator/changefeed/changefeed_db_backend.go index fea930847..2f92850a8 100644 --- a/coordinator/changefeed/changefeed_db_backend.go +++ b/coordinator/changefeed/changefeed_db_backend.go @@ -36,8 +36,8 @@ type Backend interface { SetChangefeedProgress(ctx context.Context, id common.ChangeFeedID, progress config.Progress) error // ResumeChangefeed persists the resumed status to db for a changefeed ResumeChangefeed(ctx context.Context, id common.ChangeFeedID, newCheckpointTs uint64) error - // UpdateChangefeedCheckpointTs persists the checkpoints for changefeeds - UpdateChangefeedCheckpointTs(ctx context.Context, cps map[common.ChangeFeedID]uint64) error + // UpdateChangefeedCheckpointTs persists the checkpointTs for changefeeds + UpdateChangefeedCheckpointTs(ctx context.Context, checkpointTs map[common.ChangeFeedID]uint64) error } // ChangefeedMetaWrapper is a wrapper for the changefeed load from the DB diff --git a/coordinator/controller.go b/coordinator/controller.go index 1526ec8bd..20c36a1ed 100644 --- a/coordinator/controller.go +++ b/coordinator/controller.go @@ -40,27 +40,29 @@ import ( "go.uber.org/zap" ) -// Controller schedules and balance changefeeds -// there are 3 main components in the controller, scheduler, ChangefeedDB and operator controller +// Controller schedules and balance changefeeds, there are 3 main components: +// 1. scheduler: generate operators for handling different scheduling tasks. +// 2. operatorController: manage all operators and execute them periodically. +// 3. changefeedDB: store all changefeeds info and their status in memory. +// 4. backend: the durable storage for storing changefeed metadata. type Controller struct { - bootstrapped *atomic.Bool - version int64 - - nodeChanged *atomic.Bool + version int64 - cfScheduler *scheduler.Controller + scheduler *scheduler.Controller operatorController *operator.Controller changefeedDB *changefeed.ChangefeedDB - messageCenter messaging.MessageCenter - nodeManager *watcher.NodeManager - batchSize int + backend changefeed.Backend + bootstrapped *atomic.Bool bootstrapper *bootstrap.Bootstrapper[heartbeatpb.CoordinatorBootstrapResponse] + nodeChanged *atomic.Bool + nodeManager *watcher.NodeManager + stream dynstream.DynamicStream[int, string, *Event, *Controller, *StreamHandler] taskScheduler threadpool.ThreadPool taskHandlers []*threadpool.TaskHandle - backend changefeed.Backend + messageCenter messaging.MessageCenter updatedChangefeedCh chan map[common.ChangeFeedID]*changefeed.Changefeed stateChangedCh chan *ChangefeedStateChangeEvent @@ -92,9 +94,8 @@ func NewController( oc := operator.NewOperatorController(mc, selfNode, changefeedDB, backend, nodeManager, batchSize) c := &Controller{ version: version, - batchSize: batchSize, bootstrapped: atomic.NewBool(false), - cfScheduler: scheduler.NewController(map[string]scheduler.Scheduler{ + scheduler: scheduler.NewController(map[string]scheduler.Scheduler{ scheduler.BasicScheduler: scheduler.NewBasicScheduler(selfNode.ID.String(), batchSize, oc, changefeedDB, nodeManager, oc.NewAddMaintainerOperator), scheduler.BalanceScheduler: scheduler.NewBalanceScheduler(selfNode.ID.String(), batchSize, oc, changefeedDB, nodeManager, balanceInterval, oc.NewMoveMaintainerOperator), }), @@ -358,7 +359,7 @@ func (c *Controller) FinishBootstrap(workingMap map[common.ChangeFeedID]remoteMa } // start operator and scheduler - c.taskHandlers = append(c.taskHandlers, c.cfScheduler.Start(c.taskScheduler)...) + c.taskHandlers = append(c.taskHandlers, c.scheduler.Start(c.taskScheduler)...) operatorControllerHandle := c.taskScheduler.Submit(c.operatorController, time.Now()) c.taskHandlers = append(c.taskHandlers, operatorControllerHandle) c.bootstrapped.Store(true) diff --git a/go.mod b/go.mod index dd1facc59..37632316b 100644 --- a/go.mod +++ b/go.mod @@ -43,6 +43,7 @@ require ( github.com/pingcap/tidb v1.1.0-beta.0.20241014034929-94b2ac04a0c4 github.com/pingcap/tidb/pkg/parser v0.0.0-20241014034929-94b2ac04a0c4 github.com/pingcap/tiflow v0.0.0-20241023094956-dd2d54ad4c19 + github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.20.4 github.com/r3labs/diff v1.1.0 github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 @@ -255,7 +256,6 @@ require ( github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7 // indirect github.com/pingcap/tipb v0.0.0-20241008083645-0bcddae67837 // indirect github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect github.com/prometheus/client_model v0.6.1 // indirect diff --git a/maintainer/maintainer_controller.go b/maintainer/maintainer_controller.go index 459f9f66f..5b4fe697d 100644 --- a/maintainer/maintainer_controller.go +++ b/maintainer/maintainer_controller.go @@ -53,7 +53,7 @@ type Controller struct { tsoClient replica.TSOClient splitter *split.Splitter - spanReplicationEnabled bool + enableTableAcrossNodes bool startCheckpointTs uint64 ddlDispatcherID common.DispatcherID @@ -74,25 +74,29 @@ func NewController(changefeedID common.ChangeFeedID, ddlSpan *replica.SpanReplication, batchSize int, balanceInterval time.Duration) *Controller { mc := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter) - replicaSetDB := replica.NewReplicaSetDB(changefeedID, ddlSpan) + enableTableAcrossNodes := false + var splitter *split.Splitter + if cfConfig != nil && cfConfig.Scheduler.EnableTableAcrossNodes { + enableTableAcrossNodes = true + splitter = split.NewSplitter(changefeedID, pdapi, regionCache, cfConfig.Scheduler) + } + replicaSetDB := replica.NewReplicaSetDB(changefeedID, ddlSpan, enableTableAcrossNodes) nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) oc := operator.NewOperatorController(changefeedID, mc, replicaSetDB, nodeManager, batchSize) s := &Controller{ - startCheckpointTs: checkpointTs, - changefeedID: changefeedID, - bootstrapped: false, - ddlDispatcherID: ddlSpan.ID, - operatorController: oc, - messageCenter: mc, - replicationDB: replicaSetDB, - nodeManager: nodeManager, - taskScheduler: taskScheduler, - cfConfig: cfConfig, - tsoClient: tsoClient, - } - if cfConfig != nil && cfConfig.Scheduler.EnableTableAcrossNodes { - s.splitter = split.NewSplitter(changefeedID, pdapi, regionCache, cfConfig.Scheduler) - s.spanReplicationEnabled = true + startCheckpointTs: checkpointTs, + changefeedID: changefeedID, + bootstrapped: false, + ddlDispatcherID: ddlSpan.ID, + operatorController: oc, + messageCenter: mc, + replicationDB: replicaSetDB, + nodeManager: nodeManager, + taskScheduler: taskScheduler, + cfConfig: cfConfig, + tsoClient: tsoClient, + splitter: splitter, + enableTableAcrossNodes: enableTableAcrossNodes, } s.schedulerController = NewScheduleController(changefeedID, batchSize, oc, replicaSetDB, nodeManager, balanceInterval, s.splitter) return s @@ -130,10 +134,7 @@ func (c *Controller) HandleStatus(from node.ID, statusList []*heartbeatpb.TableS zap.Stringer("node", nodeID)) continue } - stm.UpdateStatus(status) - if c.spanReplicationEnabled { - c.replicationDB.UpdateHotSpan(stm, status) - } + c.replicationDB.UpdateStatus(stm, status) } } @@ -169,7 +170,7 @@ func (c *Controller) AddNewTable(table commonEvent.Table, startTs uint64) { EndKey: span.EndKey, } tableSpans := []*heartbeatpb.TableSpan{tableSpan} - if c.spanReplicationEnabled { + if c.enableTableAcrossNodes { //split the whole table span base on the configuration, todo: background split table tableSpans = c.splitter.SplitSpans(context.Background(), tableSpan, len(c.nodeManager.GetAliveNodes()), 0) } @@ -272,7 +273,7 @@ func (c *Controller) FinishBootstrap( zap.String("changefeed", c.changefeedID.Name()), zap.Int64("tableID", table.TableID)) c.addWorkingSpans(tableMap) - if c.spanReplicationEnabled { + if c.enableTableAcrossNodes { holes := split.FindHoles(tableMap, tableSpan) // todo: split the hole c.addNewSpans(table.SchemaID, holes, c.startCheckpointTs) diff --git a/maintainer/maintainer_controller_test.go b/maintainer/maintainer_controller_test.go index a9aae357a..05c60690f 100644 --- a/maintainer/maintainer_controller_test.go +++ b/maintainer/maintainer_controller_test.go @@ -17,11 +17,14 @@ import ( "bytes" "context" "fmt" + "math/rand" "testing" "time" "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/maintainer/operator" + pkgOpearator "github.com/pingcap/ticdc/pkg/scheduler/operator" + "github.com/pingcap/ticdc/maintainer/replica" "github.com/pingcap/ticdc/pkg/common" appcontext "github.com/pingcap/ticdc/pkg/common/context" @@ -643,7 +646,7 @@ func TestDynamicSplitTableBasic(t *testing.T) { for _, task := range replicas { for cnt := 0; cnt < replica.HotSpanScoreThreshold; cnt++ { - s.replicationDB.UpdateHotSpan(task, &heartbeatpb.TableSpanStatus{ + s.replicationDB.UpdateStatus(task, &heartbeatpb.TableSpanStatus{ ID: task.ID.ToPB(), ComponentStatus: heartbeatpb.ComponentState_Working, CheckpointTs: 10, @@ -671,6 +674,239 @@ func TestDynamicSplitTableBasic(t *testing.T) { require.Equal(t, 7, s.replicationDB.GetAbsentSize()) } +func TestDynamiSplitTableWhenScaleOut(t *testing.T) { + t.Skip("skip unimplemented test") +} + +func TestDynamicMergeAndSplitTable(t *testing.T) { + t.Skip("skip flaky test") + pdAPI := &mockPdAPI{ + regions: make(map[int64][]pdutil.RegionInfo), + } + nodeManager := setNodeManagerAndMessageCenter() + nodeManager.GetAliveNodes()["node1"] = &node.Info{ID: "node1"} + nodeManager.GetAliveNodes()["node2"] = &node.Info{ID: "node2"} + tableTriggerEventDispatcherID := common.NewDispatcherID() + cfID := common.NewChangeFeedIDWithName("test") + tsoClient := &replica.MockTsoClient{} + ddlSpan := replica.NewWorkingReplicaSet(cfID, tableTriggerEventDispatcherID, + tsoClient, heartbeatpb.DDLSpanSchemaID, + heartbeatpb.DDLSpan, &heartbeatpb.TableSpanStatus{ + ID: tableTriggerEventDispatcherID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 1, + }, "node1") + s := NewController(cfID, 1, + pdAPI, tsoClient, nil, nil, &config.ReplicaConfig{ + Scheduler: &config.ChangefeedSchedulerConfig{ + EnableTableAcrossNodes: true, + RegionThreshold: 0, + WriteKeyThreshold: 1, + }}, ddlSpan, 1000, 0) + s.taskScheduler = &mockThreadPool{} + + totalTables := 10 + victim := rand.Intn(totalTables) + 1 + for i := 1; i <= totalTables; i++ { + totalSpan := spanz.TableIDToComparableSpan(int64(i)) + partialSpans := []*heartbeatpb.TableSpan{ + {TableID: int64(i), StartKey: totalSpan.StartKey, EndKey: appendNew(totalSpan.StartKey, 'a')}, + {TableID: int64(i), StartKey: appendNew(totalSpan.StartKey, 'a'), EndKey: appendNew(totalSpan.StartKey, 'b')}, + {TableID: int64(i), StartKey: appendNew(totalSpan.StartKey, 'b'), EndKey: totalSpan.EndKey}, + } + if i == victim { + // victim has hole, should not merged + k := i % 3 + old := partialSpans + partialSpans = old[:k] + partialSpans = append(partialSpans, old[k+1:]...) + } + for idx, span := range partialSpans { + dispatcherID := common.NewDispatcherID() + spanReplica := replica.NewWorkingReplicaSet(cfID, dispatcherID, tsoClient, 1, span, &heartbeatpb.TableSpanStatus{ + ID: dispatcherID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 10, + EventSizePerSecond: replica.HotSpanWriteThreshold, + }, node.ID(fmt.Sprintf("node%d", idx%2+1))) + if idx == 0 { + spanReplica.GetStatus().EventSizePerSecond = replica.HotSpanWriteThreshold * 100 + } + s.replicationDB.AddReplicatingSpan(spanReplica) + } + + // new split regions + pdAPI.regions[1] = []pdutil.RegionInfo{ + pdutil.NewTestRegionInfo(1, totalSpan.StartKey, appendNew(totalSpan.StartKey, 'a'), uint64(1)), + pdutil.NewTestRegionInfo(2, appendNew(totalSpan.StartKey, 'a'), totalSpan.EndKey, uint64(1)), + } + } + replicas := s.replicationDB.GetReplicating() + require.Equal(t, totalTables*3-1, s.replicationDB.GetReplicatingSize()) + + scheduler := s.schedulerController.GetScheduler(scheduler.SplitScheduler) + scheduler.Execute() + require.Equal(t, 0, s.replicationDB.GetSchedulingSize()) + require.Equal(t, totalTables*3-1, s.operatorController.OperatorSize()) + finishedCnt := 0 + for _, task := range replicas { + op := s.operatorController.GetOperator(task.ID) + op.Schedule() + op.Check("node1", &heartbeatpb.TableSpanStatus{ + ID: op.ID().ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Stopped, + CheckpointTs: 10, + }) + if op.IsFinished() { + op.PostFinish() + finishedCnt++ + } + } + require.Less(t, finishedCnt, totalTables*3-1) + + //total 7 regions, + // table 1: split to 4 spans, will be inserted to absent + // table 2: split to 3 spans, will be inserted to absent + require.Equal(t, 7, s.replicationDB.GetAbsentSize()) +} + +func TestDynamicMergeTableBasic(t *testing.T) { + pdAPI := &mockPdAPI{ + regions: make(map[int64][]pdutil.RegionInfo), + } + nodeManager := setNodeManagerAndMessageCenter() + nodeManager.GetAliveNodes()["node1"] = &node.Info{ID: "node1"} + nodeManager.GetAliveNodes()["node2"] = &node.Info{ID: "node2"} + tableTriggerEventDispatcherID := common.NewDispatcherID() + cfID := common.NewChangeFeedIDWithName("test") + tsoClient := &replica.MockTsoClient{} + ddlSpan := replica.NewWorkingReplicaSet(cfID, tableTriggerEventDispatcherID, + tsoClient, heartbeatpb.DDLSpanSchemaID, + heartbeatpb.DDLSpan, &heartbeatpb.TableSpanStatus{ + ID: tableTriggerEventDispatcherID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 1, + }, "node1") + s := NewController(cfID, 1, + pdAPI, tsoClient, nil, nil, &config.ReplicaConfig{ + Scheduler: &config.ChangefeedSchedulerConfig{ + EnableTableAcrossNodes: true, + RegionThreshold: 0, + WriteKeyThreshold: 1, + }}, ddlSpan, 1000, 0) + s.taskScheduler = &mockThreadPool{} + + totalTables := 10 + victim := rand.Intn(totalTables) + 1 + var holeSpan *heartbeatpb.TableSpan + for i := 1; i <= totalTables; i++ { + totalSpan := spanz.TableIDToComparableSpan(int64(i)) + partialSpans := []*heartbeatpb.TableSpan{ + {TableID: int64(i), StartKey: totalSpan.StartKey, EndKey: appendNew(totalSpan.StartKey, 'a')}, + {TableID: int64(i), StartKey: appendNew(totalSpan.StartKey, 'a'), EndKey: appendNew(totalSpan.StartKey, 'b')}, + {TableID: int64(i), StartKey: appendNew(totalSpan.StartKey, 'b'), EndKey: totalSpan.EndKey}, + } + if i == victim { + // victim has hole, should not merged + k := i % 3 + old := partialSpans + holeSpan = old[k] + partialSpans = old[:k] + partialSpans = append(partialSpans, old[k+1:]...) + } + for idx, span := range partialSpans { + dispatcherID := common.NewDispatcherID() + spanReplica := replica.NewWorkingReplicaSet(cfID, dispatcherID, tsoClient, 1, span, &heartbeatpb.TableSpanStatus{ + ID: dispatcherID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 10, + EventSizePerSecond: 0, + }, node.ID(fmt.Sprintf("node%d", idx%2+1))) + s.replicationDB.AddReplicatingSpan(spanReplica) + } + } + + expected := (totalTables - 1) * 3 + victimExpected := 2 + replicas := s.replicationDB.GetReplicating() + require.Equal(t, expected+victimExpected, s.replicationDB.GetReplicatingSize()) + + scheduler := s.schedulerController.GetScheduler(scheduler.SplitScheduler) + for i := 0; i < replica.DefaultScoreThreshold; i++ { + scheduler.Execute() + } + scheduler.Execute() // dummy execute does not take effect + require.Equal(t, victimExpected, s.replicationDB.GetReplicatingSize()) + require.Equal(t, expected, s.replicationDB.GetSchedulingSize()) + require.Equal(t, expected, s.operatorController.OperatorSize()) + + primarys := make(map[int64]pkgOpearator.Operator[common.DispatcherID, *heartbeatpb.TableSpanStatus]) + for _, task := range replicas { + op := s.operatorController.GetOperator(task.ID) + if op == nil { + require.Equal(t, int64(victim), task.Span.GetTableID()) + continue + } + op.Schedule() + op.Check(task.GetNodeID(), &heartbeatpb.TableSpanStatus{ + ID: op.ID().ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Stopped, + CheckpointTs: 10, + }) + if op.IsFinished() { + op.PostFinish() + } else { + primarys[task.Span.GetTableID()] = op + } + } + for _, op := range primarys { + finished := op.IsFinished() + require.True(t, finished) + op.PostFinish() + } + + require.Equal(t, totalTables-1, s.replicationDB.GetAbsentSize()) + + // merge the hole + dispatcherID := common.NewDispatcherID() + // the holeSpan is on node0, which is offlined + spanReplica := replica.NewWorkingReplicaSet(cfID, dispatcherID, tsoClient, 1, holeSpan, &heartbeatpb.TableSpanStatus{ + ID: dispatcherID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 10, + EventSizePerSecond: 0, + }, node.ID(fmt.Sprintf("node%d", 0))) + s.replicationDB.AddReplicatingSpan(spanReplica) + replicas = s.replicationDB.GetReplicating() + require.Equal(t, 3, len(replicas)) + for i := 0; i < replica.DefaultScoreThreshold; i++ { + scheduler.Execute() + } + require.Equal(t, 0, s.replicationDB.GetReplicatingSize()) + require.Equal(t, 30, s.operatorController.OperatorSize()) + primarys = make(map[int64]pkgOpearator.Operator[common.DispatcherID, *heartbeatpb.TableSpanStatus]) + for _, task := range replicas { + op := s.operatorController.GetOperator(task.ID) + op.Schedule() + op.Check(task.GetNodeID(), &heartbeatpb.TableSpanStatus{ + ID: op.ID().ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Stopped, + CheckpointTs: 10, + }) + if op.IsFinished() { + op.PostFinish() + } else { + primarys[task.Span.GetTableID()] = op + } + } + for _, op := range primarys { + finished := op.IsFinished() + require.True(t, finished) + op.PostFinish() + } + require.Equal(t, totalTables, s.replicationDB.GetAbsentSize()) +} + func appendNew(origin []byte, c byte) []byte { nb := bytes.Clone(origin) return append(nb, c) diff --git a/maintainer/operator/operator_controller.go b/maintainer/operator/operator_controller.go index 90627d36e..8cea2261e 100644 --- a/maintainer/operator/operator_controller.go +++ b/maintainer/operator/operator_controller.go @@ -15,6 +15,7 @@ package operator import ( "container/heap" + "math/rand" "sync" "time" @@ -79,7 +80,6 @@ func (oc *Controller) Execute() time.Time { // is the lock necessary? oc.lock.RLock() - // maybe check the the node state when scheduling the operator msg := r.Schedule() oc.lock.RUnlock() @@ -302,3 +302,53 @@ func (oc *Controller) NewSplitOperator( ) operator.Operator[common.DispatcherID, *heartbeatpb.TableSpanStatus] { return NewSplitDispatcherOperator(oc.replicationDB, replicaSet, originNode, splitSpans) } + +// AddMergeSplitOperator adds a merge split operator to the controller. +// 1. Merge Operator: len(affectedReplicaSets) > 1, len(splitSpans) == 1 +// 2. Split Operator: len(affectedReplicaSets) == 1, len(splitSpans) > 1 +// 3. MergeAndSplit Operator: len(affectedReplicaSets) > 1, len(splitSpans) > 1 +func (oc *Controller) AddMergeSplitOperator( + affectedReplicaSets []*replica.SpanReplication, + splitSpans []*heartbeatpb.TableSpan, +) bool { + oc.lock.Lock() + defer oc.lock.Unlock() + // TODO: check if there are some intersection between `ret.Replications` and `spans`. + // Ignore the intersection spans to prevent meaningless split operation. + for _, replicaSet := range affectedReplicaSets { + if _, ok := oc.operators[replicaSet.ID]; ok { + log.Info("add operator failed, operator already exists", + zap.String("changefeed", oc.changefeedID.Name()), + zap.String("dispatcherID", replicaSet.ID.String()), + ) + return false + } + span := oc.replicationDB.GetTaskByID(replicaSet.ID) + if span == nil { + log.Warn("add operator failed, span not found", + zap.String("changefeed", oc.changefeedID.Name()), + zap.String("dispatcherID", replicaSet.ID.String())) + return false + } + } + randomIdx := rand.Intn(len(affectedReplicaSets)) + primaryID := affectedReplicaSets[randomIdx].ID + primaryOp := NewMergeSplitDispatcherOperator(oc.replicationDB, primaryID, affectedReplicaSets[randomIdx], affectedReplicaSets, splitSpans, nil) + for _, replicaSet := range affectedReplicaSets { + var op *MergeSplitDispatcherOperator + if replicaSet.ID == primaryID { + op = primaryOp + } else { + op = NewMergeSplitDispatcherOperator(oc.replicationDB, primaryID, replicaSet, nil, nil, primaryOp.onFinished) + } + oc.pushOperator(op) + } + log.Info("add merge split operator", + zap.String("changefeed", oc.changefeedID.Name()), + zap.String("primary", primaryID.String()), + zap.Int64("tableID", splitSpans[0].TableID), + zap.Int("oldSpans", len(affectedReplicaSets)), + zap.Int("newSpans", len(splitSpans)), + ) + return true +} diff --git a/maintainer/operator/operator_merge_split.go b/maintainer/operator/operator_merge_split.go new file mode 100644 index 000000000..18fcfa321 --- /dev/null +++ b/maintainer/operator/operator_merge_split.go @@ -0,0 +1,190 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package operator + +import ( + "encoding/hex" + "fmt" + "sync" + "sync/atomic" + + "github.com/pingcap/log" + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/maintainer/replica" + "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/messaging" + "github.com/pingcap/ticdc/pkg/node" + "go.uber.org/zap" +) + +// MergeSplitDispatcherOperator is an operator to remove a table span from a dispatcher +// and then added some new spans to the replication db +type MergeSplitDispatcherOperator struct { + db *replica.ReplicationDB + originNode node.ID + originReplicaSet *replica.SpanReplication + + checkpointTs uint64 + removed bool + finished bool + onFinished func() + lck sync.Mutex // protect the previous fields + + // For primary operator, totalRemoved tend to be increased by operators of other affectedReplicaSets + totalRemoved atomic.Int64 + primary common.DispatcherID + affectedReplicaSets []*replica.SpanReplication + splitSpans []*heartbeatpb.TableSpan + splitSpanInfo string +} + +// NewMergeSplitDispatcherOperator creates a new MergeSplitDispatcherOperator +func NewMergeSplitDispatcherOperator( + db *replica.ReplicationDB, + primary common.DispatcherID, + originReplicaSet *replica.SpanReplication, + affectedReplicaSets []*replica.SpanReplication, + splitSpans []*heartbeatpb.TableSpan, + onFinished func(), +) *MergeSplitDispatcherOperator { + spansInfo := "" + for _, span := range splitSpans { + spansInfo += fmt.Sprintf("[%s,%s]", + hex.EncodeToString(span.StartKey), hex.EncodeToString(span.EndKey)) + } + op := &MergeSplitDispatcherOperator{ + db: db, + originNode: originReplicaSet.GetNodeID(), + originReplicaSet: originReplicaSet, + checkpointTs: originReplicaSet.GetStatus().GetCheckpointTs(), + primary: primary, + affectedReplicaSets: affectedReplicaSets, + totalRemoved: atomic.Int64{}, + splitSpans: splitSpans, + splitSpanInfo: spansInfo, + onFinished: onFinished, + } + if op.isPrimary() { + op.onFinished = func() { + op.totalRemoved.Add(1) + } + } + return op +} + +func (m *MergeSplitDispatcherOperator) Start() { + m.lck.Lock() + defer m.lck.Unlock() + + m.db.MarkSpanScheduling(m.originReplicaSet) +} + +func (m *MergeSplitDispatcherOperator) OnNodeRemove(n node.ID) { + m.lck.Lock() + defer m.lck.Unlock() + + if n == m.originNode { + log.Info("origin node is removed", + zap.String("replicaSet", m.originReplicaSet.ID.String())) + m.markFinished() + } +} + +func (m *MergeSplitDispatcherOperator) isPrimary() bool { + return m.originReplicaSet.ID == m.primary +} + +func (m *MergeSplitDispatcherOperator) markFinished() { + if !m.finished { + m.finished = true + m.onFinished() + } +} + +func (m *MergeSplitDispatcherOperator) AffectedNodes() []node.ID { + return []node.ID{m.originNode} +} + +func (m *MergeSplitDispatcherOperator) ID() common.DispatcherID { + return m.originReplicaSet.ID +} + +func (m *MergeSplitDispatcherOperator) IsFinished() bool { + m.lck.Lock() + defer m.lck.Unlock() + + if m.removed { + return true + } + + if m.originReplicaSet.ID == m.primary { + // primary operator wait for all affected replica sets to be removed, since it + // is responsible for relpace them with new spans. + return m.finished && int(m.totalRemoved.Load()) == len(m.affectedReplicaSets) + } + return m.finished +} + +func (m *MergeSplitDispatcherOperator) Check(from node.ID, status *heartbeatpb.TableSpanStatus) { + m.lck.Lock() + defer m.lck.Unlock() + + if from == m.originNode && status.ComponentStatus != heartbeatpb.ComponentState_Working { + if status.CheckpointTs > m.checkpointTs { + m.checkpointTs = status.CheckpointTs + } + m.originReplicaSet.UpdateStatus(status) + log.Info("replica set removed from origin node", + zap.Uint64("checkpointTs", m.checkpointTs), + zap.String("replicaSet", m.originReplicaSet.ID.String())) + m.markFinished() + } +} + +func (m *MergeSplitDispatcherOperator) Schedule() *messaging.TargetMessage { + return m.originReplicaSet.NewRemoveDispatcherMessage(m.originNode) +} + +// OnTaskRemoved is called when the task is removed by ddl +func (m *MergeSplitDispatcherOperator) OnTaskRemoved() { + m.lck.Lock() + defer m.lck.Unlock() + + log.Info("task removed", zap.String("replicaSet", m.originReplicaSet.ID.String())) + m.removed = true +} + +func (m *MergeSplitDispatcherOperator) PostFinish() { + m.lck.Lock() + defer m.lck.Unlock() + + if m.originReplicaSet.ID == m.primary { + log.Info("merge-split dispatcher operator finished[primary]", zap.String("id", m.originReplicaSet.ID.String())) + m.db.ReplaceReplicaSet(m.affectedReplicaSets, m.splitSpans, m.checkpointTs) + return + } + log.Info("merge-split dispatcher operator finished[secondary]", zap.String("id", m.originReplicaSet.ID.String())) +} + +func (m *MergeSplitDispatcherOperator) String() string { + if m.originReplicaSet.ID == m.primary { + return fmt.Sprintf("merge-split dispatcher operator[primary]: %s, totalAffected: %d, finished: %d, splitSpans:%s", + m.originReplicaSet.ID, len(m.affectedReplicaSets), m.totalRemoved.Load(), m.splitSpanInfo) + } + return fmt.Sprintf("merge-split dispatcher operator[secondary]: %s, primary: %s", m.originReplicaSet.ID, m.primary) +} + +func (m *MergeSplitDispatcherOperator) Type() string { + return "merge-split" +} diff --git a/maintainer/operator/operator_split.go b/maintainer/operator/operator_split.go index 15f38ce62..95db3de5c 100644 --- a/maintainer/operator/operator_split.go +++ b/maintainer/operator/operator_split.go @@ -128,7 +128,7 @@ func (m *SplitDispatcherOperator) PostFinish() { defer m.lck.Unlock() log.Info("split dispatcher operator finished", zap.String("id", m.replicaSet.ID.String())) - m.db.ReplaceReplicaSet(m.replicaSet, m.splitSpans, m.checkpointTs) + m.db.ReplaceReplicaSet([]*replica.SpanReplication{m.replicaSet}, m.splitSpans, m.checkpointTs) } func (m *SplitDispatcherOperator) String() string { diff --git a/maintainer/replica/checker.go b/maintainer/replica/checker.go new file mode 100644 index 000000000..d1c48a676 --- /dev/null +++ b/maintainer/replica/checker.go @@ -0,0 +1,373 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package replica + +import ( + "fmt" + "math" + "strconv" + "strings" + "time" + + "github.com/pingcap/log" + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/pkg/common" + appcontext "github.com/pingcap/ticdc/pkg/common/context" + "github.com/pingcap/ticdc/pkg/node" + "github.com/pingcap/ticdc/pkg/scheduler/replica" + "github.com/pingcap/ticdc/server/watcher" + "go.uber.org/zap" +) + +type OpType int + +const ( + OpSplit OpType = iota // Split one span to multiple subspans + OpMerge // merge multiple spans to one span + OpMergeAndSplit // remove old spans and split to multiple subspans +) + +const ( + HotSpanWriteThreshold = 1024 * 1024 // 1MB per second + HotSpanScoreThreshold = 3 // TODO: bump to 10 befroe release + DefaultScoreThreshold = 3 + + defaultHardImbalanceThreshold = float64(1.35) // used to trigger the rebalance + clearTimeout = 300 // seconds +) + +var MinSpanNumberCoefficient = 2 + +type CheckResult struct { + OpType OpType + Replications []*SpanReplication +} + +func (c CheckResult) String() string { + opStr := "" + switch c.OpType { + case OpSplit: + opStr = "split" + case OpMerge: + opStr = "merge" + case OpMergeAndSplit: + opStr = "merge and split" + default: + panic("unknown op type") + } + return fmt.Sprintf("OpType: %s, ReplicationSize: %d", opStr, len(c.Replications)) +} + +func getNewGroupChecker( + cfID common.ChangeFeedID, enableTableAcrossNodes bool, +) func(replica.GroupID) replica.GroupChecker[common.DispatcherID, *SpanReplication] { + if !enableTableAcrossNodes { + return replica.NewEmptyChecker[common.DispatcherID, *SpanReplication] + } + return func(groupID replica.GroupID) replica.GroupChecker[common.DispatcherID, *SpanReplication] { + groupType := replica.GetGroupType(groupID) + switch groupType { + case replica.GroupDefault: + return newHotSpanChecker(cfID) + case replica.GroupTable: + return newImbalanceChecker(cfID) + } + log.Panic("unknown group type", zap.String("changefeed", cfID.Name()), zap.Int8("groupType", int8(groupType))) + return nil + } +} + +type hotSpanChecker struct { + changefeedID common.ChangeFeedID + hotTasks map[common.DispatcherID]*hotSpanStatus + writeThreshold float32 + scoreThreshold int +} + +func newHotSpanChecker(cfID common.ChangeFeedID) *hotSpanChecker { + return &hotSpanChecker{ + changefeedID: cfID, + hotTasks: make(map[common.DispatcherID]*hotSpanStatus), + writeThreshold: HotSpanWriteThreshold, + scoreThreshold: HotSpanScoreThreshold, + } +} + +func (s *hotSpanChecker) Name() string { + return "hot span checker" +} + +func (s *hotSpanChecker) AddReplica(replica *SpanReplication) { + // only track the hot span dynamically + return +} + +func (s *hotSpanChecker) RemoveReplica(span *SpanReplication) { + delete(s.hotTasks, span.ID) +} + +func (s *hotSpanChecker) UpdateStatus(span *SpanReplication) { + status := span.GetStatus() + if status.ComponentStatus != heartbeatpb.ComponentState_Working { + if _, ok := s.hotTasks[span.ID]; ok { + delete(s.hotTasks, span.ID) + log.Debug("remove unworking hot span", zap.String("changefeed", s.changefeedID.Name()), zap.String("span", span.ID.String())) + } + return + } + + if status.EventSizePerSecond < s.writeThreshold { + if hotSpan, ok := s.hotTasks[span.ID]; ok { + hotSpan.score-- + if hotSpan.score == 0 { + delete(s.hotTasks, span.ID) + } + } + return + } + + hotSpan, ok := s.hotTasks[span.ID] + if !ok { + // add the new hot span + hotSpan = &hotSpanStatus{ + SpanReplication: span, + score: 0, + } + s.hotTasks[span.ID] = hotSpan + } + hotSpan.score++ + hotSpan.lastUpdateTime = time.Now() +} + +func (s *hotSpanChecker) Check(batchSize int) replica.GroupCheckResult { + cache := make([]CheckResult, 0) + + for _, hotSpan := range s.hotTasks { + if time.Since(hotSpan.lastUpdateTime) > clearTimeout*time.Second { + // should not happen + log.Panic("remove hot span since it is outdated", + zap.String("changefeed", s.changefeedID.Name()), zap.String("span", hotSpan.ID.String())) + // s.RemoveReplica(hotSpan.SpanReplication) + } else if hotSpan.score >= s.scoreThreshold { + cache = append(cache, CheckResult{ + OpType: OpSplit, + Replications: []*SpanReplication{hotSpan.SpanReplication}, + }) + if len(cache) >= batchSize { + break + } + } + } + return cache +} + +func (s *hotSpanChecker) Stat() string { + var res strings.Builder + cnts := make([]int, s.scoreThreshold+1) + for _, hotSpan := range s.hotTasks { + score := min(s.scoreThreshold, hotSpan.score) + cnts[score]++ + } + for i, cnt := range cnts { + if cnt == 0 { + continue + } + res.WriteString("score ") + res.WriteString(strconv.Itoa(i)) + res.WriteString("->") + res.WriteString(strconv.Itoa(cnt)) + res.WriteString(";") + if i < len(cnts)-1 { + res.WriteString(" ") + } + } + if res.Len() == 0 { + return "No hot spans" + } + return res.String() +} + +type hotSpanStatus struct { + *SpanReplication + HintMaxSpanNum uint64 + // score add 1 when the eventSizePerSecond is larger than writeThreshold*imbalanceCoefficient + score int + lastUpdateTime time.Time +} + +type rebalanceChecker struct { + changefeedID common.ChangeFeedID + allTasks map[common.DispatcherID]*hotSpanStatus + nodeManager *watcher.NodeManager + + // fast check, rebalance immediately when both the total load and imbalance ratio is high + hardWriteThreshold float32 + hardImbalanceThreshold float64 + // disable rebalance if every span load is lower than the softWriteThreshold or + // total span is larger than x + + // slow check, rebalance only if the imbalance condition has lasted for a period of time + softWriteThreshold float32 + softImbalanceThreshold float64 + + // score measures the duration of the condition + softRebalanceScore int // add 1 when the load is not balanced + softRebalanceScoreThreshold int + softMergeScore int // add 1 when the total load is lowwer than the softWriteThreshold + softMergeScoreThreshold int +} + +func newImbalanceChecker(cfID common.ChangeFeedID) *rebalanceChecker { + nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) + return &rebalanceChecker{ + changefeedID: cfID, + allTasks: make(map[common.DispatcherID]*hotSpanStatus), + nodeManager: nodeManager, + hardWriteThreshold: 10 * HotSpanWriteThreshold, + hardImbalanceThreshold: defaultHardImbalanceThreshold, + + softWriteThreshold: 3 * HotSpanWriteThreshold, + softImbalanceThreshold: 1.2, // 2 * defaultHardImbalanceThreshold, + softRebalanceScoreThreshold: DefaultScoreThreshold, + softMergeScoreThreshold: DefaultScoreThreshold, + } +} + +func (s *rebalanceChecker) Name() string { + return "table rebalance checker" +} + +func (s *rebalanceChecker) AddReplica(replica *SpanReplication) { + if _, ok := s.allTasks[replica.ID]; ok { + log.Panic("add duplicated replica", zap.String("changefeed", s.changefeedID.Name()), + zap.String("replica", replica.ID.String())) + } + s.allTasks[replica.ID] = &hotSpanStatus{ + SpanReplication: replica, + } +} + +func (s *rebalanceChecker) RemoveReplica(replica *SpanReplication) { + delete(s.allTasks, replica.ID) +} + +func (s *rebalanceChecker) UpdateStatus(replica *SpanReplication) { + if _, ok := s.allTasks[replica.ID]; !ok { + log.Panic("update unexist replica", zap.String("changefeed", s.changefeedID.Name()), + zap.String("replica", replica.ID.String())) + } +} + +func (s *rebalanceChecker) Check(_ int) replica.GroupCheckResult { + nodeLoads := make(map[node.ID]float64) + replications := []*SpanReplication{} + totalEventSizePerSecond := float32(0) + for _, span := range s.allTasks { + status := span.GetStatus() + nodeID := span.GetNodeID() + if status.ComponentStatus != heartbeatpb.ComponentState_Working || nodeID == "" { + log.Warn("skip rebalance since the span is not working", + zap.String("changefeed", s.changefeedID.Name()), zap.String("span", span.ID.String())) + return nil + } + totalEventSizePerSecond += status.EventSizePerSecond + nodeLoads[span.GetNodeID()] += float64(status.EventSizePerSecond) + replications = append(replications, span.SpanReplication) + } + + // check merge + if totalEventSizePerSecond < s.softWriteThreshold { + s.softRebalanceScore = 0 + s.softMergeScore++ + if s.softMergeScore >= s.softMergeScoreThreshold { + s.softMergeScore = 0 + return []CheckResult{ + { + OpType: OpMerge, + Replications: replications, + }, + } + } + return nil + } + s.softMergeScore = 0 + + return s.checkRebalance(nodeLoads, replications) +} + +func (s *rebalanceChecker) checkRebalance( + nodeLoads map[node.ID]float64, replications []*SpanReplication, +) []CheckResult { + ret := []CheckResult{ + { + OpType: OpMergeAndSplit, + Replications: replications, + }, + } + // case 1: too much nodes, need split more spans + allNodes := s.nodeManager.GetAliveNodes() + if len(s.allTasks) < len(allNodes)*MinSpanNumberCoefficient { + return ret + } + if len(nodeLoads) != len(allNodes) { + // wait for tasks balanced across all nodes + log.Warn("skip rebalance since tasks are not balanced", zap.String("changefeed", s.changefeedID.Name()), + zap.Int("nodesWithTasks", len(nodeLoads)), zap.Int("allNodes", len(allNodes))) + return nil + } + + maxLoad, minLoad := float64(0.0), math.MaxFloat64 + for _, load := range nodeLoads { + maxLoad = math.Max(maxLoad, load) + minLoad = math.Min(minLoad, load) + } + minLoad = math.Max(minLoad, float64(s.softWriteThreshold)) + + // case 2: check hard rebalance + if maxLoad-minLoad >= float64(s.hardWriteThreshold) && maxLoad/minLoad > s.hardImbalanceThreshold { + s.softRebalanceScore = 0 + return ret + } + + // case 3: check soft rebalance + if maxLoad/minLoad >= s.softImbalanceThreshold { + s.softRebalanceScore++ + } else { + s.softRebalanceScore = max(s.softRebalanceScore-1, 0) + } + if s.softRebalanceScore >= s.softRebalanceScoreThreshold { + s.softRebalanceScore = 0 + return ret + } + + // default case: no need to rebalance + return nil +} + +func (s *rebalanceChecker) Stat() string { + res := strings.Builder{} + res.WriteString(fmt.Sprintf("total tasks: %d; hard: [writeThreshold: %f, imbalanceThreshold: %f];", + len(s.allTasks), s.hardWriteThreshold, s.hardImbalanceThreshold)) + res.WriteString(fmt.Sprintf("soft: [writeThreshold: %f, imbalanceThreshold: %f, rebalanceScoreThreshold: %d, mergeScoreThreshold: %d];", + s.softWriteThreshold, s.softImbalanceThreshold, s.softRebalanceScoreThreshold, s.softMergeScoreThreshold)) + res.WriteString(fmt.Sprintf("softScore: [rebalance: %d, merge: %d]", s.softRebalanceScore, s.softMergeScore)) + return res.String() +} + +// TODO: implement the dynamic merge and split checker +type dynamicMergeSplitChecker struct { + changefeedID common.ChangeFeedID + allTasks map[common.DispatcherID]*hotSpanStatus + nodeManager *watcher.NodeManager +} diff --git a/maintainer/replica/checker_test.go b/maintainer/replica/checker_test.go new file mode 100644 index 000000000..f20c3dfdf --- /dev/null +++ b/maintainer/replica/checker_test.go @@ -0,0 +1,270 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package replica + +import ( + "bytes" + "fmt" + "testing" + + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/pkg/common" + appcontext "github.com/pingcap/ticdc/pkg/common/context" + "github.com/pingcap/ticdc/pkg/node" + "github.com/pingcap/ticdc/pkg/scheduler/replica" + "github.com/pingcap/ticdc/server/watcher" + "github.com/stretchr/testify/require" +) + +func appendNew(origin []byte, c byte) []byte { + nb := bytes.Clone(origin) + return append(nb, c) +} + +func TestHotSpanChecker(t *testing.T) { + t.Parallel() + + db := newDBWithCheckerForTest(t) + absent := NewReplicaSet(db.changefeedID, common.NewDispatcherID(), db.ddlSpan.tsoClient, 1, getTableSpanByID(4), 1) + db.AddAbsentReplicaSet(absent) + // replicating and scheduling will be returned + replicaSpanID := common.NewDispatcherID() + replicaSpan := NewWorkingReplicaSet(db.changefeedID, replicaSpanID, + db.ddlSpan.tsoClient, 1, + getTableSpanByID(3), &heartbeatpb.TableSpanStatus{ + ID: replicaSpanID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 1, + }, "node1") + db.AddReplicatingSpan(replicaSpan) + require.Equal(t, 1, db.GetReplicatingSize()) + require.Equal(t, 1, db.GetAbsentSize()) + + require.Equal(t, replicaSpan.GetGroupID(), absent.GetGroupID()) + groupID := replicaSpan.GetGroupID() + checker := db.GetGroupChecker(groupID).(*hotSpanChecker) + require.Equal(t, 0, len(checker.hotTasks)) + + // test update status and track hot task dynamically + db.MarkSpanReplicating(absent) + db.UpdateStatus(absent, &heartbeatpb.TableSpanStatus{ + CheckpointTs: 9, + EventSizePerSecond: HotSpanWriteThreshold, + }) + require.Equal(t, 1, len(checker.hotTasks)) + for i := 0; i < HotSpanScoreThreshold; i++ { + db.UpdateStatus(replicaSpan, &heartbeatpb.TableSpanStatus{ + CheckpointTs: 9, + EventSizePerSecond: HotSpanWriteThreshold, + }) + } + require.Equal(t, 2, len(checker.hotTasks)) + + // test check + results := checker.Check(20).([]CheckResult) + require.Equal(t, 1, len(results)) + ret := results[0] + require.Equal(t, OpSplit, ret.OpType) + require.Equal(t, 1, len(ret.Replications)) + require.Equal(t, replicaSpan, ret.Replications[0]) + require.Equal(t, 2, len(checker.hotTasks)) + + // test remove + db.ReplaceReplicaSet(ret.Replications, nil, 10) + require.Equal(t, 1, len(checker.hotTasks)) +} + +// Not parallel because it will change the global node manager +func TestRebalanceChecker(t *testing.T) { + oldMinSpanNumberCoefficient := MinSpanNumberCoefficient + MinSpanNumberCoefficient = 1 + defer func() { + MinSpanNumberCoefficient = oldMinSpanNumberCoefficient + }() + nodeManager := watcher.NewNodeManager(nil, nil) + allNodes := nodeManager.GetAliveNodes() + for i := 0; i < 3; i++ { + idx := fmt.Sprintf("node%d", i) + allNodes[node.ID(idx)] = &node.Info{ID: node.ID(idx)} + } + + appcontext.SetService(watcher.NodeManagerName, nodeManager) + db := newDBWithCheckerForTest(t) + totalSpan := getTableSpanByID(4) + partialSpans := []*heartbeatpb.TableSpan{ + {StartKey: totalSpan.StartKey, EndKey: appendNew(totalSpan.StartKey, 'a')}, + {StartKey: appendNew(totalSpan.StartKey, 'a'), EndKey: appendNew(totalSpan.StartKey, 'b')}, + {StartKey: appendNew(totalSpan.StartKey, 'b'), EndKey: appendNew(totalSpan.StartKey, 'c')}, + } + allReplicas := make([]*SpanReplication, 0, len(partialSpans)) + for _, span := range partialSpans { + absent := NewReplicaSet(db.changefeedID, common.NewDispatcherID(), db.ddlSpan.tsoClient, 1, span, 1) + allReplicas = append(allReplicas, absent) + db.AddAbsentReplicaSet(absent) + } + replicaSpan := &heartbeatpb.TableSpan{ + StartKey: appendNew(totalSpan.StartKey, 'c'), + EndKey: totalSpan.EndKey, + } + replica := NewWorkingReplicaSet(db.changefeedID, common.NewDispatcherID(), db.ddlSpan.tsoClient, 1, replicaSpan, + &heartbeatpb.TableSpanStatus{CheckpointTs: 9, ComponentStatus: heartbeatpb.ComponentState_Working}, "node0") + allReplicas = append(allReplicas, replica) + db.AddReplicatingSpan(replica) + require.Equal(t, 3, db.GetAbsentSize()) + require.Equal(t, 1, db.GetReplicatingSize()) + + // test add replica + groupID := replica.GetGroupID() + for _, r := range allReplicas { + require.Equal(t, groupID, r.GetGroupID()) + } + checker := db.GetGroupChecker(groupID).(*rebalanceChecker) + require.Equal(t, 4, len(checker.allTasks)) + + // test update replica status + require.Nil(t, checker.Check(20)) + require.Panics(t, func() { + invalidReplica := NewReplicaSet(db.changefeedID, common.NewDispatcherID(), db.ddlSpan.tsoClient, 1, replicaSpan, 1) + db.UpdateStatus(invalidReplica, &heartbeatpb.TableSpanStatus{ + CheckpointTs: 9, + EventSizePerSecond: checker.softWriteThreshold, + }) + }) + for idx, r := range allReplicas { + r.SetNodeID(node.ID(fmt.Sprintf("node%d", idx%3))) + db.MarkSpanScheduling(r) + db.UpdateStatus(r, &heartbeatpb.TableSpanStatus{ + CheckpointTs: 9, + ComponentStatus: heartbeatpb.ComponentState_Working, + EventSizePerSecond: checker.softWriteThreshold, + }) + } + + // test hard threadhold + require.Nil(t, checker.Check(20)) + db.UpdateStatus(replica, &heartbeatpb.TableSpanStatus{ + CheckpointTs: 9, + ComponentStatus: heartbeatpb.ComponentState_Working, + EventSizePerSecond: checker.hardWriteThreshold, + }) + require.NotNil(t, checker.Check(20)) + + // test scale out too much nodes + db.UpdateStatus(replica, &heartbeatpb.TableSpanStatus{ + CheckpointTs: 9, + ComponentStatus: heartbeatpb.ComponentState_Working, + EventSizePerSecond: checker.softWriteThreshold, + }) + require.Nil(t, checker.Check(20)) + nodeManager.GetAliveNodes()["node3"] = &node.Info{ID: "node3"} + require.Nil(t, checker.Check(20)) + nodeManager.GetAliveNodes()["node4"] = &node.Info{ID: "node4"} + rets := checker.Check(20).([]CheckResult) + require.NotNil(t, rets) + ret := rets[0] + require.Equal(t, OpMergeAndSplit, ret.OpType) + + // test remove + db.ReplaceReplicaSet(ret.Replications, nil, 10) + require.Equal(t, 0, len(checker.allTasks)) +} + +// Not parallel because it will change the global node manager +func TestSoftRebalanceChecker(t *testing.T) { + oldMinSpanNumberCoefficient := MinSpanNumberCoefficient + MinSpanNumberCoefficient = 1 + defer func() { + MinSpanNumberCoefficient = oldMinSpanNumberCoefficient + }() + nodeManager := watcher.NewNodeManager(nil, nil) + allNodes := nodeManager.GetAliveNodes() + totalNodes := 3 + for i := 0; i < totalNodes; i++ { + idx := fmt.Sprintf("node%d", i) + allNodes[node.ID(idx)] = &node.Info{ID: node.ID(idx)} + } + + appcontext.SetService(watcher.NodeManagerName, nodeManager) + db := newDBWithCheckerForTest(t) + totalSpan := getTableSpanByID(4) + partialSpans := []*heartbeatpb.TableSpan{ + {StartKey: totalSpan.StartKey, EndKey: appendNew(totalSpan.StartKey, 'a')}, + {StartKey: appendNew(totalSpan.StartKey, 'a'), EndKey: appendNew(totalSpan.StartKey, 'b')}, + {StartKey: appendNew(totalSpan.StartKey, 'b'), EndKey: appendNew(totalSpan.StartKey, 'c')}, + {StartKey: appendNew(totalSpan.StartKey, 'c'), EndKey: totalSpan.EndKey}, + } + + groupID := replica.GroupID(0) + allReplicas := make([]*SpanReplication, 0, len(partialSpans)) + for i, span := range partialSpans { + idx := node.ID(fmt.Sprintf("node%d", i%totalNodes)) + replicating := NewWorkingReplicaSet(db.changefeedID, common.NewDispatcherID(), db.ddlSpan.tsoClient, 1, span, + &heartbeatpb.TableSpanStatus{ + CheckpointTs: 9, + ComponentStatus: heartbeatpb.ComponentState_Working, + EventSizePerSecond: 0, // test the cornoer case that no write + }, idx) + if groupID == 0 { + groupID = replicating.GetGroupID() + } else { + require.Equal(t, groupID, replicating.GetGroupID()) + } + allReplicas = append(allReplicas, replicating) + db.AddReplicatingSpan(replicating) + } + + checker := db.GetGroupChecker(groupID).(*rebalanceChecker) + require.Equal(t, 4, len(checker.allTasks)) + + // test soft rebalance + replica := allReplicas[0] + require.Nil(t, checker.Check(20)) + require.Equal(t, 0, checker.softRebalanceScore) + require.Equal(t, 1, checker.softMergeScore) + db.UpdateStatus(replica, &heartbeatpb.TableSpanStatus{ + CheckpointTs: 9, + ComponentStatus: heartbeatpb.ComponentState_Working, + EventSizePerSecond: checker.softWriteThreshold * float32(checker.softImbalanceThreshold), + }) + for i := 1; i < checker.softRebalanceScoreThreshold; i++ { + checker.softMergeScore = 2 + require.Nil(t, checker.Check(20)) + require.Equal(t, i, checker.softRebalanceScore) + require.Zero(t, checker.softMergeScore) + } + ret := checker.Check(20).([]CheckResult)[0] + require.Equal(t, OpMergeAndSplit, ret.OpType) + require.Equal(t, 4, len(ret.Replications)) + require.Equal(t, 0, checker.softRebalanceScore) + require.Zero(t, checker.softMergeScore) + + // test soft merge + require.Nil(t, checker.Check(20)) + require.Equal(t, 1, checker.softRebalanceScore) + require.Equal(t, 0, checker.softMergeScore) + db.UpdateStatus(replica, &heartbeatpb.TableSpanStatus{ + CheckpointTs: 9, + ComponentStatus: heartbeatpb.ComponentState_Working, + EventSizePerSecond: checker.softWriteThreshold - 1, + }) + for i := 1; i < checker.softMergeScoreThreshold; i++ { + checker.softRebalanceScore = 2 + require.Nil(t, checker.Check(20)) + require.Equal(t, i, checker.softMergeScore) + require.Zero(t, checker.softRebalanceScore) + } + ret = checker.Check(20).([]CheckResult)[0] + require.Equal(t, OpMerge, ret.OpType) + require.Equal(t, 4, len(ret.Replications)) +} diff --git a/maintainer/replica/hot_span.go b/maintainer/replica/hot_span.go deleted file mode 100644 index bdf2843cb..000000000 --- a/maintainer/replica/hot_span.go +++ /dev/null @@ -1,232 +0,0 @@ -// Copyright 2024 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package replica - -import ( - "strconv" - "strings" - "sync" - "time" - - "github.com/pingcap/log" - "github.com/pingcap/ticdc/heartbeatpb" - "github.com/pingcap/ticdc/pkg/common" - "github.com/pingcap/ticdc/pkg/scheduler/replica" - "go.uber.org/zap" -) - -const ( - HotSpanWriteThreshold = 1024 * 1024 // 1MB per second - HotSpanScoreThreshold = 3 // TODO: bump to 10 befroe release - HotSpanMaxLevel = 1 - - EnableDynamicThreshold = false - ImbalanceThreshold = 3 // trigger merge after it is supported - - clearTimeout = 300 // seconds -) - -// TODO: extract group interface -func getImbalanceThreshold(id replica.GroupID) int { - if id == replica.DefaultGroupID { - return 1 - } - return ImbalanceThreshold -} - -type hotSpans struct { - lock sync.Mutex - hotSpanGroups map[replica.GroupID]map[common.DispatcherID]*HotSpan -} - -type HotSpan struct { - *SpanReplication - HintMaxSpanNum uint64 - - eventSizePerSecond float32 - writeThreshold float32 // maybe a dynamic value - imbalanceCoefficient int // fixed value for each group - // score add 1 when the eventSizePerSecond is larger than writeThreshold*imbalanceCoefficient - score int - lastUpdateTime time.Time -} - -func NewHotSpans() *hotSpans { - s := &hotSpans{ - hotSpanGroups: make(map[replica.GroupID]map[common.DispatcherID]*HotSpan), - } - return s -} - -func (s *hotSpans) getOrCreateGroup(groupID replica.GroupID) map[common.DispatcherID]*HotSpan { - group, ok := s.hotSpanGroups[groupID] - if !ok { - group = make(map[common.DispatcherID]*HotSpan) - s.hotSpanGroups[groupID] = group - } - return group -} - -func (s *hotSpans) getBatchByGroup(groupID replica.GroupID, cache []*HotSpan) []*HotSpan { - batchSize := cap(cache) - if batchSize == 0 { - batchSize = 1024 - cache = make([]*HotSpan, batchSize) - } - cache = cache[:0] - - outdatedSpans := make([]*HotSpan, 0) - s.lock.Lock() - defer s.lock.Unlock() - hotSpanCache := s.getOrCreateGroup(groupID) - if EnableDynamicThreshold && groupID != replica.DefaultGroupID { - totalEventSizePerSecond := float32(0) - for _, span := range hotSpanCache { - totalEventSizePerSecond += span.eventSizePerSecond - } - if totalEventSizePerSecond > 0 { - avg := float32(totalEventSizePerSecond / float32(len(hotSpanCache))) - for _, span := range hotSpanCache { - span.writeThreshold = avg - span.HintMaxSpanNum = uint64(span.eventSizePerSecond / avg) - } - } - } - - for _, span := range hotSpanCache { - if time.Since(span.lastUpdateTime) > clearTimeout*time.Second { - outdatedSpans = append(outdatedSpans, span) - } else if span.score >= HotSpanScoreThreshold { - cache = append(cache, span) - if len(cache) >= batchSize { - break - } - } - } - s.doClear(groupID, outdatedSpans...) - return cache -} - -func (s *hotSpans) updateHotSpan(span *SpanReplication, status *heartbeatpb.TableSpanStatus) { - if status.ComponentStatus != heartbeatpb.ComponentState_Working { - return - } - - s.lock.Lock() - defer s.lock.Unlock() - hotSpanCache := s.getOrCreateGroup(span.groupID) - if status.EventSizePerSecond < HotSpanWriteThreshold { - if span, ok := hotSpanCache[span.ID]; ok && status.EventSizePerSecond < span.writeThreshold { - if span.score > 0 { - span.score-- - } - if span.groupID == replica.DefaultGroupID && span.score == 0 { - delete(hotSpanCache, span.ID) - } - } - return - } - - cache, ok := hotSpanCache[span.ID] - if !ok { - cache = &HotSpan{ - SpanReplication: span, - writeThreshold: HotSpanWriteThreshold, - imbalanceCoefficient: getImbalanceThreshold(span.groupID), - score: 0, - } - hotSpanCache[span.ID] = cache - } - if status.EventSizePerSecond >= cache.writeThreshold*float32(cache.imbalanceCoefficient) { - cache.score++ - cache.lastUpdateTime = time.Now() - } -} - -func (s *hotSpans) clearHotSpansByGroup(groupID replica.GroupID, spans ...*HotSpan) { - // extract needClear method to simply the clear behavior - if groupID != replica.DefaultGroupID { - return - } - s.lock.Lock() - defer s.lock.Unlock() - s.doClear(groupID, spans...) -} - -func (s *hotSpans) doClear(groupID replica.GroupID, spans ...*HotSpan) { - log.Info("clear hot spans", zap.String("group", replica.GetGroupName(groupID)), zap.Int("count", len(spans))) - hotSpanCache := s.getOrCreateGroup(groupID) - for _, span := range spans { - if groupID == replica.DefaultGroupID { - delete(hotSpanCache, span.ID) - } else { - span.score = 0 - } - } -} - -func (s *hotSpans) stat() string { - s.lock.Lock() - defer s.lock.Unlock() - - var res strings.Builder - total := 0 - for groupID, hotSpanCache := range s.hotSpanGroups { - if total > 0 { - res.WriteString(" ") - } - res.WriteString(replica.GetGroupName(groupID)) - res.WriteString(": [") - cnts := [HotSpanScoreThreshold + 1]int{} - for _, span := range hotSpanCache { - score := min(HotSpanScoreThreshold, span.score) - cnts[score]++ - total++ - } - for i, cnt := range cnts { - if cnt == 0 { - continue - } - res.WriteString("score ") - res.WriteString(strconv.Itoa(i)) - res.WriteString("->") - res.WriteString(strconv.Itoa(cnt)) - res.WriteString(";") - if i < len(cnts)-1 { - res.WriteString(" ") - } - } - res.WriteString("] ") - } - if total == 0 { - return "No hot spans" - } - return res.String() -} - -func (db *ReplicationDB) UpdateHotSpan(span *SpanReplication, status *heartbeatpb.TableSpanStatus) { - db.hotSpans.updateHotSpan(span, status) -} - -func (db *ReplicationDB) ClearHotSpansByGroup(groupID replica.GroupID, spans ...*HotSpan) { - db.hotSpans.clearHotSpansByGroup(groupID, spans...) -} - -func (db *ReplicationDB) GetHotSpansByGroup(groupID replica.GroupID, cache []*HotSpan) []*HotSpan { - return db.hotSpans.getBatchByGroup(groupID, cache) -} - -func (db *ReplicationDB) GetHotSpanStat() string { - return db.hotSpans.stat() -} diff --git a/maintainer/replica/replication_db.go b/maintainer/replica/replication_db.go index 6bfad440c..50e877ce8 100644 --- a/maintainer/replica/replication_db.go +++ b/maintainer/replica/replication_db.go @@ -41,14 +41,19 @@ type ReplicationDB struct { ddlSpan *SpanReplication // LOCK protects the above maps - lock sync.RWMutex - - hotSpans *hotSpans + lock sync.RWMutex + newGroupChecker func(groupID replica.GroupID) replica.GroupChecker[common.DispatcherID, *SpanReplication] } // NewReplicaSetDB creates a new ReplicationDB and initializes the maps -func NewReplicaSetDB(changefeedID common.ChangeFeedID, ddlSpan *SpanReplication) *ReplicationDB { - db := &ReplicationDB{changefeedID: changefeedID, ddlSpan: ddlSpan} +func NewReplicaSetDB( + changefeedID common.ChangeFeedID, ddlSpan *SpanReplication, enableTableAcrossNodes bool, +) *ReplicationDB { + db := &ReplicationDB{ + changefeedID: changefeedID, + ddlSpan: ddlSpan, + newGroupChecker: getNewGroupChecker(changefeedID, enableTableAcrossNodes), + } db.reset() db.putDDLDispatcher(db.ddlSpan) return db @@ -184,19 +189,26 @@ func (db *ReplicationDB) GetTasksBySchemaID(schemaID int64) []*SpanReplication { } // ReplaceReplicaSet replaces the old replica set with the new spans -func (db *ReplicationDB) ReplaceReplicaSet(old *SpanReplication, newSpans []*heartbeatpb.TableSpan, checkpointTs uint64) bool { +func (db *ReplicationDB) ReplaceReplicaSet(oldReplications []*SpanReplication, newSpans []*heartbeatpb.TableSpan, checkpointTs uint64) { db.lock.Lock() defer db.lock.Unlock() // first check the old replica set exists, if not, return false - if _, ok := db.allTasks[old.ID]; !ok { - log.Warn("old replica set not found, skip", - zap.String("changefeed", db.changefeedID.Name()), - zap.String("span", old.ID.String())) - return false + for _, old := range oldReplications { + if _, ok := db.allTasks[old.ID]; !ok { + log.Panic("old replica set not found", + zap.String("changefeed", db.changefeedID.Name()), + zap.String("span", old.ID.String())) + } + oldCheckpointTs := old.GetStatus().GetCheckpointTs() + if checkpointTs > oldCheckpointTs { + checkpointTs = oldCheckpointTs + } + db.removeSpanUnLock(old) } var news []*SpanReplication + old := oldReplications[0] for _, span := range newSpans { new := NewReplicaSet( old.ChangefeedID, @@ -206,11 +218,8 @@ func (db *ReplicationDB) ReplaceReplicaSet(old *SpanReplication, newSpans []*hea span, checkpointTs) news = append(news, new) } - - // remove and insert the new replica set - db.removeSpanUnLock(old) + // insert the new replica set db.addAbsentReplicaSetUnLock(news...) - return true } // AddReplicatingSpan adds a replicating the replicating map, that means the task is already scheduled to a dispatcher @@ -298,6 +307,15 @@ func (db *ReplicationDB) UpdateSchemaID(tableID, newSchemaID int64) { } } +func (db *ReplicationDB) UpdateStatus(task *SpanReplication, status *heartbeatpb.TableSpanStatus) { + task.UpdateStatus(status) + checker := db.GetGroupChecker(task.GetGroupID()) // Note: need RLock here + + db.lock.Lock() + defer db.lock.Unlock() + checker.UpdateStatus(task) +} + // BindSpanToNode binds the span to the node, it will remove the task from the old node and add it to the new node // ,and it also marks the task as scheduling func (db *ReplicationDB) BindSpanToNode(old, new node.ID, task *SpanReplication) { @@ -361,6 +379,15 @@ func (db *ReplicationDB) GetAbsentForTest(_ []*SpanReplication, maxSize int) []* return ret[:maxSize] } +// Optimize the lock usage, maybe control the lock within checker +func (db *ReplicationDB) CheckByGroup(groupID replica.GroupID, batch int) replica.GroupCheckResult { + checker := db.GetGroupChecker(groupID) + + db.lock.RLock() + defer db.lock.RUnlock() + return checker.Check(batch) +} + func (db *ReplicationDB) withRLock(action func()) { db.lock.RLock() defer db.lock.RUnlock() @@ -372,8 +399,8 @@ func (db *ReplicationDB) reset() { db.schemaTasks = make(map[int64]map[common.DispatcherID]*SpanReplication) db.tableTasks = make(map[int64]map[common.DispatcherID]*SpanReplication) db.allTasks = make(map[common.DispatcherID]*SpanReplication) - db.ReplicationDB = replica.NewReplicationDB[common.DispatcherID, *SpanReplication](db.changefeedID.String(), db.withRLock) - db.hotSpans = NewHotSpans() + db.ReplicationDB = replica.NewReplicationDB[common.DispatcherID, *SpanReplication](db.changefeedID.String(), + db.withRLock, db.newGroupChecker) } func (db *ReplicationDB) putDDLDispatcher(ddlSpan *SpanReplication) { diff --git a/maintainer/replica/replication_db_test.go b/maintainer/replica/replication_db_test.go index d2915842d..e5d7804f5 100644 --- a/maintainer/replica/replication_db_test.go +++ b/maintainer/replica/replication_db_test.go @@ -35,7 +35,9 @@ func getTableSpanByID(id common.TableID) *heartbeatpb.TableSpan { } func TestBasicFunction(t *testing.T) { - db := newDBForTest(t) + t.Parallel() + + db := newDBWithCheckerForTest(t) absent := NewReplicaSet(db.changefeedID, common.NewDispatcherID(), db.ddlSpan.tsoClient, 1, getTableSpanByID(4), 1) db.AddAbsentReplicaSet(absent) // replicating and scheduling will be returned @@ -106,7 +108,9 @@ func TestBasicFunction(t *testing.T) { } func TestReplaceReplicaSet(t *testing.T) { - db := newDBForTest(t) + t.Parallel() + + db := newDBWithCheckerForTest(t) // replicating and scheduling will be returned replicaSpanID := common.NewDispatcherID() replicaSpan := NewWorkingReplicaSet(db.changefeedID, replicaSpanID, @@ -119,18 +123,21 @@ func TestReplaceReplicaSet(t *testing.T) { db.AddReplicatingSpan(replicaSpan) notExists := &SpanReplication{ID: common.NewDispatcherID()} - ok := db.ReplaceReplicaSet(notExists, []*heartbeatpb.TableSpan{{}, {}}, 1) - require.False(t, ok) + require.PanicsWithValue(t, "old replica set not found", func() { + db.ReplaceReplicaSet([]*SpanReplication{notExists}, []*heartbeatpb.TableSpan{{}, {}}, 1) + }) require.Len(t, db.GetAllTasks(), 2) - db.ReplaceReplicaSet(replicaSpan, []*heartbeatpb.TableSpan{getTableSpanByID(3), getTableSpanByID(4)}, 5) + db.ReplaceReplicaSet([]*SpanReplication{replicaSpan}, []*heartbeatpb.TableSpan{getTableSpanByID(3), getTableSpanByID(4)}, 5) require.Len(t, db.GetAllTasks(), 3) require.Equal(t, 2, db.GetAbsentSize()) require.Equal(t, 2, db.GetTaskSizeBySchemaID(1)) } func TestMarkSpanAbsent(t *testing.T) { - db := newDBForTest(t) + t.Parallel() + + db := newDBWithCheckerForTest(t) // replicating and scheduling will be returned replicaSpanID := common.NewDispatcherID() replicaSpan := NewWorkingReplicaSet(db.changefeedID, replicaSpanID, @@ -147,7 +154,9 @@ func TestMarkSpanAbsent(t *testing.T) { } func TestForceRemove(t *testing.T) { - db := newDBForTest(t) + t.Parallel() + + db := newDBWithCheckerForTest(t) // replicating and scheduling will be returned replicaSpanID := common.NewDispatcherID() replicaSpan := NewWorkingReplicaSet(db.changefeedID, replicaSpanID, @@ -165,7 +174,9 @@ func TestForceRemove(t *testing.T) { } func TestGetAbsents(t *testing.T) { - db := newDBForTest(t) + t.Parallel() + + db := newDBWithCheckerForTest(t) for i := 0; i < 10; i++ { absent := NewReplicaSet(db.changefeedID, common.NewDispatcherID(), db.ddlSpan.tsoClient, 1, getTableSpanByID(int64(i+1)), 1) db.AddAbsentReplicaSet(absent) @@ -175,7 +186,9 @@ func TestGetAbsents(t *testing.T) { } func TestRemoveAllTables(t *testing.T) { - db := newDBForTest(t) + t.Parallel() + + db := newDBWithCheckerForTest(t) // ddl span will not be removed removed := db.TryRemoveAll() require.Len(t, removed, 0) @@ -208,7 +221,7 @@ func TestRemoveAllTables(t *testing.T) { require.Len(t, db.GetAllTasks(), 1) } -func newDBForTest(t *testing.T) *ReplicationDB { +func newDBWithCheckerForTest(t *testing.T) *ReplicationDB { cfID := common.NewChangeFeedIDWithName("test") tableTriggerEventDispatcherID := common.NewDispatcherID() ctrl := gomock.NewController(t) @@ -220,5 +233,5 @@ func newDBForTest(t *testing.T) *ReplicationDB { ComponentStatus: heartbeatpb.ComponentState_Working, CheckpointTs: 1, }, "node1") - return NewReplicaSetDB(cfID, ddlSpan) + return NewReplicaSetDB(cfID, ddlSpan, true) } diff --git a/maintainer/replica/replication_span_test.go b/maintainer/replica/replication_span_test.go index 3cde8d016..f812de742 100644 --- a/maintainer/replica/replication_span_test.go +++ b/maintainer/replica/replication_span_test.go @@ -26,6 +26,8 @@ import ( ) func TestUpdateStatus(t *testing.T) { + t.Parallel() + replicaSet := NewReplicaSet(common.NewChangeFeedIDWithName("test"), common.NewDispatcherID(), nil, 1, getTableSpanByID(4), 10) replicaSet.UpdateStatus(&heartbeatpb.TableSpanStatus{CheckpointTs: 9}) require.Equal(t, uint64(10), replicaSet.status.Load().CheckpointTs) @@ -34,6 +36,8 @@ func TestUpdateStatus(t *testing.T) { } func TestNewRemoveDispatcherMessage(t *testing.T) { + t.Parallel() + replicaSet := NewReplicaSet(common.NewChangeFeedIDWithName("test"), common.NewDispatcherID(), nil, 1, getTableSpanByID(4), 10) msg := replicaSet.NewRemoveDispatcherMessage("node1") req := msg.Message[0].(*heartbeatpb.ScheduleDispatcherRequest) @@ -43,6 +47,8 @@ func TestNewRemoveDispatcherMessage(t *testing.T) { } func TestSpanReplication_NewAddDispatcherMessage(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) tsoClient := replica_mock.NewMockTSOClient(ctrl) replicaSet := NewReplicaSet(common.NewChangeFeedIDWithName("test"), common.NewDispatcherID(), tsoClient, 1, getTableSpanByID(4), 10) diff --git a/maintainer/scheduler.go b/maintainer/scheduler.go index ca14bda53..5f132f9ed 100644 --- a/maintainer/scheduler.go +++ b/maintainer/scheduler.go @@ -18,12 +18,16 @@ import ( "time" "github.com/pingcap/log" + "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/maintainer/operator" "github.com/pingcap/ticdc/maintainer/replica" "github.com/pingcap/ticdc/maintainer/split" "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/scheduler" + pkgReplica "github.com/pingcap/ticdc/pkg/scheduler/replica" "github.com/pingcap/ticdc/server/watcher" + "github.com/pingcap/ticdc/utils" + "github.com/pingcap/tiflow/pkg/spanz" "go.uber.org/zap" ) @@ -40,7 +44,7 @@ func NewScheduleController(changefeedID common.ChangeFeedID, scheduler.BalanceScheduler: scheduler.NewBalanceScheduler(changefeedID.String(), batchSize, oc, db, nodeM, balanceInterval, oc.NewMoveOperator), } if splitter != nil { - schedulers[scheduler.SplitScheduler] = newSplitScheduler(changefeedID, batchSize, splitter, oc, db, nodeM) + schedulers[scheduler.SplitScheduler] = newSplitScheduler(changefeedID, batchSize, splitter, oc, db, nodeM, balanceInterval) } return scheduler.NewController(schedulers) } @@ -58,23 +62,23 @@ type splitScheduler struct { checkInterval time.Duration lastCheckTime time.Time - cachedSpans []*replica.HotSpan + batchSize int } func newSplitScheduler( changefeedID common.ChangeFeedID, batchSize int, splitter *split.Splitter, oc *operator.Controller, db *replica.ReplicationDB, nodeManager *watcher.NodeManager, + checkInterval time.Duration, ) *splitScheduler { return &splitScheduler{ - changefeedID: changefeedID, - splitter: splitter, - opController: oc, - db: db, - nodeManager: nodeManager, - cachedSpans: make([]*replica.HotSpan, batchSize), - - maxCheckTime: time.Second * 5, - checkInterval: time.Second * 120, + changefeedID: changefeedID, + splitter: splitter, + opController: oc, + db: db, + nodeManager: nodeManager, + batchSize: batchSize, + maxCheckTime: time.Second * 500, + checkInterval: checkInterval, } } @@ -87,44 +91,102 @@ func (s *splitScheduler) Execute() time.Time { } log.Info("check split status", zap.String("changefeed", s.changefeedID.Name()), - zap.String("hotSpans", s.db.GetHotSpanStat()), zap.String("groupDistribution", s.db.GetGroupStat())) + zap.String("hotSpans", s.db.GetCheckerStat()), zap.String("groupDistribution", s.db.GetGroupStat())) - batch := cap(s.cachedSpans) + checked, batch, start := 0, s.batchSize, time.Now() needBreak := false for _, group := range s.db.GetGroups() { if needBreak || batch <= 0 { break } - s.cachedSpans = s.cachedSpans[:0] - cachedSpans := s.db.GetHotSpansByGroup(group, s.cachedSpans) - batch -= len(cachedSpans) + checkResults := s.db.CheckByGroup(group, s.batchSize) + checked, needBreak = s.doCheck(checkResults, start) + batch -= checked + s.lastCheckTime = time.Now() + } + return s.lastCheckTime.Add(s.checkInterval) +} - checkedIndex, start := 0, time.Now() - for ; checkedIndex < len(cachedSpans); checkedIndex++ { - if time.Since(start) > s.maxCheckTime { - needBreak = true - break - } - span := cachedSpans[checkedIndex] - if s.db.GetTaskByID(span.ID) == nil { - continue - } - spans := s.splitter.SplitSpans(context.Background(), span.Span, len(s.nodeManager.GetAliveNodes()), int(span.HintMaxSpanNum)) +func (s *splitScheduler) Name() string { + return scheduler.SplitScheduler +} + +func (s *splitScheduler) doCheck(ret pkgReplica.GroupCheckResult, start time.Time) (int, bool) { + if ret == nil { + return 0, false + } + checkResults := ret.([]replica.CheckResult) + + checkedIndex := 0 + for ; checkedIndex < len(checkResults); checkedIndex++ { + if time.Since(start) > s.maxCheckTime { + return checkedIndex, true + } + ret := checkResults[checkedIndex] + totalSpan, valid := s.valid(ret) + if !valid { + continue + } + + switch ret.OpType { + case replica.OpMerge: + s.opController.AddMergeSplitOperator(ret.Replications, []*heartbeatpb.TableSpan{totalSpan}) + case replica.OpSplit: + fallthrough + case replica.OpMergeAndSplit: + expectedSpanNum := split.NextExpectedSpansNumber(len(ret.Replications)) + spans := s.splitter.SplitSpans(context.Background(), totalSpan, len(s.nodeManager.GetAliveNodes()), expectedSpanNum) if len(spans) > 1 { log.Info("split span", zap.String("changefeed", s.changefeedID.Name()), - zap.String("span", span.ID.String()), + zap.String("span", totalSpan.String()), zap.Int("span szie", len(spans))) - s.opController.AddOperator(operator.NewSplitDispatcherOperator(s.db, span.SpanReplication, span.GetNodeID(), spans)) + s.opController.AddMergeSplitOperator(ret.Replications, spans) } } - s.lastCheckTime = time.Now() - s.db.ClearHotSpansByGroup(group, cachedSpans[:checkedIndex]...) } - return s.lastCheckTime.Add(s.checkInterval) + return checkedIndex, false } -func (s *splitScheduler) Name() string { - return scheduler.SplitScheduler +func (s *splitScheduler) valid(c replica.CheckResult) (*heartbeatpb.TableSpan, bool) { + if c.OpType == replica.OpSplit && len(c.Replications) != 1 { + log.Panic("split operation should have only one replication", + zap.String("changefeed", s.changefeedID.Name()), + zap.Int64("tableId", c.Replications[0].Span.TableID), + zap.Stringer("checkResult", c)) + } + span := spanz.TableIDToComparableSpan(c.Replications[0].Span.TableID) + totalSpan := &heartbeatpb.TableSpan{ + TableID: span.TableID, + StartKey: span.StartKey, + EndKey: span.EndKey, + } + + if c.OpType == replica.OpMerge || c.OpType == replica.OpMergeAndSplit { + if len(c.Replications) <= 1 { + log.Panic("invalid replication size", + zap.String("changefeed", s.changefeedID.Name()), + zap.Int64("tableId", c.Replications[0].Span.TableID), + zap.Stringer("checkResult", c)) + } + spanMap := utils.NewBtreeMap[*heartbeatpb.TableSpan, *replica.SpanReplication](heartbeatpb.LessTableSpan) + for _, r := range c.Replications { + spanMap.ReplaceOrInsert(r.Span, r) + } + holes := split.FindHoles(spanMap, totalSpan) + log.Warn("skip merge operation since there are holes", + zap.String("changefeed", s.changefeedID.Name()), + zap.Int64("tableId", c.Replications[0].Span.TableID), + zap.Int("holes", len(holes)), zap.Stringer("checkResult", c)) + return totalSpan, len(holes) == 0 + } + + if c.OpType == replica.OpMergeAndSplit && len(c.Replications) >= split.DefaultMaxSpanNumber { + log.Debug("skip split operation since the replication number is too large", + zap.String("changefeed", s.changefeedID.Name()), + zap.Int64("tableId", c.Replications[0].Span.TableID), zap.Stringer("checkResult", c)) + return totalSpan, false + } + return totalSpan, true } diff --git a/maintainer/split/region_count_splitter.go b/maintainer/split/region_count_splitter.go index c266267bd..f18de5a83 100644 --- a/maintainer/split/region_count_splitter.go +++ b/maintainer/split/region_count_splitter.go @@ -42,7 +42,7 @@ func newRegionCountSplitter( } func (m *regionCountSplitter) split( - ctx context.Context, span *heartbeatpb.TableSpan, captureNum int, maxSpanNum int, + ctx context.Context, span *heartbeatpb.TableSpan, captureNum int, expectedSpanNum int, ) []*heartbeatpb.TableSpan { bo := tikv.NewBackoffer(ctx, 500) regions, err := m.regionCache.ListRegionIDsInKeyRange(bo, span.StartKey, span.EndKey) @@ -64,7 +64,7 @@ func (m *regionCountSplitter) split( } stepper := newEvenlySplitStepper( - getSpansNumber(len(regions), captureNum, maxSpanNum), + getSpansNumber(len(regions), captureNum, expectedSpanNum, DefaultMaxSpanNumber), len(regions)) spans := make([]*heartbeatpb.TableSpan, 0, stepper.SpanCount()) @@ -169,12 +169,3 @@ func (e *evenlySplitStepper) Step() int { e.remain = e.remain - e.extraRegionPerSpan return e.regionPerSpan + e.extraRegionPerSpan } - -func getSpansNumber(regionNum, captureNum, maxSpanNum int) int { - coefficient := max(captureNum-1, baseSpanNumberCoefficient) - spanNum := 1 - if regionNum > 1 { - spanNum = max(captureNum*coefficient, regionNum/spanRegionLimit) - } - return min(spanNum, maxSpanNum) -} diff --git a/maintainer/split/region_count_splitter_test.go b/maintainer/split/region_count_splitter_test.go index 09d7d2c13..2c723ce69 100644 --- a/maintainer/split/region_count_splitter_test.go +++ b/maintainer/split/region_count_splitter_test.go @@ -128,7 +128,7 @@ func TestRegionCountSplitSpan(t *testing.T) { RegionThreshold: 1, } splitter := newRegionCountSplitter(cfID, cache, cfg.RegionThreshold) - spans := splitter.split(context.Background(), cs.span, cs.totalCaptures, defaultMaxSpanNumber) + spans := splitter.split(context.Background(), cs.span, cs.totalCaptures, 0) require.Equalf(t, cs.expectSpans, spans, "%d %s", i, cs.span.String()) } } @@ -206,7 +206,7 @@ func TestRegionCountEvenlySplitSpan(t *testing.T) { context.Background(), &heartbeatpb.TableSpan{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}, cs.totalCaptures, - defaultMaxSpanNumber, + 0, ) require.Equalf(t, cs.expectedSpans, len(spans), "%d %v", i, cs) @@ -242,7 +242,7 @@ func TestSplitSpanRegionOutOfOrder(t *testing.T) { cfID := common.NewChangeFeedIDWithName("test") splitter := newRegionCountSplitter(cfID, cache, cfg.RegionThreshold) span := &heartbeatpb.TableSpan{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")} - spans := splitter.split(context.Background(), span, 1, defaultMaxSpanNumber) + spans := splitter.split(context.Background(), span, 1, 0) require.Equal( t, []*heartbeatpb.TableSpan{&heartbeatpb.TableSpan{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}}, spans) } diff --git a/maintainer/split/splitter.go b/maintainer/split/splitter.go index a9fee13fd..187db7892 100644 --- a/maintainer/split/splitter.go +++ b/maintainer/split/splitter.go @@ -31,14 +31,15 @@ import ( const ( // spanRegionLimit is the maximum number of regions a span can cover. spanRegionLimit = 50000 - // baseSpanNumberCoefficient is the base coefficient that use to - // multiply the number of captures to get the number of spans. - baseSpanNumberCoefficient = 3 - // defaultMaxSpanNumber is the maximum number of spans that can be split + // DefaultMaxSpanNumber is the maximum number of spans that can be split // in single batch. - defaultMaxSpanNumber = 100 + DefaultMaxSpanNumber = 100 ) +// baseSpanNumberCoefficient is the base coefficient that use to +// multiply the number of captures to get the number of spans. +var baseSpanNumberCoefficient = replica.MinSpanNumberCoefficient + 1 + // RegionCache is a simplified interface of tikv.RegionCache. // It is useful to restrict RegionCache usage and mocking in tests. type RegionCache interface { @@ -52,7 +53,7 @@ type RegionCache interface { type splitter interface { split( - ctx context.Context, span *heartbeatpb.TableSpan, totalCaptures int, maxSpanNum int, + ctx context.Context, span *heartbeatpb.TableSpan, totalCaptures int, expectedSpanNum int, ) []*heartbeatpb.TableSpan } @@ -81,13 +82,10 @@ func NewSplitter( func (s *Splitter) SplitSpans(ctx context.Context, span *heartbeatpb.TableSpan, totalCaptures int, - maxSpanNum int) []*heartbeatpb.TableSpan { + expectedSpanNum int) []*heartbeatpb.TableSpan { spans := []*heartbeatpb.TableSpan{span} - if maxSpanNum <= 0 { - maxSpanNum = defaultMaxSpanNumber - } for _, sp := range s.splitters { - spans = sp.split(ctx, span, totalCaptures, maxSpanNum) + spans = sp.split(ctx, span, totalCaptures, expectedSpanNum) if len(spans) > 1 { return spans } @@ -132,3 +130,19 @@ func FindHoles(currentSpan utils.Map[*heartbeatpb.TableSpan, *replica.SpanReplic } return holes } + +func NextExpectedSpansNumber(oldNum int) int { + if oldNum < 64 { + return oldNum * 2 + } + return min(DefaultMaxSpanNumber, oldNum*3/2) +} + +func getSpansNumber(regionNum, captureNum, expectedNum, maxSpanNum int) int { + coefficient := max(captureNum-1, baseSpanNumberCoefficient) + spanNum := 1 + if regionNum > 1 { + spanNum = max(expectedNum, captureNum*coefficient, regionNum/spanRegionLimit) + } + return min(spanNum, maxSpanNum) +} diff --git a/maintainer/split/write_bytes_splitter.go b/maintainer/split/write_bytes_splitter.go index a7a68b03c..c55cd05db 100644 --- a/maintainer/split/write_bytes_splitter.go +++ b/maintainer/split/write_bytes_splitter.go @@ -56,7 +56,7 @@ func (m *writeSplitter) split( ctx context.Context, span *heartbeatpb.TableSpan, captureNum int, - maxSpanNum int, + expectedSpanNum int, ) []*heartbeatpb.TableSpan { if m.writeKeyThreshold == 0 { return nil @@ -76,7 +76,7 @@ func (m *writeSplitter) split( return []*heartbeatpb.TableSpan{span} } - spansNum := getSpansNumber(len(regions), captureNum, maxSpanNum) + spansNum := getSpansNumber(len(regions), captureNum, expectedSpanNum, DefaultMaxSpanNumber) if spansNum <= 1 { log.Warn("only one capture and the regions number less than"+ " the maxSpanRegionLimit, skip split span", diff --git a/maintainer/split/write_bytes_splitter_test.go b/maintainer/split/write_bytes_splitter_test.go index 98949f81a..cfd645a0c 100644 --- a/maintainer/split/write_bytes_splitter_test.go +++ b/maintainer/split/write_bytes_splitter_test.go @@ -173,7 +173,7 @@ func TestSplitRegionsByWrittenKeysCold(t *testing.T) { re := require.New(t) cfID := common.NewChangeFeedIDWithName("test") splitter := newWriteSplitter(cfID, nil, 0) - baseSpanNum := getSpansNumber(2, 1, defaultMaxSpanNumber) + baseSpanNum := getSpansNumber(2, 1, 0, DefaultMaxSpanNumber) require.Equal(t, 3, baseSpanNum) regions, startKeys, endKeys := prepareRegionsInfo(make([]int, 7)) info := splitter.splitRegionsByWrittenKeysV1(0, regions, baseSpanNum) // [2,3,4], [5,6,7], [8] @@ -199,7 +199,7 @@ func TestNotSplitRegionsByWrittenKeysCold(t *testing.T) { re := require.New(t) cfID := common.NewChangeFeedIDWithName("test") splitter := newWriteSplitter(cfID, nil, 1) - baseSpanNum := getSpansNumber(2, 1, defaultMaxSpanNumber) + baseSpanNum := getSpansNumber(2, 1, 0, DefaultMaxSpanNumber) require.Equal(t, 3, baseSpanNum) regions, startKeys, endKeys := prepareRegionsInfo(make([]int, 7)) info := splitter.splitRegionsByWrittenKeysV1(0, regions, baseSpanNum) // [2,3,4,5,6,7,8] @@ -230,7 +230,7 @@ func TestSplitRegionsByWrittenKeysConfig(t *testing.T) { re.EqualValues(1, info.Spans[0].TableID) splitter.writeKeyThreshold = 0 - spans := splitter.split(context.Background(), &heartbeatpb.TableSpan{}, 3, defaultMaxSpanNumber) + spans := splitter.split(context.Background(), &heartbeatpb.TableSpan{}, 3, DefaultMaxSpanNumber) require.Empty(t, spans) } @@ -269,7 +269,7 @@ func TestSpanRegionLimitBase(t *testing.T) { regions = append(regions, pdutil.NewTestRegionInfo(uint64(i+9), []byte("f"), []byte("f"), 100)) } captureNum := 2 - spanNum := getSpansNumber(len(regions), captureNum, defaultMaxSpanNumber) + spanNum := getSpansNumber(len(regions), captureNum, 0, DefaultMaxSpanNumber) info := splitter.splitRegionsByWrittenKeysV1(0, cloneRegions(regions), spanNum) require.Len(t, info.RegionCounts, spanNum) for _, c := range info.RegionCounts { @@ -341,7 +341,7 @@ func TestSpanRegionLimit(t *testing.T) { pdutil.NewTestRegionInfo(uint64(i+9), []byte("f"), []byte("f"), uint64(writtenKeys[i]))) } captureNum := 3 - spanNum := getSpansNumber(len(regions), captureNum, defaultMaxSpanNumber) + spanNum := getSpansNumber(len(regions), captureNum, 0, DefaultMaxSpanNumber) info := splitter.splitRegionsByWrittenKeysV1(0, cloneRegions(regions), spanNum) require.LessOrEqual(t, spanNum, len(info.RegionCounts)) for _, c := range info.RegionCounts { diff --git a/pkg/scheduler/balance.go b/pkg/scheduler/balance.go index 62bd02ab9..9b90dc964 100644 --- a/pkg/scheduler/balance.go +++ b/pkg/scheduler/balance.go @@ -28,7 +28,7 @@ import ( ) // balanceScheduler is used to check the balance status of all spans among all nodes -type balanceScheduler[T replica.ReplicationID, S any, R replica.Replication[T]] struct { +type balanceScheduler[T replica.ReplicationID, S replica.ReplicationStatus, R replica.Replication[T]] struct { id string batchSize int @@ -50,7 +50,7 @@ type balanceScheduler[T replica.ReplicationID, S any, R replica.Replication[T]] newMoveOperator func(r R, source, target node.ID) operator.Operator[T, S] } -func NewBalanceScheduler[T replica.ReplicationID, S any, R replica.Replication[T]]( +func NewBalanceScheduler[T replica.ReplicationID, S replica.ReplicationStatus, R replica.Replication[T]]( id string, batchSize int, oc operator.Controller[T, S], db replica.ScheduleGroup[T, R], nodeManager *watcher.NodeManager, balanceInterval time.Duration, @@ -92,7 +92,7 @@ func (s *balanceScheduler[T, S, R]) Execute() time.Time { } func (s *balanceScheduler[T, S, R]) schedulerGroup(nodes map[node.ID]*node.Info) int { - batch, moved := s.batchSize, 0 + availableSize, totalMoved := s.batchSize, 0 for _, group := range s.db.GetGroups() { // fast path, check the balance status moveSize := CheckBalanceStatus(s.db.GetTaskSizePerNodeByGroup(group), nodes) @@ -101,12 +101,14 @@ func (s *balanceScheduler[T, S, R]) schedulerGroup(nodes map[node.ID]*node.Info) continue } replicas := s.db.GetReplicatingByGroup(group) - moved += Balance(batch, s.random, nodes, replicas, s.doMove) - if moved >= batch { + moveSize = Balance(availableSize, s.random, nodes, replicas, s.doMove) + totalMoved += moveSize + if totalMoved >= s.batchSize { break } + availableSize -= moveSize } - return moved + return totalMoved } // TODO: refactor and simplify the implementation and limit max group size @@ -136,37 +138,39 @@ func (s *balanceScheduler[T, S, R]) schedulerGlobal(nodes map[node.ID]*node.Info } } lowerLimitPerNode := int(math.Floor(float64(totalTasks) / float64(len(nodes)))) - limitCnt := 0 - for _, size := range sizePerNode { - if size == lowerLimitPerNode { - limitCnt++ - } - } - if limitCnt == len(nodes) { - // all nodes are global balanced + + // fast path check again + moveSize = CheckBalanceStatus(sizePerNode, nodes) + if moveSize <= 0 { + // no need to do the balance, skip return 0 } moved := 0 for _, nodeTasks := range groupNodetasks { - availableNodes, victims, next := []node.ID{}, []node.ID{}, 0 + availableNodes, victims, nextVictim := []node.ID{}, []node.ID{}, 0 for id, task := range nodeTasks { if task != zero && sizePerNode[id] > lowerLimitPerNode { victims = append(victims, id) } else if task == zero && sizePerNode[id] < lowerLimitPerNode { + // Notice: do not handle equal case (sizePerNode[id] == lowerLimitPerNode). + // Otherwise, it will trigger unnecessary and infinite global balance. + // For example, + // Node A: 1, Node B: 1, Node C: 0, lowerLimitPerNode = 1 + // If Node C is recorded as a available node, a meaningless global balance will be triggered. availableNodes = append(availableNodes, id) } } - for _, new := range availableNodes { - if next >= len(victims) { + for _, target := range availableNodes { + if nextVictim >= len(victims) { break } - old := victims[next] - if s.doMove(nodeTasks[old], new) { - sizePerNode[old]-- - sizePerNode[new]++ - next++ + victim := victims[nextVictim] + if s.doMove(nodeTasks[victim], target) { + sizePerNode[victim]-- + sizePerNode[target]++ + nextVictim++ moved++ } } @@ -217,12 +221,12 @@ func Balance[T replica.ReplicationID, R replica.Replication[T]]( replicating []R, move func(R, node.ID) bool, ) (movedSize int) { nodeTasks := make(map[node.ID][]R) - for _, cf := range replicating { - nodeID := cf.GetNodeID() + for _, task := range replicating { + nodeID := task.GetNodeID() if _, ok := nodeTasks[nodeID]; !ok { nodeTasks[nodeID] = make([]R, 0) } - nodeTasks[nodeID] = append(nodeTasks[nodeID], cf) + nodeTasks[nodeID] = append(nodeTasks[nodeID], task) } absentNodeCnt := 0 diff --git a/pkg/scheduler/basic.go b/pkg/scheduler/basic.go index 18ef2539c..5892ecb34 100644 --- a/pkg/scheduler/basic.go +++ b/pkg/scheduler/basic.go @@ -27,7 +27,7 @@ import ( // basicScheduler generates operators for the spans, and push them to the operator controller // it generates add operator for the absent spans, and move operator for the unbalanced replicating spans // currently, it only supports balance the spans by size -type basicScheduler[T replica.ReplicationID, S any, R replica.Replication[T]] struct { +type basicScheduler[T replica.ReplicationID, S replica.ReplicationStatus, R replica.Replication[T]] struct { id string batchSize int @@ -39,14 +39,14 @@ type basicScheduler[T replica.ReplicationID, S any, R replica.Replication[T]] st newAddOperator func(r R, target node.ID) operator.Operator[T, S] // scheduler r to target node } -func NewBasicScheduler[T replica.ReplicationID, S any, R replica.Replication[T]]( - changefeedID string, batchSize int, +func NewBasicScheduler[T replica.ReplicationID, S replica.ReplicationStatus, R replica.Replication[T]]( + id string, batchSize int, oc operator.Controller[T, S], db replica.ScheduleGroup[T, R], nodeManager *watcher.NodeManager, newAddOperator func(R, node.ID) operator.Operator[T, S], ) *basicScheduler[T, S, R] { return &basicScheduler[T, S, R]{ - id: changefeedID, + id: id, batchSize: batchSize, operatorController: oc, db: db, diff --git a/pkg/scheduler/operator/operator.go b/pkg/scheduler/operator/operator.go index c23253096..d3819c7c4 100644 --- a/pkg/scheduler/operator/operator.go +++ b/pkg/scheduler/operator/operator.go @@ -19,7 +19,26 @@ import ( "github.com/pingcap/ticdc/pkg/scheduler/replica" ) -type Controller[T replica.ReplicationID, S any] interface { +// type OpType int + +// const ( +// OpAdd OpType = iota // Add a new task +// OpStop // Stop a task +// OpRemove // Remove a task +// OpMove // Move a task to another node +// OpSplit // Split one task to multiple subtasks +// OpMerge // merge multiple tasks to one task +// OpMergeAndSplit // remove old tasks and split to multiple subtasks +// ) + +// type OpOption[T replica.ReplicationID, R replica.Replication[T]] struct { +// OpType OpType +// Source node.ID +// Target node.ID +// OriginReplicas []R +// } + +type Controller[T replica.ReplicationID, S replica.ReplicationStatus] interface { // AddOperator adds an operator to the controller AddOperator(op Operator[T, S]) bool // GetOperator gets an operator by ID @@ -31,7 +50,7 @@ type Controller[T replica.ReplicationID, S any] interface { // Operator is the interface for the coordinator schedule maintainer // operator thread run Start -> Schedule -> PostFinish // Check, OnNodeRemove and OnTaskRemoved is called by the other thread when some event is triggered -type Operator[T replica.ReplicationID, S any] interface { +type Operator[T replica.ReplicationID, S replica.ReplicationStatus] interface { // ID returns the ID ID() T // Type returns the operator type diff --git a/pkg/scheduler/operator/operator_queue.go b/pkg/scheduler/operator/operator_queue.go index 50cf571b2..ce09ac5db 100644 --- a/pkg/scheduler/operator/operator_queue.go +++ b/pkg/scheduler/operator/operator_queue.go @@ -20,18 +20,18 @@ import ( "github.com/pingcap/ticdc/pkg/scheduler/replica" ) -type OperatorWithTime[T replica.ReplicationID, S any] struct { +type OperatorWithTime[T replica.ReplicationID, S replica.ReplicationStatus] struct { OP Operator[T, S] Time time.Time EnqueueTime time.Time Removed bool } -func NewOperatorWithTime[T replica.ReplicationID, S any](op Operator[T, S], time time.Time) *OperatorWithTime[T, S] { +func NewOperatorWithTime[T replica.ReplicationID, S replica.ReplicationStatus](op Operator[T, S], time time.Time) *OperatorWithTime[T, S] { return &OperatorWithTime[T, S]{OP: op, Time: time, EnqueueTime: time} } -type OperatorQueue[T replica.ReplicationID, S any] []*OperatorWithTime[T, S] +type OperatorQueue[T replica.ReplicationID, S replica.ReplicationStatus] []*OperatorWithTime[T, S] func (opn OperatorQueue[T, S]) Len() int { return len(opn) } diff --git a/pkg/scheduler/replica/checker.go b/pkg/scheduler/replica/checker.go index 3cea54520..051733024 100644 --- a/pkg/scheduler/replica/checker.go +++ b/pkg/scheduler/replica/checker.go @@ -13,24 +13,93 @@ package replica -// type OpType int - -// const ( -// OpSplit OpType = iota // Split one span to multiple subspans -// OpMerge // merge multiple spans to one span -// OpMergeAndSplit // remove old spans and split to multiple subspans -// ) - -// type CheckResult[R Replication] struct { -// OpType OpType -// Replications []R -// } - -// type Checker[R Replication, S any] interface { -// UpdateStatus(replication R, status S) -// Check() []CheckResult[R] -// } - -// define the check strategy -// soft/hard threadhold -// split/merge/mergeAndSplit result +import "fmt" + +type ( + GroupID = int64 + GroupTpye int8 + GroupCheckResult any + ReplicationStatus any +) + +const DefaultGroupID GroupID = 0 + +const ( + GroupDefault GroupTpye = iota + GroupTable + // add more group strategy later + // groupHotLevel1 +) + +// Notice: all methods are NOT thread-safe. +type GroupChecker[T ReplicationID, R Replication[T]] interface { + AddReplica(replication R) + RemoveReplica(replication R) + UpdateStatus(replication R) + + Check(batch int) GroupCheckResult + Name() string + Stat() string +} + +func NewEmptyChecker[T ReplicationID, R Replication[T]](GroupID) GroupChecker[T, R] { + return &EmptyStatusChecker[T, R]{} +} + +// implement a empty status checker +type EmptyStatusChecker[T ReplicationID, R Replication[T]] struct{} + +func (c *EmptyStatusChecker[T, R]) AddReplica(_ R) {} + +func (c *EmptyStatusChecker[T, R]) RemoveReplica(_ R) {} + +func (c *EmptyStatusChecker[T, R]) UpdateStatus(_ R) {} + +func (c *EmptyStatusChecker[T, R]) Check(_ int) GroupCheckResult { + return nil +} + +func (c *EmptyStatusChecker[T, R]) Name() string { + return "empty checker" +} + +func (c *EmptyStatusChecker[T, R]) Stat() string { + return "" +} + +func GetGroupName(id GroupID) string { + gt := GroupTpye(id >> 56) + if gt == GroupTable { + return fmt.Sprintf("%s-%d", gt.String(), id&0x00FFFFFFFFFFFFFF) + } + return gt.String() +} + +func (gt GroupTpye) Less(other GroupTpye) bool { + return gt < other +} + +func (gt GroupTpye) String() string { + switch gt { + case GroupDefault: + return "default" + case GroupTable: + return "table" + default: + // return "HotLevel" + strconv.Itoa(int(gt-groupHotLevel1)) + panic("unreachable") + } +} + +func GenGroupID(gt GroupTpye, tableID int64) GroupID { + // use high 8 bits to store the group type + id := int64(gt) << 56 + if gt == GroupTable { + return id | tableID + } + return id +} + +func GetGroupType(id GroupID) GroupTpye { + return GroupTpye(id >> 56) +} diff --git a/pkg/scheduler/replica/replication.go b/pkg/scheduler/replica/replication.go index 105c9edbf..669fd284c 100644 --- a/pkg/scheduler/replica/replication.go +++ b/pkg/scheduler/replica/replication.go @@ -14,6 +14,7 @@ package replica import ( + "fmt" "math" "strconv" "strings" @@ -63,12 +64,19 @@ type ScheduleGroup[T ReplicationID, R Replication[T]] interface { GetTaskSizePerNode() map[node.ID]int GetImbalanceGroupNodeTask(nodes map[node.ID]*node.Info) (groups map[GroupID]map[node.ID]R, valid bool) GetTaskSizePerNodeByGroup(groupID GroupID) map[node.ID]int + + GetGroupChecker(groupID GroupID) GroupChecker[T, R] + GetCheckerStat() string } +// ReplicationDB is responsible for managing the scheduling state of replication tasks. +// 1. It provides the interface for the scheduler to query the scheduling information. +// 2. It provides the interface for `Add/Removeā€œ replication tasks and update the scheduling state. +// 3. It maintains the scheduling group information internally. type ReplicationDB[T ReplicationID, R Replication[T]] interface { ScheduleGroup[T, R] - // The flowing methods are not thread-safe + // The flowing methods are NOT thread-safe GetReplicatingWithoutLock() []R GetSchedulingWithoutLock() []R AddAbsentWithoutLock(task R) @@ -83,20 +91,22 @@ type ReplicationDB[T ReplicationID, R Replication[T]] interface { } func NewReplicationDB[T ReplicationID, R Replication[T]]( - id string, withRLock func(action func()), + id string, withRLock func(action func()), newChecker func(GroupID) GroupChecker[T, R], ) ReplicationDB[T, R] { r := &replicationDB[T, R]{ id: id, taskGroups: make(map[GroupID]*replicationGroup[T, R]), withRLock: withRLock, + newChecker: newChecker, } - r.taskGroups[DefaultGroupID] = newReplicationGroup[T, R](id, DefaultGroupID) + r.taskGroups[DefaultGroupID] = newReplicationGroup(id, DefaultGroupID, r.newChecker(DefaultGroupID)) return r } type replicationDB[T ReplicationID, R Replication[T]] struct { id string withRLock func(action func()) + newChecker func(GroupID) GroupChecker[T, R] taskGroups map[GroupID]*replicationGroup[T, R] } @@ -110,6 +120,13 @@ func (db *replicationDB[T, R]) GetGroups() []GroupID { return groups } +func (db *replicationDB[T, R]) GetGroupChecker(groupID GroupID) (ret GroupChecker[T, R]) { + db.withRLock(func() { + ret = db.mustGetGroup(groupID).checker + }) + return +} + func (db *replicationDB[T, R]) GetAbsent() []R { var absent = make([]R, 0) db.withRLock(func() { @@ -248,8 +265,8 @@ func (db *replicationDB[T, R]) GetImbalanceGroupNodeTask(nodes map[node.ID]*node case upperLimitPerNode - 1: groupMap[nodeID] = zeroR default: - // len(tasks) > upperLimitPerNode || len(tasks) < upperLimitPerNode-1 - log.Error("scheduler: invalid group state", + // invalid state: len(tasks) > upperLimitPerNode || len(tasks) < upperLimitPerNode-1 + log.Panic("scheduler: invalid group state", zap.String("schedulerID", db.id), zap.String("group", GetGroupName(gid)), zap.Int("totalSpan", totalSpan), zap.Int("nodesNum", nodesNum), zap.Int("upperLimitPerNode", upperLimitPerNode), @@ -334,17 +351,38 @@ func (db *replicationDB[T, R]) GetGroupStat() string { distribute.WriteString(strconv.Itoa(size)) distribute.WriteString("; ") } - distribute.WriteString("] ") + distribute.WriteString("]") + total++ } }) return distribute.String() } +func (db *replicationDB[T, R]) GetCheckerStat() string { + stat := strings.Builder{} + db.withRLock(func() { + total := 0 + for groupID, group := range db.taskGroups { + if total > 0 { + stat.WriteString(" ") + } + stat.WriteString(GetGroupName(groupID)) + stat.WriteString(fmt.Sprintf("(%s)", group.checker.Name())) + stat.WriteString(": [") + stat.WriteString(group.checker.Stat()) + stat.WriteString("] ") + total++ + } + }) + return stat.String() +} + func (db *replicationDB[T, R]) getOrCreateGroup(task R) *replicationGroup[T, R] { groupID := task.GetGroupID() g, ok := db.taskGroups[groupID] if !ok { - g = newReplicationGroup[T, R](db.id, groupID) + checker := db.newChecker(groupID) + g = newReplicationGroup(db.id, groupID, checker) db.taskGroups[groupID] = g log.Info("scheduler: add new task group", zap.String("schedulerID", db.id), zap.String("group", GetGroupName(groupID)), diff --git a/pkg/scheduler/replica/replication_group.go b/pkg/scheduler/replica/replication_group.go index 79a7bed98..569d25ebe 100644 --- a/pkg/scheduler/replica/replication_group.go +++ b/pkg/scheduler/replica/replication_group.go @@ -14,27 +14,11 @@ package replica import ( - "fmt" - "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/node" "go.uber.org/zap" ) -type ( - GroupID = int64 - GroupTpye int8 -) - -const DefaultGroupID GroupID = 0 - -const ( - GroupDefault GroupTpye = iota - GroupTable - // add more group strategy later - // groupHotLevel1 -) - // replicationGroup maintains a group of replication tasks. // All methods are not thread-safe. type replicationGroup[T ReplicationID, R Replication[T]] struct { @@ -49,10 +33,12 @@ type replicationGroup[T ReplicationID, R Replication[T]] struct { scheduling map[T]R absent map[T]R - // checker []replica.Checker + checker GroupChecker[T, R] } -func newReplicationGroup[T ReplicationID, R Replication[T]](id string, groupID GroupID) *replicationGroup[T, R] { +func newReplicationGroup[T ReplicationID, R Replication[T]]( + id string, groupID GroupID, checker GroupChecker[T, R], +) *replicationGroup[T, R] { return &replicationGroup[T, R]{ id: id, groupID: groupID, @@ -61,6 +47,7 @@ func newReplicationGroup[T ReplicationID, R Replication[T]](id string, groupID G replicating: make(map[T]R), scheduling: make(map[T]R), absent: make(map[T]R), + checker: checker, } } @@ -112,6 +99,7 @@ func (g *replicationGroup[T, R]) AddReplicatingReplica(replica R) { zap.String("replica", replica.GetID().String())) g.replicating[replica.GetID()] = replica g.updateNodeMap("", nodeID, replica) + g.checker.AddReplica(replica) } // MarkReplicaReplicating move the replica to the replicating map @@ -169,6 +157,7 @@ func (g *replicationGroup[T, R]) updateNodeMap(old, new node.ID, replica R) { func (g *replicationGroup[T, R]) AddAbsentReplica(replica R) { g.mustVerifyGroupID(replica.GetGroupID()) g.absent[replica.GetID()] = replica + g.checker.AddReplica(replica) } func (g *replicationGroup[T, R]) RemoveReplica(replica R) { @@ -185,6 +174,7 @@ func (g *replicationGroup[T, R]) RemoveReplica(replica R) { if len(nodeMap) == 0 { delete(g.nodeTasks, replica.GetNodeID()) } + g.checker.RemoveReplica(replica) } func (g *replicationGroup[T, R]) IsEmpty() bool { @@ -249,40 +239,3 @@ func (g *replicationGroup[T, R]) GetTaskSizePerNode() map[node.ID]int { } return res } - -func GetGroupName(id GroupID) string { - gt := GroupTpye(id >> 56) - if gt == GroupTable { - return fmt.Sprintf("%s-%d", gt.String(), id&0x00FFFFFFFFFFFFFF) - } - return gt.String() -} - -func (gt GroupTpye) Less(other GroupTpye) bool { - return gt < other -} - -func (gt GroupTpye) String() string { - switch gt { - case GroupDefault: - return "default" - case GroupTable: - return "table" - default: - // return "HotLevel" + strconv.Itoa(int(gt-groupHotLevel1)) - panic("unreachable") - } -} - -func GenGroupID(gt GroupTpye, tableID int64) GroupID { - // use high 8 bits to store the group type - id := int64(gt) << 56 - if gt == GroupTable { - return id | tableID - } - return id -} - -func GetGroupType(id GroupID) GroupTpye { - return GroupTpye(id >> 56) -} diff --git a/tools/workload/schema/large_row.go b/tools/workload/schema/large_row.go index 1bbc522ae..eceef147b 100644 --- a/tools/workload/schema/large_row.go +++ b/tools/workload/schema/large_row.go @@ -132,7 +132,7 @@ func (l *LargeRowWorkload) BuildInsertSql(tableN int, batchSize int) string { } } - log.Info("large row workload, insert the table", + log.Debug("large row workload, insert the table", zap.Int("table", tableN), zap.Int("batchSize", batchSize), zap.Int("largeRowCount", largeRowCount), zap.Int("length", len(insertSQL))) @@ -155,7 +155,7 @@ func (l *LargeRowWorkload) BuildUpdateSql(opts UpdateOption) string { } upsertSQL.WriteString(" ON DUPLICATE KEY UPDATE col_0=VALUES(col_0)") - log.Info("large row workload, upsert the table", + log.Debug("large row workload, upsert the table", zap.Int("table", opts.Table), zap.Int("batchSize", opts.Batch), zap.Int("largeRowCount", largeRowCount)) return upsertSQL.String()