Skip to content
This repository has been archived by the owner on Feb 21, 2024. It is now read-only.

Commit

Permalink
RetryWithTx (#2348)
Browse files Browse the repository at this point in the history
* First pass at RetryWithTx

* Refactor RetryWithTx to take a writable bool (instead of reads, writes)

* Implment DirectiveMethodDiff

This commit adds support for a Directive to contain only the diffs (as
opposed to the full Directive).

* Update controller tests to allow for DirectiveMethodDiff (over Full)

* Update RetryWithTx to retry on duplicate key constraint.

If two concurrent processes call IngestShard() for the same shard, both
were trying to insert the same job into the jobs table. That resulted in
a duplicate key error from the database. We want to include that error
in the list of errors for which RetryWithTx should retry.

* Remove unused method: Directive.TranslatePartitions()

* Replace query in a loop with a single query

We had a query which was looking to see if a job already existed. That
query was inside a loop, and could potentially generate 256 queries (for
example). This commit replaces that logic so that we use a single query
wiht an `IN ()` clause.

* Convert to directive version-by-address

This commit uses a separate directive version per address. It moves the
version get/increment back inside the buildDirective method so that if
two concurrent processes are building a directive for the same address,
one of them will get rolled back trying to commit the version update.

* Migration for directive version by address

* Add a comment about DirectiveVersion lock/unlock logic

* Remove AddLastWins

* fix linter

* handle error in walkdir

* fix test failures from removing AddLastWins
  • Loading branch information
travisturner authored Mar 31, 2023
1 parent a5dda0c commit 8fca15e
Show file tree
Hide file tree
Showing 26 changed files with 1,747 additions and 781 deletions.
16 changes: 16 additions & 0 deletions api_directive.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,22 @@ func (api *API) ApplyDirective(ctx context.Context, d *dax.Directive) error {
// Handle the operations based on the directive method.
switch d.Method {
case dax.DirectiveMethodDiff:
// In order to prevent adding too much code specific to handling a diff
// directive (e.g. adding something like an `enactDirectiveDiff()`
// method), we are instead going to build a full Directive based on the
// diff, and then proceed normally as if we had received a full
// Directive. We do that by copying the previous Directive and then
// applying the diffs to the copy.
newD := previousDirective.Copy()

// Apply the diffs from the incoming Directive to the new, copied
// Directive.
newD.ApplyDiff(d)

// Now proceed with the new diff as if we had received it as a full diff.
d = newD

case dax.DirectiveMethodFull:
// pass: normal operation

case dax.DirectiveMethodReset:
Expand Down
8 changes: 4 additions & 4 deletions api_directive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestAPI_Directive(t *testing.T) {
// Empty directive (and empty holder).
{
d := &dax.Directive{
Method: dax.DirectiveMethodDiff,
Method: dax.DirectiveMethodFull,
Version: 1,
}
err := api.ApplyDirective(ctx, d)
Expand All @@ -41,7 +41,7 @@ func TestAPI_Directive(t *testing.T) {
// Add a new table.
{
d := &dax.Directive{
Method: dax.DirectiveMethodDiff,
Method: dax.DirectiveMethodFull,
Tables: []*dax.QualifiedTable{
tbl1,
},
Expand All @@ -55,7 +55,7 @@ func TestAPI_Directive(t *testing.T) {
// Add a new table, and keep the existing table.
{
d := &dax.Directive{
Method: dax.DirectiveMethodDiff,
Method: dax.DirectiveMethodFull,
Tables: []*dax.QualifiedTable{
tbl1,
tbl2,
Expand All @@ -70,7 +70,7 @@ func TestAPI_Directive(t *testing.T) {
// Add a new table and remove one of the existing tables.
{
d := &dax.Directive{
Method: dax.DirectiveMethodDiff,
Method: dax.DirectiveMethodFull,
Tables: []*dax.QualifiedTable{
tbl2,
tbl3,
Expand Down
4 changes: 2 additions & 2 deletions dax/controller/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (b *Balancer) assignMinWorkers(tx dax.Transaction, roleType dax.RoleType) (

diffs := NewInternalDiffs()

// Create an ordered slice of map keys so that tests are predicatable.
// Create an ordered slice of map keys so that tests are predictable.
qdbids := make([]dax.QualifiedDatabaseID, 0, len(m))
for qdbid := range m {
qdbids = append(qdbids, qdbid)
Expand Down Expand Up @@ -366,7 +366,7 @@ func (b *Balancer) addJobs(tx dax.Transaction, roleType dax.RoleType, qdbid dax.
// assigned workers until it has at least one job (which this database
// now has).
if diff, err := b.balanceDatabaseForRole(tx, roleType, qdbid); err != nil {
return nil, errors.Wrapf(err, "assigning min workers: (%s)", roleType)
return nil, errors.Wrapf(err, "balancing database for role: (%s)", roleType)
} else {
diffs.Merge(diff)
}
Expand Down
Loading

0 comments on commit 8fca15e

Please sign in to comment.