From e7e0d8a47ef282671eb29108b1e592f01cc0b3ca Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 21 Nov 2024 16:32:33 +0800 Subject: [PATCH 1/5] adjus logs --- cdc/kv/shared_client.go | 1 + cdc/kv/shared_stream.go | 19 ++++++++++++++++--- cdc/puller/multiplexing_puller.go | 6 +++--- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/cdc/kv/shared_client.go b/cdc/kv/shared_client.go index 789fbfa6f30..36029763afd 100644 --- a/cdc/kv/shared_client.go +++ b/cdc/kv/shared_client.go @@ -293,6 +293,7 @@ func (s *SharedClient) Subscribe(subID SubscriptionID, span tablepb.Span, startT zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), zap.Any("subscriptionID", rt.subscriptionID), + zap.Uint64("startTs", startTs), zap.String("span", rt.span.String())) } diff --git a/cdc/kv/shared_stream.go b/cdc/kv/shared_stream.go index 7a9b1204f89..03cfae9c82c 100644 --- a/cdc/kv/shared_stream.go +++ b/cdc/kv/shared_stream.go @@ -362,18 +362,31 @@ func (s *requestedStream) send(ctx context.Context, c *SharedClient, rs *request } else if cc, err = getTableExclusiveConn(subscriptionID); err != nil { return err } - if err = cc.Client().Send(c.createRegionRequest(region)); err != nil { + request := c.createRegionRequest(region) + if err = cc.Client().Send(request); err != nil { log.Warn("event feed send request to grpc stream failed", zap.String("namespace", c.changefeed.Namespace), zap.String("changefeed", c.changefeed.ID), + zap.Int64("tableID", region.span.TableID), + zap.Uint64("regionID", request.RegionId), + zap.Uint64("checkpointTs", request.CheckpointTs), zap.Uint64("streamID", s.streamID), zap.Any("subscriptionID", subscriptionID), - zap.Uint64("regionID", region.verID.GetID()), - zap.Int64("tableID", region.span.TableID), zap.Uint64("storeID", rs.storeID), zap.String("addr", rs.storeAddr), zap.Error(err)) } + log.Info("event feed send request to grpc stream", + zap.String("namespace", c.changefeed.Namespace), + zap.String("changefeed", c.changefeed.ID), + zap.Int64("tableID", region.span.TableID), + zap.Uint64("regionID", request.RegionId), + zap.Uint64("checkpointTs", request.CheckpointTs), + zap.Uint64("streamID", s.streamID), + zap.Any("subscriptionID", subscriptionID), + zap.Uint64("storeID", rs.storeID), + zap.String("addr", rs.storeAddr), + zap.Error(err)) } if region, err = fetchMoreReq(); err != nil { diff --git a/cdc/puller/multiplexing_puller.go b/cdc/puller/multiplexing_puller.go index a7821a04b95..2cbce99b8dc 100644 --- a/cdc/puller/multiplexing_puller.go +++ b/cdc/puller/multiplexing_puller.go @@ -95,11 +95,11 @@ func (p *tableProgress) handleResolvedSpans(ctx context.Context, e *model.Resolv log.Info("puller is initialized", zap.String("namespace", p.changefeed.Namespace), zap.String("changefeed", p.changefeed.ID), - zap.String("tableName", p.tableName), - zap.Any("tableID", p.spans), zap.Uint64("resolvedTs", resolvedTs), zap.Duration("duration", time.Since(p.start)), - ) + zap.Int64("tableID", p.spans[0].TableID), + zap.String("tableName", p.tableName), + zap.Any("tableID", p.spans)) } if resolvedTs > p.resolvedTs.Load() { p.resolvedTs.Store(resolvedTs) From c801725603634c7367ad08e321514108fd98f3d6 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 21 Nov 2024 19:15:49 +0800 Subject: [PATCH 2/5] add table id to span --- pkg/spanz/span.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/spanz/span.go b/pkg/spanz/span.go index 43b715c1a93..96f9f67cd14 100644 --- a/pkg/spanz/span.go +++ b/pkg/spanz/span.go @@ -148,7 +148,7 @@ func Intersect(lhs tablepb.Span, rhs tablepb.Span) (span tablepb.Span, err error end = rhs.EndKey } - return tablepb.Span{StartKey: start, EndKey: end}, nil + return tablepb.Span{TableID: lhs.TableID, StartKey: start, EndKey: end}, nil } // IsSubSpan returns true if the sub span is parents spans From fd5a37a73e121b3e9dea0441ac6321a770f24adc Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 22 Nov 2024 16:55:09 +0800 Subject: [PATCH 3/5] adjust the log --- cdc/puller/multiplexing_puller.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cdc/puller/multiplexing_puller.go b/cdc/puller/multiplexing_puller.go index 2cbce99b8dc..fea155eb61e 100644 --- a/cdc/puller/multiplexing_puller.go +++ b/cdc/puller/multiplexing_puller.go @@ -98,8 +98,7 @@ func (p *tableProgress) handleResolvedSpans(ctx context.Context, e *model.Resolv zap.Uint64("resolvedTs", resolvedTs), zap.Duration("duration", time.Since(p.start)), zap.Int64("tableID", p.spans[0].TableID), - zap.String("tableName", p.tableName), - zap.Any("tableID", p.spans)) + zap.String("tableName", p.tableName)) } if resolvedTs > p.resolvedTs.Load() { p.resolvedTs.Store(resolvedTs) From bc738f4bef7793e5f7e7629e037743599c3e0154 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 22 Nov 2024 17:38:04 +0800 Subject: [PATCH 4/5] adjust the log --- cdc/kv/shared_stream.go | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/cdc/kv/shared_stream.go b/cdc/kv/shared_stream.go index 03cfae9c82c..baa1c6e46d2 100644 --- a/cdc/kv/shared_stream.go +++ b/cdc/kv/shared_stream.go @@ -376,17 +376,6 @@ func (s *requestedStream) send(ctx context.Context, c *SharedClient, rs *request zap.String("addr", rs.storeAddr), zap.Error(err)) } - log.Info("event feed send request to grpc stream", - zap.String("namespace", c.changefeed.Namespace), - zap.String("changefeed", c.changefeed.ID), - zap.Int64("tableID", region.span.TableID), - zap.Uint64("regionID", request.RegionId), - zap.Uint64("checkpointTs", request.CheckpointTs), - zap.Uint64("streamID", s.streamID), - zap.Any("subscriptionID", subscriptionID), - zap.Uint64("storeID", rs.storeID), - zap.String("addr", rs.storeAddr), - zap.Error(err)) } if region, err = fetchMoreReq(); err != nil { From 5742e3ab56de00ebf836280719eb6af3986659ed Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 22 Nov 2024 17:56:26 +0800 Subject: [PATCH 5/5] adjust the log --- cdc/entry/schema_storage.go | 22 +++++++++++++------ cdc/kv/shared_client.go | 3 --- cdc/owner/ddl_manager.go | 14 +++++------- cdc/puller/ddl_puller.go | 12 +--------- cdc/puller/multiplexing_puller.go | 2 +- .../internal/v3/member/capture_manager.go | 4 ++-- 6 files changed, 25 insertions(+), 32 deletions(-) diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index 999c8734b5a..06b3dd107f3 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -209,12 +209,14 @@ func (s *schemaStorage) HandleDDLJob(job *timodel.Job) error { log.Info("schemaStorage: ignore foregone DDL", zap.String("namespace", s.id.Namespace), zap.String("changefeed", s.id.ID), - zap.String("DDL", job.Query), + zap.String("role", s.role.String()), zap.Int64("jobID", job.ID), + zap.String("schema", job.SchemaName), + zap.String("table", job.TableName), + zap.String("query", job.Query), zap.Uint64("finishTs", job.BinlogInfo.FinishedTS), zap.Int64("schemaVersion", s.schemaVersion), - zap.Int64("jobSchemaVersion", job.BinlogInfo.SchemaVersion), - zap.String("role", s.role.String())) + zap.Int64("jobSchemaVersion", job.BinlogInfo.SchemaVersion)) return nil } snap = lastSnap.Copy() @@ -225,11 +227,14 @@ func (s *schemaStorage) HandleDDLJob(job *timodel.Job) error { log.Error("schemaStorage: update snapshot by the DDL job failed", zap.String("namespace", s.id.Namespace), zap.String("changefeed", s.id.ID), + zap.String("role", s.role.String()), + zap.Int64("jobID", job.ID), zap.String("schema", job.SchemaName), zap.String("table", job.TableName), zap.String("query", job.Query), - zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS), - zap.String("role", s.role.String()), + zap.Uint64("finishTs", job.BinlogInfo.FinishedTS), + zap.Int64("schemaVersion", s.schemaVersion), + zap.Int64("jobSchemaVersion", job.BinlogInfo.SchemaVersion), zap.Error(err)) return errors.Trace(err) } @@ -239,12 +244,15 @@ func (s *schemaStorage) HandleDDLJob(job *timodel.Job) error { log.Info("schemaStorage: update snapshot by the DDL job", zap.String("namespace", s.id.Namespace), zap.String("changefeed", s.id.ID), + zap.String("role", s.role.String()), zap.String("schema", job.SchemaName), zap.String("table", job.TableName), - zap.String("query", job.Query), + zap.Int64("jobID", job.ID), + zap.Uint64("startTs", job.StartTS), zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS), zap.Uint64("schemaVersion", uint64(s.schemaVersion)), - zap.String("role", s.role.String())) + zap.String("query", job.Query)) + return nil } diff --git a/cdc/kv/shared_client.go b/cdc/kv/shared_client.go index 36029763afd..570d8351bf4 100644 --- a/cdc/kv/shared_client.go +++ b/cdc/kv/shared_client.go @@ -824,9 +824,6 @@ func (s *SharedClient) logSlowRegions(ctx context.Context) error { return ctx.Err() case <-ticker.C: } - log.Info("event feed starts to check locked regions", - zap.String("namespace", s.changefeed.Namespace), - zap.String("changefeed", s.changefeed.ID)) currTime := s.pdClock.CurrentTime() s.totalSpans.RLock() diff --git a/cdc/owner/ddl_manager.go b/cdc/owner/ddl_manager.go index e709d086213..b8e4cce998b 100644 --- a/cdc/owner/ddl_manager.go +++ b/cdc/owner/ddl_manager.go @@ -293,19 +293,17 @@ func (m *ddlManager) tick( continue } - log.Info("handle a ddl job", - zap.String("namespace", m.changfeedID.Namespace), - zap.String("changefeed", m.changfeedID.ID), - zap.Int64("tableID", job.TableID), - zap.Int64("jobID", job.ID), - zap.String("query", job.Query), - zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS), - ) events, err := m.schema.BuildDDLEvents(ctx, job) if err != nil { return nil, nil, err } + log.Info("build ddl events from the ddl job", + zap.String("namespace", m.changfeedID.Namespace), + zap.String("changefeed", m.changfeedID.ID), + zap.Int64("tableID", job.TableID), + zap.Int64("jobID", job.ID)) + for _, event := range events { tableName := event.TableInfo.TableName m.pendingDDLs[tableName] = append(m.pendingDDLs[tableName], event) diff --git a/cdc/puller/ddl_puller.go b/cdc/puller/ddl_puller.go index f9fc521f3d8..57c5f103308 100644 --- a/cdc/puller/ddl_puller.go +++ b/cdc/puller/ddl_puller.go @@ -210,12 +210,7 @@ func (p *ddlJobPullerImpl) handleRawKVEntry(ctx context.Context, ddlRawKV *model log.Info("a new ddl job is received", zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), - zap.String("schema", job.SchemaName), - zap.String("table", job.TableName), - zap.Uint64("startTs", job.StartTS), - zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS), - zap.String("query", job.Query), - zap.Any("job", job)) + zap.Int64("jobID", job.ID)) } jobEntry := &model.DDLJobEntry{ @@ -671,11 +666,6 @@ func (h *ddlPullerImpl) addToPending(job *timodel.Job) { log.Info("ddl puller receives new pending job", zap.String("namespace", h.changefeedID.Namespace), zap.String("changefeed", h.changefeedID.ID), - zap.String("schema", job.SchemaName), - zap.String("table", job.TableName), - zap.String("query", job.Query), - zap.Uint64("startTs", job.StartTS), - zap.Uint64("finishTs", job.BinlogInfo.FinishedTS), zap.Int64("jobID", job.ID)) } diff --git a/cdc/puller/multiplexing_puller.go b/cdc/puller/multiplexing_puller.go index fea155eb61e..9e5a5c1115c 100644 --- a/cdc/puller/multiplexing_puller.go +++ b/cdc/puller/multiplexing_puller.go @@ -95,8 +95,8 @@ func (p *tableProgress) handleResolvedSpans(ctx context.Context, e *model.Resolv log.Info("puller is initialized", zap.String("namespace", p.changefeed.Namespace), zap.String("changefeed", p.changefeed.ID), - zap.Uint64("resolvedTs", resolvedTs), zap.Duration("duration", time.Since(p.start)), + zap.Uint64("resolvedTs", resolvedTs), zap.Int64("tableID", p.spans[0].TableID), zap.String("tableName", p.tableName)) } diff --git a/cdc/scheduler/internal/v3/member/capture_manager.go b/cdc/scheduler/internal/v3/member/capture_manager.go index 651b28be71b..63f5f361e3d 100644 --- a/cdc/scheduler/internal/v3/member/capture_manager.go +++ b/cdc/scheduler/internal/v3/member/capture_manager.go @@ -246,8 +246,8 @@ func (c *CaptureManager) HandleAliveCaptureUpdate( log.Info("schedulerv3: find a new capture", zap.String("namespace", c.changefeedID.Namespace), zap.String("changefeed", c.changefeedID.ID), - zap.String("captureAddr", info.AdvertiseAddr), - zap.String("capture", id)) + zap.String("capture", id), + zap.String("captureAddr", info.AdvertiseAddr)) msgs = append(msgs, &schedulepb.Message{ To: id, MsgType: schedulepb.MsgHeartbeat,