diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index beb556f21f5..eaef5504329 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -162,15 +162,15 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags, send func(events []*binlogdatapb.VEvent) error) error { vgtid, filter, flags, err := vsm.resolveParams(ctx, tabletType, vgtid, filter, flags) if err != nil { - return err + return vterrors.Wrap(err, "failed to resolve vstream parameters") } ts, err := vsm.toposerv.GetTopoServer() if err != nil { - return err + return vterrors.Wrap(err, "failed to get topology server") } if ts == nil { log.Errorf("unable to get topo server in VStream()") - return fmt.Errorf("unable to get topo server") + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "unable to get topoology server") } vs := &vstream{ vgtid: vgtid, @@ -215,7 +215,7 @@ func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodat flags = &vtgatepb.VStreamFlags{} } if vgtid == nil || len(vgtid.ShardGtids) == 0 { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vgtid must have at least one value with a starting position") + return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vgtid must have at least one value with a starting position in ShardGtids") } // To fetch from all keyspaces, the input must contain a single ShardGtid // that has an empty keyspace, and the Gtid must be "current". @@ -228,7 +228,7 @@ func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodat newvgtid := &binlogdatapb.VGtid{} keyspaces, err := vsm.toposerv.GetSrvKeyspaceNames(ctx, vsm.cell, false) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, vterrors.Wrapf(err, "failed to get keyspace names in cell %s", vsm.cell) } if isEmpty { @@ -244,7 +244,7 @@ func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodat } else { re, err := regexp.Compile(strings.Trim(inputKeyspace, "/")) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, vterrors.Wrapf(err, "failed to compile regexp using %s", inputKeyspace) } for _, keyspace := range keyspaces { if re.MatchString(keyspace) { @@ -262,12 +262,13 @@ func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodat for _, sgtid := range vgtid.ShardGtids { if sgtid.Shard == "" { if sgtid.Gtid != "current" && sgtid.Gtid != "" { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "if shards are unspecified, the Gtid value must be 'current' or empty; got: %v", vgtid) + return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "if shards are unspecified, the Gtid value must be 'current' or empty; got: %+v", + vgtid) } // TODO(sougou): this should work with the new Migrate workflow _, _, allShards, err := vsm.resolver.GetKeyspaceShards(ctx, sgtid.Keyspace, tabletType) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, vterrors.Wrapf(err, "failed to get shards in keyspace %s", sgtid.Keyspace) } for _, shard := range allShards { newvgtid.ShardGtids = append(newvgtid.ShardGtids, &binlogdatapb.ShardGtid{ @@ -329,9 +330,9 @@ func (vs *vstream) sendEvents(ctx context.Context) { send := func(evs []*binlogdatapb.VEvent) error { if err := vs.send(evs); err != nil { vs.once.Do(func() { - vs.setError(err) + vs.setError(err, "error sending events") }) - return err + return vterrors.Wrap(err, "error sending events") } return nil } @@ -339,13 +340,13 @@ func (vs *vstream) sendEvents(ctx context.Context) { select { case <-ctx.Done(): vs.once.Do(func() { - vs.setError(fmt.Errorf("context canceled")) + vs.setError(ctx.Err(), "context ended while sending events") }) return case evs := <-vs.eventCh: if err := send(evs); err != nil { vs.once.Do(func() { - vs.setError(err) + vs.setError(err, "error sending events") }) return } @@ -359,7 +360,7 @@ func (vs *vstream) sendEvents(ctx context.Context) { }} if err := send(evs); err != nil { vs.once.Do(func() { - vs.setError(err) + vs.setError(err, "error sending heartbeat") }) return } @@ -378,7 +379,7 @@ func (vs *vstream) startOneStream(ctx context.Context, sgtid *binlogdatapb.Shard if err != nil { log.Errorf("Error in vstream for %+v: %s", sgtid, err) vs.once.Do(func() { - vs.setError(err) + vs.setError(err, fmt.Sprintf("error starting stream from shard GTID %+v", sgtid)) vs.cancel() }) } @@ -464,10 +465,12 @@ func (vs *vstream) alignStreams(ctx context.Context, event *binlogdatapb.VEvent, } select { case <-ctx.Done(): - return ctx.Err() + return vterrors.Wrapf(ctx.Err(), "context ended while waiting for skew to reduce for stream %s from %s/%s", + streamID, keyspace, shard) case <-time.After(time.Duration(vs.skewTimeoutSeconds) * time.Second): log.Errorf("timed out while waiting for skew to reduce: %s", streamID) - return fmt.Errorf("timed out while waiting for skew to reduce: %s", streamID) + return vterrors.Errorf(vtrpcpb.Code_CANCELED, "timed out while waiting for skew to reduce for stream %s from %s/%s", + streamID, keyspace, shard) case <-vs.skewCh: // once skew is fixed the channel is closed and all waiting streams "wake up" } @@ -500,7 +503,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha for { select { case <-ctx.Done(): - return ctx.Err() + return vterrors.Wrapf(ctx.Err(), "context ended while streaming from %s/%s", sgtid.Keyspace, sgtid.Shard) case <-journalDone: // Unreachable. // This can happen if a server misbehaves and does not end @@ -544,8 +547,9 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha if err != nil { return tabletPickerErr(err) } - log.Infof("Picked a %s tablet for VStream in %s/%s within the %s cell(s)", - vs.tabletType.String(), sgtid.GetKeyspace(), sgtid.GetShard(), strings.Join(cells, ",")) + tabletAliasString := topoproto.TabletAliasString(tablet.Alias) + log.Infof("Picked %s tablet %s for VStream in %s/%s within the %s cell(s)", + vs.tabletType.String(), tabletAliasString, sgtid.GetKeyspace(), sgtid.GetShard(), strings.Join(cells, ",")) target := &querypb.Target{ Keyspace: sgtid.Keyspace, @@ -556,7 +560,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha tabletConn, err := vs.vsm.resolver.GetGateway().QueryServiceByAlias(ctx, tablet.Alias, target) if err != nil { log.Errorf(err.Error()) - return err + return vterrors.Wrapf(err, "failed to get tablet connection to %s", tabletAliasString) } errCh := make(chan error, 1) @@ -565,9 +569,9 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha var err error switch { case ctx.Err() != nil: - err = fmt.Errorf("context has ended") + err = vterrors.Wrapf(ctx.Err(), "context ended while streaming tablet health from %s", tabletAliasString) case shr == nil || shr.RealtimeStats == nil || shr.Target == nil: - err = fmt.Errorf("health check failed on %s", topoproto.TabletAliasString(tablet.Alias)) + err = fmt.Errorf("health check failed on %s", tabletAliasString) case vs.tabletType != shr.Target.TabletType: err = fmt.Errorf("tablet %s type has changed from %s to %s, restarting vstream", topoproto.TabletAliasString(tablet.Alias), vs.tabletType, shr.Target.TabletType) @@ -580,6 +584,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } if err != nil { log.Warningf("Tablet state changed: %s, attempting to restart", err) + err = vterrors.Wrapf(err, "error streaming tablet health from %s", tabletAliasString) errCh <- err return err } @@ -604,7 +609,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha Options: options, } var vstreamCreatedOnce sync.Once - log.Infof("Starting to vstream from %s, with req %+v", topoproto.TabletAliasString(tablet.Alias), req) + log.Infof("Starting to vstream from %s, with req %+v", tabletAliasString, req) err = tabletConn.VStream(ctx, req, func(events []*binlogdatapb.VEvent) error { // We received a valid event. Reset error count. errCount = 0 @@ -617,9 +622,11 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha select { case <-ctx.Done(): - return ctx.Err() + return vterrors.Wrapf(ctx.Err(), "context ended while streaming from tablet %s in %s/%s", + tabletAliasString, sgtid.Keyspace, sgtid.Shard) case streamErr := <-errCh: - return vterrors.New(vtrpcpb.Code_UNAVAILABLE, streamErr.Error()) + return vterrors.Wrapf(streamErr, "error streaming from tablet %s in %s/%s", + tabletAliasString, sgtid.Keyspace, sgtid.Shard) case <-journalDone: // Unreachable. // This can happen if a server misbehaves and does not end @@ -628,6 +635,9 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha default: } + aligningStreamsErr := fmt.Sprintf("error aligning streams across %s/%s", sgtid.Keyspace, sgtid.Shard) + sendingEventsErr := fmt.Sprintf("error sending event batch from tablet %s", tabletAliasString) + sendevents := make([]*binlogdatapb.VEvent, 0, len(events)) for i, event := range events { switch event.Type { @@ -648,11 +658,11 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha eventss = append(eventss, sendevents) if err := vs.alignStreams(ctx, event, sgtid.Keyspace, sgtid.Shard); err != nil { - return err + return vterrors.Wrap(err, aligningStreamsErr) } if err := vs.sendAll(ctx, sgtid, eventss); err != nil { - return err + return vterrors.Wrap(err, sendingEventsErr) } eventss = nil sendevents = nil @@ -664,11 +674,11 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha eventss = append(eventss, sendevents) if err := vs.alignStreams(ctx, event, sgtid.Keyspace, sgtid.Shard); err != nil { - return err + return vterrors.Wrap(err, aligningStreamsErr) } if err := vs.sendAll(ctx, sgtid, eventss); err != nil { - return err + return vterrors.Wrap(err, sendingEventsErr) } eventss = nil sendevents = nil @@ -677,7 +687,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // Otherwise they can accumulate indefinitely if there are no real events. // TODO(sougou): figure out a model for this. if err := vs.alignStreams(ctx, event, sgtid.Keyspace, sgtid.Shard); err != nil { - return err + return vterrors.Wrap(err, aligningStreamsErr) } case binlogdatapb.VEventType_JOURNAL: journal := event.Journal @@ -698,14 +708,15 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } eventss = append(eventss, sendevents) if err := vs.sendAll(ctx, sgtid, eventss); err != nil { - return err + return vterrors.Wrap(err, sendingEventsErr) } eventss = nil sendevents = nil } je, err := vs.getJournalEvent(ctx, sgtid, journal) if err != nil { - return err + return vterrors.Wrapf(err, "error getting journal event for shard GTID %+v on tablet %s", + sgtid, tabletAliasString) } if je != nil { var endTimer *time.Timer @@ -725,7 +736,8 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha journalDone = je.done select { case <-ctx.Done(): - return ctx.Err() + return vterrors.Wrapf(ctx.Err(), "context ended while waiting for journal event for shard GTID %+v on tablet %s", + sgtid, tabletAliasString) case <-journalDone: if endTimer != nil { <-endTimer.C @@ -752,13 +764,15 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } if err == nil { // Unreachable. - err = vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "vstream ended unexpectedly") + err = vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "vstream ended unexpectedly on tablet %s in %s/%s", + tabletAliasString, sgtid.Keyspace, sgtid.Shard) } retry, ignoreTablet := vs.shouldRetry(err) if !retry { log.Errorf("vstream for %s/%s error: %v", sgtid.Keyspace, sgtid.Shard, err) - return err + return vterrors.Wrapf(err, "error in vstream for %s/%s on tablet %s", + sgtid.Keyspace, sgtid.Shard, tabletAliasString) } if ignoreTablet { ignoreTablets = append(ignoreTablets, tablet.GetAlias()) @@ -768,7 +782,8 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // Retry, at most, 3 times if the error can be retried. if errCount >= 3 { log.Errorf("vstream for %s/%s had three consecutive failures: %v", sgtid.Keyspace, sgtid.Shard, err) - return err + return vterrors.Wrapf(err, "persistent error in vstream for %s/%s on tablet %s; giving up", + sgtid.Keyspace, sgtid.Shard, tabletAliasString) } log.Infof("vstream for %s/%s error, retrying: %v", sgtid.Keyspace, sgtid.Shard, err) } @@ -884,10 +899,10 @@ func (vs *vstream) getError() error { return vs.err } -func (vs *vstream) setError(err error) { +func (vs *vstream) setError(err error, msg string) { vs.errMu.Lock() defer vs.errMu.Unlock() - vs.err = err + vs.err = vterrors.Wrap(err, msg) } // getJournalEvent returns a journalEvent. The caller has to wait on its done channel.