From 0a5ca5ee82a22fb615f863261ce58e4967d25990 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 27 Feb 2024 17:44:54 -0500 Subject: [PATCH] Truncation vreplication_log message Signed-off-by: Matt Lord --- go/vt/binlog/binlogplayer/mock_dbclient.go | 7 ++ .../tabletmanager/vreplication/utils.go | 22 +++- .../tabletmanager/vreplication/utils_test.go | 104 ++++++++++++++++++ 3 files changed, 131 insertions(+), 2 deletions(-) create mode 100644 go/vt/vttablet/tabletmanager/vreplication/utils_test.go diff --git a/go/vt/binlog/binlogplayer/mock_dbclient.go b/go/vt/binlog/binlogplayer/mock_dbclient.go index abc170ed493..f8552734dd9 100644 --- a/go/vt/binlog/binlogplayer/mock_dbclient.go +++ b/go/vt/binlog/binlogplayer/mock_dbclient.go @@ -244,3 +244,10 @@ func (dc *MockDBClient) ExecuteFetchMulti(query string, maxrows int) ([]*sqltype } return results, nil } + +// RemoveInvariant can be used to customize the behavior of the mock client. +func (dc *MockDBClient) RemoveInvariant(query string) { + dc.expectMu.Lock() + defer dc.expectMu.Unlock() + delete(dc.invariants, query) +} diff --git a/go/vt/vttablet/tabletmanager/vreplication/utils.go b/go/vt/vttablet/tabletmanager/vreplication/utils.go index 21c3a61c9f1..bac5e278ebe 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/utils.go +++ b/go/vt/vttablet/tabletmanager/vreplication/utils.go @@ -98,6 +98,24 @@ func insertLog(dbClient *vdbClient, typ string, vreplID int32, state, message st query = fmt.Sprintf("update %s.vreplication_log set count = count + 1 where id = %d", sidecar.GetIdentifier(), id) } else { buf := sqlparser.NewTrackedBuffer(nil) + // The message column is a TEXT field and thus has a max length of 64KiB (2^16-1) so we truncate that if needed. + // See: https://dev.mysql.com/doc/refman/en/string-type-syntax.html and + // https://dev.mysql.com/doc/refman/en/storage-requirements.html#data-types-storage-reqs-strings + // We perform the truncation in the middle of the message as the end of the message is likely to be the most + // important part as it often explains WHY we e.g. failed to execute an INSERT in the workflow. + maxMessageLen := 65535 + truncationStr := fmt.Sprintf(" ... %s ... ", sqlparser.TruncationText) + if len(message) > maxMessageLen { + mid := (len(message) / 2) - len(truncationStr) + for mid > (maxMessageLen / 2) { + mid = mid / 2 + } + tail := (len(message) - (mid + len(truncationStr))) + 1 + log.Errorf("BEFORE:: Message length: %d, mid: %d, sub: %d", len(message), mid, tail) + message = fmt.Sprintf("%s%s%s", message[:mid], truncationStr, message[tail:]) + log.Errorf("AFTER:: Message length: %d, mid: %d, sub: %d", len(message), mid, tail) + log.Flush() + } buf.Myprintf("insert into %s.vreplication_log(vrepl_id, type, state, message) values(%s, %s, %s, %s)", sidecar.GetIdentifier(), strconv.Itoa(int(vreplID)), encodeString(typ), encodeString(state), encodeString(message)) query = buf.ParsedQuery().Query @@ -108,7 +126,7 @@ func insertLog(dbClient *vdbClient, typ string, vreplID int32, state, message st return nil } -// insertLogWithParams is called when a stream is created. The attributes of the stream are stored as a json string +// insertLogWithParams is called when a stream is created. The attributes of the stream are stored as a json string. func insertLogWithParams(dbClient *vdbClient, action string, vreplID int32, params map[string]string) error { var message string if params != nil { @@ -121,7 +139,7 @@ func insertLogWithParams(dbClient *vdbClient, action string, vreplID int32, para return nil } -// isUnrecoverableError returns true if vreplication cannot recover from the given error and should completely terminate +// isUnrecoverableError returns true if vreplication cannot recover from the given error and should completely terminate. func isUnrecoverableError(err error) bool { if err == nil { return false diff --git a/go/vt/vttablet/tabletmanager/vreplication/utils_test.go b/go/vt/vttablet/tabletmanager/vreplication/utils_test.go new file mode 100644 index 00000000000..fbe2576af30 --- /dev/null +++ b/go/vt/vttablet/tabletmanager/vreplication/utils_test.go @@ -0,0 +1,104 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vreplication + +import ( + "fmt" + "strings" + "testing" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/binlog/binlogplayer" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/sqlparser" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" +) + +func TestInsertLogTruncation(t *testing.T) { + dbClient := binlogplayer.NewMockDBClient(t) + defer dbClient.Close() + dbClient.RemoveInvariant("insert into _vt.vreplication_log") + stats := binlogplayer.NewStats() + defer stats.Stop() + vdbClient := newVDBClient(dbClient, stats) + defer vdbClient.Close() + vrID := int32(1) + typ := "Testing" + state := binlogdatapb.VReplicationWorkflowState_Error.String() + maxMessageLen := 65535 + truncationStr := fmt.Sprintf(" ... %s ... ", sqlparser.TruncationText) + + insertStmtf := "insert into _vt.vreplication_log(vrepl_id, type, state, message) values(%d, '%s', '%s', %s)" + + tests := []struct { + message string + expectTruncation bool + }{ + { + message: "Simple message that's not truncated", + }, + { + message: "Simple message that needs to be truncated " + strings.Repeat("a", 80000) + " cuz it's long", + expectTruncation: true, + }, + { + message: "Simple message that doesn't need to be truncated " + strings.Repeat("b", 64000) + " cuz it's not quite too long", + }, + { + message: "Message that is just barely short enough " + strings.Repeat("c", maxMessageLen-(len("Message that is just barely short enough ")+len(" so it doesn't get truncated"))) + " so it doesn't get truncated", + }, + { + message: "Message that is just barely too long " + strings.Repeat("d", maxMessageLen-(len("Message that is just barely too long ")+len(" so it gets truncated"))+1) + " so it gets truncated", + expectTruncation: true, + }, + { + message: "Super long message brosef wut r ya doin " + strings.Repeat("e", 60000) + strings.Repeat("f", 60000) + " so maybe don't do that to yourself and your friends", + expectTruncation: true, + }, + { + message: "Super duper long message brosef wut r ya doin " + strings.Repeat("g", 120602) + strings.Repeat("h", 120001) + " so maybe really don't do that to yourself and your friends", + expectTruncation: true, + }, + } + for _, tc := range tests { + t.Run("insertLog", func(t *testing.T) { + var messageOut string + if tc.expectTruncation { + log.Errorf("BEFORE:: Message length: %d", len(tc.message)) + mid := (len(tc.message) / 2) - len(truncationStr) + for mid > (maxMessageLen / 2) { + mid = mid / 2 + } + tail := (len(tc.message) - (mid + len(truncationStr))) + 1 + messageOut = fmt.Sprintf("%s%s%s", tc.message[:mid], truncationStr, tc.message[tail:]) + require.True(t, strings.HasPrefix(messageOut, tc.message[:10])) // Confirm we still have the same beginning + require.True(t, strings.HasSuffix(messageOut, tc.message[len(tc.message)-10:])) // Confirm we still have the same end + require.True(t, strings.Contains(messageOut, truncationStr)) // Confirm we have the truncation text + } else { + messageOut = tc.message + } + require.LessOrEqual(t, len(messageOut), 65535) + dbClient.ExpectRequest(fmt.Sprintf(insertStmtf, vrID, typ, state, encodeString(messageOut)), &sqltypes.Result{}, nil) + err := insertLog(vdbClient, typ, vrID, state, tc.message) + require.NoError(t, err) + dbClient.Wait() + }) + } +}