Skip to content

Commit

Permalink
TestMYLogical/consistent passed
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhouXing19 committed Nov 6, 2024
1 parent d1f24b5 commit d2e6bec
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 7 deletions.
3 changes: 1 addition & 2 deletions .github/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,7 @@ services:
ports:
- "3306:3306"
mysql-v8:
image: mysql:8-debian
platform: linux/x86_64
image: mysql:8.0
environment:
MYSQL_ROOT_PASSWORD: SoupOrSecret
MYSQL_DATABASE: _replicator
Expand Down
3 changes: 1 addition & 2 deletions internal/sequencer/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,7 @@ func (s *Core) Start(
// don't want to lose this update.
// Synthesize a fake table name from the
// group.
stat.Progress().Put(ident.NewTable(
group.Enclosing, group.Name), advanceTo)
stat.Progress().Put(group.GroupTable(), advanceTo)
} else {
for _, table := range group.Tables {
stat.Progress().Put(table, advanceTo)
Expand Down
19 changes: 16 additions & 3 deletions internal/sequencer/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,22 +91,35 @@ func NewStat(group *types.TableGroup, progress *ident.TableMap[hlc.Range]) Stat
// in the group, or if the group is empty, [hlc.RangeEmpty] will be
// returned.
func CommonProgress(s Stat) hlc.Range {
if s == nil || len(s.Group().Tables) == 0 {
if s == nil {
return hlc.RangeEmpty()
}

group := s.Group()
progress := s.Progress()

commonMax := hlc.New(math.MaxInt64, math.MaxInt)
for _, table := range group.Tables {
ts, ok := progress.Get(table)

if len(group.Tables) == 0 {
ts, ok := progress.Get(group.GroupTable())
if !ok {
return hlc.RangeEmpty()
}
if hlc.Compare(ts.Max(), commonMax) < 0 {
commonMax = ts.Max()
}
} else {
for _, table := range group.Tables {
ts, ok := progress.Get(table)
if !ok {
return hlc.RangeEmpty()
}
if hlc.Compare(ts.Max(), commonMax) < 0 {
commonMax = ts.Max()
}
}
}

return hlc.RangeExcluding(hlc.Zero(), commonMax)
}

Expand Down
4 changes: 4 additions & 0 deletions internal/sequencer/sequtil/lease_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ func LeaseGroup(
names[idx] = fmt.Sprintf("sequtil.Lease.%s", table.Canonical().Raw())
}

if len(names) == 0 {
names = append(names, fmt.Sprintf("sequtil.Lease.%s", group.Name.Raw()))
}

// Run this in a loop in case of non-renewal. This is likely
// caused by database overload or any other case where we can't
// run SQL in a timely fashion.
Expand Down
7 changes: 7 additions & 0 deletions internal/types/table_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,10 @@ func (g *TableGroup) String() string {
sb.WriteString(" ]")
return sb.String()
}

// GroupTable is only used as a key when there is no table in this group
// (i.e. len(g.Tables()) == 0) but we still want to track progress of
// of the whole group.
func (g *TableGroup) GroupTable() ident.Table {
return ident.NewTable(g.Enclosing, g.Name)
}

0 comments on commit d2e6bec

Please sign in to comment.