Skip to content

Commit

Permalink
Refactoring BinlogStream class to replace with BinlogMetadata
Browse files Browse the repository at this point in the history
  • Loading branch information
fulghum committed Jun 1, 2024
1 parent 05c840d commit e621e26
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 80 deletions.
105 changes: 47 additions & 58 deletions go/mysql/binlog_event_make.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,32 +72,22 @@ func NewMariaDBBinlogFormat() BinlogFormat {
}
}

// BinlogStream is used to generate consistent BinlogEvent packets
// for a stream. It stores the ServerID, log position, and timestamp
// fields which are needed for a binlog event's outer packet.
type BinlogStream struct {
// ServerID is the server ID of the originating mysql-server.
// BinlogEventMetadata is a container for various metadata needed to create
// a binlog event.
type BinlogEventMetadata struct {
// ServerID is the server ID of the server that created this event.
ServerID uint32

// LogPosition is an incrementing log position.
LogPosition uint32

// Timestamp is a uint32 indicating when the events occurred.
// Timestamp is a uint32 indicating when the event occurred.
Timestamp uint32
}

// NewFakeBinlogStream returns a simple BinlogStream with hardcoded values for testing.
func NewFakeBinlogStream() *BinlogStream {
return &BinlogStream{
ServerID: 1,
LogPosition: 4,
Timestamp: 1407805592,
}
// NextLogPosition indicates the position in the binlog file for the start of the next event.
NextLogPosition uint32
}

// Packetize adds the binlog event header to a packet, and optionally
// packetize adds the binlog event header to a packet, and optionally
// the checksum.
func (s *BinlogStream) Packetize(f BinlogFormat, typ byte, flags uint16, data []byte) []byte {
func packetize(f BinlogFormat, typ byte, flags uint16, data []byte, m BinlogEventMetadata) []byte {
length := int(f.HeaderLength) + len(data)
if typ == eFormatDescriptionEvent || f.ChecksumAlgorithm == BinlogChecksumAlgCRC32 {
// Just add 4 zeroes to the end.
Expand All @@ -109,13 +99,13 @@ func (s *BinlogStream) Packetize(f BinlogFormat, typ byte, flags uint16, data []
case eRotateEvent, eHeartbeatEvent:
// timestamp remains zero
default:
binary.LittleEndian.PutUint32(result[0:4], s.Timestamp)
binary.LittleEndian.PutUint32(result[0:4], m.Timestamp)
}
result[4] = typ
binary.LittleEndian.PutUint32(result[5:9], s.ServerID)
binary.LittleEndian.PutUint32(result[5:9], m.ServerID)
binary.LittleEndian.PutUint32(result[9:13], uint32(length))
if f.HeaderLength >= 19 {
binary.LittleEndian.PutUint32(result[13:17], s.LogPosition)
binary.LittleEndian.PutUint32(result[13:17], m.NextLogPosition)
binary.LittleEndian.PutUint16(result[17:19], flags)
}
copy(result[f.HeaderLength:], data)
Expand All @@ -137,7 +127,7 @@ func NewInvalidEvent() BinlogEvent {
// NewFormatDescriptionEvent creates a new FormatDescriptionEvent
// based on the provided BinlogFormat. It uses a mysql56BinlogEvent
// but could use a MariaDB one.
func NewFormatDescriptionEvent(f BinlogFormat, s *BinlogStream) BinlogEvent {
func NewFormatDescriptionEvent(f BinlogFormat, m BinlogEventMetadata) BinlogEvent {
length := 2 + // binlog-version
50 + // server version
4 + // create timestamp
Expand All @@ -147,70 +137,70 @@ func NewFormatDescriptionEvent(f BinlogFormat, s *BinlogStream) BinlogEvent {
data := make([]byte, length)
binary.LittleEndian.PutUint16(data[0:2], f.FormatVersion)
copy(data[2:52], f.ServerVersion)
binary.LittleEndian.PutUint32(data[52:56], s.Timestamp)
binary.LittleEndian.PutUint32(data[52:56], m.Timestamp)
data[56] = f.HeaderLength
copy(data[57:], f.HeaderSizes)
data[57+len(f.HeaderSizes)] = f.ChecksumAlgorithm

ev := s.Packetize(f, eFormatDescriptionEvent, 0, data)
ev := packetize(f, eFormatDescriptionEvent, 0, data, m)
return NewMysql56BinlogEvent(ev)
}

// NewInvalidFormatDescriptionEvent returns an invalid FormatDescriptionEvent.
// The binlog version is set to 3. It IsValid() though.
func NewInvalidFormatDescriptionEvent(f BinlogFormat, s *BinlogStream) BinlogEvent {
func NewInvalidFormatDescriptionEvent(f BinlogFormat, m BinlogEventMetadata) BinlogEvent {
length := 75
data := make([]byte, length)
data[0] = 3

ev := s.Packetize(f, eFormatDescriptionEvent, 0, data)
ev := packetize(f, eFormatDescriptionEvent, 0, data, m)
return NewMysql56BinlogEvent(ev)
}

// NewRotateEvent returns a RotateEvent.
// The timestamp of such an event should be zero, so we patch it in.
func NewRotateEvent(f BinlogFormat, s *BinlogStream, position uint64, filename string) BinlogEvent {
func NewRotateEvent(f BinlogFormat, m BinlogEventMetadata, position uint64, filename string) BinlogEvent {
length := 8 + // position
len(filename)
data := make([]byte, length)
binary.LittleEndian.PutUint64(data[0:8], position)
copy(data[8:], filename)

ev := s.Packetize(f, eRotateEvent, 0, data)
ev := packetize(f, eRotateEvent, 0, data, m)
return NewMysql56BinlogEvent(ev)
}

func NewFakeRotateEvent(f BinlogFormat, s *BinlogStream, filename string) BinlogEvent {
func NewFakeRotateEvent(f BinlogFormat, m BinlogEventMetadata, filename string) BinlogEvent {
length := 8 + // position
len(filename)
data := make([]byte, length)
binary.LittleEndian.PutUint64(data[0:8], 4)
copy(data[8:], filename)

ev := s.Packetize(f, eRotateEvent, FlagLogEventArtificial, data)
ev := packetize(f, eRotateEvent, FlagLogEventArtificial, data, m)
return NewMysql56BinlogEvent(ev)
}

// NewHeartbeatEvent returns a HeartbeatEvent.
// see https://dev.mysql.com/doc/internals/en/heartbeat-event.html
func NewHeartbeatEvent(f BinlogFormat, s *BinlogStream) BinlogEvent {
ev := s.Packetize(f, eHeartbeatEvent, 0, []byte{})
func NewHeartbeatEvent(f BinlogFormat, m BinlogEventMetadata) BinlogEvent {
ev := packetize(f, eHeartbeatEvent, 0, []byte{}, m)
return NewMysql56BinlogEvent(ev)
}

// NewHeartbeatEvent returns a HeartbeatEvent.
// see https://dev.mysql.com/doc/internals/en/heartbeat-event.html
func NewHeartbeatEventWithLogFile(f BinlogFormat, s *BinlogStream, filename string) BinlogEvent {
func NewHeartbeatEventWithLogFile(f BinlogFormat, m BinlogEventMetadata, filename string) BinlogEvent {
length := len(filename)
data := make([]byte, length)
copy(data, filename)

ev := s.Packetize(f, eHeartbeatEvent, 0, data)
ev := packetize(f, eHeartbeatEvent, 0, data, m)
return NewMysql56BinlogEvent(ev)
}

// NewQueryEvent makes up a QueryEvent based on the Query structure.
func NewQueryEvent(f BinlogFormat, s *BinlogStream, q Query) BinlogEvent {
func NewQueryEvent(f BinlogFormat, m BinlogEventMetadata, q Query) BinlogEvent {
statusVarLength := 0
if q.Charset != nil {
statusVarLength += 1 + 2 + 2 + 2
Expand Down Expand Up @@ -247,32 +237,32 @@ func NewQueryEvent(f BinlogFormat, s *BinlogStream, q Query) BinlogEvent {
pos++
copy(data[pos:], q.SQL)

ev := s.Packetize(f, eQueryEvent, 0, data)
ev := packetize(f, eQueryEvent, 0, data, m)
return NewMysql56BinlogEvent(ev)
}

// NewInvalidQueryEvent returns an invalid QueryEvent. IsValid is however true.
// sqlPos is out of bounds.
func NewInvalidQueryEvent(f BinlogFormat, s *BinlogStream) BinlogEvent {
func NewInvalidQueryEvent(f BinlogFormat, m BinlogEventMetadata) BinlogEvent {
length := 100
data := make([]byte, length)
data[4+4] = 200 // > 100

ev := s.Packetize(f, eQueryEvent, 0, data)
ev := packetize(f, eQueryEvent, 0, data, m)
return NewMysql56BinlogEvent(ev)
}

// NewXIDEvent returns a XID event. We do not use the data, so keep it 0.
func NewXIDEvent(f BinlogFormat, s *BinlogStream) BinlogEvent {
func NewXIDEvent(f BinlogFormat, m BinlogEventMetadata) BinlogEvent {
length := 8
data := make([]byte, length)

ev := s.Packetize(f, eXIDEvent, 0, data)
ev := packetize(f, eXIDEvent, 0, data, m)
return NewMysql56BinlogEvent(ev)
}

// NewIntVarEvent returns an IntVar event.
func NewIntVarEvent(f BinlogFormat, s *BinlogStream, typ byte, value uint64) BinlogEvent {
func NewIntVarEvent(f BinlogFormat, m BinlogEventMetadata, typ byte, value uint64) BinlogEvent {
length := 9
data := make([]byte, length)

Expand All @@ -286,13 +276,13 @@ func NewIntVarEvent(f BinlogFormat, s *BinlogStream, typ byte, value uint64) Bin
data[7] = byte(value >> 48)
data[8] = byte(value >> 56)

ev := s.Packetize(f, eIntVarEvent, 0, data)
ev := packetize(f, eIntVarEvent, 0, data, m)
return NewMysql56BinlogEvent(ev)
}

// NewMariaDBGTIDEvent returns a MariaDB specific GTID event.
// It ignores the Server in the gtid, instead uses the BinlogStream.ServerID.
func NewMariaDBGTIDEvent(f BinlogFormat, s *BinlogStream, gtid MariadbGTID, hasBegin bool) BinlogEvent {
func NewMariaDBGTIDEvent(f BinlogFormat, m BinlogEventMetadata, gtid MariadbGTID, hasBegin bool) BinlogEvent {
length := 8 + // sequence
4 + // domain
1 // flags2
Expand All @@ -318,12 +308,12 @@ func NewMariaDBGTIDEvent(f BinlogFormat, s *BinlogStream, gtid MariadbGTID, hasB
}
data[12] = flags2

ev := s.Packetize(f, eMariaGTIDEvent, 0, data)
ev := packetize(f, eMariaGTIDEvent, 0, data, m)
return NewMariadbBinlogEvent(ev)
}

// NewMySQLGTIDEvent returns a MySQL specific GTID event.
func NewMySQLGTIDEvent(f BinlogFormat, s *BinlogStream, gtid Mysql56GTID, hasBegin bool) BinlogEvent {
func NewMySQLGTIDEvent(f BinlogFormat, m BinlogEventMetadata, gtid Mysql56GTID, hasBegin bool) BinlogEvent {
length := 1 + // flags
16 + // SID (server UUID)
8 // GNO (sequence number, signed int)
Expand All @@ -347,14 +337,13 @@ func NewMySQLGTIDEvent(f BinlogFormat, s *BinlogStream, gtid Mysql56GTID, hasBeg
}
data[0] = flags2

ev := s.Packetize(f, eGTIDEvent, 0, data)
ev := packetize(f, eGTIDEvent, 0, data, m)
return NewMysql56BinlogEvent(ev)
}


// NewTableMapEvent returns a TableMap event.
// Only works with post_header_length=8.
func NewTableMapEvent(f BinlogFormat, s *BinlogStream, tableID uint64, tm *TableMap) BinlogEvent {
func NewTableMapEvent(f BinlogFormat, m BinlogEventMetadata, tableID uint64, tm *TableMap) BinlogEvent {
if f.HeaderSize(eTableMapEvent) != 8 {
panic("Not implemented, post_header_length!=8")
}
Expand Down Expand Up @@ -406,30 +395,30 @@ func NewTableMapEvent(f BinlogFormat, s *BinlogStream, tableID uint64, tm *Table
panic("bad encoding")
}

ev := s.Packetize(f, eTableMapEvent, 0, data)
ev := packetize(f, eTableMapEvent, 0, data, m)
return NewMariadbBinlogEvent(ev)
}

// NewWriteRowsEvent returns a WriteRows event. Uses v2.
func NewWriteRowsEvent(f BinlogFormat, s *BinlogStream, tableID uint64, rows Rows) BinlogEvent {
return newRowsEvent(f, s, eWriteRowsEventV2, tableID, rows)
func NewWriteRowsEvent(f BinlogFormat, m BinlogEventMetadata, tableID uint64, rows Rows) BinlogEvent {
return newRowsEvent(f, m, eWriteRowsEventV2, tableID, rows)
}

// NewUpdateRowsEvent returns an UpdateRows event. Uses v2.
func NewUpdateRowsEvent(f BinlogFormat, s *BinlogStream, tableID uint64, rows Rows) BinlogEvent {
return newRowsEvent(f, s, eUpdateRowsEventV2, tableID, rows)
func NewUpdateRowsEvent(f BinlogFormat, m BinlogEventMetadata, tableID uint64, rows Rows) BinlogEvent {
return newRowsEvent(f, m, eUpdateRowsEventV2, tableID, rows)
}

// NewDeleteRowsEvent returns an DeleteRows event. Uses v2.
func NewDeleteRowsEvent(f BinlogFormat, s *BinlogStream, tableID uint64, rows Rows) BinlogEvent {
return newRowsEvent(f, s, eDeleteRowsEventV2, tableID, rows)
func NewDeleteRowsEvent(f BinlogFormat, m BinlogEventMetadata, tableID uint64, rows Rows) BinlogEvent {
return newRowsEvent(f, m, eDeleteRowsEventV2, tableID, rows)
}

// newRowsEvent can create an event of type:
// eWriteRowsEventV1, eWriteRowsEventV2,
// eUpdateRowsEventV1, eUpdateRowsEventV2,
// eDeleteRowsEventV1, eDeleteRowsEventV2.
func newRowsEvent(f BinlogFormat, s *BinlogStream, typ byte, tableID uint64, rows Rows) BinlogEvent {
func newRowsEvent(f BinlogFormat, m BinlogEventMetadata, typ byte, tableID uint64, rows Rows) BinlogEvent {
if f.HeaderSize(typ) == 6 {
panic("Not implemented, post_header_length==6")
}
Expand Down Expand Up @@ -489,6 +478,6 @@ func newRowsEvent(f BinlogFormat, s *BinlogStream, typ byte, tableID uint64, row
}
}

ev := s.Packetize(f, typ, 0, data)
ev := packetize(f, typ, 0, data, m)
return NewMysql56BinlogEvent(ev)
}
Loading

0 comments on commit e621e26

Please sign in to comment.