Skip to content

Commit

Permalink
[release-21.0] fix: flaky test on twopc transaction (vitessio#17068) (v…
Browse files Browse the repository at this point in the history
…itessio#17070)

Signed-off-by: Harshit Gangal <[email protected]>
Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
  • Loading branch information
vitess-bot[bot] authored Oct 29, 2024
1 parent a3d9fdc commit 510faa8
Showing 1 changed file with 18 additions and 18 deletions.
36 changes: 18 additions & 18 deletions go/test/endtoend/transaction/twopc/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,24 +168,21 @@ func getDTID(dtidMap map[string]string, dtKey string) string {
}

func runVStream(t *testing.T, ctx context.Context, ch chan *binlogdatapb.VEvent, vtgateConn *vtgateconn.VTGateConn) {
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{
{Keyspace: keyspaceName, Shard: "-40", Gtid: "current"},
{Keyspace: keyspaceName, Shard: "40-80", Gtid: "current"},
{Keyspace: keyspaceName, Shard: "80-", Gtid: "current"},
}}
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "/.*/",
}},
shards := []string{"-40", "40-80", "80-"}
shardGtids := make([]*binlogdatapb.ShardGtid, 0, len(shards))
var seen = make(map[string]bool, len(shards))
var wg sync.WaitGroup
for _, shard := range shards {
shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{Keyspace: keyspaceName, Shard: shard, Gtid: "current"})
seen[shard] = false
wg.Add(1)
}
vgtid := &binlogdatapb.VGtid{ShardGtids: shardGtids}
filter := &binlogdatapb.Filter{Rules: []*binlogdatapb.Rule{{Match: "/.*/"}}}

vReader, err := vtgateConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, nil)
require.NoError(t, err)

// Use a channel to signal that the first VGTID event has been processed
firstEventProcessed := make(chan struct{})
var once sync.Once

go func() {
for {
evs, err := vReader.Recv()
Expand All @@ -195,9 +192,12 @@ func runVStream(t *testing.T, ctx context.Context, ch chan *binlogdatapb.VEvent,
require.NoError(t, err)

for _, ev := range evs {
// Signal the first event has been processed using sync.Once
// Mark VGTID event from each shard seen.
if ev.Type == binlogdatapb.VEventType_VGTID {
once.Do(func() { close(firstEventProcessed) })
if !seen[ev.Shard] {
seen[ev.Shard] = true
wg.Done()
}
}
if ev.Type == binlogdatapb.VEventType_ROW || ev.Type == binlogdatapb.VEventType_FIELD {
ch <- ev
Expand All @@ -206,8 +206,8 @@ func runVStream(t *testing.T, ctx context.Context, ch chan *binlogdatapb.VEvent,
}
}()

// Wait for the first event to be processed
<-firstEventProcessed
// Wait for VGTID event from all shards
wg.Wait()
}

func retrieveTransitions(t *testing.T, ch chan *binlogdatapb.VEvent, tableMap map[string][]*querypb.Field, dtMap map[string]string) map[string][]string {
Expand Down

0 comments on commit 510faa8

Please sign in to comment.