From 3b5ba5c5459469bac0c8748da5881c1d730a61e5 Mon Sep 17 00:00:00 2001 From: wlwilliamx <53336371+wlwilliamx@users.noreply.github.com> Date: Fri, 20 Dec 2024 22:07:30 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #11869 Signed-off-by: ti-chi-bot --- cdc/entry/schema/snapshot.go | 7 +++++++ cdc/entry/schema_storage.go | 35 ++++++++++++++++++++++++++++++----- 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/cdc/entry/schema/snapshot.go b/cdc/entry/schema/snapshot.go index 5fb9fb85d91..8af74188e49 100644 --- a/cdc/entry/schema/snapshot.go +++ b/cdc/entry/schema/snapshot.go @@ -103,6 +103,7 @@ func (s *Snapshot) FillSchemaName(job *timodel.Job) error { return nil } +<<<<<<< HEAD // GetSchemaVersion returns the schema version of the meta. func GetSchemaVersion(meta *timeta.Meta) (int64, error) { // After we get the schema version at startTs, if the diff corresponding to that version does not exist, @@ -124,6 +125,12 @@ func GetSchemaVersion(meta *timeta.Meta) (int64, error) { // NewSingleSnapshotFromMeta creates a new single schema snapshot from a tidb meta func NewSingleSnapshotFromMeta( meta *timeta.Meta, +======= +// NewSnapshotFromMeta creates a schema snapshot from meta. +func NewSnapshotFromMeta( + id model.ChangeFeedID, + meta timeta.Reader, +>>>>>>> c5b8800f8e (schemaStorage(ticdc): remove `schemaVersion` in `schemaStorage` (#11869)) currentTs uint64, forceReplicate bool, filter filter.Filter, diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index f5061ad03e8..4c846a4f297 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -53,12 +53,21 @@ type SchemaStorage interface { DoGC(ts uint64) (lastSchemaTs uint64) } +<<<<<<< HEAD type schemaStorageImpl struct { snaps []*schema.Snapshot snapsMu sync.RWMutex gcTs uint64 resolvedTs uint64 schemaVersion int64 +======= +type schemaStorage struct { + snaps []*schema.Snapshot + snapsMu sync.RWMutex + + gcTs uint64 + resolvedTs uint64 +>>>>>>> c5b8800f8e (schemaStorage(ticdc): remove `schemaVersion` in `schemaStorage` (#11869)) forceReplicate bool @@ -73,9 +82,14 @@ func NewSchemaStorage( role util.Role, filter filter.Filter, ) (SchemaStorage, error) { var ( +<<<<<<< HEAD snap *schema.Snapshot err error version int64 +======= + snap *schema.Snapshot + err error +>>>>>>> c5b8800f8e (schemaStorage(ticdc): remove `schemaVersion` in `schemaStorage` (#11869)) ) if meta == nil { snap = schema.NewEmptySnapshot(forceReplicate) @@ -84,7 +98,6 @@ func NewSchemaStorage( if err != nil { return nil, errors.Trace(err) } - version, err = schema.GetSchemaVersion(meta) if err != nil { return nil, errors.Trace(err) } @@ -98,7 +111,6 @@ func NewSchemaStorage( resolvedTs: startTs, forceReplicate: forceReplicate, id: id, - schemaVersion: version, role: role, } return schema, nil @@ -177,7 +189,6 @@ func (s *schemaStorageImpl) GetLastSnapshot() *schema.Snapshot { // HandleDDLJob creates a new snapshot in storage and handles the ddl job func (s *schemaStorageImpl) HandleDDLJob(job *timodel.Job) error { if s.skipJob(job) { - s.schemaVersion = job.BinlogInfo.SchemaVersion s.AdvanceResolvedTs(job.BinlogInfo.FinishedTS) return nil } @@ -186,16 +197,20 @@ func (s *schemaStorageImpl) HandleDDLJob(job *timodel.Job) error { var snap *schema.Snapshot if len(s.snaps) > 0 { lastSnap := s.snaps[len(s.snaps)-1] +<<<<<<< HEAD // We use schemaVersion to check if an already-executed DDL job is processed for a second time. // Unexecuted DDL jobs should have largest schemaVersions. if job.BinlogInfo.FinishedTS <= lastSnap.CurrentTs() || job.BinlogInfo.SchemaVersion <= s.schemaVersion { log.Info("ignore foregone DDL", +======= + if job.BinlogInfo.FinishedTS <= lastSnap.CurrentTs() { + log.Info("schemaStorage: ignore foregone DDL", +>>>>>>> c5b8800f8e (schemaStorage(ticdc): remove `schemaVersion` in `schemaStorage` (#11869)) zap.String("namespace", s.id.Namespace), zap.String("changefeed", s.id.ID), zap.String("DDL", job.Query), zap.Int64("jobID", job.ID), zap.Uint64("finishTs", job.BinlogInfo.FinishedTS), - zap.Int64("schemaVersion", s.schemaVersion), zap.Int64("jobSchemaVersion", job.BinlogInfo.SchemaVersion), zap.String("role", s.role.String())) return nil @@ -223,8 +238,18 @@ func (s *schemaStorageImpl) HandleDDLJob(job *timodel.Job) error { zap.String("role", s.role.String())) s.snaps = append(s.snaps, snap) - s.schemaVersion = job.BinlogInfo.SchemaVersion s.AdvanceResolvedTs(job.BinlogInfo.FinishedTS) +<<<<<<< HEAD +======= + log.Info("schemaStorage: update snapshot by the DDL job", + zap.String("namespace", s.id.Namespace), + zap.String("changefeed", s.id.ID), + zap.String("schema", job.SchemaName), + zap.String("table", job.TableName), + zap.String("query", job.Query), + zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS), + zap.String("role", s.role.String())) +>>>>>>> c5b8800f8e (schemaStorage(ticdc): remove `schemaVersion` in `schemaStorage` (#11869)) return nil } From 7796345bf8616fcb60c2e8751ed6b45c73fc5a50 Mon Sep 17 00:00:00 2001 From: wlwilliamx Date: Tue, 24 Dec 2024 18:42:15 +0800 Subject: [PATCH 2/2] schema storage: resolve conflicts --- cdc/entry/schema/snapshot.go | 25 ------------------------ cdc/entry/schema_storage.go | 38 ++---------------------------------- 2 files changed, 2 insertions(+), 61 deletions(-) diff --git a/cdc/entry/schema/snapshot.go b/cdc/entry/schema/snapshot.go index 8af74188e49..29a86222926 100644 --- a/cdc/entry/schema/snapshot.go +++ b/cdc/entry/schema/snapshot.go @@ -103,34 +103,9 @@ func (s *Snapshot) FillSchemaName(job *timodel.Job) error { return nil } -<<<<<<< HEAD -// GetSchemaVersion returns the schema version of the meta. -func GetSchemaVersion(meta *timeta.Meta) (int64, error) { - // After we get the schema version at startTs, if the diff corresponding to that version does not exist, - // it means that the job is not committed yet, so we should subtract one from the version, i.e., version--. - version, err := meta.GetSchemaVersion() - if err != nil { - return 0, errors.Trace(err) - } - diff, err := meta.GetSchemaDiff(version) - if err != nil { - return 0, errors.Trace(err) - } - if diff == nil { - version-- - } - return version, nil -} - // NewSingleSnapshotFromMeta creates a new single schema snapshot from a tidb meta func NewSingleSnapshotFromMeta( meta *timeta.Meta, -======= -// NewSnapshotFromMeta creates a schema snapshot from meta. -func NewSnapshotFromMeta( - id model.ChangeFeedID, - meta timeta.Reader, ->>>>>>> c5b8800f8e (schemaStorage(ticdc): remove `schemaVersion` in `schemaStorage` (#11869)) currentTs uint64, forceReplicate bool, filter filter.Filter, diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index 4c846a4f297..1d4aedad5bf 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -53,21 +53,11 @@ type SchemaStorage interface { DoGC(ts uint64) (lastSchemaTs uint64) } -<<<<<<< HEAD type schemaStorageImpl struct { - snaps []*schema.Snapshot - snapsMu sync.RWMutex - gcTs uint64 - resolvedTs uint64 - schemaVersion int64 -======= -type schemaStorage struct { - snaps []*schema.Snapshot - snapsMu sync.RWMutex - + snaps []*schema.Snapshot + snapsMu sync.RWMutex gcTs uint64 resolvedTs uint64 ->>>>>>> c5b8800f8e (schemaStorage(ticdc): remove `schemaVersion` in `schemaStorage` (#11869)) forceReplicate bool @@ -82,14 +72,8 @@ func NewSchemaStorage( role util.Role, filter filter.Filter, ) (SchemaStorage, error) { var ( -<<<<<<< HEAD - snap *schema.Snapshot - err error - version int64 -======= snap *schema.Snapshot err error ->>>>>>> c5b8800f8e (schemaStorage(ticdc): remove `schemaVersion` in `schemaStorage` (#11869)) ) if meta == nil { snap = schema.NewEmptySnapshot(forceReplicate) @@ -197,15 +181,8 @@ func (s *schemaStorageImpl) HandleDDLJob(job *timodel.Job) error { var snap *schema.Snapshot if len(s.snaps) > 0 { lastSnap := s.snaps[len(s.snaps)-1] -<<<<<<< HEAD - // We use schemaVersion to check if an already-executed DDL job is processed for a second time. - // Unexecuted DDL jobs should have largest schemaVersions. - if job.BinlogInfo.FinishedTS <= lastSnap.CurrentTs() || job.BinlogInfo.SchemaVersion <= s.schemaVersion { - log.Info("ignore foregone DDL", -======= if job.BinlogInfo.FinishedTS <= lastSnap.CurrentTs() { log.Info("schemaStorage: ignore foregone DDL", ->>>>>>> c5b8800f8e (schemaStorage(ticdc): remove `schemaVersion` in `schemaStorage` (#11869)) zap.String("namespace", s.id.Namespace), zap.String("changefeed", s.id.ID), zap.String("DDL", job.Query), @@ -239,17 +216,6 @@ func (s *schemaStorageImpl) HandleDDLJob(job *timodel.Job) error { s.snaps = append(s.snaps, snap) s.AdvanceResolvedTs(job.BinlogInfo.FinishedTS) -<<<<<<< HEAD -======= - log.Info("schemaStorage: update snapshot by the DDL job", - zap.String("namespace", s.id.Namespace), - zap.String("changefeed", s.id.ID), - zap.String("schema", job.SchemaName), - zap.String("table", job.TableName), - zap.String("query", job.Query), - zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS), - zap.String("role", s.role.String())) ->>>>>>> c5b8800f8e (schemaStorage(ticdc): remove `schemaVersion` in `schemaStorage` (#11869)) return nil }