Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VStream API [Backport]: Allow Keyspace-Level Heartbeats to be Streamed (#16593) #586

Open
wants to merge 2 commits into
base: slack-19.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ type ClusterConfig struct {
vtorcPort int

vreplicationCompressGTID bool
// Set overrideHeartbeatOptions to true to override the default heartbeat options:
// which are set to only on demand (5s) and 250ms interval.
overrideHeartbeatOptions bool
}

// enableGTIDCompression enables GTID compression for the cluster and returns a function
Expand Down Expand Up @@ -519,12 +522,15 @@ func (vc *VitessCluster) AddKeyspace(t *testing.T, cells []*Cell, ksName string,
// AddTablet creates new tablet with specified attributes
func (vc *VitessCluster) AddTablet(t testing.TB, cell *Cell, keyspace *Keyspace, shard *Shard, tabletType string, tabletID int) (*Tablet, *exec.Cmd, error) {
tablet := &Tablet{}

options := []string{
"--queryserver-config-schema-reload-time", "5s",
var options []string
defaultHeartbeatOptions := []string{
"--heartbeat_on_demand_duration", "5s",
"--heartbeat_interval", "250ms",
} // FIXME: for multi-cell initial schema doesn't seem to load without "--queryserver-config-schema-reload-time"
}
if !mainClusterConfig.overrideHeartbeatOptions {
options = append(options, defaultHeartbeatOptions...)
}

options = append(options, extraVTTabletArgs...)

if mainClusterConfig.vreplicationCompressGTID {
Expand Down
136 changes: 136 additions & 0 deletions go/test/endtoend/vreplication/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,3 +555,139 @@ func TestVStreamCopyMultiKeyspaceReshard(t *testing.T) {
require.NotZero(t, ne.numDash40Events)
require.NotZero(t, ne.num40DashEvents)
}

const (
vstreamHeartbeatsTestContextTimeout = 20 * time.Second
// Expect a reasonable number of heartbeats to be received in the test duration, should ideally be ~ timeout
// since the heartbeat interval is set to 1s. But we set it to 10 to be conservative to avoid CI flakiness.
numExpectedHeartbeats = 10
)

func doVStream(t *testing.T, vc *VitessCluster, flags *vtgatepb.VStreamFlags) (numRowEvents map[string]int, numFieldEvents map[string]int) {
// Stream for a while to ensure heartbeats are sent.
ctx, cancel := context.WithTimeout(context.Background(), vstreamHeartbeatsTestContextTimeout)
defer cancel()

numRowEvents = make(map[string]int)
numFieldEvents = make(map[string]int)
vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
require.NoError(t, err)
defer vstreamConn.Close()

done := false
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: "product",
Shard: "0",
Gtid: "",
}}}

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "customer",
Filter: "select * from customer",
}},
}
// Stream events from the VStream API.
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
require.NoError(t, err)
for !done {
evs, err := reader.Recv()
switch err {
case nil:
for _, ev := range evs {
switch ev.Type {
case binlogdatapb.VEventType_ROW:
rowEvent := ev.RowEvent
arr := strings.Split(rowEvent.TableName, ".")
require.Equal(t, len(arr), 2)
tableName := arr[1]
require.Equal(t, "product", rowEvent.Keyspace)
require.Equal(t, "0", rowEvent.Shard)
numRowEvents[tableName]++

case binlogdatapb.VEventType_FIELD:
fieldEvent := ev.FieldEvent
arr := strings.Split(fieldEvent.TableName, ".")
require.Equal(t, len(arr), 2)
tableName := arr[1]
require.Equal(t, "product", fieldEvent.Keyspace)
require.Equal(t, "0", fieldEvent.Shard)
numFieldEvents[tableName]++
default:
}
}
case io.EOF:
log.Infof("Stream Ended")
done = true
default:
log.Errorf("remote error: %v", err)
done = true
}
}
return numRowEvents, numFieldEvents
}

// TestVStreamHeartbeats enables streaming of the internal Vitess heartbeat tables in the VStream API and
// ensures that the heartbeat events are received as expected by the client.
func TestVStreamHeartbeats(t *testing.T) {
// Enable continuous heartbeats.
extraVTTabletArgs = append(extraVTTabletArgs,
"--heartbeat_enable",
"--heartbeat_interval", "1s",
"--heartbeat_on_demand_duration", "0",
)
setSidecarDBName("_vt")
config := *mainClusterConfig
config.overrideHeartbeatOptions = true
vc = NewVitessCluster(t, &clusterOptions{
clusterConfig: &config,
})
defer vc.TearDown()

require.NotNil(t, vc)
defaultReplicas = 0
defaultRdonly = 0

defaultCell := vc.Cells[vc.CellNames[0]]
vc.AddKeyspace(t, []*Cell{defaultCell}, "product", "0", initialProductVSchema, initialProductSchema,
defaultReplicas, defaultRdonly, 100, nil)
verifyClusterHealth(t, vc)
insertInitialData(t)

expectedNumRowEvents := make(map[string]int)
expectedNumRowEvents["customer"] = 3 // 3 rows inserted in the customer table in insertInitialData()

type testCase struct {
name string
flags *vtgatepb.VStreamFlags
expectedHeartbeats int
}
testCases := []testCase{
{
name: "With Keyspace Heartbeats On",
flags: &vtgatepb.VStreamFlags{
StreamKeyspaceHeartbeats: true,
},
expectedHeartbeats: numExpectedHeartbeats,
},
{
name: "With Keyspace Heartbeats Off",
flags: nil,
expectedHeartbeats: 0,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
gotNumRowEvents, gotNumFieldEvents := doVStream(t, vc, tc.flags)
for k := range expectedNumRowEvents {
require.Equalf(t, 1, gotNumFieldEvents[k], "incorrect number of field events for table %s, got %d", k, gotNumFieldEvents[k])
}
require.GreaterOrEqual(t, gotNumRowEvents["heartbeat"], tc.expectedHeartbeats, "incorrect number of heartbeat events received")
log.Infof("Total number of heartbeat events received: %v", gotNumRowEvents["heartbeat"])
delete(gotNumRowEvents, "heartbeat")
require.Equal(t, expectedNumRowEvents, gotNumRowEvents)
})
}
}
Loading
Loading