Skip to content

Commit

Permalink
Use uint64 for binary log file position
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Jan 7, 2025
1 parent 68861b1 commit 8fe55af
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 105 deletions.
6 changes: 3 additions & 3 deletions go/mysql/binlog_event_filepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ func (ev *filePosBinlogEvent) StripChecksum(f BinlogFormat) (BinlogEvent, []byte

// nextPosition returns the next file position of the binlog.
// If no information is available, it returns 0.
func (ev *filePosBinlogEvent) nextPosition(f BinlogFormat) uint32 {
func (ev *filePosBinlogEvent) nextPosition(f BinlogFormat) uint64 {
if f.HeaderLength <= 13 {
// Dead code. This is just a failsafe.
return 0
}
return binary.LittleEndian.Uint32(ev.Bytes()[13:17])
return binary.LittleEndian.Uint64(ev.Bytes()[13:21])
}

// rotate implements BinlogEvent.Rotate().
Expand Down Expand Up @@ -283,7 +283,7 @@ type filePosGTIDEvent struct {
gtid replication.FilePosGTID
}

func newFilePosGTIDEvent(file string, pos uint32, timestamp uint32) filePosGTIDEvent {
func newFilePosGTIDEvent(file string, pos uint64, timestamp uint32) filePosGTIDEvent {
return filePosGTIDEvent{
filePosFakeEvent: filePosFakeEvent{
timestamp: timestamp,
Expand Down
6 changes: 2 additions & 4 deletions go/mysql/endtoend/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/mysql/sqlerror"

"vitess.io/vitess/go/mysql"
Expand Down Expand Up @@ -58,9 +57,8 @@ func connectForReplication(t *testing.T, rbr bool) (*mysql.Conn, mysql.BinlogFor
status, err := conn.ShowPrimaryStatus()
require.NoError(t, err, "retrieving primary status failed: %v", err)

filePos := status.FilePosition.GTIDSet.(replication.FilePosGTID)
file := filePos.File
position := filePos.Pos
file := status.FilePosition.File
position := status.FilePosition.Pos

// Tell the server that we understand the format of events
// that will be used if binlog_checksum is enabled on the server.
Expand Down
10 changes: 8 additions & 2 deletions go/mysql/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package mysql

import (
"fmt"
"math"

"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/vt/proto/vtrpc"
Expand All @@ -34,7 +35,12 @@ const (
// WriteComBinlogDump writes a ComBinlogDump command.
// See http://dev.mysql.com/doc/internals/en/com-binlog-dump.html for syntax.
// Returns a SQLError.
func (c *Conn) WriteComBinlogDump(serverID uint32, binlogFilename string, binlogPos uint32, flags uint16) error {
func (c *Conn) WriteComBinlogDump(serverID uint32, binlogFilename string, binlogPos uint64, flags uint16) error {
// The binary log file position is a uint64, but the protocol command
// only uses 4 bytes for the file position.
if binlogPos > math.MaxUint32 {
return vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "binlog position %d is too large, it must fit into 32 bits", binlogPos)
}
c.sequence = 0
length := 1 + // ComBinlogDump
4 + // binlog-pos
Expand All @@ -43,7 +49,7 @@ func (c *Conn) WriteComBinlogDump(serverID uint32, binlogFilename string, binlog
len(binlogFilename) // binlog-filename
data, pos := c.startEphemeralPacketWithHeader(length)
pos = writeByte(data, pos, ComBinlogDump)
pos = writeUint32(data, pos, binlogPos)
pos = writeUint32(data, pos, uint32(binlogPos))
pos = writeUint16(data, pos, flags)
pos = writeUint32(data, pos, serverID)
_ = writeEOFString(data, pos, binlogFilename)
Expand Down
66 changes: 66 additions & 0 deletions go/mysql/replication/binlog_file_position.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
Copyright 2025 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package replication

import (
"fmt"
"strconv"
"strings"
)

type BinlogFilePos struct {
File string
Pos uint64
}

// ParseBinlogFilePos parses a binlog file and position in the input
// format used by commands such as SHOW REPLICA STATUS and SHOW BINARY
// LOG STATUS.
func ParseBinlogFilePos(s string) (BinlogFilePos, error) {
bfp := BinlogFilePos{}

// Split into parts.
file, posStr, ok := strings.Cut(s, ":")
if !ok {
return bfp, fmt.Errorf("invalid binlog file position (%v): expecting file:pos", s)
}

pos, err := strconv.ParseUint(posStr, 0, 64)
if err != nil {
return bfp, fmt.Errorf("invalid binlog file position (%v): expecting position to be an unsigned 64 bit integer", posStr)
}

bfp.File = file
bfp.Pos = pos

return bfp, nil
}

// String returns the string representation of the BinlogFilePos
// using a colon as the seperator.
func (bfp BinlogFilePos) String() string {
return fmt.Sprintf("%s:%d", bfp.File, bfp.Pos)
}

func (bfp BinlogFilePos) IsZero() bool {
return bfp.File == "" && bfp.Pos == 0
}

func (bfp BinlogFilePos) ConvertToFlavorPosition() (pos Position, err error) {
pos.GTIDSet, err = ParseFilePosGTIDSet(bfp.String())
return pos, err
}
6 changes: 3 additions & 3 deletions go/mysql/replication/filepos_gtid.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ func parseFilePosGTID(s string) (GTID, error) {
return nil, fmt.Errorf("invalid FilePos GTID (%v): expecting file:pos", s)
}

pos, err := strconv.ParseUint(parts[1], 0, 32)
pos, err := strconv.ParseUint(parts[1], 0, 64)
if err != nil {
return nil, fmt.Errorf("invalid FilePos GTID (%v): expecting pos to be an integer", s)
}

return FilePosGTID{
File: parts[0],
Pos: uint32(pos),
Pos: pos,
}, nil
}

Expand All @@ -56,7 +56,7 @@ func ParseFilePosGTIDSet(s string) (GTIDSet, error) {
// FilePosGTID implements GTID.
type FilePosGTID struct {
File string
Pos uint32
Pos uint64
}

// String implements GTID.String().
Expand Down
9 changes: 7 additions & 2 deletions go/mysql/replication/filepos_gtid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
func Test_filePosGTID_String(t *testing.T) {
type fields struct {
file string
pos uint32
pos uint64
}
tests := []struct {
name string
Expand All @@ -35,6 +35,11 @@ func Test_filePosGTID_String(t *testing.T) {
fields{file: "mysql-bin.166031", pos: 192394},
"mysql-bin.166031:192394",
},
{
"handles large position correctly",
fields{file: "vt-1448040107-bin.003222", pos: 4663881395},
"vt-1448040107-bin.003222:4663881395",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -52,7 +57,7 @@ func Test_filePosGTID_String(t *testing.T) {
func Test_filePosGTID_ContainsGTID(t *testing.T) {
type fields struct {
file string
pos uint32
pos uint64
}
type args struct {
other GTID
Expand Down
9 changes: 5 additions & 4 deletions go/mysql/replication/primary_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ import (
type PrimaryStatus struct {
// Position represents the server's GTID based position.
Position Position
// FilePosition represents the server's file based position.
FilePosition Position
// FilePosition represents the server's current binary log
// file and position.
FilePosition BinlogFilePos
// ServerUUID is the UUID of the server.
ServerUUID string
}
Expand All @@ -38,7 +39,7 @@ type PrimaryStatus struct {
func PrimaryStatusToProto(s PrimaryStatus) *replicationdatapb.PrimaryStatus {
return &replicationdatapb.PrimaryStatus{
Position: EncodePosition(s.Position),
FilePosition: EncodePosition(s.FilePosition),
FilePosition: s.FilePosition.String(),
ServerUuid: s.ServerUUID,
}
}
Expand All @@ -63,7 +64,7 @@ func ParsePrimaryStatus(fields map[string]string) PrimaryStatus {
file := fields["File"]
if file != "" && fileExecPosStr != "" {
var err error
status.FilePosition.GTIDSet, err = ParseFilePosGTIDSet(fmt.Sprintf("%s:%s", file, fileExecPosStr))
status.FilePosition, err = ParseBinlogFilePos(fmt.Sprintf("%s:%s", file, fileExecPosStr))
if err != nil {
log.Warningf("Error parsing GTID set %s:%s: %v", file, fileExecPosStr, err)
}
Expand Down
58 changes: 28 additions & 30 deletions go/mysql/replication/replication_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,17 @@ type ReplicationStatus struct {
// it is the executed GTID set. For file replication implementation, it is same as
// FilePosition
Position Position
// RelayLogPosition is the Position that the replica would be at if it
// were to finish executing everything that's currently in its relay log.
// However, some MySQL flavors don't expose this information,
// in which case RelayLogPosition.IsZero() will be true.
// If ReplicationLagUnknown is true then we should not rely on the seconds
// behind value and we can instead try to calculate the lag ourselves when
// appropriate. For MySQL GTID replication implementation it is the union of
// executed GTID set and retrieved GTID set. For file replication implementation,
// it is same as RelayLogSourceBinlogEquivalentPosition
// RelayLogPosition is the relay log file and position that the replica would be
// at if it were to finish executing everything that's currently in its relay log.
RelayLogPosition Position
// FilePosition stores the position of the source tablets binary log
// upto which the SQL thread of the replica has run.
FilePosition Position
FilePosition BinlogFilePos
// RelayLogSourceBinlogEquivalentPosition stores the position of the source tablets binary log
// upto which the IO thread has read and added to the relay log
RelayLogSourceBinlogEquivalentPosition Position
RelayLogSourceBinlogEquivalentPosition BinlogFilePos
// RelayLogFilePosition stores the position in the relay log file
RelayLogFilePosition Position
RelayLogFilePosition BinlogFilePos
SourceServerID uint32
IOState ReplicationState
LastIOError string
Expand Down Expand Up @@ -96,14 +89,14 @@ func (s *ReplicationStatus) SQLHealthy() bool {
func ReplicationStatusToProto(s ReplicationStatus) *replicationdatapb.Status {
replstatuspb := &replicationdatapb.Status{
Position: EncodePosition(s.Position),
RelayLogPosition: EncodePosition(s.RelayLogPosition),
FilePosition: EncodePosition(s.FilePosition),
RelayLogSourceBinlogEquivalentPosition: EncodePosition(s.RelayLogSourceBinlogEquivalentPosition),
RelayLogPosition: s.RelayLogPosition.String(),
FilePosition: s.FilePosition.String(),
RelayLogSourceBinlogEquivalentPosition: s.RelayLogSourceBinlogEquivalentPosition.String(),
SourceServerId: s.SourceServerID,
ReplicationLagSeconds: s.ReplicationLagSeconds,
ReplicationLagUnknown: s.ReplicationLagUnknown,
SqlDelay: s.SQLDelay,
RelayLogFilePosition: EncodePosition(s.RelayLogFilePosition),
RelayLogFilePosition: s.RelayLogFilePosition.String(),
SourceHost: s.SourceHost,
SourceUser: s.SourceUser,
SourcePort: s.SourcePort,
Expand Down Expand Up @@ -131,15 +124,15 @@ func ProtoToReplicationStatus(s *replicationdatapb.Status) ReplicationStatus {
if err != nil {
panic(vterrors.Wrapf(err, "cannot decode RelayLogPosition"))
}
filePos, err := DecodePosition(s.FilePosition)
filePos, err := ParseBinlogFilePos(s.FilePosition)
if err != nil {
panic(vterrors.Wrapf(err, "cannot decode FilePosition"))
}
fileRelayPos, err := DecodePosition(s.RelayLogSourceBinlogEquivalentPosition)
fileRelayPos, err := ParseBinlogFilePos(s.RelayLogSourceBinlogEquivalentPosition)
if err != nil {
panic(vterrors.Wrapf(err, "cannot decode RelayLogSourceBinlogEquivalentPosition"))
}
relayFilePos, err := DecodePosition(s.RelayLogFilePosition)
relayFilePos, err := ParseBinlogFilePos(s.RelayLogFilePosition)
if err != nil {
panic(vterrors.Wrapf(err, "cannot decode RelayLogFilePosition"))
}
Expand Down Expand Up @@ -270,18 +263,23 @@ func ParseMariadbReplicationStatus(resultMap map[string]string) (ReplicationStat
func ParseFilePosReplicationStatus(resultMap map[string]string) (ReplicationStatus, error) {
status := ParseReplicationStatus(resultMap, false)

status.Position = status.FilePosition
status.RelayLogPosition = status.RelayLogSourceBinlogEquivalentPosition
var err error
status.Position, err = status.FilePosition.ConvertToFlavorPosition()
if err != nil {
return status, err
}
status.RelayLogPosition, err = status.RelayLogSourceBinlogEquivalentPosition.ConvertToFlavorPosition()

return status, nil
return status, err
}

func ParseFilePosPrimaryStatus(resultMap map[string]string) (PrimaryStatus, error) {
status := ParsePrimaryStatus(resultMap)

status.Position = status.FilePosition
var err error
status.Position, err = status.FilePosition.ConvertToFlavorPosition()

return status, nil
return status, err
}

// ParseReplicationStatus parses the common (non-flavor-specific) fields of ReplicationStatus
Expand Down Expand Up @@ -348,27 +346,27 @@ func ParseReplicationStatus(fields map[string]string, replica bool) ReplicationS
executedPosStr := fields[execSourceLogPosField]
file := fields[relaySourceLogFileField]
if file != "" && executedPosStr != "" {
status.FilePosition.GTIDSet, err = ParseFilePosGTIDSet(fmt.Sprintf("%s:%s", file, executedPosStr))
status.FilePosition, err = ParseBinlogFilePos(fmt.Sprintf("%s:%s", file, executedPosStr))
if err != nil {
log.Warningf("Error parsing GTID set %s:%s: %v", file, executedPosStr, err)
log.Warningf("Error parsing binlog file and position %s:%s: %v", file, executedPosStr, err)
}
}

readPosStr := fields[readSourceLogPosField]
file = fields[sourceLogFileField]
if file != "" && readPosStr != "" {
status.RelayLogSourceBinlogEquivalentPosition.GTIDSet, err = ParseFilePosGTIDSet(fmt.Sprintf("%s:%s", file, readPosStr))
status.RelayLogSourceBinlogEquivalentPosition, err = ParseBinlogFilePos(fmt.Sprintf("%s:%s", file, readPosStr))
if err != nil {
log.Warningf("Error parsing GTID set %s:%s: %v", file, readPosStr, err)
log.Warningf("Error parsing relay log file and position %s:%s: %v", file, readPosStr, err)
}
}

relayPosStr := fields["Relay_Log_Pos"]
file = fields["Relay_Log_File"]
if file != "" && relayPosStr != "" {
status.RelayLogFilePosition.GTIDSet, err = ParseFilePosGTIDSet(fmt.Sprintf("%s:%s", file, relayPosStr))
status.RelayLogFilePosition, err = ParseBinlogFilePos(fmt.Sprintf("%s:%s", file, relayPosStr))
if err != nil {
log.Warningf("Error parsing GTID set %s:%s: %v", file, relayPosStr, err)
log.Warningf("Error parsing relay log file and position %s:%s: %v", file, relayPosStr, err)
}
}
return status
Expand Down
Loading

0 comments on commit 8fe55af

Please sign in to comment.