Skip to content

Commit

Permalink
Make vreplication log message failures non-fatal
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Feb 27, 2024
1 parent 0a5ca5e commit 8d01c20
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 59 deletions.
12 changes: 3 additions & 9 deletions go/vt/vttablet/tabletmanager/vreplication/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,9 +433,7 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error)
return nil, err
}
vre.controllers[id] = ct
if err := insertLogWithParams(vdbc, LogStreamCreate, id, params); err != nil {
return nil, err
}
insertLogWithParams(vdbc, LogStreamCreate, id, params)
}
return qr, nil
case updateQuery:
Expand Down Expand Up @@ -475,9 +473,7 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error)
return nil, err
}
vre.controllers[id] = ct
if err := insertLog(vdbc, LogStateChange, id, params["state"], ""); err != nil {
return nil, err
}
insertLog(vdbc, LogStateChange, id, params["state"], "")
}
return qr, nil
case deleteQuery:
Expand All @@ -495,9 +491,7 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error)
ct.Stop()
delete(vre.controllers, id)
}
if err := insertLogWithParams(vdbc, LogStreamDelete, id, nil); err != nil {
return nil, err
}
insertLogWithParams(vdbc, LogStreamDelete, id, nil)
}
if err := dbClient.Begin(); err != nil {
return nil, err
Expand Down
24 changes: 9 additions & 15 deletions go/vt/vttablet/tabletmanager/vreplication/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,17 @@ func getLastLog(dbClient *vdbClient, vreplID int32) (id int64, typ, state, messa
return id, typ, state, message, nil
}

func insertLog(dbClient *vdbClient, typ string, vreplID int32, state, message string) error {
func insertLog(dbClient *vdbClient, typ string, vreplID int32, state, message string) {
// getLastLog returns the last log for a stream. During insertion, if the type/state/message match we do not insert
// a new log but increment the count. This prevents spamming of the log table in case the same message is logged continuously.
id, _, lastLogState, lastLogMessage, err := getLastLog(dbClient, vreplID)
if err != nil {
return err
log.Errorf("Could not insert vreplication_log record because we failed to get the last log record: %v", err)
return
}
if typ == LogStateChange && state == lastLogState {
// handles case where current state is Running, controller restarts after an error and initializes the state Running
return nil
return
}
var query string
if id > 0 && message == lastLogMessage {
Expand All @@ -106,37 +107,30 @@ func insertLog(dbClient *vdbClient, typ string, vreplID int32, state, message st
maxMessageLen := 65535
truncationStr := fmt.Sprintf(" ... %s ... ", sqlparser.TruncationText)
if len(message) > maxMessageLen {
mid := (len(message) / 2) - len(truncationStr)
mid := ((len(message) / 2) - len(truncationStr)) - 1
for mid > (maxMessageLen / 2) {
mid = mid / 2
}
tail := (len(message) - (mid + len(truncationStr))) + 1
log.Errorf("BEFORE:: Message length: %d, mid: %d, sub: %d", len(message), mid, tail)
tail := (len(message) - mid + len(truncationStr))
message = fmt.Sprintf("%s%s%s", message[:mid], truncationStr, message[tail:])
log.Errorf("AFTER:: Message length: %d, mid: %d, sub: %d", len(message), mid, tail)
log.Flush()
}
buf.Myprintf("insert into %s.vreplication_log(vrepl_id, type, state, message) values(%s, %s, %s, %s)",
sidecar.GetIdentifier(), strconv.Itoa(int(vreplID)), encodeString(typ), encodeString(state), encodeString(message))
query = buf.ParsedQuery().Query
}
if _, err = dbClient.ExecuteFetch(query, 10000); err != nil {
return fmt.Errorf("could not insert into log table: %v: %v", query, err)
log.Errorf("Could not insert into vreplication_log table: %v: %v", query, err)
}
return nil
}

// insertLogWithParams is called when a stream is created. The attributes of the stream are stored as a json string.
func insertLogWithParams(dbClient *vdbClient, action string, vreplID int32, params map[string]string) error {
func insertLogWithParams(dbClient *vdbClient, action string, vreplID int32, params map[string]string) {
var message string
if params != nil {
obj, _ := json.Marshal(params)
message = string(obj)
}
if err := insertLog(dbClient, action, vreplID, params["state"], message); err != nil {
return err
}
return nil
insertLog(dbClient, action, vreplID, params["state"], message)
}

// isUnrecoverableError returns true if vreplication cannot recover from the given error and should completely terminate.
Expand Down
3 changes: 1 addition & 2 deletions go/vt/vttablet/tabletmanager/vreplication/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ func TestInsertLogTruncation(t *testing.T) {
}
require.LessOrEqual(t, len(messageOut), 65535)
dbClient.ExpectRequest(fmt.Sprintf(insertStmtf, vrID, typ, state, encodeString(messageOut)), &sqltypes.Result{}, nil)
err := insertLog(vdbClient, typ, vrID, state, tc.message)
require.NoError(t, err)
insertLog(vdbClient, typ, vrID, state, tc.message)
dbClient.Wait()
})
}
Expand Down
16 changes: 4 additions & 12 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,31 +246,23 @@ func (vc *vcopier) initTablesForCopy(ctx context.Context) error {
if err := vc.vr.setState(binlogdatapb.VReplicationWorkflowState_Copying, ""); err != nil {
return err
}
if err := vc.vr.insertLog(LogCopyStart, fmt.Sprintf("Copy phase started for %d table(s)",
len(plan.TargetTables))); err != nil {
return err
}
vc.vr.insertLog(LogCopyStart, fmt.Sprintf("Copy phase started for %d table(s)", len(plan.TargetTables)))

if vc.vr.supportsDeferredSecondaryKeys() {
settings, err := binlogplayer.ReadVRSettings(vc.vr.dbClient, vc.vr.id)
if err != nil {
return err
}
if settings.DeferSecondaryKeys {
if err := vc.vr.insertLog(LogCopyStart, fmt.Sprintf("Copy phase temporarily dropping secondary keys for %d table(s)",
len(plan.TargetTables))); err != nil {
return err
}
vc.vr.insertLog(LogCopyStart, fmt.Sprintf("Copy phase temporarily dropping secondary keys for %d table(s)", len(plan.TargetTables)))
for _, tableName := range tableNames {
if err := vc.vr.stashSecondaryKeys(ctx, tableName); err != nil {
return err
}
}
if err := vc.vr.insertLog(LogCopyStart,
vc.vr.insertLog(LogCopyStart,
fmt.Sprintf("Copy phase finished dropping secondary keys and saving post copy actions to restore them for %d table(s)",
len(plan.TargetTables))); err != nil {
return err
}
len(plan.TargetTables)))
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2799,8 +2799,7 @@ func TestVReplicationLogs(t *testing.T) {

for _, want := range expected {
t.Run("", func(t *testing.T) {
err = insertLog(vdbc, LogMessage, 1, binlogdatapb.VReplicationWorkflowState_Running.String(), "message1")
require.NoError(t, err)
insertLog(vdbc, LogMessage, 1, binlogdatapb.VReplicationWorkflowState_Running.String(), "message1")
qr, err := env.Mysqld.FetchSuperQuery(context.Background(), query)
require.NoError(t, err)
require.Equal(t, want, fmt.Sprintf("%v", qr.Rows))
Expand Down
26 changes: 7 additions & 19 deletions go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,7 @@ func (vr *vreplicator) replicate(ctx context.Context) error {
return err
}
if numTablesToCopy == 0 {
if err := vr.insertLog(LogCopyEnd, fmt.Sprintf("Copy phase completed at gtid %s", settings.StartPos)); err != nil {
return err
}
vr.insertLog(LogCopyEnd, fmt.Sprintf("Copy phase completed at gtid %s", settings.StartPos))
}
}
case settings.StartPos.IsZero():
Expand Down Expand Up @@ -464,14 +462,12 @@ func (vr *vreplicator) setMessage(message string) error {
if _, err := vr.dbClient.Execute(query); err != nil {
return fmt.Errorf("could not set message: %v: %v", query, err)
}
if err := insertLog(vr.dbClient, LogMessage, vr.id, vr.state.String(), message); err != nil {
return err
}
insertLog(vr.dbClient, LogMessage, vr.id, vr.state.String(), message)
return nil
}

func (vr *vreplicator) insertLog(typ, message string) error {
return insertLog(vr.dbClient, typ, vr.id, vr.state.String(), message)
func (vr *vreplicator) insertLog(typ, message string) {
insertLog(vr.dbClient, typ, vr.id, vr.state.String(), message)
}

func (vr *vreplicator) setState(state binlogdatapb.VReplicationWorkflowState, message string) error {
Expand All @@ -489,9 +485,7 @@ func (vr *vreplicator) setState(state binlogdatapb.VReplicationWorkflowState, me
if state == vr.state {
return nil
}
if err := insertLog(vr.dbClient, LogStateChange, vr.id, state.String(), message); err != nil {
return err
}
insertLog(vr.dbClient, LogStateChange, vr.id, state.String(), message)
vr.state = state

return nil
Expand Down Expand Up @@ -815,10 +809,7 @@ func (vr *vreplicator) execPostCopyActions(ctx context.Context, tableName string
return nil
}

if err := vr.insertLog(LogCopyStart, fmt.Sprintf("Executing %d post copy action(s) for %s table",
len(qr.Rows), tableName)); err != nil {
return err
}
vr.insertLog(LogCopyStart, fmt.Sprintf("Executing %d post copy action(s) for %s table", len(qr.Rows), tableName))

// Save our connection ID so we can use it to easily KILL any
// running SQL action we may perform later if needed.
Expand Down Expand Up @@ -1039,10 +1030,7 @@ func (vr *vreplicator) execPostCopyActions(ctx context.Context, tableName string
}
}

if err := vr.insertLog(LogCopyStart, fmt.Sprintf("Completed all post copy actions for %s table",
tableName)); err != nil {
return err
}
vr.insertLog(LogCopyStart, fmt.Sprintf("Completed all post copy actions for %s table", tableName))

return nil
}
Expand Down

0 comments on commit 8d01c20

Please sign in to comment.