Skip to content

Commit

Permalink
Truncation vreplication_log message
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Feb 27, 2024
1 parent fb4abd5 commit 0a5ca5e
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 2 deletions.
7 changes: 7 additions & 0 deletions go/vt/binlog/binlogplayer/mock_dbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,3 +244,10 @@ func (dc *MockDBClient) ExecuteFetchMulti(query string, maxrows int) ([]*sqltype
}
return results, nil
}

// RemoveInvariant can be used to customize the behavior of the mock client.
func (dc *MockDBClient) RemoveInvariant(query string) {
dc.expectMu.Lock()
defer dc.expectMu.Unlock()
delete(dc.invariants, query)
}
22 changes: 20 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,24 @@ func insertLog(dbClient *vdbClient, typ string, vreplID int32, state, message st
query = fmt.Sprintf("update %s.vreplication_log set count = count + 1 where id = %d", sidecar.GetIdentifier(), id)
} else {
buf := sqlparser.NewTrackedBuffer(nil)
// The message column is a TEXT field and thus has a max length of 64KiB (2^16-1) so we truncate that if needed.
// See: https://dev.mysql.com/doc/refman/en/string-type-syntax.html and
// https://dev.mysql.com/doc/refman/en/storage-requirements.html#data-types-storage-reqs-strings
// We perform the truncation in the middle of the message as the end of the message is likely to be the most
// important part as it often explains WHY we e.g. failed to execute an INSERT in the workflow.
maxMessageLen := 65535
truncationStr := fmt.Sprintf(" ... %s ... ", sqlparser.TruncationText)
if len(message) > maxMessageLen {
mid := (len(message) / 2) - len(truncationStr)
for mid > (maxMessageLen / 2) {
mid = mid / 2
}
tail := (len(message) - (mid + len(truncationStr))) + 1
log.Errorf("BEFORE:: Message length: %d, mid: %d, sub: %d", len(message), mid, tail)
message = fmt.Sprintf("%s%s%s", message[:mid], truncationStr, message[tail:])
log.Errorf("AFTER:: Message length: %d, mid: %d, sub: %d", len(message), mid, tail)
log.Flush()
}
buf.Myprintf("insert into %s.vreplication_log(vrepl_id, type, state, message) values(%s, %s, %s, %s)",
sidecar.GetIdentifier(), strconv.Itoa(int(vreplID)), encodeString(typ), encodeString(state), encodeString(message))
query = buf.ParsedQuery().Query
Expand All @@ -108,7 +126,7 @@ func insertLog(dbClient *vdbClient, typ string, vreplID int32, state, message st
return nil
}

// insertLogWithParams is called when a stream is created. The attributes of the stream are stored as a json string
// insertLogWithParams is called when a stream is created. The attributes of the stream are stored as a json string.
func insertLogWithParams(dbClient *vdbClient, action string, vreplID int32, params map[string]string) error {
var message string
if params != nil {
Expand All @@ -121,7 +139,7 @@ func insertLogWithParams(dbClient *vdbClient, action string, vreplID int32, para
return nil
}

// isUnrecoverableError returns true if vreplication cannot recover from the given error and should completely terminate
// isUnrecoverableError returns true if vreplication cannot recover from the given error and should completely terminate.
func isUnrecoverableError(err error) bool {
if err == nil {
return false
Expand Down
104 changes: 104 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
Copyright 2021 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package vreplication

import (
"fmt"
"strings"
"testing"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/sqlparser"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

func TestInsertLogTruncation(t *testing.T) {
dbClient := binlogplayer.NewMockDBClient(t)
defer dbClient.Close()
dbClient.RemoveInvariant("insert into _vt.vreplication_log")
stats := binlogplayer.NewStats()
defer stats.Stop()
vdbClient := newVDBClient(dbClient, stats)
defer vdbClient.Close()
vrID := int32(1)
typ := "Testing"
state := binlogdatapb.VReplicationWorkflowState_Error.String()
maxMessageLen := 65535
truncationStr := fmt.Sprintf(" ... %s ... ", sqlparser.TruncationText)

insertStmtf := "insert into _vt.vreplication_log(vrepl_id, type, state, message) values(%d, '%s', '%s', %s)"

tests := []struct {
message string
expectTruncation bool
}{
{
message: "Simple message that's not truncated",
},
{
message: "Simple message that needs to be truncated " + strings.Repeat("a", 80000) + " cuz it's long",
expectTruncation: true,
},
{
message: "Simple message that doesn't need to be truncated " + strings.Repeat("b", 64000) + " cuz it's not quite too long",
},
{
message: "Message that is just barely short enough " + strings.Repeat("c", maxMessageLen-(len("Message that is just barely short enough ")+len(" so it doesn't get truncated"))) + " so it doesn't get truncated",
},
{
message: "Message that is just barely too long " + strings.Repeat("d", maxMessageLen-(len("Message that is just barely too long ")+len(" so it gets truncated"))+1) + " so it gets truncated",
expectTruncation: true,
},
{
message: "Super long message brosef wut r ya doin " + strings.Repeat("e", 60000) + strings.Repeat("f", 60000) + " so maybe don't do that to yourself and your friends",
expectTruncation: true,
},
{
message: "Super duper long message brosef wut r ya doin " + strings.Repeat("g", 120602) + strings.Repeat("h", 120001) + " so maybe really don't do that to yourself and your friends",
expectTruncation: true,
},
}
for _, tc := range tests {
t.Run("insertLog", func(t *testing.T) {
var messageOut string
if tc.expectTruncation {
log.Errorf("BEFORE:: Message length: %d", len(tc.message))
mid := (len(tc.message) / 2) - len(truncationStr)
for mid > (maxMessageLen / 2) {
mid = mid / 2
}
tail := (len(tc.message) - (mid + len(truncationStr))) + 1
messageOut = fmt.Sprintf("%s%s%s", tc.message[:mid], truncationStr, tc.message[tail:])
require.True(t, strings.HasPrefix(messageOut, tc.message[:10])) // Confirm we still have the same beginning
require.True(t, strings.HasSuffix(messageOut, tc.message[len(tc.message)-10:])) // Confirm we still have the same end
require.True(t, strings.Contains(messageOut, truncationStr)) // Confirm we have the truncation text
} else {
messageOut = tc.message
}
require.LessOrEqual(t, len(messageOut), 65535)
dbClient.ExpectRequest(fmt.Sprintf(insertStmtf, vrID, typ, state, encodeString(messageOut)), &sqltypes.Result{}, nil)
err := insertLog(vdbClient, typ, vrID, state, tc.message)
require.NoError(t, err)
dbClient.Wait()
})
}
}

0 comments on commit 0a5ca5e

Please sign in to comment.