Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvclient: adjust logs to reduce duplication and make not too verbose #11789

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,14 @@ func (s *schemaStorage) HandleDDLJob(job *timodel.Job) error {
log.Info("schemaStorage: ignore foregone DDL",
zap.String("namespace", s.id.Namespace),
zap.String("changefeed", s.id.ID),
zap.String("DDL", job.Query),
zap.String("role", s.role.String()),
zap.Int64("jobID", job.ID),
zap.String("schema", job.SchemaName),
zap.String("table", job.TableName),
zap.String("query", job.Query),
zap.Uint64("finishTs", job.BinlogInfo.FinishedTS),
zap.Int64("schemaVersion", s.schemaVersion),
zap.Int64("jobSchemaVersion", job.BinlogInfo.SchemaVersion),
zap.String("role", s.role.String()))
zap.Int64("jobSchemaVersion", job.BinlogInfo.SchemaVersion))
return nil
}
snap = lastSnap.Copy()
Expand All @@ -225,11 +227,14 @@ func (s *schemaStorage) HandleDDLJob(job *timodel.Job) error {
log.Error("schemaStorage: update snapshot by the DDL job failed",
zap.String("namespace", s.id.Namespace),
zap.String("changefeed", s.id.ID),
zap.String("role", s.role.String()),
zap.Int64("jobID", job.ID),
zap.String("schema", job.SchemaName),
zap.String("table", job.TableName),
zap.String("query", job.Query),
zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS),
zap.String("role", s.role.String()),
zap.Uint64("finishTs", job.BinlogInfo.FinishedTS),
zap.Int64("schemaVersion", s.schemaVersion),
zap.Int64("jobSchemaVersion", job.BinlogInfo.SchemaVersion),
zap.Error(err))
return errors.Trace(err)
}
Expand All @@ -239,12 +244,15 @@ func (s *schemaStorage) HandleDDLJob(job *timodel.Job) error {
log.Info("schemaStorage: update snapshot by the DDL job",
zap.String("namespace", s.id.Namespace),
zap.String("changefeed", s.id.ID),
zap.String("role", s.role.String()),
zap.String("schema", job.SchemaName),
zap.String("table", job.TableName),
zap.String("query", job.Query),
zap.Int64("jobID", job.ID),
zap.Uint64("startTs", job.StartTS),
zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS),
zap.Uint64("schemaVersion", uint64(s.schemaVersion)),
zap.String("role", s.role.String()))
zap.String("query", job.Query))

return nil
}

Expand Down
4 changes: 1 addition & 3 deletions cdc/kv/shared_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ func (s *SharedClient) Subscribe(subID SubscriptionID, span tablepb.Span, startT
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", rt.subscriptionID),
zap.Uint64("startTs", startTs),
zap.String("span", rt.span.String()))
}

Expand Down Expand Up @@ -823,9 +824,6 @@ func (s *SharedClient) logSlowRegions(ctx context.Context) error {
return ctx.Err()
case <-ticker.C:
}
log.Info("event feed starts to check locked regions",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID))

currTime := s.pdClock.CurrentTime()
s.totalSpans.RLock()
Expand Down
8 changes: 5 additions & 3 deletions cdc/kv/shared_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,14 +362,16 @@ func (s *requestedStream) send(ctx context.Context, c *SharedClient, rs *request
} else if cc, err = getTableExclusiveConn(subscriptionID); err != nil {
return err
}
if err = cc.Client().Send(c.createRegionRequest(region)); err != nil {
request := c.createRegionRequest(region)
if err = cc.Client().Send(request); err != nil {
log.Warn("event feed send request to grpc stream failed",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Int64("tableID", region.span.TableID),
zap.Uint64("regionID", request.RegionId),
zap.Uint64("checkpointTs", request.CheckpointTs),
zap.Uint64("streamID", s.streamID),
zap.Any("subscriptionID", subscriptionID),
zap.Uint64("regionID", region.verID.GetID()),
zap.Int64("tableID", region.span.TableID),
zap.Uint64("storeID", rs.storeID),
zap.String("addr", rs.storeAddr),
zap.Error(err))
Expand Down
16 changes: 7 additions & 9 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,20 +293,18 @@ func (m *ddlManager) tick(
continue
}

// Note: do not change the key words in the log, it is used to search the
// FinishTS of the DDL job. Some integration tests and users depend on it.
log.Info("handle a ddl job",
events, err := m.schema.BuildDDLEvents(ctx, job)
if err != nil {
return nil, nil, err
}

log.Info("build ddl events from the ddl job",
zap.String("namespace", m.changfeedID.Namespace),
zap.String("changefeed", m.changfeedID.ID),
zap.Int64("tableID", job.TableID),
zap.Int64("jobID", job.ID),
zap.String("query", job.Query),
zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS),
)
events, err := m.schema.BuildDDLEvents(ctx, job)
if err != nil {
return nil, nil, err
}
zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS))

for _, event := range events {
snap := m.schema.GetLastSnapshot()
Expand Down
12 changes: 1 addition & 11 deletions cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,7 @@ func (p *ddlJobPullerImpl) handleRawKVEntry(ctx context.Context, ddlRawKV *model
log.Info("a new ddl job is received",
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.String("schema", job.SchemaName),
zap.String("table", job.TableName),
zap.Uint64("startTs", job.StartTS),
zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS),
zap.String("query", job.Query),
zap.Any("job", job))
zap.Int64("jobID", job.ID))
}

jobEntry := &model.DDLJobEntry{
Expand Down Expand Up @@ -700,11 +695,6 @@ func (h *ddlPullerImpl) addToPending(job *timodel.Job) {
log.Info("ddl puller receives new pending job",
zap.String("namespace", h.changefeedID.Namespace),
zap.String("changefeed", h.changefeedID.ID),
zap.String("schema", job.SchemaName),
zap.String("table", job.TableName),
zap.String("query", job.Query),
zap.Uint64("startTs", job.StartTS),
zap.Uint64("finishTs", job.BinlogInfo.FinishedTS),
zap.Int64("jobID", job.ID))
}

Expand Down
7 changes: 3 additions & 4 deletions cdc/puller/multiplexing_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,10 @@ func (p *tableProgress) handleResolvedSpans(ctx context.Context, e *model.Resolv
log.Info("puller is initialized",
zap.String("namespace", p.changefeed.Namespace),
zap.String("changefeed", p.changefeed.ID),
zap.String("tableName", p.tableName),
zap.Any("tableID", p.spans),
zap.Uint64("resolvedTs", resolvedTs),
zap.Duration("duration", time.Since(p.start)),
)
zap.Uint64("resolvedTs", resolvedTs),
zap.Int64("tableID", p.spans[0].TableID),
zap.String("tableName", p.tableName))
}
if resolvedTs > p.resolvedTs.Load() {
p.resolvedTs.Store(resolvedTs)
Expand Down
4 changes: 2 additions & 2 deletions cdc/scheduler/internal/v3/member/capture_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,8 @@ func (c *CaptureManager) HandleAliveCaptureUpdate(
log.Info("schedulerv3: find a new capture",
zap.String("namespace", c.changefeedID.Namespace),
zap.String("changefeed", c.changefeedID.ID),
zap.String("captureAddr", info.AdvertiseAddr),
zap.String("capture", id))
zap.String("capture", id),
zap.String("captureAddr", info.AdvertiseAddr))
msgs = append(msgs, &schedulepb.Message{
To: id,
MsgType: schedulepb.MsgHeartbeat,
Expand Down
2 changes: 1 addition & 1 deletion pkg/spanz/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func Intersect(lhs tablepb.Span, rhs tablepb.Span) (span tablepb.Span, err error
end = rhs.EndKey
}

return tablepb.Span{StartKey: start, EndKey: end}, nil
return tablepb.Span{TableID: lhs.TableID, StartKey: start, EndKey: end}, nil
}

// IsSubSpan returns true if the sub span is parents spans
Expand Down
Loading