diff --git a/go/mysql/binlog_event.go b/go/mysql/binlog_event.go index 5d472230d0e..f79593003d9 100644 --- a/go/mysql/binlog_event.go +++ b/go/mysql/binlog_event.go @@ -48,7 +48,7 @@ type BinlogEvent interface { IsValid() bool // General protocol events. - NextPosition() uint32 + NextPosition() uint64 // IsFormatDescription returns true if this is a // FORMAT_DESCRIPTION_EVENT. Do not call StripChecksum before diff --git a/go/mysql/binlog_event_common.go b/go/mysql/binlog_event_common.go index c95873614f0..b69dc5b30d4 100644 --- a/go/mysql/binlog_event_common.go +++ b/go/mysql/binlog_event_common.go @@ -47,7 +47,7 @@ import ( // +----------------------------+ // | extra_headers 19 : x-19 | // +============================+ -// http://dev.mysql.com/doc/internals/en/event-header-fields.html +// https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_replication_binlog_event.html#sect_protocol_replication_binlog_event_header type binlogEvent []byte const ( @@ -119,8 +119,9 @@ func (ev binlogEvent) Length() uint32 { } // NextPosition returns the nextPosition field from the header -func (ev binlogEvent) NextPosition() uint32 { - return binary.LittleEndian.Uint32(ev.Bytes()[13 : 13+4]) +func (ev binlogEvent) NextPosition() uint64 { + // Only 4 bytes are used for the next_position field in the header. + return uint64(binary.LittleEndian.Uint32(ev.Bytes()[13:17])) } // IsFormatDescription implements BinlogEvent.IsFormatDescription(). diff --git a/go/mysql/binlog_event_filepos.go b/go/mysql/binlog_event_filepos.go index c71c8346964..f587f201677 100644 --- a/go/mysql/binlog_event_filepos.go +++ b/go/mysql/binlog_event_filepos.go @@ -75,12 +75,13 @@ func (ev *filePosBinlogEvent) StripChecksum(f BinlogFormat) (BinlogEvent, []byte // nextPosition returns the next file position of the binlog. // If no information is available, it returns 0. -func (ev *filePosBinlogEvent) nextPosition(f BinlogFormat) uint32 { +func (ev *filePosBinlogEvent) nextPosition(f BinlogFormat) uint64 { if f.HeaderLength <= 13 { // Dead code. This is just a failsafe. return 0 } - return binary.LittleEndian.Uint32(ev.Bytes()[13:17]) + // The header only uses 4 bytes for the next_position. + return uint64(binary.LittleEndian.Uint32(ev.Bytes()[13:17])) } // rotate implements BinlogEvent.Rotate(). @@ -139,7 +140,7 @@ type filePosFakeEvent struct { timestamp uint32 } -func (ev filePosFakeEvent) NextPosition() uint32 { +func (ev filePosFakeEvent) NextPosition() uint64 { return 0 } @@ -279,7 +280,7 @@ type filePosGTIDEvent struct { gtid replication.FilePosGTID } -func newFilePosGTIDEvent(file string, pos uint32, timestamp uint32) filePosGTIDEvent { +func newFilePosGTIDEvent(file string, pos uint64, timestamp uint32) filePosGTIDEvent { return filePosGTIDEvent{ filePosFakeEvent: filePosFakeEvent{ timestamp: timestamp, diff --git a/go/mysql/replication.go b/go/mysql/replication.go index 84c65842c7e..4c5a0c9523e 100644 --- a/go/mysql/replication.go +++ b/go/mysql/replication.go @@ -18,6 +18,7 @@ package mysql import ( "fmt" + "math" "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/vt/proto/vtrpc" @@ -32,9 +33,14 @@ const ( // This file contains the methods related to replication. // WriteComBinlogDump writes a ComBinlogDump command. -// See http://dev.mysql.com/doc/internals/en/com-binlog-dump.html for syntax. // Returns a SQLError. -func (c *Conn) WriteComBinlogDump(serverID uint32, binlogFilename string, binlogPos uint32, flags uint16) error { +// See: https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_binlog_dump.html +func (c *Conn) WriteComBinlogDump(serverID uint32, binlogFilename string, binlogPos uint64, flags uint16) error { + // The binary log file position is a uint64, but the protocol command + // only uses 4 bytes for the file position. + if binlogPos > math.MaxUint32 { + return vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "binlog position %d is too large, it must fit into 32 bits", binlogPos) + } c.sequence = 0 length := 1 + // ComBinlogDump 4 + // binlog-pos @@ -43,7 +49,7 @@ func (c *Conn) WriteComBinlogDump(serverID uint32, binlogFilename string, binlog len(binlogFilename) // binlog-filename data, pos := c.startEphemeralPacketWithHeader(length) pos = writeByte(data, pos, ComBinlogDump) - pos = writeUint32(data, pos, binlogPos) + pos = writeUint32(data, pos, uint32(binlogPos)) pos = writeUint16(data, pos, flags) pos = writeUint32(data, pos, serverID) _ = writeEOFString(data, pos, binlogFilename) diff --git a/go/mysql/replication/filepos_gtid.go b/go/mysql/replication/filepos_gtid.go index 850fb421915..95c7efcd3b1 100644 --- a/go/mysql/replication/filepos_gtid.go +++ b/go/mysql/replication/filepos_gtid.go @@ -33,14 +33,14 @@ func parseFilePosGTID(s string) (GTID, error) { return nil, fmt.Errorf("invalid FilePos GTID (%v): expecting file:pos", s) } - pos, err := strconv.ParseUint(parts[1], 0, 32) + pos, err := strconv.ParseUint(parts[1], 0, 64) if err != nil { return nil, fmt.Errorf("invalid FilePos GTID (%v): expecting pos to be an integer", s) } return FilePosGTID{ File: parts[0], - Pos: uint32(pos), + Pos: pos, }, nil } @@ -56,7 +56,7 @@ func ParseFilePosGTIDSet(s string) (GTIDSet, error) { // FilePosGTID implements GTID. type FilePosGTID struct { File string - Pos uint32 + Pos uint64 } // String implements GTID.String(). diff --git a/go/mysql/replication/filepos_gtid_test.go b/go/mysql/replication/filepos_gtid_test.go index 174aed6ccf9..6cef4756af2 100644 --- a/go/mysql/replication/filepos_gtid_test.go +++ b/go/mysql/replication/filepos_gtid_test.go @@ -23,7 +23,7 @@ import ( func Test_filePosGTID_String(t *testing.T) { type fields struct { file string - pos uint32 + pos uint64 } tests := []struct { name string @@ -35,6 +35,11 @@ func Test_filePosGTID_String(t *testing.T) { fields{file: "mysql-bin.166031", pos: 192394}, "mysql-bin.166031:192394", }, + { + "handles large position correctly", + fields{file: "vt-1448040107-bin.003222", pos: 4663881395}, + "vt-1448040107-bin.003222:4663881395", + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -52,7 +57,7 @@ func Test_filePosGTID_String(t *testing.T) { func Test_filePosGTID_ContainsGTID(t *testing.T) { type fields struct { file string - pos uint32 + pos uint64 } type args struct { other GTID diff --git a/go/mysql/replication_test.go b/go/mysql/replication_test.go index c397bc71b45..29f04a6f9fa 100644 --- a/go/mysql/replication_test.go +++ b/go/mysql/replication_test.go @@ -17,6 +17,7 @@ limitations under the License. package mysql import ( + "math" "reflect" "testing" @@ -34,6 +35,10 @@ func TestComBinlogDump(t *testing.T) { cConn.Close() }() + // Try to write a ComBinlogDump packet with a position greater than 4 bytes. + err := cConn.WriteComBinlogDump(1, "moofarm", math.MaxInt64, 0x0d0e) + require.Error(t, err) + // Write ComBinlogDump packet, read it, compare. if err := cConn.WriteComBinlogDump(0x01020304, "moofarm", 0x05060708, 0x090a); err != nil { t.Fatalf("WriteComBinlogDump failed: %v", err) diff --git a/go/test/endtoend/migration/migration_test.go b/go/test/endtoend/migration/migration_test.go index eca112e388d..54afd2cb3ee 100644 --- a/go/test/endtoend/migration/migration_test.go +++ b/go/test/endtoend/migration/migration_test.go @@ -132,10 +132,6 @@ three streams although only two are required. This is to show that there can exi streams from the same source. The main difference between an external source vs a vitess source is that the source proto contains an "external_mysql" field instead of keyspace and shard. That field is the key into the externalConnections section of the input yaml. - -VReplicationExec: insert into _vt.vreplication (workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values('product', 'vt_commerce', 'filter: > external_mysql:\"product\" ', ”, 9999, 9999, 'primary', 0, 0, 'Running') -VReplicationExec: insert into _vt.vreplication (workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values('customer', 'vt_commerce', 'filter: > external_mysql:\"customer\" ', ”, 9999, 9999, 'primary', 0, 0, 'Running') -VReplicationExec: insert into _vt.vreplication (workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values('orders', 'vt_commerce', 'filter: > external_mysql:\"customer\" ', ”, 9999, 9999, 'primary', 0, 0, 'Running') */ func TestMigration(t *testing.T) { yamlFile := startCluster(t) @@ -155,7 +151,7 @@ func TestMigration(t *testing.T) { migrate(t, "customer", "commerce", []string{"customer"}) migrate(t, "customer", "commerce", []string{"orders"}) vttablet := keyspaces["commerce"].Shards[0].Vttablets[0].VttabletProcess - waitForVReplicationToCatchup(t, vttablet, 1*time.Second) + waitForVReplicationToCatchup(t, vttablet, 30*time.Second) testcases := []struct { query string @@ -217,11 +213,11 @@ func migrate(t *testing.T, fromdb, toks string, tables []string) { var sqlEscaped bytes.Buffer val.EncodeSQL(&sqlEscaped) query := fmt.Sprintf("insert into _vt.vreplication "+ - "(workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values"+ - "('%s', '%s', %s, '', 9999, 9999, 'primary', 0, 0, 'Running')", tables[0], "vt_"+toks, sqlEscaped.String()) - fmt.Printf("VReplicationExec: %s\n", query) - vttablet := keyspaces[toks].Shards[0].Vttablets[0].VttabletProcess - err := clusterInstance.VtctldClientProcess.ExecuteCommand("VReplicationExec", vttablet.TabletPath, query) + "(workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state, options) values"+ + "('%s', '%s', %s, '', 9999, 9999, 'primary', 0, 0, 'Running', '{}')", tables[0], "vt_"+toks, sqlEscaped.String()) + fmt.Printf("VReplication insert: %s\n", query) + vttablet := keyspaces[toks].Shards[0].Vttablets[0].Alias + err := clusterInstance.VtctldClientProcess.ExecuteCommand("ExecuteFetchAsDBA", vttablet, query) require.NoError(t, err) } diff --git a/go/test/endtoend/vtorc/readtopologyinstance/main_test.go b/go/test/endtoend/vtorc/readtopologyinstance/main_test.go index fa8dc116782..44fa272df8e 100644 --- a/go/test/endtoend/vtorc/readtopologyinstance/main_test.go +++ b/go/test/endtoend/vtorc/readtopologyinstance/main_test.go @@ -87,7 +87,7 @@ func TestReadTopologyInstanceBufferable(t *testing.T) { assert.Equal(t, "ON", primaryInstance.GTIDMode) assert.Equal(t, "FULL", primaryInstance.BinlogRowImage) assert.Contains(t, primaryInstance.SelfBinlogCoordinates.LogFile, fmt.Sprintf("vt-0000000%d-bin", primary.TabletUID)) - assert.Greater(t, primaryInstance.SelfBinlogCoordinates.LogPos, uint32(0)) + assert.Greater(t, primaryInstance.SelfBinlogCoordinates.LogPos, uint64(0)) assert.True(t, primaryInstance.SemiSyncPrimaryEnabled) assert.True(t, primaryInstance.SemiSyncReplicaEnabled) assert.True(t, primaryInstance.SemiSyncPrimaryStatus) @@ -139,7 +139,7 @@ func TestReadTopologyInstanceBufferable(t *testing.T) { assert.Equal(t, utils.Hostname, replicaInstance.SourceHost) assert.Equal(t, primary.MySQLPort, replicaInstance.SourcePort) assert.Contains(t, replicaInstance.SelfBinlogCoordinates.LogFile, fmt.Sprintf("vt-0000000%d-bin", replica.TabletUID)) - assert.Greater(t, replicaInstance.SelfBinlogCoordinates.LogPos, uint32(0)) + assert.Greater(t, replicaInstance.SelfBinlogCoordinates.LogPos, uint64(0)) assert.False(t, replicaInstance.SemiSyncPrimaryEnabled) assert.True(t, replicaInstance.SemiSyncReplicaEnabled) assert.False(t, replicaInstance.SemiSyncPrimaryStatus) @@ -157,11 +157,11 @@ func TestReadTopologyInstanceBufferable(t *testing.T) { assert.True(t, replicaInstance.ReplicationIOThreadRuning) assert.True(t, replicaInstance.ReplicationSQLThreadRuning) assert.Equal(t, replicaInstance.ReadBinlogCoordinates.LogFile, primaryInstance.SelfBinlogCoordinates.LogFile) - assert.Greater(t, replicaInstance.ReadBinlogCoordinates.LogPos, uint32(0)) + assert.Greater(t, replicaInstance.ReadBinlogCoordinates.LogPos, uint64(0)) assert.Equal(t, replicaInstance.ExecBinlogCoordinates.LogFile, primaryInstance.SelfBinlogCoordinates.LogFile) - assert.Greater(t, replicaInstance.ExecBinlogCoordinates.LogPos, uint32(0)) + assert.Greater(t, replicaInstance.ExecBinlogCoordinates.LogPos, uint64(0)) assert.Contains(t, replicaInstance.RelaylogCoordinates.LogFile, fmt.Sprintf("vt-0000000%d-relay", replica.TabletUID)) - assert.Greater(t, replicaInstance.RelaylogCoordinates.LogPos, uint32(0)) + assert.Greater(t, replicaInstance.RelaylogCoordinates.LogPos, uint64(0)) assert.Empty(t, replicaInstance.LastIOError) assert.Empty(t, replicaInstance.LastSQLError) assert.EqualValues(t, 0, replicaInstance.SQLDelay) diff --git a/go/vt/vtorc/inst/analysis_dao.go b/go/vt/vtorc/inst/analysis_dao.go index 99df358e330..93d3bc6a3a6 100644 --- a/go/vt/vtorc/inst/analysis_dao.go +++ b/go/vt/vtorc/inst/analysis_dao.go @@ -323,7 +323,7 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna a.AnalyzedInstancePrimaryAlias = topoproto.TabletAliasString(primaryTablet.Alias) a.AnalyzedInstanceBinlogCoordinates = BinlogCoordinates{ LogFile: m.GetString("binary_log_file"), - LogPos: m.GetUint32("binary_log_pos"), + LogPos: m.GetUint64("binary_log_pos"), Type: BinaryLog, } isStaleBinlogCoordinates := m.GetBool("is_stale_binlog_coordinates") diff --git a/go/vt/vtorc/inst/binlog.go b/go/vt/vtorc/inst/binlog.go index 9c115e4e457..b4abf34ac7e 100644 --- a/go/vt/vtorc/inst/binlog.go +++ b/go/vt/vtorc/inst/binlog.go @@ -40,22 +40,23 @@ const ( // BinlogCoordinates described binary log coordinates in the form of log file & log position. type BinlogCoordinates struct { LogFile string - LogPos uint32 + LogPos uint64 Type BinlogType } -// ParseInstanceKey will parse an InstanceKey from a string representation such as 127.0.0.1:3306 +// ParseBinlogCoordinates will parse a string representation such as "mysql-bin.000001:12345" +// into a BinlogCoordinates struct. func ParseBinlogCoordinates(logFileLogPos string) (*BinlogCoordinates, error) { tokens := strings.SplitN(logFileLogPos, ":", 2) if len(tokens) != 2 { return nil, fmt.Errorf("ParseBinlogCoordinates: Cannot parse BinlogCoordinates from %s. Expected format is file:pos", logFileLogPos) } - logPos, err := strconv.ParseUint(tokens[1], 10, 32) + logPos, err := strconv.ParseUint(tokens[1], 10, 64) if err != nil { return nil, fmt.Errorf("ParseBinlogCoordinates: invalid pos: %s", tokens[1]) } - return &BinlogCoordinates{LogFile: tokens[0], LogPos: uint32(logPos)}, nil + return &BinlogCoordinates{LogFile: tokens[0], LogPos: logPos}, nil } // DisplayString returns a user-friendly string representation of these coordinates @@ -177,6 +178,6 @@ func (binlogCoordinates *BinlogCoordinates) ExtractDetachedCoordinates() (isDeta } detachedCoordinates.LogFile = detachedCoordinatesSubmatch[1] logPos, _ := strconv.ParseUint(detachedCoordinatesSubmatch[2], 10, 32) - detachedCoordinates.LogPos = uint32(logPos) + detachedCoordinates.LogPos = logPos return true, detachedCoordinates } diff --git a/go/vt/vtorc/inst/binlog_test.go b/go/vt/vtorc/inst/binlog_test.go index bc0110e981c..1f73f3c0029 100644 --- a/go/vt/vtorc/inst/binlog_test.go +++ b/go/vt/vtorc/inst/binlog_test.go @@ -41,7 +41,7 @@ func TestPreviousFileCoordinates(t *testing.T) { require.NoError(t, err) require.Equal(t, previous.LogFile, "mysql-bin.000009") - require.Equal(t, previous.LogPos, uint32(0)) + require.Equal(t, previous.LogPos, uint64(0)) } func TestNextFileCoordinates(t *testing.T) { @@ -49,7 +49,7 @@ func TestNextFileCoordinates(t *testing.T) { require.NoError(t, err) require.Equal(t, next.LogFile, "mysql-bin.000011") - require.Equal(t, next.LogPos, uint32(0)) + require.Equal(t, next.LogPos, uint64(0)) } func TestBinlogCoordinates(t *testing.T) { diff --git a/go/vt/vtorc/inst/instance_dao.go b/go/vt/vtorc/inst/instance_dao.go index ec0288cc423..46be8689c0a 100644 --- a/go/vt/vtorc/inst/instance_dao.go +++ b/go/vt/vtorc/inst/instance_dao.go @@ -516,14 +516,14 @@ func readInstanceRow(m sqlutils.RowMap) *Instance { instance.GtidErrant = m.GetString("gtid_errant") instance.UsingMariaDBGTID = m.GetBool("mariadb_gtid") instance.SelfBinlogCoordinates.LogFile = m.GetString("binary_log_file") - instance.SelfBinlogCoordinates.LogPos = m.GetUint32("binary_log_pos") + instance.SelfBinlogCoordinates.LogPos = m.GetUint64("binary_log_pos") instance.ReadBinlogCoordinates.LogFile = m.GetString("source_log_file") - instance.ReadBinlogCoordinates.LogPos = m.GetUint32("read_source_log_pos") + instance.ReadBinlogCoordinates.LogPos = m.GetUint64("read_source_log_pos") instance.ExecBinlogCoordinates.LogFile = m.GetString("relay_source_log_file") - instance.ExecBinlogCoordinates.LogPos = m.GetUint32("exec_source_log_pos") + instance.ExecBinlogCoordinates.LogPos = m.GetUint64("exec_source_log_pos") instance.IsDetached, _ = instance.ExecBinlogCoordinates.ExtractDetachedCoordinates() instance.RelaylogCoordinates.LogFile = m.GetString("relay_log_file") - instance.RelaylogCoordinates.LogPos = m.GetUint32("relay_log_pos") + instance.RelaylogCoordinates.LogPos = m.GetUint64("relay_log_pos") instance.RelaylogCoordinates.Type = RelayLog instance.LastSQLError = m.GetString("last_sql_error") instance.LastIOError = m.GetString("last_io_error") diff --git a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go index 3167be5e512..8983f6f447e 100644 --- a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go +++ b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go @@ -132,7 +132,8 @@ func TestEmergencyReparentShard(t *testing.T) { }, }, }) - goodReplica1RelayLogPos, _ := replication.ParseFilePosGTIDSet("relay-bin.000004:455") + goodReplica1RelayLogPos, err := replication.ParseFilePosGTIDSet("relay-bin.003222:18321744073709551612") // Requires all 64 bits or uint64 + require.NoError(t, err) goodReplica1.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{ GTIDSet: goodReplica1RelayLogPos, } @@ -181,7 +182,7 @@ func TestEmergencyReparentShard(t *testing.T) { // run EmergencyReparentShard waitReplicaTimeout := time.Second * 2 - err := vp.Run([]string{"EmergencyReparentShard", "--wait_replicas_timeout", waitReplicaTimeout.String(), newPrimary.Tablet.Keyspace + "/" + newPrimary.Tablet.Shard, + err = vp.Run([]string{"EmergencyReparentShard", "--wait_replicas_timeout", waitReplicaTimeout.String(), newPrimary.Tablet.Keyspace + "/" + newPrimary.Tablet.Shard, topoproto.TabletAliasString(newPrimary.Tablet.Alias)}) require.NoError(t, err) // check what was run