Skip to content

Commit

Permalink
Cleanup errors
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Oct 7, 2024
1 parent 435ff92 commit 6064bad
Showing 1 changed file with 14 additions and 19 deletions.
33 changes: 14 additions & 19 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2154,7 +2154,8 @@ func (s *Server) WorkflowDelete(ctx context.Context, req *vtctldatapb.WorkflowDe
// We don't cleanup other related artifacts since they are not tied to the tenant.
if !req.GetKeepData() {
if err := s.DeleteTenantData(ctx, ts); err != nil {
return nil, vterrors.Wrapf(err, "failed to delete tenant data, please retry the operation")
return nil, vterrors.Wrapf(err, "failed to fully delete all migrated data for tenant %s, please retry the operation",
ts.options.TenantId)
}
}
if err := delFunc(); err != nil {
Expand Down Expand Up @@ -2788,12 +2789,12 @@ func (s *Server) DeleteTenantData(ctx context.Context, ts *trafficSwitcher) erro
}
stmt, err := s.env.Parser().Parse(stream.Streams[0].BinlogSource.Filter.Rules[0].Filter)
if err != nil {
return vterrors.Wrapf(err, "failed to parse query filter for tenant %s", ts.options.TenantId)
return vterrors.Wrap(err, "failed to parse query filters")
}
sel, ok := stmt.(*sqlparser.Select)
if !ok || sel.Where == nil {
return vterrors.Wrapf(err, "unexpected query filter for tenant %s: %v",
ts.options.TenantId, stream.Streams[0].BinlogSource.Filter.Rules[0].Filter)
return vterrors.Wrapf(err, "unexpected query filter: %v",
stream.Streams[0].BinlogSource.Filter.Rules[0].Filter)
}
exprs := make([]sqlparser.Expr, 0, 1)
for _, subexpr := range sqlparser.SplitAndExpression(nil, sel.Where.Expr) {
Expand All @@ -2803,29 +2804,29 @@ func (s *Server) DeleteTenantData(ctx context.Context, ts *trafficSwitcher) erro
exprs = append(exprs, subexpr)
}
if len(exprs) == 0 {
return vterrors.Wrapf(err, "unexpected query filter for tenant %s: %v",
ts.options.TenantId, stream.Streams[0].BinlogSource.Filter.Rules[0].Filter)
return vterrors.Wrapf(err, "unexpected query filter: %v",
stream.Streams[0].BinlogSource.Filter.Rules[0].Filter)
}
where = &sqlparser.Where{Expr: sqlparser.AndExpressions(exprs...)}
break // We only needed to examine the first stream
}
if where == nil {
return vterrors.Wrapf(err, "no filter found for tenant %s", ts.options.TenantId)
return vterrors.Wrap(err, "no delete filter found")
}

err = ts.ForAllTargets(func(target *MigrationTarget) error {
return ts.ForAllTargets(func(target *MigrationTarget) error {
for _, table := range ts.tables {
escapedTable, err := sqlescape.EnsureEscaped(table)
if err != nil {
return vterrors.Wrapf(err, "unable to escape table name %q", table)
}
stmt, err := s.env.Parser().Parse(fmt.Sprintf("delete from %s", escapedTable))
if err != nil {
return vterrors.Wrapf(err, "unable to build delete query for tenant %s", ts.options.TenantId)
return vterrors.Wrap(err, "unable to build delete query")
}
del, ok := stmt.(*sqlparser.Delete)
if !ok {
return vterrors.Wrapf(err, "unable to build delete query for tenant %s", ts.options.TenantId)
return vterrors.Wrap(err, "unable to build delete query")
}
del.Where = where
del.Limit = &sqlparser.Limit{Rowcount: sqlparser.NewIntLiteral("1000")}
Expand All @@ -2838,26 +2839,20 @@ func (s *Server) DeleteTenantData(ctx context.Context, ts *trafficSwitcher) erro
DbName: target.primary.DbName(),
})
if err != nil {
return vterrors.Wrapf(err, "error deleting data for tenant %s", ts.options.TenantId)
return vterrors.Wrapf(err, "error deleting data using query %q", query)
}
if res.RowsAffected == 0 { // We're done
return nil
}
select {
case <-ctx.Done():
return vterrors.Wrapf(ctx.Err(), "context cancelled while deleting data for tenant %s",
ts.options.TenantId)
return vterrors.Wrap(ctx.Err(), "context cancelled while deleting")
default:
}
}
}
return err
return nil
})
if err != nil {
return vterrors.Wrapf(err, "failed to delete data for tenant %s", ts.options.TenantId)
}

return nil
}

func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workflowName string) (*trafficSwitcher, error) {
Expand Down

0 comments on commit 6064bad

Please sign in to comment.