Skip to content

Commit

Permalink
fix: rebalance by shardingkey
Browse files Browse the repository at this point in the history
  • Loading branch information
YenchangChan committed Mar 5, 2024
1 parent ebc4114 commit e9fd481
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 39 deletions.
3 changes: 2 additions & 1 deletion ckconfig/users.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ func profiles(userProfiles []model.Profile, info HostInfo) map[string]interface{
defaultProfile["distributed_ddl_task_timeout"] = 15
defaultProfile["allow_drop_detached"] = 1
defaultProfile["use_uncompressed_cache"] = 0
defaultProfile["max_execution_time"] = 300
defaultProfile["max_execution_time"] = 3600 // 1 hour
defaultProfile["max_partitions_per_insert_block"] = 500
profileMap["default"] = defaultProfile

//normal
Expand Down
4 changes: 2 additions & 2 deletions service/clickhouse/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (p *ArchiveParams) GetSlots(host, table string) (slots []time.Time, err err
log.Logger.Infof("host %s: total rows to export: %d, estimated size (in bytes): %d", host, totalRowsCnt, tblEstSize)
atomic.AddUint64(&p.EstSize, tblEstSize)

sqlTmpl3 := "SELECT toStartOfInterval(`%s`, INTERVAL %s) AS slot, count() FROM `%s`.`%s` WHERE `%s`>=%s AND `%s`<%s GROUP BY slot ORDER BY slot"
sqlTmpl3 := "SELECT toStartOfInterval(`%s`, INTERVAL %s) AS slot, count() FROM `%s`.`%s` WHERE `%s`>=%s AND `%s`<%s GROUP BY slot ORDER BY slot SETTINGS max_execution_time=0"
var tryIntervals []string
if colType == "Date" {
// remove 4 hour, 1 hour
Expand Down Expand Up @@ -287,7 +287,7 @@ func (p *ArchiveParams) ExportSlot(host, table string, seq int, slotBeg, slotEnd
queries := []string{
fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", p.Database, tmpTbl),
fmt.Sprintf("CREATE TABLE `%s`.`%s` AS `%s`.`%s` ENGINE=%s", p.Database, tmpTbl, p.Database, table, engine),
fmt.Sprintf("INSERT INTO `%s`.`%s` SELECT * FROM `%s`.`%s` WHERE `%s`>=%s AND `%s`<%s", p.Database, tmpTbl, p.Database, table, colName, formatTimestamp(slotBeg, colType), colName, formatTimestamp(slotEnd, colType)),
fmt.Sprintf("INSERT INTO `%s`.`%s` SELECT * FROM `%s`.`%s` WHERE `%s`>=%s AND `%s`<%s SETTINGS max_execution_time=0", p.Database, tmpTbl, p.Database, table, colName, formatTimestamp(slotBeg, colType), colName, formatTimestamp(slotEnd, colType)),
}
conn := common.GetConnection(host)
if conn == nil {
Expand Down
15 changes: 10 additions & 5 deletions service/clickhouse/clickhouse_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ func SetTableOrderBy(conf *model.CKManClickHouseConfig, req model.OrderbyReq) er

queries := []string{
tmpSql,
fmt.Sprintf("INSERT INTO `%s`.`tmp_%s` SELECT * FROM `%s`.`%s` SETTINGS max_insert_threads=%d", req.Database, local, req.Database, local, max_insert_threads),
fmt.Sprintf("INSERT INTO `%s`.`tmp_%s` SELECT * FROM `%s`.`%s` SETTINGS max_insert_threads=%d, max_execution_time=0", req.Database, local, req.Database, local, max_insert_threads),
}

for _, query := range queries {
Expand Down Expand Up @@ -969,7 +969,7 @@ func SetTableOrderBy(conf *model.CKManClickHouseConfig, req model.OrderbyReq) er
return
}
if lastError == nil {
query := fmt.Sprintf("INSERT INTO `%s`.`%s` SELECT * FROM `%s`.`tmp_%s` SETTINGS max_insert_threads=%d", req.Database, local, req.Database, local, max_insert_threads)
query := fmt.Sprintf("INSERT INTO `%s`.`%s` SELECT * FROM `%s`.`tmp_%s` SETTINGS max_insert_threads=%d,max_execution_time=0", req.Database, local, req.Database, local, max_insert_threads)
log.Logger.Debugf("%s: %s", host, query)
err = ck.Conn.Exec(query)
if err != nil {
Expand Down Expand Up @@ -1764,24 +1764,29 @@ func RebalanceByPartition(conf *model.CKManClickHouseConfig, rebalancer *CKRebal
// 200w data costs 4s
func RebalanceByShardingkey(conf *model.CKManClickHouseConfig, rebalancer *CKRebalance) error {
var err error
start := time.Now()
log.Logger.Info("[rebalance] STEP InitCKConns")
if err = rebalancer.InitCKConns(); err != nil {
log.Logger.Errorf("got error %+v", err)
return err
}

log.Logger.Info("[rebalance] STEP CreateTemporaryTable")
if err = rebalancer.CreateTemporaryTable(); err != nil {
return err
}

log.Logger.Info("[rebalance] STEP InsertPlan")
if err = rebalancer.InsertPlan(); err != nil {
return err
}

log.Logger.Info("[rebalance] STEP MoveBack")
if err = rebalancer.MoveBack(); err != nil {
return errors.Wrapf(err, "table %s.%s rebalance failed, data can be corrupted, please move back from temp table[%s] manually", rebalancer.Database, rebalancer.Table, fmt.Sprintf("tmp_%s", rebalancer.Table))
}

log.Logger.Info("[rebalance] STEP Cleanup")
rebalancer.Cleanup()

log.Logger.Infof("[rebalance] DONE, Elapsed: %v sec", time.Since(start).Seconds())
return nil
}

Expand Down
60 changes: 29 additions & 31 deletions service/clickhouse/rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func (r *CKRebalance) InsertPlan() error {
conn := common.GetConnection(host)

tableName := fmt.Sprintf("tmp_%s", r.Table)
query := fmt.Sprintf("INSERT INTO `%s`.`%s` SELECT * FROM `%s`.`%s` WHERE %s %% %d = %d SETTINGS max_insert_threads=%d",
query := fmt.Sprintf("INSERT INTO `%s`.`%s` SELECT * FROM `%s`.`%s` WHERE %s %% %d = %d SETTINGS max_insert_threads=%d, max_execution_time=0",
r.Database, tableName, r.Database, r.DistTable, ShardingFunc(r.Shardingkey), len(r.Hosts), idx, max_insert_threads)
log.Logger.Debugf("[%s]%s", host, query)
if err := conn.Exec(query); err != nil {
Expand Down Expand Up @@ -426,34 +426,14 @@ func (r *CKRebalance) MoveBack() error {
defer wg.Done()
// truncate and detach ori table
tableName := fmt.Sprintf("tmp_%s", r.Table)
query := fmt.Sprintf("TRUNCATE TABLE `%s`.`%s` SETTINGS alter_sync = 0", r.Database, r.Table)
query := fmt.Sprintf("TRUNCATE TABLE `%s`.`%s`", r.Database, r.Table)
conn := common.GetConnection(host)
log.Logger.Debugf("[%s]%s", host, query)
if err = conn.Exec(query); err != nil {
lastError = errors.Wrap(err, host)
return
}

var partitions []string
query = fmt.Sprintf("SELECT DISTINCT partition FROM system.parts WHERE database = '%s' AND table = '%s' AND active=1", r.Database, tableName)
log.Logger.Debugf("[%s]%s", host, query)
rows, err := conn.Query(query)
if err != nil {
lastError = errors.Wrap(err, host)
return
}
defer rows.Close()
for rows.Next() {
var partition string
if err := rows.Scan(&partition); err != nil {
lastError = errors.Wrap(err, host)
return
}
if partition != "" {
partitions = append(partitions, partition)
}
}

// copy data
cmd := fmt.Sprintf("ls -l %sclickhouse/data/%s/%s/ |grep -v total |awk '{print $9}'", r.DataDir, r.Database, tableName)
sshOpts := common.SshOptions{
Expand All @@ -477,26 +457,44 @@ func (r *CKRebalance) MoveBack() error {
lastError = errors.Wrap(err, host)
return
}
if reg.MatchString(file) {
if reg.MatchString(file) && !strings.HasPrefix(file, "tmp_merge") {
parts = append(parts, file)
}
}
log.Logger.Infof("host:[%s], parts: %v", host, parts)
var cmds []string
for _, part := range parts {
cmds = append(cmds, fmt.Sprintf("cp -prf %sclickhouse/data/%s/%s/%s %sclickhouse/data/%s/%s/detached/", r.DataDir, r.Database, tableName, part, r.DataDir, r.Database, r.Table))
}
_, err = common.RemoteExecute(sshOpts, strings.Join(cmds, ";"))
if err != nil {
lastError = errors.Wrap(err, host)
return
if len(cmds) > 0 {
log.Logger.Infof("host:[%s], cmds: %v", host, cmds)
_, err = common.RemoteExecute(sshOpts, strings.Join(cmds, ";"))
if err != nil {
lastError = errors.Wrap(err, host)
return
}
}

for _, partiton := range partitions {
query = fmt.Sprintf("ALTER TABLE `%s`.`%s` ATTACH PARTITION '%s'", r.Database, r.Table, partiton)
var failedParts []string
for _, part := range parts {
query = fmt.Sprintf("ALTER TABLE `%s`.`%s` ATTACH PART '%s'", r.Database, r.Table, part)
log.Logger.Debugf("[%s]%s", host, query)
if err = conn.Exec(query); err != nil {
lastError = errors.Wrap(err, host)
return
failedParts = append(failedParts, part)
continue
}
}

if len(failedParts) > 0 {
max_insert_threads := runtime.NumCPU()*3/4 + 1
log.Logger.Infof("[%s]failed parts: %v, retry again", host, failedParts)
for _, part := range failedParts {
query := fmt.Sprintf("INSERT INTO `%s`.`%s` SELECT * FROM `%s`.`%s` WHERE _part = '%s' SETTINGS max_insert_threads=%d, max_execution_time=0", r.Database, r.Table, r.Database, tableName, part, max_insert_threads)
log.Logger.Debugf("[%s]%s", host, query)
if err = conn.Exec(query); err != nil {
lastError = errors.Wrap(err, host)
return
}
}
}
})
Expand Down

0 comments on commit e9fd481

Please sign in to comment.