From 8fca15e936deaccdda2237678c283ebd6c0c9aac Mon Sep 17 00:00:00 2001 From: Travis Turner Date: Thu, 30 Mar 2023 20:54:37 -0500 Subject: [PATCH] RetryWithTx (#2348) * First pass at RetryWithTx * Refactor RetryWithTx to take a writable bool (instead of reads, writes) * Implment DirectiveMethodDiff This commit adds support for a Directive to contain only the diffs (as opposed to the full Directive). * Update controller tests to allow for DirectiveMethodDiff (over Full) * Update RetryWithTx to retry on duplicate key constraint. If two concurrent processes call IngestShard() for the same shard, both were trying to insert the same job into the jobs table. That resulted in a duplicate key error from the database. We want to include that error in the list of errors for which RetryWithTx should retry. * Remove unused method: Directive.TranslatePartitions() * Replace query in a loop with a single query We had a query which was looking to see if a job already existed. That query was inside a loop, and could potentially generate 256 queries (for example). This commit replaces that logic so that we use a single query wiht an `IN ()` clause. * Convert to directive version-by-address This commit uses a separate directive version per address. It moves the version get/increment back inside the buildDirective method so that if two concurrent processes are building a directive for the same address, one of them will get rolled back trying to commit the version update. * Migration for directive version by address * Add a comment about DirectiveVersion lock/unlock logic * Remove AddLastWins * fix linter * handle error in walkdir * fix test failures from removing AddLastWins --- api_directive.go | 16 + api_directive_test.go | 8 +- dax/controller/balancer/balancer.go | 4 +- dax/controller/controller.go | 1292 ++++++++++------- dax/controller/controller_test.go | 233 +-- dax/controller/sqldb/directiveversion.go | 45 +- dax/controller/sqldb/directiveversion_test.go | 51 + dax/controller/sqldb/freejob.go | 32 +- dax/controller/sqldb/migrator.go | 67 +- dax/controller/sqldb/transactor.go | 11 +- dax/controller/transactor.go | 17 - dax/directive.go | 265 +++- dax/directive_version_test.go | 32 - dax/migrations/001_initial.down.fizz | 11 +- dax/migrations/001_initial.up.fizz | 20 +- .../002_directiveversion_by_address.down.fizz | 0 .../002_directiveversion_by_address.up.fizz | 13 + dax/models/directiveversion.go | 2 +- dax/models/job.go | 10 + dax/server/test/managed.go | 12 +- dax/test/dax/dax_test.go | 4 +- dax/transaction.go | 96 +- dax/transaction_test.go | 175 +++ dax/workerjob.go | 37 +- dax/workerjob_test.go | 51 + idk/serverless/importer.go | 24 +- 26 files changed, 1747 insertions(+), 781 deletions(-) create mode 100644 dax/controller/sqldb/directiveversion_test.go delete mode 100644 dax/controller/transactor.go delete mode 100644 dax/directive_version_test.go create mode 100644 dax/migrations/002_directiveversion_by_address.down.fizz create mode 100644 dax/migrations/002_directiveversion_by_address.up.fizz create mode 100644 dax/transaction_test.go diff --git a/api_directive.go b/api_directive.go index e04b4b096..2cebc7113 100644 --- a/api_directive.go +++ b/api_directive.go @@ -35,6 +35,22 @@ func (api *API) ApplyDirective(ctx context.Context, d *dax.Directive) error { // Handle the operations based on the directive method. switch d.Method { case dax.DirectiveMethodDiff: + // In order to prevent adding too much code specific to handling a diff + // directive (e.g. adding something like an `enactDirectiveDiff()` + // method), we are instead going to build a full Directive based on the + // diff, and then proceed normally as if we had received a full + // Directive. We do that by copying the previous Directive and then + // applying the diffs to the copy. + newD := previousDirective.Copy() + + // Apply the diffs from the incoming Directive to the new, copied + // Directive. + newD.ApplyDiff(d) + + // Now proceed with the new diff as if we had received it as a full diff. + d = newD + + case dax.DirectiveMethodFull: // pass: normal operation case dax.DirectiveMethodReset: diff --git a/api_directive_test.go b/api_directive_test.go index a90a3b1ba..1ba5d0c75 100644 --- a/api_directive_test.go +++ b/api_directive_test.go @@ -30,7 +30,7 @@ func TestAPI_Directive(t *testing.T) { // Empty directive (and empty holder). { d := &dax.Directive{ - Method: dax.DirectiveMethodDiff, + Method: dax.DirectiveMethodFull, Version: 1, } err := api.ApplyDirective(ctx, d) @@ -41,7 +41,7 @@ func TestAPI_Directive(t *testing.T) { // Add a new table. { d := &dax.Directive{ - Method: dax.DirectiveMethodDiff, + Method: dax.DirectiveMethodFull, Tables: []*dax.QualifiedTable{ tbl1, }, @@ -55,7 +55,7 @@ func TestAPI_Directive(t *testing.T) { // Add a new table, and keep the existing table. { d := &dax.Directive{ - Method: dax.DirectiveMethodDiff, + Method: dax.DirectiveMethodFull, Tables: []*dax.QualifiedTable{ tbl1, tbl2, @@ -70,7 +70,7 @@ func TestAPI_Directive(t *testing.T) { // Add a new table and remove one of the existing tables. { d := &dax.Directive{ - Method: dax.DirectiveMethodDiff, + Method: dax.DirectiveMethodFull, Tables: []*dax.QualifiedTable{ tbl2, tbl3, diff --git a/dax/controller/balancer/balancer.go b/dax/controller/balancer/balancer.go index b638b8d22..6e7686f23 100644 --- a/dax/controller/balancer/balancer.go +++ b/dax/controller/balancer/balancer.go @@ -169,7 +169,7 @@ func (b *Balancer) assignMinWorkers(tx dax.Transaction, roleType dax.RoleType) ( diffs := NewInternalDiffs() - // Create an ordered slice of map keys so that tests are predicatable. + // Create an ordered slice of map keys so that tests are predictable. qdbids := make([]dax.QualifiedDatabaseID, 0, len(m)) for qdbid := range m { qdbids = append(qdbids, qdbid) @@ -366,7 +366,7 @@ func (b *Balancer) addJobs(tx dax.Transaction, roleType dax.RoleType, qdbid dax. // assigned workers until it has at least one job (which this database // now has). if diff, err := b.balanceDatabaseForRole(tx, roleType, qdbid); err != nil { - return nil, errors.Wrapf(err, "assigning min workers: (%s)", roleType) + return nil, errors.Wrapf(err, "balancing database for role: (%s)", roleType) } else { diffs.Merge(diff) } diff --git a/dax/controller/controller.go b/dax/controller/controller.go index 130278791..e5afd553a 100644 --- a/dax/controller/controller.go +++ b/dax/controller/controller.go @@ -18,6 +18,10 @@ import ( "golang.org/x/sync/errgroup" ) +const ( + txRetry = 5 +) + // Ensure type implements interface. var _ computer.Registrar = (*Controller)(nil) var _ dax.Schemar = (*Controller)(nil) @@ -29,7 +33,7 @@ type Controller struct { Schemar schemar.Schemar Balancer Balancer - Transactor Transactor + Transactor dax.Transactor Snapshotter *snapshotter.Snapshotter Writelogger *writelogger.Writelogger @@ -141,12 +145,6 @@ func (c *Controller) Stop() error { func (c *Controller) RegisterNodes(ctx context.Context, nodes ...*dax.Node) error { c.logger.Printf("c.RegisterNodes(): %s", dax.Nodes(nodes)) - tx, err := c.Transactor.BeginTx(ctx, true) - if err != nil { - return errors.Wrap(err, "beginning tx") - } - defer tx.Rollback() - // Validate input. for _, n := range nodes { if n.Address == "" { @@ -162,85 +160,92 @@ func (c *Controller) RegisterNodes(ctx context.Context, nodes ...*dax.Node) erro } } - // workerSet maintains the set of workers which have a job assignment change - // and therefore need to be sent an updated Directive. - workerSet := NewAddressSet() + var directives []*dax.Directive - // diffByAddr keeps track of the diffs that have been applied to each - // specific address. - // TODO(tlt): I don't understand why we're keeping track of the - // dax.WorkerDiff here (as opposed to just the unique Address) because it - // doesn't ever seem to be used. - diffByAddr := make(map[dax.Address]dax.WorkerDiff) + fn := func(tx dax.Transaction, writable bool) error { + // workerSet maintains the set of workers which have a job assignment change + // and therefore need to be sent an updated Directive. + workerSet := NewAddressSet() - // Create node if we don't already have it - for _, n := range nodes { - // If the node already exists, skip it. - if node, _ := c.Balancer.ReadNode(tx, n.Address); node != nil { - // If the node already exists, but it has indicated that it doesn't - // have a directive, then send it one. - if !n.HasDirective { - workerSet.Add(n.Address) + // diffByAddr keeps track of the diffs that have been applied to each + // specific address. + // TODO(tlt): I don't understand why we're keeping track of the + // dax.WorkerDiff here (as opposed to just the unique Address) because it + // doesn't ever seem to be used. + diffByAddr := make(map[dax.Address]dax.WorkerDiff) + + // Create node if we don't already have it + for _, n := range nodes { + // If the node already exists, skip it. + if node, _ := c.Balancer.ReadNode(tx, n.Address); node != nil { + // If the node already exists, but it has indicated that it doesn't + // have a directive, then send it one. + if !n.HasDirective { + workerSet.Add(n.Address) + } + continue } - continue - } - // Add the node to the workerSet so that it receives a directive. - // Even if there is currently no data for this worker (i.e. it - // doesn't result in a diffByAddr entry below), we still want to - // send it a "reset" directive so that in the off chance it has some - // local data, that data gets removed. - workerSet.Add(n.Address) + // Add the node to the workerSet so that it receives a directive. + // Even if there is currently no data for this worker (i.e. it + // doesn't result in a diffByAddr entry below), we still want to + // send it a "reset" directive so that in the off chance it has some + // local data, that data gets removed. + workerSet.Add(n.Address) - adiffs, err := c.Balancer.AddWorker(tx, n) - if err != nil { - return errors.Wrap(err, "adding worker") - } + adiffs, err := c.Balancer.AddWorker(tx, n) + if err != nil { + return errors.Wrap(err, "adding worker") + } - for _, diff := range adiffs { - existingDiff, ok := diffByAddr[dax.Address(diff.Address)] - if !ok { - existingDiff.Address = diff.Address + for _, diff := range adiffs { + existingDiff, ok := diffByAddr[dax.Address(diff.Address)] + if !ok { + existingDiff.Address = diff.Address + } + existingDiff.Add(diff) + diffByAddr[dax.Address(diff.Address)] = existingDiff } - existingDiff.Add(diff) - diffByAddr[dax.Address(diff.Address)] = existingDiff } - } - // Add any worker which has a diff to the workerSet so that it receives a - // directive. - for addr := range diffByAddr { - workerSet.Add(addr) - } + // Add any worker which has a diff to the workerSet so that it receives a + // directive. + for addr := range diffByAddr { + workerSet.Add(addr) + } - // No need to send directives if the workerSet is empty. - if len(workerSet) == 0 { - return nil - } + // No need to send directives if the workerSet is empty. + if len(workerSet) == 0 { + return nil + } - // Convert the slice of addresses into a slice of addressMethod containing - // the appropriate method. - addrMethods := applyAddressMethod(workerSet.SortedSlice(), dax.DirectiveMethodDiff) + // Convert the slice of addresses into a slice of addressMethod containing + // the appropriate method. + addrMethods := applyAddressMethod(workerSet.SortedSlice(), dax.DirectiveMethodFull) - // For the addresses which are being added, set their method to "reset". - for i := range addrMethods { - for j := range nodes { - if addrMethods[i].address == nodes[j].Address { - addrMethods[i].method = dax.DirectiveMethodReset + // For the addresses which are being added, set their method to "reset". + for i := range addrMethods { + for j := range nodes { + if addrMethods[i].address == nodes[j].Address { + addrMethods[i].method = dax.DirectiveMethodReset + } } } - } - directives, err := c.buildDirectives(tx, addrMethods) - if err != nil { - return errors.Wrap(err, "building directives") + var err error + directives, err = c.buildDirectives(ctx, tx, addrMethods) + if err != nil { + return errors.Wrap(err, "building directives") + } + + return nil } - if err := tx.Commit(); err != nil { - return errors.Wrap(err, "committing") + if err := dax.RetryWithTx(ctx, c.Transactor, fn, true, txRetry); err != nil { + return errors.Wrap(err, "retry with tx: write") } - if err := c.sendDirectives2(ctx, directives); err != nil { + if err := c.sendDirectives(ctx, directives); err != nil { return NewErrDirectiveSendFailure(err.Error()) } @@ -320,94 +325,97 @@ func (c *Controller) CheckInNode(ctx context.Context, n *dax.Node) error { // DeregisterNodes removes nodes from the controller's list of registered nodes. // It sends directives to the removed nodes, but ignores errors. func (c *Controller) DeregisterNodes(ctx context.Context, addresses ...dax.Address) error { - tx, err := c.Transactor.BeginTx(ctx, true) - if err != nil { - return errors.Wrap(err, "beginning tx") - } - defer tx.Rollback() + var directives []*dax.Directive - // workerSet maintains the set of workers which have a job assignment change - // and therefore need to be sent an updated Directive. - workerSet := NewAddressSet() + fn := func(tx dax.Transaction, writable bool) error { + // workerSet maintains the set of workers which have a job assignment change + // and therefore need to be sent an updated Directive. + workerSet := NewAddressSet() - // diffByAddr keeps track of the diffs that have been applied to each - // specific address. - diffByAddr := make(map[dax.Address]dax.WorkerDiff) - - for _, address := range addresses { - // Add the removed node to the workerSet so that it receives a - // directive. Even if there is currently no data for the worker (i.e. it - // doesn't result in a diffByAddr entry below), we still want to send it - // a "reset" directive so that in the off chance it has some local data, - // that data gets removed. - // TODO(tlt): see below where we actually REMOVE this. We need to - // address this confusion. - // workerSet.Add(address) - - rdiffs, err := c.Balancer.RemoveWorker(tx, address) - if err != nil { - return errors.Wrapf(err, "removing worker: %s", address) - } + // diffByAddr keeps track of the diffs that have been applied to each + // specific address. + diffByAddr := make(map[dax.Address]dax.WorkerDiff) + + for _, address := range addresses { + // Add the removed node to the workerSet so that it receives a + // directive. Even if there is currently no data for the worker (i.e. it + // doesn't result in a diffByAddr entry below), we still want to send it + // a "reset" directive so that in the off chance it has some local data, + // that data gets removed. + // TODO(tlt): see below where we actually REMOVE this. We need to + // address this confusion. + // workerSet.Add(address) + + rdiffs, err := c.Balancer.RemoveWorker(tx, address) + if err != nil { + return errors.Wrapf(err, "removing worker: %s", address) + } - // we assume that the job names are different between the - // different role types so we don't have to track each - // role separately which would be annoying. - for _, diff := range rdiffs { - existingDiff, ok := diffByAddr[dax.Address(diff.Address)] - if !ok { - existingDiff.Address = diff.Address + // we assume that the job names are different between the + // different role types so we don't have to track each + // role separately which would be annoying. + for _, diff := range rdiffs { + existingDiff, ok := diffByAddr[dax.Address(diff.Address)] + if !ok { + existingDiff.Address = diff.Address + } + existingDiff.Add(diff) + diffByAddr[dax.Address(diff.Address)] = existingDiff } - existingDiff.Add(diff) - diffByAddr[dax.Address(diff.Address)] = existingDiff } - } - for addr := range diffByAddr { - workerSet.Add(addr) - } + for addr := range diffByAddr { + workerSet.Add(addr) + } - // Don't send a Directive to the removed nodes after all. - // TODO(tlt): we have to do this because otherwise the send request hangs - // while holding a mu.Lock on Controller. - for _, addr := range addresses { - workerSet.Remove(addr) - } + // Don't send a Directive to the removed nodes after all. + // TODO(tlt): we have to do this because otherwise the send request hangs + // while holding a mu.Lock on Controller. + for _, addr := range addresses { + workerSet.Remove(addr) + } - // No need to send Directives if nothing has ultimately changed. - if len(workerSet) == 0 { - return tx.Commit() - } + // No need to send Directives if nothing has ultimately changed. + if len(workerSet) == 0 { + return nil + } - // Convert the slice of addresses into a slice of addressMethod containing - // the appropriate method. - addrMethods := applyAddressMethod(workerSet.SortedSlice(), dax.DirectiveMethodDiff) + // Convert the slice of addresses into a slice of addressMethod containing + // the appropriate method. + addrMethods := applyAddressMethod(workerSet.SortedSlice(), dax.DirectiveMethodFull) - // For the addresses which are being removed, set their method to "reset". - for i := range addrMethods { - for j := range addresses { - if addrMethods[i].address == addresses[j] { - addrMethods[i].method = dax.DirectiveMethodReset + // For the addresses which are being removed, set their method to "reset". + for i := range addrMethods { + for j := range addresses { + if addrMethods[i].address == addresses[j] { + addrMethods[i].method = dax.DirectiveMethodReset + } } } - } - directives, err := c.buildDirectives(tx, addrMethods) - if err != nil { - return errors.Wrap(err, "building directives") + var err error + directives, err = c.buildDirectives(ctx, tx, addrMethods) + if err != nil { + return errors.Wrap(err, "building directives") + } + + return nil } - if err := tx.Commit(); err != nil { - return errors.Wrap(err, "committing") + if err := dax.RetryWithTx(ctx, c.Transactor, fn, true, txRetry); err != nil { + return errors.Wrap(err, "retry with tx: write") } - if err := c.sendDirectives2(ctx, directives); err != nil { + if err := c.sendDirectives(ctx, directives); err != nil { return NewErrDirectiveSendFailure(err.Error()) } return nil } -func (c *Controller) nodesTranslateReadOrWrite(tx dax.Transaction, role *dax.TranslateRole, qdbid dax.QualifiedDatabaseID, createMissing bool, asWrite bool) ([]dax.AssignedNode, bool, []*dax.Directive, error) { +// nodesTranslateReadOrWrite contains the logic for the c.nodesTranslate() +// method, but it supports being called with either a read or write lock. +func (c *Controller) nodesTranslateReadOrWrite(ctx context.Context, tx dax.Transaction, role *dax.TranslateRole, qdbid dax.QualifiedDatabaseID, createMissing bool, asWrite bool) ([]dax.AssignedNode, bool, []*dax.Directive, error) { qtid := role.TableKey.QualifiedTableID() roleType := dax.RoleTypeTranslate @@ -422,6 +430,7 @@ func (c *Controller) nodesTranslateReadOrWrite(tx dax.Transaction, role *dax.Tra return nil, false, nil, errors.Wrap(err, "getting workers for jobs") } + // figure out if any jobs in the role have no workers assigned outJobs := dax.NewSet[dax.Job]() for _, worker := range workers { for _, job := range worker.Jobs { @@ -449,7 +458,7 @@ func (c *Controller) nodesTranslateReadOrWrite(tx dax.Transaction, role *dax.Tra sort.Slice(missed, func(i, j int) bool { return missed[i] < missed[j] }) - workerSet := NewAddressSet() + workerDiffs := dax.WorkerDiffs{} for _, job := range missed { j, err := decodePartition(job) if err != nil { @@ -459,18 +468,12 @@ func (c *Controller) nodesTranslateReadOrWrite(tx dax.Transaction, role *dax.Tra if err != nil { return nil, false, nil, errors.Wrap(err, "adding job") } - for _, diff := range diffs { - workerSet.Add(dax.Address(diff.Address)) - } + workerDiffs = workerDiffs.Apply(diffs) } - // Convert the slice of addresses into a slice of addressMethod - // containing the appropriate method. - addrMethods := applyAddressMethod(workerSet.SortedSlice(), dax.DirectiveMethodDiff) - - directives, err = c.buildDirectives(tx, addrMethods) + directives, err = c.buildDirectivesAsDiffs(ctx, tx, roleType, workerDiffs) if err != nil { - return nil, false, nil, errors.Wrap(err, "building directives") + return nil, false, nil, errors.Wrap(err, "building directives as diffs") } // Re-run WorkersForJobs. @@ -486,7 +489,7 @@ func (c *Controller) nodesTranslateReadOrWrite(tx dax.Transaction, role *dax.Tra // nodesComputeReadOrWrite contains the logic for the c.nodesCompute() method, // but it supports being called with either a read or write lock. -func (c *Controller) nodesComputeReadOrWrite(tx dax.Transaction, role *dax.ComputeRole, qdbid dax.QualifiedDatabaseID, createMissing bool, asWrite bool) ([]dax.AssignedNode, bool, []*dax.Directive, error) { +func (c *Controller) nodesComputeReadOrWrite(ctx context.Context, tx dax.Transaction, role *dax.ComputeRole, qdbid dax.QualifiedDatabaseID, createMissing bool, asWrite bool) ([]dax.AssignedNode, bool, []*dax.Directive, error) { qtid := role.TableKey.QualifiedTableID() roleType := dax.RoleTypeCompute @@ -529,7 +532,7 @@ func (c *Controller) nodesComputeReadOrWrite(tx dax.Transaction, role *dax.Compu sort.Slice(missed, func(i, j int) bool { return missed[i] < missed[j] }) - workerSet := NewAddressSet() + workerDiffs := dax.WorkerDiffs{} for _, job := range missed { j, err := decodeShard(job) if err != nil { @@ -539,18 +542,12 @@ func (c *Controller) nodesComputeReadOrWrite(tx dax.Transaction, role *dax.Compu if err != nil { return nil, false, nil, errors.Wrap(err, "adding job") } - for _, diff := range diffs { - workerSet.Add(dax.Address(diff.Address)) - } + workerDiffs = workerDiffs.Apply(diffs) } - // Convert the slice of addresses into a slice of addressMethod - // containing the appropriate method. - addrMethods := applyAddressMethod(workerSet.SortedSlice(), dax.DirectiveMethodDiff) - - directives, err = c.buildDirectives(tx, addrMethods) + directives, err = c.buildDirectivesAsDiffs(ctx, tx, roleType, workerDiffs) if err != nil { - return nil, false, nil, errors.Wrap(err, "building directives") + return nil, false, nil, errors.Wrap(err, "building directives as diffs") } // Re-run WorkersForJobs. @@ -626,77 +623,74 @@ func (c *Controller) translateWorkersToAssignedNodes(tx dax.Transaction, workers // CreateDatabase adds a database to the schemar. func (c *Controller) CreateDatabase(ctx context.Context, qdb *dax.QualifiedDatabase) error { - tx, err := c.Transactor.BeginTx(ctx, true) - if err != nil { - return errors.Wrap(err, "beginning tx") - } - defer tx.Rollback() - // Create Database ID. if _, err := qdb.CreateID(); err != nil { return errors.Wrap(err, "creating database ID") } - if err := c.Schemar.CreateDatabase(tx, qdb); err != nil { - return errors.Wrap(err, "creating database in schemar") + fn := func(tx dax.Transaction, writable bool) error { + if err := c.Schemar.CreateDatabase(tx, qdb); err != nil { + return errors.Wrap(err, "creating database in schemar") + } + return nil } - return tx.Commit() + return dax.RetryWithTx(ctx, c.Transactor, fn, true, txRetry) } func (c *Controller) DropDatabase(ctx context.Context, qdbid dax.QualifiedDatabaseID) error { - tx, err := c.Transactor.BeginTx(ctx, true) - if err != nil { - return errors.Wrap(err, "beginning tx") - } - defer tx.Rollback() + var directives []*dax.Directive - // Get all the tables for the database and call dropTable on each one. - qtbls, err := c.Schemar.Tables(tx, qdbid) - if err != nil { - return errors.Wrapf(err, "getting tables for database: %s", qdbid) - } + fn := func(tx dax.Transaction, writable bool) error { + // Get all the tables for the database and call dropTable on each one. + qtbls, err := c.Schemar.Tables(tx, qdbid) + if err != nil { + return errors.Wrapf(err, "getting tables for database: %s", qdbid) + } - // workerSet maintains the set of workers which have a job assignment change - // and therefore need to be sent an updated Directive. - workerSet := NewAddressSet() + // workerSet maintains the set of workers which have a job assignment change + // and therefore need to be sent an updated Directive. + workerSet := NewAddressSet() - for _, qtbl := range qtbls { - qtid := qtbl.QualifiedID() - addrs, err := c.dropTable(tx, qtid) - if err != nil { - return errors.Wrapf(err, "dropping table: %s", qtid) + for _, qtbl := range qtbls { + qtid := qtbl.QualifiedID() + addrs, err := c.dropTable(tx, qtid) + if err != nil { + return errors.Wrapf(err, "dropping table: %s", qtid) + } + workerSet.Merge(addrs) } - workerSet.Merge(addrs) - } - addrs := make([]dax.Address, 0, len(workerSet)) - for worker := range workerSet { - addrs = append(addrs, worker) - } - if err := c.Balancer.FreeWorkers(tx, addrs...); err != nil { - return errors.Wrap(err, "freeing workers") - } + addrs := make([]dax.Address, 0, len(workerSet)) + for worker := range workerSet { + addrs = append(addrs, worker) + } + if err := c.Balancer.FreeWorkers(tx, addrs...); err != nil { + return errors.Wrap(err, "freeing workers") + } - // Drop the database record from the schema. - if err := c.Schemar.DropDatabase(tx, qdbid); err != nil { - return errors.Wrap(err, "dropping database from schemar") - } + // Drop the database record from the schema. + if err := c.Schemar.DropDatabase(tx, qdbid); err != nil { + return errors.Wrap(err, "dropping database from schemar") + } - // Convert the slice of addresses into a slice of addressMethod containing - // the appropriate method. - addrMethods := applyAddressMethod(workerSet.SortedSlice(), dax.DirectiveMethodDiff) + // Convert the slice of addresses into a slice of addressMethod containing + // the appropriate method. + addrMethods := applyAddressMethod(workerSet.SortedSlice(), dax.DirectiveMethodFull) - directives, err := c.buildDirectives(tx, addrMethods) - if err != nil { - return errors.Wrap(err, "building directives") + directives, err = c.buildDirectives(ctx, tx, addrMethods) + if err != nil { + return errors.Wrap(err, "building directives") + } + + return nil } - if err := tx.Commit(); err != nil { - return errors.Wrap(err, "committing") + if err := dax.RetryWithTx(ctx, c.Transactor, fn, true, txRetry); err != nil { + return errors.Wrap(err, "retry with tx: write") } - if err := c.sendDirectives2(ctx, directives); err != nil { + if err := c.sendDirectives(ctx, directives); err != nil { return NewErrDirectiveSendFailure(err.Error()) } return nil @@ -736,39 +730,40 @@ func (c *Controller) DatabaseByID(ctx context.Context, qdbid dax.QualifiedDataba // SetDatabaseOption sets the option on the given database. func (c *Controller) SetDatabaseOption(ctx context.Context, qdbid dax.QualifiedDatabaseID, option string, value string) error { - tx, err := c.Transactor.BeginTx(ctx, true) - if err != nil { - return errors.Wrap(err, "beginning tx") - } - defer tx.Rollback() + var directives []*dax.Directive - if err := c.Schemar.SetDatabaseOption(tx, qdbid, option, value); err != nil { - return errors.Wrapf(err, "setting database option: %s", option) - } + fn := func(tx dax.Transaction, writable bool) error { + if err := c.Schemar.SetDatabaseOption(tx, qdbid, option, value); err != nil { + return errors.Wrapf(err, "setting database option: %s", option) + } - diffs, err := c.Balancer.BalanceDatabase(tx, qdbid) - if err != nil { - return errors.Wrapf(err, "balancing database: %s", qdbid) - } + diffs, err := c.Balancer.BalanceDatabase(tx, qdbid) + if err != nil { + return errors.Wrapf(err, "balancing database: %s", qdbid) + } - workerSet := NewAddressSet() - for _, diff := range diffs { - workerSet.Add(dax.Address(diff.Address)) - } + workerSet := NewAddressSet() + for _, diff := range diffs { + workerSet.Add(dax.Address(diff.Address)) + } - // Convert the slice of addresses into a slice of addressMethod containing - // the appropriate method. - addrMethods := applyAddressMethod(workerSet.SortedSlice(), dax.DirectiveMethodDiff) - directives, err := c.buildDirectives(tx, addrMethods) - if err != nil { - return errors.Wrap(err, "building directives") + // Convert the slice of addresses into a slice of addressMethod containing + // the appropriate method. + addrMethods := applyAddressMethod(workerSet.SortedSlice(), dax.DirectiveMethodFull) + + directives, err = c.buildDirectives(ctx, tx, addrMethods) + if err != nil { + return errors.Wrap(err, "building directives") + } + + return nil } - if err := tx.Commit(); err != nil { - return errors.Wrap(err, "committing") + if err := dax.RetryWithTx(ctx, c.Transactor, fn, true, txRetry); err != nil { + return errors.Wrap(err, "retry with tx: write") } - if err := c.sendDirectives2(ctx, directives); err != nil { + if err := c.sendDirectives(ctx, directives); err != nil { return NewErrDirectiveSendFailure(err.Error()) } return nil @@ -793,92 +788,94 @@ func (c *Controller) Databases(ctx context.Context, orgID dax.OrganizationID, id // affected nodes based on the change. func (c *Controller) CreateTable(ctx context.Context, qtbl *dax.QualifiedTable) error { c.logger.Debugf("CreateTable %+v", qtbl) - tx, err := c.Transactor.BeginTx(ctx, true) - if err != nil { - return errors.Wrap(err, "beginning tx") - } - defer tx.Rollback() // Create Table ID. if _, err := qtbl.CreateID(); err != nil { return errors.Wrap(err, "creating table ID") } - // Create the table in schemar. - if err := c.Schemar.CreateTable(tx, qtbl); err != nil { - return errors.Wrapf(err, "creating table: %s", qtbl) - } - - var addrMethods []addressMethod - - // If the table is keyed, add partitions to the balancer. - if qtbl.StringKeys() { - // workerSet maintains the set of workers which have a job assignment change - // and therefore need to be sent an updated Directive. - workerSet := NewAddressSet() + var directives []*dax.Directive - // Generate the list of partitionsToAdd to be added. - partitionsToAdd := make(dax.PartitionNums, qtbl.PartitionN) - for partitionNum := 0; partitionNum < qtbl.PartitionN; partitionNum++ { - partitionsToAdd[partitionNum] = dax.PartitionNum(partitionNum) + fn := func(tx dax.Transaction, writable bool) error { + // Create the table in schemar. + if err := c.Schemar.CreateTable(tx, qtbl); err != nil { + return errors.Wrapf(err, "creating table: %s", qtbl) } - jobs := make([]dax.Job, 0, len(partitionsToAdd)) - for _, p := range partitionsToAdd { - jobs = append(jobs, partition(qtbl.Key(), p).Job()) - } + var addrMethods []addressMethod - diffs, err := c.Balancer.AddJobs(tx, dax.RoleTypeTranslate, qtbl.QualifiedID(), jobs...) - if err != nil { - return errors.Wrap(err, "adding job") - } - for _, diff := range diffs { - workerSet.Add(dax.Address(diff.Address)) - } + // If the table is keyed, add partitions to the balancer. + if qtbl.StringKeys() { + // workerSet maintains the set of workers which have a job assignment change + // and therefore need to be sent an updated Directive. + workerSet := NewAddressSet() - // Convert the slice of addresses into a slice of addressMethod containing - // the appropriate method. - addrMethods = applyAddressMethod(workerSet.SortedSlice(), dax.DirectiveMethodDiff) - } + // Generate the list of partitionsToAdd to be added. + partitionsToAdd := make(dax.PartitionNums, qtbl.PartitionN) + for partitionNum := 0; partitionNum < qtbl.PartitionN; partitionNum++ { + partitionsToAdd[partitionNum] = dax.PartitionNum(partitionNum) + } - // This is more FieldVersion hackery. Even if the table is not keyed, we - // still want to manage partition 0 for the table in case any of the table's - // fields contain string keys (we use partition 0 for field string keys for - // now; in the future we should distribute/balance the field key translation - // like we do shards and partitions). - if !qtbl.StringKeys() { - // workerSet maintains the set of workers which have a job assignment change - // and therefore need to be sent an updated Directive. - workerSet := NewAddressSet() + jobs := make([]dax.Job, 0, len(partitionsToAdd)) + for _, p := range partitionsToAdd { + jobs = append(jobs, partition(qtbl.Key(), p).Job()) + } - p := dax.PartitionNum(0) + diffs, err := c.Balancer.AddJobs(tx, dax.RoleTypeTranslate, qtbl.QualifiedID(), jobs...) + if err != nil { + return errors.Wrap(err, "adding job") + } + for _, diff := range diffs { + workerSet.Add(dax.Address(diff.Address)) + } - // We don't currently use the returned diff, other than to determine - // which worker was affected, because we send the full Directive - // every time. - diffs, err := c.Balancer.AddJobs(tx, dax.RoleTypeTranslate, qtbl.QualifiedID(), partition(qtbl.Key(), p).Job()) - if err != nil { - return errors.Wrap(err, "adding job") + // Convert the slice of addresses into a slice of addressMethod containing + // the appropriate method. + addrMethods = applyAddressMethod(workerSet.SortedSlice(), dax.DirectiveMethodFull) } - for _, diff := range diffs { - workerSet.Add(dax.Address(diff.Address)) + + // This is more FieldVersion hackery. Even if the table is not keyed, we + // still want to manage partition 0 for the table in case any of the table's + // fields contain string keys (we use partition 0 for field string keys for + // now; in the future we should distribute/balance the field key translation + // like we do shards and partitions). + if !qtbl.StringKeys() { + // workerSet maintains the set of workers which have a job assignment change + // and therefore need to be sent an updated Directive. + workerSet := NewAddressSet() + + p := dax.PartitionNum(0) + + // We don't currently use the returned diff, other than to determine + // which worker was affected, because we send the full Directive + // every time. + diffs, err := c.Balancer.AddJobs(tx, dax.RoleTypeTranslate, qtbl.QualifiedID(), partition(qtbl.Key(), p).Job()) + if err != nil { + return errors.Wrap(err, "adding job") + } + for _, diff := range diffs { + workerSet.Add(dax.Address(diff.Address)) + } + + // Convert the slice of addresses into a slice of addressMethod containing + // the appropriate method. + addrMethods = applyAddressMethod(workerSet.SortedSlice(), dax.DirectiveMethodFull) } - // Convert the slice of addresses into a slice of addressMethod containing - // the appropriate method. - addrMethods = applyAddressMethod(workerSet.SortedSlice(), dax.DirectiveMethodDiff) - } + var err error + directives, err = c.buildDirectives(ctx, tx, addrMethods) + if err != nil { + return errors.Wrap(err, "building directives") + } - directives, err := c.buildDirectives(tx, addrMethods) - if err != nil { - return errors.Wrap(err, "building directives") + return nil } - if err := tx.Commit(); err != nil { - return errors.Wrap(err, "committing") + if err := dax.RetryWithTx(ctx, c.Transactor, fn, true, txRetry); err != nil { + return errors.Wrap(err, "retry with tx: write") } - if err := c.sendDirectives2(ctx, directives); err != nil { + if err := c.sendDirectives(ctx, directives); err != nil { return NewErrDirectiveSendFailure(err.Error()) } return nil @@ -887,31 +884,31 @@ func (c *Controller) CreateTable(ctx context.Context, qtbl *dax.QualifiedTable) // DropTable removes a table from the schema and sends directives to all affected // nodes based on the change. func (c *Controller) DropTable(ctx context.Context, qtid dax.QualifiedTableID) error { - tx, err := c.Transactor.BeginTx(ctx, true) - if err != nil { - return errors.Wrap(err, "beginning tx") - } - defer tx.Rollback() + var directives []*dax.Directive - addrs, err := c.dropTable(tx, qtid) - if err != nil { - return errors.Wrapf(err, "dropping table: %s", qtid) - } + fn := func(tx dax.Transaction, writable bool) error { + addrs, err := c.dropTable(tx, qtid) + if err != nil { + return errors.Wrapf(err, "dropping table: %s", qtid) + } + + // Convert the slice of addresses into a slice of addressMethod containing + // the appropriate method. + addrMethods := applyAddressMethod(addrs.SortedSlice(), dax.DirectiveMethodFull) - // Convert the slice of addresses into a slice of addressMethod containing - // the appropriate method. - addrMethods := applyAddressMethod(addrs.SortedSlice(), dax.DirectiveMethodDiff) + directives, err = c.buildDirectives(ctx, tx, addrMethods) + if err != nil { + return errors.Wrap(err, "building directives") + } - directives, err := c.buildDirectives(tx, addrMethods) - if err != nil { - return errors.Wrap(err, "building directives") + return nil } - if err := tx.Commit(); err != nil { - return errors.Wrap(err, "committing") + if err := dax.RetryWithTx(ctx, c.Transactor, fn, true, txRetry); err != nil { + return errors.Wrap(err, "retry with tx: write") } - if err := c.sendDirectives2(ctx, directives); err != nil { + if err := c.sendDirectives(ctx, directives); err != nil { return NewErrDirectiveSendFailure(err.Error()) } return nil @@ -988,50 +985,56 @@ func (c *Controller) Tables(ctx context.Context, qdbid dax.QualifiedDatabaseID, // RemoveShards deregisters the table/shard combinations with the controller and // sends the necessary directives. func (c *Controller) RemoveShards(ctx context.Context, qtid dax.QualifiedTableID, shards ...dax.ShardNum) error { - tx, err := c.Transactor.BeginTx(ctx, true) - if err != nil { - return errors.Wrap(err, "beginning tx") - } - defer tx.Rollback() - // workerSet maintains the set of workers which have a job assignment change - // and therefore need to be sent an updated Directive. - workerSet := NewAddressSet() + var directives []*dax.Directive - for _, s := range shards { - // We don't currently use the returned diff, other than to determine - // which worker was affected, because we send the full Directive every - // time. - diffs, err := c.Balancer.RemoveJobs(tx, dax.RoleTypeCompute, qtid, shard(qtid.Key(), s).Job()) - if err != nil { - return errors.Wrap(err, "removing job") - } - for _, diff := range diffs { - workerSet.Add(dax.Address(diff.Address)) + fn := func(tx dax.Transaction, writable bool) error { + // workerSet maintains the set of workers which have a job assignment change + // and therefore need to be sent an updated Directive. + workerSet := NewAddressSet() + + for _, s := range shards { + // We don't currently use the returned diff, other than to determine + // which worker was affected, because we send the full Directive every + // time. + diffs, err := c.Balancer.RemoveJobs(tx, dax.RoleTypeCompute, qtid, shard(qtid.Key(), s).Job()) + if err != nil { + return errors.Wrap(err, "removing job") + } + for _, diff := range diffs { + workerSet.Add(dax.Address(diff.Address)) + } } - } - // Convert the slice of addresses into a slice of addressMethod containing - // the appropriate method. - addrMethods := applyAddressMethod(workerSet.SortedSlice(), dax.DirectiveMethodDiff) + // Convert the slice of addresses into a slice of addressMethod containing + // the appropriate method. + addrMethods := applyAddressMethod(workerSet.SortedSlice(), dax.DirectiveMethodFull) - directives, err := c.buildDirectives(tx, addrMethods) - if err != nil { - return errors.Wrap(err, "building directives") + var err error + directives, err = c.buildDirectives(ctx, tx, addrMethods) + if err != nil { + return errors.Wrap(err, "building directives") + } + + return nil } - if err := tx.Commit(); err != nil { - return errors.Wrap(err, "committing") + if err := dax.RetryWithTx(ctx, c.Transactor, fn, true, txRetry); err != nil { + return errors.Wrap(err, "retry with tx: write") } - if err := c.sendDirectives2(ctx, directives); err != nil { + if err := c.sendDirectives(ctx, directives); err != nil { return NewErrDirectiveSendFailure(err.Error()) } return nil } -func (c *Controller) sendDirectives2(ctx context.Context, directives []*dax.Directive) error { +func (c *Controller) sendDirectives(ctx context.Context, directives []*dax.Directive) error { + if len(directives) == 0 { + return nil + } + errs := make([]error, len(directives)) var eg errgroup.Group for i, dir := range directives { @@ -1096,8 +1099,8 @@ func applyAddressMethod(addrs []dax.Address, method dax.DirectiveMethod) []addre // buildDirectives builds a list of directives for the given addrs (i.e. nodes) // using information (i.e. current state) from the balancers. -func (c *Controller) buildDirectives(tx dax.Transaction, addrs []addressMethod) ([]*dax.Directive, error) { - // If nodes is empty, return early. +func (c *Controller) buildDirectives(ctx context.Context, tx dax.Transaction, addrs []addressMethod) ([]*dax.Directive, error) { + // If addrs is empty, return early. if len(addrs) == 0 { return nil, nil } @@ -1105,10 +1108,13 @@ func (c *Controller) buildDirectives(tx dax.Transaction, addrs []addressMethod) directives := make([]*dax.Directive, len(addrs)) for i, addressMethod := range addrs { - dVersion, err := c.DirectiveVersion.Increment(tx, 1) + // Get the current directive version for address. If this address has + // never been sent a directive before, we should get 0 here. + currentDirectiveVersion, err := c.DirectiveVersion.GetCurrent(tx, addressMethod.address) if err != nil { - return nil, errors.Wrap(err, "incrementing directive version") + return nil, errors.Wrap(err, "getting current directive version") } + nextDirectiveVersion := currentDirectiveVersion + 1 d := &dax.Directive{ Address: addressMethod.address, @@ -1116,7 +1122,7 @@ func (c *Controller) buildDirectives(tx dax.Transaction, addrs []addressMethod) Tables: []*dax.QualifiedTable{}, ComputeRoles: []dax.ComputeRole{}, TranslateRoles: []dax.TranslateRole{}, - Version: dVersion, + Version: nextDirectiveVersion, } // computeMap maps a table to a list of shards for that table. We need @@ -1262,17 +1268,293 @@ func (c *Controller) buildDirectives(tx dax.Transaction, addrs []addressMethod) sort.Slice(d.TranslateRoles, func(i, j int) bool { return d.TranslateRoles[i].TableKey < d.TranslateRoles[j].TableKey }) directives[i] = d + + // Set directive version to nextDirectiveVersion where directiveVersion + // equals currentDirectiveVersion for this address. + if err := c.DirectiveVersion.SetNext(tx, addressMethod.address, currentDirectiveVersion, nextDirectiveVersion); err != nil { + return nil, errors.Wrap(err, "setting next directive version") + } } return directives, nil } -// SnapshotTable snapshots a table. It might also snapshot everything -// else... no guarantees here, only used in tests as of this writing. -func (c *Controller) SnapshotTable(ctx context.Context, qtid dax.QualifiedTableID) error { - c.snapControl <- struct{}{} - return nil -} +// buildDirectivesAsDiffs builds a list of directives based on the given +// WorkerDiffs. These are used for directives of type DirectiveMethodDiff. +func (c *Controller) buildDirectivesAsDiffs(ctx context.Context, tx dax.Transaction, roleType dax.RoleType, diffs []dax.WorkerDiff) ([]*dax.Directive, error) { + // If addrs is empty, return early. + if len(diffs) == 0 { + return nil, nil + } + + directives := make([]*dax.Directive, len(diffs)) + + for i, workerDiff := range diffs { + // Get the current directive version for address. If this address has + // never been sent a directive before, we should get 0 here. + currentDirectiveVersion, err := c.DirectiveVersion.GetCurrent(tx, workerDiff.Address) + if err != nil { + return nil, errors.Wrap(err, "getting current directive version") + } + nextDirectiveVersion := currentDirectiveVersion + 1 + + d := &dax.Directive{ + Address: workerDiff.Address, + Method: dax.DirectiveMethodDiff, + Tables: []*dax.QualifiedTable{}, + ComputeRolesAdded: []dax.ComputeRole{}, + ComputeRolesRemoved: []dax.ComputeRole{}, + TranslateRolesAdded: []dax.TranslateRole{}, + TranslateRolesRemoved: []dax.TranslateRole{}, + Version: nextDirectiveVersion, + } + + // tableSet maintains the set of tables which have a job assignment + // change and therefore need to be included in the Directive schema. + tableSet := NewTableSet() + + // computeMapAdded maps a table to a list of shards added for that + // table. We need to aggregate them here because the list of jobs from + // WorkerDiff can contain a mixture of table/shards. + computeMapAdded := make(map[dax.TableKey][]dax.ShardNum) + computeMapRemoved := make(map[dax.TableKey][]dax.ShardNum) + + // translateMapAdded maps a table to a list of partitions added for that + // table. We need to aggregate them here because the list of jobs from + // WorkerDiff can contain a mixture of table/partitions. + translateMapAdded := make(map[dax.TableKey][]dax.PartitionNum) + translateMapRemoved := make(map[dax.TableKey][]dax.PartitionNum) + + // ownsPartition0Added is the list of tables for which this node owns + // partition 0, based on partition 0 being added. This is used to + // determine FieldVersion responsiblity. + ownsPartition0Added := make(map[dax.TableKey]struct{}, 0) + ownsPartition0Removed := make(map[dax.TableKey]struct{}, 0) + + switch roleType { + case dax.RoleTypeCompute: + for _, job := range workerDiff.AddedJobs { + j, err := decodeShard(job) + if err != nil { + return nil, errors.Wrapf(err, "decoding shard job added: %s", job) + } + + tkey := j.table() + computeMapAdded[tkey] = append(computeMapAdded[tkey], j.shardNum()) + tableSet.Add(tkey) + } + for _, job := range workerDiff.RemovedJobs { + j, err := decodeShard(job) + if err != nil { + return nil, errors.Wrapf(err, "decoding shard job removed: %s", job) + } + + tkey := j.table() + computeMapRemoved[tkey] = append(computeMapRemoved[tkey], j.shardNum()) + tableSet.Add(tkey) + } + case dax.RoleTypeTranslate: + for _, job := range workerDiff.AddedJobs { + j, err := decodePartition(job) + if err != nil { + return nil, errors.Wrapf(err, "decoding partition job added: %s", job) + } + + // This check is related to the FieldVersion logic below. + // Basically, we need to determine if this node is + // responsible for partition 0 for any table(s), and if so, + // include FieldVersion in the directive for the node. + if j.partitionNum() == 0 { + ownsPartition0Added[j.table()] = struct{}{} + } + + tkey := j.table() + translateMapAdded[tkey] = append(translateMapAdded[tkey], j.partitionNum()) + tableSet.Add(tkey) + } + for _, job := range workerDiff.RemovedJobs { + j, err := decodePartition(job) + if err != nil { + return nil, errors.Wrapf(err, "decoding partition job removed: %s", job) + } + + // This check is related to the FieldVersion logic below. + // Basically, we need to determine if this node is no longer + // responsible for partition 0 for any table(s), and if so, + // remove FieldVersion in the directive for the node. + if j.partitionNum() == 0 { + ownsPartition0Removed[j.table()] = struct{}{} + } + + tkey := j.table() + translateMapRemoved[tkey] = append(translateMapRemoved[tkey], j.partitionNum()) + tableSet.Add(tkey) + } + } + + // Convert the computeMapAdded into a list of ComputeRole. + for k, v := range computeMapAdded { + // Because these were encoded as strings in the balancer and may be + // out of order numerically, sort them as integers. + //sort.Slice(v, func(i, j int) bool { return v[i] < v[j] }) + sort.Sort(dax.ShardNums(v)) + + d.ComputeRolesAdded = append(d.ComputeRolesAdded, dax.ComputeRole{ + TableKey: k, + Shards: v, + }) + } + + // Convert the computeMapRemoved into a list of ComputeRole. + for k, v := range computeMapRemoved { + // Because these were encoded as strings in the balancer and may be + // out of order numerically, sort them as integers. + //sort.Slice(v, func(i, j int) bool { return v[i] < v[j] }) + sort.Sort(dax.ShardNums(v)) + + d.ComputeRolesRemoved = append(d.ComputeRolesRemoved, dax.ComputeRole{ + TableKey: k, + Shards: v, + }) + } + + // Convert the translateMapAdded into a list of TranslateRole. + for k, v := range translateMapAdded { + // Because these were encoded as strings in the balancer and may be + // out of order numerically, sort them as integers. + sort.Sort(dax.PartitionNums(v)) + + d.TranslateRolesAdded = append(d.TranslateRolesAdded, dax.TranslateRole{ + TableKey: k, + Partitions: v, + }) + } + + // Convert the translateMapRemoved into a list of TranslateRole. + for k, v := range translateMapRemoved { + // Because these were encoded as strings in the balancer and may be + // out of order numerically, sort them as integers. + sort.Sort(dax.PartitionNums(v)) + + d.TranslateRolesRemoved = append(d.TranslateRolesRemoved, dax.TranslateRole{ + TableKey: k, + Partitions: v, + }) + } + + // Add field-specific TranslateRoles to the node which is responsible + // for partition 0. This is a bit clunkly; ideally we would handle this + // the same way we handle shards and partitions, by maintaining a + // distinct balancer for FieldVersions. But because the query side isn't + // currently set up to look for field translation anywhere but on the + // local node (or in the case of Serverless, on partition 0), we're + // keeping everything that way for now. + for tkey := range ownsPartition0Added { + qtid := tkey.QualifiedTableID() + table, err := c.Schemar.Table(tx, qtid) + if err != nil { + return nil, errors.Wrapf(err, "getting table: %s", tkey) + } + + fieldNames := make([]dax.FieldName, 0) + for _, field := range table.Fields { + if !field.StringKeys() { + continue + } + + // Skip the primary key field; it uses table translation. + if field.IsPrimaryKey() { + continue + } + + fieldNames = append(fieldNames, field.Name) + } + + if len(fieldNames) == 0 { + continue + } + + d.TranslateRolesAdded = append(d.TranslateRolesAdded, dax.TranslateRole{ + TableKey: tkey, + Fields: fieldNames, + }) + + tableSet.Add(tkey) + } + + // And in case partition 0 was removed. + for tkey := range ownsPartition0Removed { + qtid := tkey.QualifiedTableID() + table, err := c.Schemar.Table(tx, qtid) + if err != nil { + return nil, errors.Wrapf(err, "getting table: %s", tkey) + } + + fieldNames := make([]dax.FieldName, 0) + for _, field := range table.Fields { + if !field.StringKeys() { + continue + } + + // Skip the primary key field; it uses table translation. + if field.IsPrimaryKey() { + continue + } + + fieldNames = append(fieldNames, field.Name) + } + + if len(fieldNames) == 0 { + continue + } + + d.TranslateRolesRemoved = append(d.TranslateRolesRemoved, dax.TranslateRole{ + TableKey: tkey, + Fields: fieldNames, + }) + + tableSet.Add(tkey) + } + /////////////// end of FieldVersion logic ////////////////////// + + if len(tableSet) > 0 { + dTables := make([]*dax.QualifiedTable, 0) + for qdbid, tblIDs := range tableSet.QualifiedSortedSlice() { + qtbls, err := c.Schemar.Tables(tx, qdbid, tblIDs...) + if err != nil { + return nil, errors.Wrapf(err, "getting directive tables for qdbid: %s", qdbid) + } + dTables = append(dTables, qtbls...) + } + d.Tables = dTables + } + + // Sort ComputeRolesAdded by table. + sort.Slice(d.ComputeRolesAdded, func(i, j int) bool { return d.ComputeRolesAdded[i].TableKey < d.ComputeRolesAdded[j].TableKey }) + sort.Slice(d.ComputeRolesRemoved, func(i, j int) bool { return d.ComputeRolesRemoved[i].TableKey < d.ComputeRolesRemoved[j].TableKey }) + + // Sort TranslateRolesAdded by table. + sort.Slice(d.TranslateRolesAdded, func(i, j int) bool { return d.TranslateRolesAdded[i].TableKey < d.TranslateRolesAdded[j].TableKey }) + sort.Slice(d.TranslateRolesRemoved, func(i, j int) bool { return d.TranslateRolesRemoved[i].TableKey < d.TranslateRolesRemoved[j].TableKey }) + + directives[i] = d + + // Set directive version to nextDirectiveVersion where directiveVersion + // equals currentDirectiveVersion for this address. + if err := c.DirectiveVersion.SetNext(tx, workerDiff.Address, currentDirectiveVersion, nextDirectiveVersion); err != nil { + return nil, errors.Wrap(err, "setting next directive version") + } + } + + return directives, nil +} + +// SnapshotTable snapshots a table. It might also snapshot everything +// else... no guarantees here, only used in tests as of this writing. +func (c *Controller) SnapshotTable(ctx context.Context, qtid dax.QualifiedTableID) error { + c.snapControl <- struct{}{} + return nil +} // SnapshotShardData forces the compute node responsible for the given shard to // snapshot that shard, then increment its shard version for logs written to the @@ -1438,7 +1720,7 @@ func (c *Controller) ComputeNodes(ctx context.Context, qtid dax.QualifiedTableID return computeNodes, nil } - assignedNodes, _, _, err := c.nodesComputeReadOrWrite(tx, role, qdbid, false, false) + assignedNodes, _, _, err := c.nodesComputeReadOrWrite(ctx, tx, role, qdbid, false, false) if err != nil { return nil, errors.Wrap(err, "getting compute nodes read or write") } @@ -1500,7 +1782,7 @@ func (c *Controller) TranslateNodes(ctx context.Context, qtid dax.QualifiedTable return translateNodes, nil } - assignedNodes, _, _, err := c.nodesTranslateReadOrWrite(tx, role, qdbid, false, false) + assignedNodes, _, _, err := c.nodesTranslateReadOrWrite(ctx, tx, role, qdbid, false, false) if err != nil { return nil, errors.Wrap(err, "getting translate nodes read or write") } @@ -1554,42 +1836,39 @@ func (c *Controller) IngestPartition(ctx context.Context, qtid dax.QualifiedTabl } qdbid := qtid.QualifiedDatabaseID - // Try with a read transaction first. - tx, err := c.Transactor.BeginTx(ctx, false) - if err != nil { - return "", errors.Wrap(err, "beginning tx") - } - defer tx.Rollback() + var nodes []dax.AssignedNode + var retryAsWrite bool + var directives []*dax.Directive - if err := c.sanitizeQTID(tx, &qtid); err != nil { - return "", errors.Wrap(err, "sanitizing") - } + fn := func(tx dax.Transaction, writable bool) error { + if err := c.sanitizeQTID(tx, &qtid); err != nil { + return errors.Wrap(err, "sanitizing") + } - // Verify that the table exists. - if _, err := c.Schemar.Table(tx, qtid); err != nil { - return "", errors.Wrap(err, "getting table") + // Verify that the table exists. + if _, err := c.Schemar.Table(tx, qtid); err != nil { + return errors.Wrap(err, "getting table") + } + + var err error + nodes, retryAsWrite, directives, err = c.nodesTranslateReadOrWrite(ctx, tx, role, qdbid, true, writable) + if err != nil { + return errors.Wrap(err, "getting translate nodes read or write") + } + + return nil } - nodes, retryAsWrite, _, err := c.nodesTranslateReadOrWrite(tx, role, qdbid, true, false) - if err != nil { - return "", errors.Wrap(err, "getting translate nodes read or write") + // Try with a read transaction first. + if err := dax.RetryWithTx(ctx, c.Transactor, fn, false, 1); err != nil { + return "", errors.Wrap(err, "retry with tx: read") } - var directives []*dax.Directive // If it's writable, and we couldn't find all the partitions with just a // read, try again with a write transaction. if retryAsWrite { - tx.Rollback() - - tx, err = c.Transactor.BeginTx(ctx, true) - if err != nil { - return "", errors.Wrap(err, "beginning tx") - } - defer tx.Rollback() - - nodes, _, directives, err = c.nodesTranslateReadOrWrite(tx, role, qdbid, true, true) - if err != nil { - return "", errors.Wrap(err, "getting translate nodes read or write retry") + if err := dax.RetryWithTx(ctx, c.Transactor, fn, true, txRetry); err != nil { + return "", errors.Wrap(err, "retry with tx: write") } } @@ -1623,17 +1902,8 @@ func (c *Controller) IngestPartition(ctx context.Context, qtid dax.QualifiedTabl fmt.Sprintf("partition returned (%d) does not match requested (%d)", p, partition)) } - // Only commit if the transaction is writable. - if retryAsWrite { - if err := tx.Commit(); err != nil { - return node.Address, errors.Wrap(err, "committing") - } - - if err := c.sendDirectives2(ctx, directives); err != nil { - return node.Address, NewErrDirectiveSendFailure(err.Error()) - } - - return node.Address, nil + if err := c.sendDirectives(ctx, directives); err != nil { + return node.Address, NewErrDirectiveSendFailure(err.Error()) } return node.Address, nil @@ -1647,44 +1917,41 @@ func (c *Controller) IngestShard(ctx context.Context, qtid dax.QualifiedTableID, } qdbid := qtid.QualifiedDatabaseID - // Try with a read transaction first. - tx, err := c.Transactor.BeginTx(ctx, false) - if err != nil { - return "", errors.Wrap(err, "beginning tx") - } - defer tx.Rollback() + var nodes []dax.AssignedNode + var retryAsWrite bool + var directives []*dax.Directive - if err := c.sanitizeQTID(tx, &qtid); err != nil { - return "", errors.Wrap(err, "sanitizing") - } + fn := func(tx dax.Transaction, writable bool) error { + if err := c.sanitizeQTID(tx, &qtid); err != nil { + return errors.Wrap(err, "sanitizing") + } - // Verify that the table exists. - if _, err := c.Schemar.Table(tx, qtid); err != nil { - return "", err - } + // Verify that the table exists. + if _, err := c.Schemar.Table(tx, qtid); err != nil { + return err + } - nodes, retryAsWrite, _, err := c.nodesComputeReadOrWrite(tx, role, qdbid, true, false) - if err != nil { - return "", errors.Wrap(err, "getting compute nodes read or write") + var err error + nodes, retryAsWrite, directives, err = c.nodesComputeReadOrWrite(ctx, tx, role, qdbid, true, writable) + if err != nil { + return errors.Wrap(err, "getting compute nodes read or write") + } + + return nil } - var directives []*dax.Directive + // Try with a read transaction first. + if err := dax.RetryWithTx(ctx, c.Transactor, fn, false, 1); err != nil { + return "", errors.Wrap(err, "retry with tx: read") + } // If it's writable, and we couldn't find all the partitions with just a // read, try again with a write transaction. if retryAsWrite { - tx.Rollback() - - tx, err = c.Transactor.BeginTx(ctx, true) - if err != nil { - return "", errors.Wrap(err, "beginning tx") - } - defer tx.Rollback() - - nodes, _, directives, err = c.nodesComputeReadOrWrite(tx, role, qdbid, true, true) - if err != nil { - return "", errors.Wrap(err, "getting compute nodes read or write retry") + if err := dax.RetryWithTx(ctx, c.Transactor, fn, true, txRetry); err != nil { + return "", errors.Wrap(err, "retry with tx: write") } + retryAsWrite = true } computeNodes, err := c.assignedToComputeNodes(nodes) @@ -1714,13 +1981,8 @@ func (c *Controller) IngestShard(ctx context.Context, qtid dax.QualifiedTableID, fmt.Sprintf("shard returned (%d) does not match requested (%d)", s, shrdNum)) } - // Only commit if the transaction is writable. if retryAsWrite { - if err := tx.Commit(); err != nil { - return node.Address, errors.Wrap(err, "committing") - } - - if err := c.sendDirectives2(ctx, directives); err != nil { + if err := c.sendDirectives(ctx, directives); err != nil { return node.Address, NewErrDirectiveSendFailure(err.Error()) } } @@ -1731,136 +1993,136 @@ func (c *Controller) IngestShard(ctx context.Context, qtid dax.QualifiedTableID, //// func (c *Controller) CreateField(ctx context.Context, qtid dax.QualifiedTableID, fld *dax.Field) error { - tx, err := c.Transactor.BeginTx(ctx, true) - if err != nil { - return errors.Wrap(err, "beginning tx") - } - defer tx.Rollback() + var directives []*dax.Directive - if err := c.sanitizeQTID(tx, &qtid); err != nil { - return errors.Wrap(err, "sanitizing") - } + fn := func(tx dax.Transaction, writable bool) error { + if err := c.sanitizeQTID(tx, &qtid); err != nil { + return errors.Wrap(err, "sanitizing") + } - // Create the field in schemar. - if err := c.Schemar.CreateField(tx, qtid, fld); err != nil { - return errors.Wrapf(err, "creating field: %s, %s", qtid, fld) - } + // Create the field in schemar. + if err := c.Schemar.CreateField(tx, qtid, fld); err != nil { + return errors.Wrapf(err, "creating field: %s, %s", qtid, fld) + } - // workerSet maintains the set of workers which have a job assignment change - // and therefore need to be sent an updated Directive. - workerSet := NewAddressSet() + // workerSet maintains the set of workers which have a job assignment change + // and therefore need to be sent an updated Directive. + workerSet := NewAddressSet() - qdbid := qtid.QualifiedDatabaseID + qdbid := qtid.QualifiedDatabaseID - // Get the worker(s) responsible for partition 0. - job := partition(qtid.Key(), 0).Job() - workers, err := c.Balancer.WorkersForJobs(tx, dax.RoleTypeTranslate, qdbid, job) - if err != nil { - return errors.Wrapf(err, "getting workers for job: %s", job) - } + // Get the worker(s) responsible for partition 0. + job := partition(qtid.Key(), 0).Job() + workers, err := c.Balancer.WorkersForJobs(tx, dax.RoleTypeTranslate, qdbid, job) + if err != nil { + return errors.Wrapf(err, "getting workers for job: %s", job) + } - for _, w := range workers { - workerSet.Add(dax.Address(w.Address)) - } + for _, w := range workers { + workerSet.Add(dax.Address(w.Address)) + } - // Get the list of workers responsible for shard data for this table. - if state, err := c.Balancer.CurrentState(tx, dax.RoleTypeCompute, qdbid); err != nil { - return errors.Wrap(err, "getting current compute state") - } else { - for _, worker := range state { - for _, job := range worker.Jobs { - if shard, err := decodeShard(job); err != nil { - return errors.Wrapf(err, "decoding shard: %s", job) - } else if shard.table() == qtid.Key() { - workerSet.Add(dax.Address(worker.Address)) - break + // Get the list of workers responsible for shard data for this table. + if state, err := c.Balancer.CurrentState(tx, dax.RoleTypeCompute, qdbid); err != nil { + return errors.Wrap(err, "getting current compute state") + } else { + for _, worker := range state { + for _, job := range worker.Jobs { + if shard, err := decodeShard(job); err != nil { + return errors.Wrapf(err, "decoding shard: %s", job) + } else if shard.table() == qtid.Key() { + workerSet.Add(dax.Address(worker.Address)) + break + } } } } - } - // Convert the slice of addresses into a slice of addressMethod containing - // the appropriate method. - addrMethods := applyAddressMethod(workerSet.SortedSlice(), dax.DirectiveMethodDiff) + // Convert the slice of addresses into a slice of addressMethod containing + // the appropriate method. + addrMethods := applyAddressMethod(workerSet.SortedSlice(), dax.DirectiveMethodFull) - directives, err := c.buildDirectives(tx, addrMethods) - if err != nil { - return errors.Wrap(err, "building directives") + directives, err = c.buildDirectives(ctx, tx, addrMethods) + if err != nil { + return errors.Wrap(err, "building directives") + } + + return nil } - if err := tx.Commit(); err != nil { - return errors.Wrap(err, "committing") + if err := dax.RetryWithTx(ctx, c.Transactor, fn, true, txRetry); err != nil { + return errors.Wrap(err, "retry with tx: write") } - if err := c.sendDirectives2(ctx, directives); err != nil { + if err := c.sendDirectives(ctx, directives); err != nil { return NewErrDirectiveSendFailure(err.Error()) } return nil } func (c *Controller) DropField(ctx context.Context, qtid dax.QualifiedTableID, fldName dax.FieldName) error { - tx, err := c.Transactor.BeginTx(ctx, true) - if err != nil { - return errors.Wrap(err, "beginning tx") - } - defer tx.Rollback() + var directives []*dax.Directive - if err := c.sanitizeQTID(tx, &qtid); err != nil { - return errors.Wrap(err, "sanitizing") - } + fn := func(tx dax.Transaction, writable bool) error { + if err := c.sanitizeQTID(tx, &qtid); err != nil { + return errors.Wrap(err, "sanitizing") + } - // Drop the field from schemar. - if err := c.Schemar.DropField(tx, qtid, fldName); err != nil { - return errors.Wrapf(err, "dropping field: %s, %s", qtid, fldName) - } + // Drop the field from schemar. + if err := c.Schemar.DropField(tx, qtid, fldName); err != nil { + return errors.Wrapf(err, "dropping field: %s, %s", qtid, fldName) + } - // workerSet maintains the set of workers which have a job assignment change - // and therefore need to be sent an updated Directive. - workerSet := NewAddressSet() + // workerSet maintains the set of workers which have a job assignment change + // and therefore need to be sent an updated Directive. + workerSet := NewAddressSet() - qdbid := qtid.QualifiedDatabaseID + qdbid := qtid.QualifiedDatabaseID - // Get the worker(s) responsible for partition 0. - job := partition(qtid.Key(), 0).Job() - workers, err := c.Balancer.WorkersForJobs(tx, dax.RoleTypeTranslate, qdbid, job) - if err != nil { - return errors.Wrapf(err, "getting workers for job: %s", job) - } + // Get the worker(s) responsible for partition 0. + job := partition(qtid.Key(), 0).Job() + workers, err := c.Balancer.WorkersForJobs(tx, dax.RoleTypeTranslate, qdbid, job) + if err != nil { + return errors.Wrapf(err, "getting workers for job: %s", job) + } - for _, w := range workers { - workerSet.Add(dax.Address(w.Address)) - } + for _, w := range workers { + workerSet.Add(dax.Address(w.Address)) + } - // Get the list of workers responsible for shard data for this table. - if state, err := c.Balancer.CurrentState(tx, dax.RoleTypeCompute, qdbid); err != nil { - return errors.Wrap(err, "getting current compute state") - } else { - for _, worker := range state { - for _, job := range worker.Jobs { - if shard, err := decodeShard(job); err != nil { - return errors.Wrapf(err, "decoding shard: %s", job) - } else if shard.table() == qtid.Key() { - workerSet.Add(dax.Address(worker.Address)) - break + // Get the list of workers responsible for shard data for this table. + if state, err := c.Balancer.CurrentState(tx, dax.RoleTypeCompute, qdbid); err != nil { + return errors.Wrap(err, "getting current compute state") + } else { + for _, worker := range state { + for _, job := range worker.Jobs { + if shard, err := decodeShard(job); err != nil { + return errors.Wrapf(err, "decoding shard: %s", job) + } else if shard.table() == qtid.Key() { + workerSet.Add(dax.Address(worker.Address)) + break + } } } } - } - // Convert the slice of addresses into a slice of addressMethod containing - // the appropriate method. - addrMethods := applyAddressMethod(workerSet.SortedSlice(), dax.DirectiveMethodDiff) + // Convert the slice of addresses into a slice of addressMethod containing + // the appropriate method. + addrMethods := applyAddressMethod(workerSet.SortedSlice(), dax.DirectiveMethodFull) - directives, err := c.buildDirectives(tx, addrMethods) - if err != nil { - return errors.Wrap(err, "building directives") + directives, err = c.buildDirectives(ctx, tx, addrMethods) + if err != nil { + return errors.Wrap(err, "building directives") + } + + return nil } - if err := tx.Commit(); err != nil { - return errors.Wrap(err, "committing") + if err := dax.RetryWithTx(ctx, c.Transactor, fn, true, txRetry); err != nil { + return errors.Wrap(err, "retry with tx: write") } - if err := c.sendDirectives2(ctx, directives); err != nil { + if err := c.sendDirectives(ctx, directives); err != nil { return NewErrDirectiveSendFailure(err.Error()) } return nil diff --git a/dax/controller/controller_test.go b/dax/controller/controller_test.go index cd6a27867..304180086 100644 --- a/dax/controller/controller_test.go +++ b/dax/controller/controller_test.go @@ -159,14 +159,16 @@ func TestController(t *testing.T) { Tables: []*dax.QualifiedTable{ tbl0, }, - ComputeRoles: []dax.ComputeRole{ + ComputeRolesAdded: []dax.ComputeRole{ { TableKey: tbl0.Key(), Shards: dax.NewShardNums(0), }, }, - TranslateRoles: []dax.TranslateRole{}, - Version: 2, + ComputeRolesRemoved: []dax.ComputeRole{}, + TranslateRolesAdded: []dax.TranslateRole{}, + TranslateRolesRemoved: []dax.TranslateRole{}, + Version: 2, }, } got = director.flush() @@ -194,7 +196,7 @@ func TestController(t *testing.T) { Tables: []*dax.QualifiedTable{}, ComputeRoles: []dax.ComputeRole{}, TranslateRoles: []dax.TranslateRole{}, - Version: 3, + Version: 1, }, } got = director.flush() @@ -216,7 +218,7 @@ func TestController(t *testing.T) { Tables: []*dax.QualifiedTable{}, ComputeRoles: []dax.ComputeRole{}, TranslateRoles: []dax.TranslateRole{}, - Version: 4, + Version: 1, }, } got = director.flush() @@ -224,68 +226,89 @@ func TestController(t *testing.T) { assert.Equal(t, exp, got) // Add more shards. - addShards(t, ctx, con, tbl0.QualifiedID(), dax.NewShardNums(1, 2, 3, 5, 8)...) + // Because addShards is a helper function which actually adds each shard + // one at a time, the controller is actually building separate + // directives for each call to IngestShard. In other words, this test is + // ensuring that the directive which are sent are what you would get if + // you added one shard at a time. So here, we just send in 3 at a time. + // We don't want more that one directive per address in the same test + // check, otherwise we can't guarantee an order. + addShards(t, ctx, con, tbl0.QualifiedID(), dax.NewShardNums(1, 2, 3)...) exp = []*dax.Directive{ { - Address: node1.Address, + Address: node0.Address, Method: dax.DirectiveMethodDiff, Tables: []*dax.QualifiedTable{ tbl0, }, - ComputeRoles: []dax.ComputeRole{ + ComputeRolesAdded: []dax.ComputeRole{ { TableKey: tbl0.Key(), - Shards: dax.NewShardNums(1), + Shards: dax.NewShardNums(3), }, }, - TranslateRoles: []dax.TranslateRole{}, - Version: 5, + ComputeRolesRemoved: []dax.ComputeRole{}, + TranslateRolesAdded: []dax.TranslateRole{}, + TranslateRolesRemoved: []dax.TranslateRole{}, + Version: 3, }, { - Address: node2.Address, + Address: node1.Address, Method: dax.DirectiveMethodDiff, Tables: []*dax.QualifiedTable{ tbl0, }, - ComputeRoles: []dax.ComputeRole{ + ComputeRolesAdded: []dax.ComputeRole{ { TableKey: tbl0.Key(), - Shards: dax.NewShardNums(2), + Shards: dax.NewShardNums(1), }, }, - TranslateRoles: []dax.TranslateRole{}, - Version: 6, + ComputeRolesRemoved: []dax.ComputeRole{}, + TranslateRolesAdded: []dax.TranslateRole{}, + TranslateRolesRemoved: []dax.TranslateRole{}, + Version: 2, }, { - Address: node0.Address, + Address: node2.Address, Method: dax.DirectiveMethodDiff, Tables: []*dax.QualifiedTable{ tbl0, }, - ComputeRoles: []dax.ComputeRole{ + ComputeRolesAdded: []dax.ComputeRole{ { TableKey: tbl0.Key(), - Shards: dax.NewShardNums(0, 3), + Shards: dax.NewShardNums(2), }, }, - TranslateRoles: []dax.TranslateRole{}, - Version: 7, + ComputeRolesRemoved: []dax.ComputeRole{}, + TranslateRolesAdded: []dax.TranslateRole{}, + TranslateRolesRemoved: []dax.TranslateRole{}, + Version: 2, }, + } + assert.Equal(t, exp, director.flush()) + + addShards(t, ctx, con, tbl0.QualifiedID(), dax.NewShardNums(5, 8)...) + + exp = []*dax.Directive{ { Address: node1.Address, Method: dax.DirectiveMethodDiff, Tables: []*dax.QualifiedTable{ tbl0, }, - ComputeRoles: []dax.ComputeRole{ + ComputeRolesAdded: []dax.ComputeRole{ { TableKey: tbl0.Key(), - Shards: dax.NewShardNums(1, 5), + Shards: dax.NewShardNums(5), }, }, - TranslateRoles: []dax.TranslateRole{}, - Version: 8, + ComputeRolesRemoved: []dax.ComputeRole{}, + TranslateRolesAdded: []dax.TranslateRole{}, + TranslateRolesRemoved: []dax.TranslateRole{}, + Version: 3, }, { Address: node2.Address, @@ -293,14 +316,16 @@ func TestController(t *testing.T) { Tables: []*dax.QualifiedTable{ tbl0, }, - ComputeRoles: []dax.ComputeRole{ + ComputeRolesAdded: []dax.ComputeRole{ { TableKey: tbl0.Key(), - Shards: dax.NewShardNums(2, 8), + Shards: dax.NewShardNums(8), }, }, - TranslateRoles: []dax.TranslateRole{}, - Version: 9, + ComputeRolesRemoved: []dax.ComputeRole{}, + TranslateRolesAdded: []dax.TranslateRole{}, + TranslateRolesRemoved: []dax.TranslateRole{}, + Version: 3, }, } got = director.flush() @@ -311,11 +336,14 @@ func TestController(t *testing.T) { tbl1 := daxtest.TestQualifiedTable(t, qdbid, "bar", 0, false) assert.NoError(t, con.CreateTable(ctx, tbl1)) + exp = []*dax.Directive{} + assert.Equal(t, exp, director.flush()) + tbls = append(tbls, tbl1) sort.Sort(tbls) // Add more shards. - addShards(t, ctx, con, tbl1.QualifiedID(), dax.NewShardNums(3, 5, 8, 13)...) + addShards(t, ctx, con, tbl1.QualifiedID(), dax.NewShardNums(3, 5, 8)...) exp = []*dax.Directive{ { @@ -323,80 +351,76 @@ func TestController(t *testing.T) { Method: dax.DirectiveMethodDiff, Tables: []*dax.QualifiedTable{ tbls[0], - tbls[1], }, - ComputeRoles: []dax.ComputeRole{ + ComputeRolesAdded: []dax.ComputeRole{ { TableKey: tbls[0].Key(), Shards: dax.NewShardNums(3), }, - { - TableKey: tbls[1].Key(), - Shards: dax.NewShardNums(0, 3), - }, }, - TranslateRoles: []dax.TranslateRole{}, - Version: 10, + ComputeRolesRemoved: []dax.ComputeRole{}, + TranslateRolesAdded: []dax.TranslateRole{}, + TranslateRolesRemoved: []dax.TranslateRole{}, + Version: 4, }, { Address: node1.Address, Method: dax.DirectiveMethodDiff, Tables: []*dax.QualifiedTable{ tbls[0], - tbls[1], }, - ComputeRoles: []dax.ComputeRole{ + ComputeRolesAdded: []dax.ComputeRole{ { TableKey: tbls[0].Key(), Shards: dax.NewShardNums(5), }, - { - TableKey: tbls[1].Key(), - Shards: dax.NewShardNums(1, 5), - }, }, - TranslateRoles: []dax.TranslateRole{}, - Version: 11, + ComputeRolesRemoved: []dax.ComputeRole{}, + TranslateRolesAdded: []dax.TranslateRole{}, + TranslateRolesRemoved: []dax.TranslateRole{}, + Version: 4, }, { Address: node2.Address, Method: dax.DirectiveMethodDiff, Tables: []*dax.QualifiedTable{ tbls[0], - tbls[1], }, - ComputeRoles: []dax.ComputeRole{ + ComputeRolesAdded: []dax.ComputeRole{ { TableKey: tbls[0].Key(), Shards: dax.NewShardNums(8), }, - { - TableKey: tbls[1].Key(), - Shards: dax.NewShardNums(2, 8), - }, }, - TranslateRoles: []dax.TranslateRole{}, - Version: 12, + ComputeRolesRemoved: []dax.ComputeRole{}, + TranslateRolesAdded: []dax.TranslateRole{}, + TranslateRolesRemoved: []dax.TranslateRole{}, + Version: 4, }, + } + got = director.flush() + require.Equal(t, len(exp), len(got)) + require.Equal(t, exp, got) + + addShards(t, ctx, con, tbl1.QualifiedID(), dax.NewShardNums(13)...) + + exp = []*dax.Directive{ { Address: node0.Address, Method: dax.DirectiveMethodDiff, Tables: []*dax.QualifiedTable{ tbls[0], - tbls[1], }, - ComputeRoles: []dax.ComputeRole{ + ComputeRolesAdded: []dax.ComputeRole{ { TableKey: tbls[0].Key(), - Shards: dax.NewShardNums(3, 13), - }, - { - TableKey: tbls[1].Key(), - Shards: dax.NewShardNums(0, 3), + Shards: dax.NewShardNums(13), }, }, - TranslateRoles: []dax.TranslateRole{}, - Version: 13, + ComputeRolesRemoved: []dax.ComputeRole{}, + TranslateRolesAdded: []dax.TranslateRole{}, + TranslateRolesRemoved: []dax.TranslateRole{}, + Version: 5, }, } got = director.flush() @@ -411,21 +435,18 @@ func TestController(t *testing.T) { Address: node0.Address, Method: dax.DirectiveMethodDiff, Tables: []*dax.QualifiedTable{ - tbls[0], tbls[1], }, - ComputeRoles: []dax.ComputeRole{ - { - TableKey: tbls[0].Key(), - Shards: dax.NewShardNums(3, 13), - }, + ComputeRolesAdded: []dax.ComputeRole{ { TableKey: tbls[1].Key(), - Shards: dax.NewShardNums(0, 1, 3), + Shards: dax.NewShardNums(1), }, }, - TranslateRoles: []dax.TranslateRole{}, - Version: 14, + ComputeRolesRemoved: []dax.ComputeRole{}, + TranslateRolesAdded: []dax.TranslateRole{}, + TranslateRolesRemoved: []dax.TranslateRole{}, + Version: 6, }, { Address: node2.Address, @@ -434,18 +455,20 @@ func TestController(t *testing.T) { tbls[0], tbls[1], }, - ComputeRoles: []dax.ComputeRole{ + ComputeRolesAdded: []dax.ComputeRole{ { TableKey: tbls[0].Key(), - Shards: dax.NewShardNums(5, 8), + Shards: dax.NewShardNums(5), }, { TableKey: tbls[1].Key(), - Shards: dax.NewShardNums(2, 5, 8), + Shards: dax.NewShardNums(5), }, }, - TranslateRoles: []dax.TranslateRole{}, - Version: 15, + ComputeRolesRemoved: []dax.ComputeRole{}, + TranslateRolesAdded: []dax.TranslateRole{}, + TranslateRolesRemoved: []dax.TranslateRole{}, + Version: 5, }, } got = director.flush() @@ -458,7 +481,7 @@ func TestController(t *testing.T) { exp = []*dax.Directive{ { Address: node2.Address, - Method: dax.DirectiveMethodDiff, + Method: dax.DirectiveMethodFull, Tables: []*dax.QualifiedTable{ tbls[0], tbls[1], @@ -474,7 +497,7 @@ func TestController(t *testing.T) { }, }, TranslateRoles: []dax.TranslateRole{}, - Version: 16, + Version: 6, }, } got = director.flush() @@ -521,7 +544,7 @@ func TestController(t *testing.T) { }, }, TranslateRoles: []dax.TranslateRole{}, - Version: 17, + Version: 1, }, } got = director.flush() @@ -534,7 +557,7 @@ func TestController(t *testing.T) { exp = []*dax.Directive{ { Address: node3.Address, - Method: dax.DirectiveMethodDiff, + Method: dax.DirectiveMethodFull, Tables: []*dax.QualifiedTable{ tbls[0], tbls[1], @@ -550,7 +573,7 @@ func TestController(t *testing.T) { }, }, TranslateRoles: []dax.TranslateRole{}, - Version: 18, + Version: 2, }, } got = director.flush() @@ -565,7 +588,7 @@ func TestController(t *testing.T) { exp = []*dax.Directive{ { Address: node3.Address, - Method: dax.DirectiveMethodDiff, + Method: dax.DirectiveMethodFull, Tables: []*dax.QualifiedTable{ tbls[0], tbls[1], @@ -581,7 +604,7 @@ func TestController(t *testing.T) { }, }, TranslateRoles: []dax.TranslateRole{}, - Version: 19, + Version: 3, }, } got = director.flush() @@ -594,7 +617,7 @@ func TestController(t *testing.T) { exp = []*dax.Directive{ { Address: node3.Address, - Method: dax.DirectiveMethodDiff, + Method: dax.DirectiveMethodFull, Tables: []*dax.QualifiedTable{ tbls[0], }, @@ -605,7 +628,7 @@ func TestController(t *testing.T) { }, }, TranslateRoles: []dax.TranslateRole{}, - Version: 20, + Version: 4, }, } got = director.flush() @@ -690,7 +713,7 @@ func TestController(t *testing.T) { exp = []*dax.Directive{ { Address: node0.Address, - Method: dax.DirectiveMethodDiff, + Method: dax.DirectiveMethodFull, Tables: []*dax.QualifiedTable{ tbl0, }, @@ -727,7 +750,7 @@ func TestController(t *testing.T) { Tables: []*dax.QualifiedTable{}, ComputeRoles: []dax.ComputeRole{}, TranslateRoles: []dax.TranslateRole{}, - Version: 3, + Version: 1, }, } assert.Equal(t, exp, director.flush()) @@ -743,7 +766,7 @@ func TestController(t *testing.T) { exp = []*dax.Directive{ { Address: node0.Address, - Method: dax.DirectiveMethodDiff, + Method: dax.DirectiveMethodFull, Tables: []*dax.QualifiedTable{ tbl0, }, @@ -754,11 +777,11 @@ func TestController(t *testing.T) { Partitions: dax.NewPartitionNums(0, 1, 2), }, }, - Version: 4, + Version: 3, }, { Address: node1.Address, - Method: dax.DirectiveMethodDiff, + Method: dax.DirectiveMethodFull, Tables: []*dax.QualifiedTable{ tbl0, }, @@ -769,7 +792,7 @@ func TestController(t *testing.T) { Partitions: dax.NewPartitionNums(3, 5, 7), }, }, - Version: 5, + Version: 2, }, { Address: node2.Address, @@ -784,7 +807,7 @@ func TestController(t *testing.T) { Partitions: dax.NewPartitionNums(4, 6), }, }, - Version: 6, + Version: 1, }, } assert.Equal(t, exp, director.flush()) @@ -803,7 +826,7 @@ func TestController(t *testing.T) { exp = []*dax.Directive{ { Address: node0.Address, - Method: dax.DirectiveMethodDiff, + Method: dax.DirectiveMethodFull, Tables: []*dax.QualifiedTable{ tbls[0], tbls[1], @@ -819,11 +842,11 @@ func TestController(t *testing.T) { Partitions: dax.NewPartitionNums(0, 1, 2), }, }, - Version: 7, + Version: 4, }, { Address: node1.Address, - Method: dax.DirectiveMethodDiff, + Method: dax.DirectiveMethodFull, Tables: []*dax.QualifiedTable{ tbls[0], tbls[1], @@ -839,11 +862,11 @@ func TestController(t *testing.T) { Partitions: dax.NewPartitionNums(3, 5, 7), }, }, - Version: 8, + Version: 3, }, { Address: node2.Address, - Method: dax.DirectiveMethodDiff, + Method: dax.DirectiveMethodFull, Tables: []*dax.QualifiedTable{ tbls[0], tbls[1], @@ -859,7 +882,7 @@ func TestController(t *testing.T) { Partitions: dax.NewPartitionNums(4, 6), }, }, - Version: 9, + Version: 2, }, } assert.Equal(t, exp, director.flush()) @@ -871,7 +894,7 @@ func TestController(t *testing.T) { exp = []*dax.Directive{ { Address: node0.Address, - Method: dax.DirectiveMethodDiff, + Method: dax.DirectiveMethodFull, Tables: []*dax.QualifiedTable{ tbls[0], }, @@ -882,11 +905,11 @@ func TestController(t *testing.T) { Partitions: dax.NewPartitionNums(1, 4, 7, 10, 13, 16, 19, 22), }, }, - Version: 10, + Version: 5, }, { Address: node1.Address, - Method: dax.DirectiveMethodDiff, + Method: dax.DirectiveMethodFull, Tables: []*dax.QualifiedTable{ tbls[0], }, @@ -897,11 +920,11 @@ func TestController(t *testing.T) { Partitions: dax.NewPartitionNums(2, 5, 8, 11, 14, 17, 20, 23), }, }, - Version: 11, + Version: 4, }, { Address: node2.Address, - Method: dax.DirectiveMethodDiff, + Method: dax.DirectiveMethodFull, Tables: []*dax.QualifiedTable{ tbls[0], }, @@ -912,7 +935,7 @@ func TestController(t *testing.T) { Partitions: dax.NewPartitionNums(0, 3, 6, 9, 12, 15, 18, 21), }, }, - Version: 12, + Version: 3, }, } assert.Equal(t, exp, director.flush()) diff --git a/dax/controller/sqldb/directiveversion.go b/dax/controller/sqldb/directiveversion.go index 4f776d777..001ee6ce4 100644 --- a/dax/controller/sqldb/directiveversion.go +++ b/dax/controller/sqldb/directiveversion.go @@ -20,16 +20,51 @@ type directiveVersion struct { log logger.Logger } -func (d *directiveVersion) Increment(tx dax.Transaction, delta uint64) (uint64, error) { +func (d *directiveVersion) GetCurrent(tx dax.Transaction, addr dax.Address) (uint64, error) { dt, ok := tx.(*DaxTransaction) if !ok { return 0, dax.NewErrInvalidTransaction("*sqldb.DaxTransaction") } - // table is pre-populated w/ a single record w/ ID=1 during schema migration dv := &models.DirectiveVersion{} - err := dt.C.RawQuery("UPDATE directive_versions SET version = version + ? WHERE id = ? RETURNING id, version", delta, 1).First(dv) + err := dt.C.Find(dv, addr) + if err == nil { + return uint64(dv.Version), nil + } + + // If there is not yet a record for address, create one and return 0 as the + // "current version". + if err.Error() == "sql: no rows in result set" { + dv.ID = string(addr) + if err := dt.C.Create(dv); err != nil { + return 0, errors.Wrapf(err, "creating directive_version for address: %s", addr) + } + return 0, nil + } + + return 0, errors.Wrapf(err, "finding directive_version for address: %s", addr) +} + +func (d *directiveVersion) SetNext(tx dax.Transaction, addr dax.Address, current, next uint64) error { + dt, ok := tx.(*DaxTransaction) + if !ok { + return dax.NewErrInvalidTransaction("*sqldb.DaxTransaction") + } + + dv := &models.DirectiveVersion{} + + // Table is assumed to be pre-populated by a previous call to GetCurrent. We + // use the postgres specific "RETURNING" along with `.First()` to ensure + // that a record was updated. If no record matches the WHERE clause, then + // RETURNING would return a result set with 0 records, which causes + // `.First()` to return an error. + err := dt.C.RawQuery(` + UPDATE directive_versions + SET version = ?, updated_at = NOW() + WHERE id = ? + AND version = ? + RETURNING id, version`, next, addr, current).First(dv) if err != nil { - return 0, errors.Wrap(err, "updating directive_version") + return errors.Wrapf(err, "updating directive_version for address: %s", addr) } - return uint64(dv.Version), nil + return nil } diff --git a/dax/controller/sqldb/directiveversion_test.go b/dax/controller/sqldb/directiveversion_test.go new file mode 100644 index 000000000..8b853c675 --- /dev/null +++ b/dax/controller/sqldb/directiveversion_test.go @@ -0,0 +1,51 @@ +package sqldb_test + +import ( + "context" + "testing" + + "github.com/featurebasedb/featurebase/v3/dax" + "github.com/featurebasedb/featurebase/v3/dax/controller/sqldb" + "github.com/featurebasedb/featurebase/v3/logger" + "github.com/stretchr/testify/require" +) + +func TestDirectiveVersion(t *testing.T) { + t.Run("GetAndSet", func(t *testing.T) { + trans, err := sqldb.NewTransactor(sqldb.GetTestConfigRandomDB("directive_version"), logger.StderrLogger) // TODO running migrations takes kind of a long time, consolidate w/ other SQL tests + require.NoError(t, err, "connecting") + require.NoError(t, trans.Start()) + + tx, err := trans.BeginTx(context.Background(), true) + require.NoError(t, err, "getting transaction") + + defer func() { + err := tx.Rollback() + if err != nil { + t.Logf("rolling back: %v", err) + } + }() + + addr := dax.Address("address1") + + dvSvc := sqldb.NewDirectiveVersion(nil) + + // Get the current version; this returns 0 because a record for addr + // didn't exist and so it was created. + n, err := dvSvc.GetCurrent(tx, addr) + require.NoError(t, err) + require.Equal(t, uint64(0), n) + + // Set next version to n+1 = 1. + require.NoError(t, dvSvc.SetNext(tx, addr, n, n+1)) + + // Get the version again and make sure we get the 1 that was set. + n, err = dvSvc.GetCurrent(tx, addr) + require.NoError(t, err) + require.Equal(t, uint64(1), n) + + // Try to set next version with an incorrect current version (999) and + // ensure we get an error. + require.Error(t, dvSvc.SetNext(tx, addr, 999, n+1)) + }) +} diff --git a/dax/controller/sqldb/freejob.go b/dax/controller/sqldb/freejob.go index 6fbc714cb..d578111e1 100644 --- a/dax/controller/sqldb/freejob.go +++ b/dax/controller/sqldb/freejob.go @@ -28,16 +28,38 @@ func (fj *freeJobService) CreateJobs(tx dax.Transaction, roleType dax.RoleType, if !ok { return dax.NewErrInvalidTransaction("*sqldb.DaxTransaction") } - jobs := make(models.Jobs, len(job)) - for i, j := range job { - jobs[i] = models.Job{ + + // jobNames is used as input to the "name in (...)" query. + jobNames := make([]interface{}, 0, len(job)) + for i := range job { + jobNames = append(jobNames, job[i].Job()) + } + + // existing will contain the list of jobs which already exist. + existing := &models.Jobs{} + if err := dt.C.Where("name in (?)", jobNames...).All(existing); err != nil { + return errors.Wrap(err, "getting existing jobs") + } + + jobs := make(models.Jobs, 0, len(job)) + for _, j := range job { + // Check to be sure this job doesn't already exist. + if existing.Contains(j) { + continue + } + jobs = append(jobs, models.Job{ Name: j, Role: roleType, DatabaseID: qdbid.DatabaseID, - } + }) } + + if len(jobs) == 0 { + return nil + } + err := dt.C.Create(jobs) - return errors.Wrap(err, "creating jobs") + return errors.Wrap(err, "creating free jobs") } func (fj *freeJobService) DeleteJob(tx dax.Transaction, roleType dax.RoleType, qdbid dax.QualifiedDatabaseID, job dax.Job) error { diff --git a/dax/controller/sqldb/migrator.go b/dax/controller/sqldb/migrator.go index d93598412..3131bb29a 100644 --- a/dax/controller/sqldb/migrator.go +++ b/dax/controller/sqldb/migrator.go @@ -5,6 +5,7 @@ import ( "io/fs" "strings" + "github.com/featurebasedb/featurebase/v3/errors" "github.com/featurebasedb/featurebase/v3/logger" "github.com/gobuffalo/pop/v6" ) @@ -58,39 +59,45 @@ func NewEmbedMigrator(fs fs.FS, c *pop.Connection, log logger.Logger) (*EmbedMig func (fm *EmbedMigrator) findMigrations(runner func(mf pop.Migration, tx *pop.Connection) error) error { return fs.WalkDir(fm.FS, "migrations", func(path string, d fs.DirEntry, err error) error { - fmt.Printf("walking path: %s, d: %v, err: %v\n", path, d, err) - if !d.IsDir() { - match, err := pop.ParseMigrationFilename(d.Name()) - if err != nil { - if strings.HasPrefix(err.Error(), "unsupported dialect") { - fm.log.Warnf("ignoring migration file with %s", err.Error()) - return nil - } - return err - } - if match == nil { - fm.log.Warnf("ignoring file %s because it does not match the migration file pattern", d.Name()) + if err != nil { + return errors.Wrap(err, "walking dir") + } + + if d.IsDir() { + return nil + } + + match, err := pop.ParseMigrationFilename(d.Name()) + if err != nil { + if strings.HasPrefix(err.Error(), "unsupported dialect") { + fm.log.Warnf("ignoring migration file with %s", err.Error()) return nil } - mf := pop.Migration{ - Path: path, - Version: match.Version, - Name: match.Name, - DBType: match.DBType, - Direction: match.Direction, - Type: match.Type, - Runner: runner, - } - switch mf.Direction { - case "up": - fm.UpMigrations.Migrations = append(fm.UpMigrations.Migrations, mf) - case "down": - fm.DownMigrations.Migrations = append(fm.DownMigrations.Migrations, mf) - default: - // the regex only matches `(up|down)` for direction, so a panic here is appropriate - panic("got unknown migration direction " + mf.Direction) - } + return err + } + if match == nil { + fm.log.Warnf("ignoring file %s because it does not match the migration file pattern", d.Name()) + return nil + } + mf := pop.Migration{ + Path: path, + Version: match.Version, + Name: match.Name, + DBType: match.DBType, + Direction: match.Direction, + Type: match.Type, + Runner: runner, } + switch mf.Direction { + case "up": + fm.UpMigrations.Migrations = append(fm.UpMigrations.Migrations, mf) + case "down": + fm.DownMigrations.Migrations = append(fm.DownMigrations.Migrations, mf) + default: + // the regex only matches `(up|down)` for direction, so a panic here is appropriate + panic("got unknown migration direction " + mf.Direction) + } + return nil }) } diff --git a/dax/controller/sqldb/transactor.go b/dax/controller/sqldb/transactor.go index 9e0ff0c37..e11330cbc 100644 --- a/dax/controller/sqldb/transactor.go +++ b/dax/controller/sqldb/transactor.go @@ -2,7 +2,6 @@ package sqldb import ( "context" - "strings" "database/sql" @@ -67,16 +66,14 @@ func (t Transactor) Start() error { return errors.Wrap(err, "migrating DB") } - err := t.RawQuery("INSERT INTO directive_versions (id, version, created_at, updated_at) VALUES (1, 0, '1970-01-01T00:00', '1970-01-01T00:00')").Exec() - if err != nil && !strings.Contains(err.Error(), "duplicate key value violates unique constraint") { - return errors.Wrap(err, "unexpected error (re)inserting directive_version record") - } - return nil } func (t Transactor) BeginTx(ctx context.Context, writable bool) (dax.Transaction, error) { - cn, err := t.NewTransactionContextOptions(ctx, &sql.TxOptions{ReadOnly: !writable}) + cn, err := t.NewTransactionContextOptions(ctx, &sql.TxOptions{ + Isolation: sql.LevelRepeatableRead, + ReadOnly: !writable, + }) if err != nil { return nil, errors.Wrap(err, "getting SQL transaction") } diff --git a/dax/controller/transactor.go b/dax/controller/transactor.go deleted file mode 100644 index 4c72c4c77..000000000 --- a/dax/controller/transactor.go +++ /dev/null @@ -1,17 +0,0 @@ -package controller - -import ( - "context" - - "github.com/featurebasedb/featurebase/v3/dax" -) - -type Transactor interface { - // Start is useful for Transactor implementations which need to establish a - // connection. We don't want to do that in the NewImplementation() function; - // we want that to happen upon Start(). - Start() error - - BeginTx(ctx context.Context, writable bool) (dax.Transaction, error) - Close() error -} diff --git a/dax/directive.go b/dax/directive.go index 4247ea0ef..f76dc60d6 100644 --- a/dax/directive.go +++ b/dax/directive.go @@ -1,5 +1,7 @@ package dax +import "sort" + // Directive contains the instructions, sent from the Controller, which a // compute node is to follow. A Directive is typically JSON-encoded and POSTed // to a compute node's `/directive` endpoint. @@ -16,11 +18,30 @@ type Directive struct { ComputeRoles []ComputeRole `json:"compute-roles"` TranslateRoles []TranslateRole `json:"translate-roles"` + // The following members are used by DirectiveMethodDiff. They inlude only + // those roles which have changed, as opposed to the entire role set for the + // worker. + ComputeRolesAdded []ComputeRole `json:"compute-roles-added"` + ComputeRolesRemoved []ComputeRole `json:"compute-roles-removed"` + TranslateRolesAdded []TranslateRole `json:"translate-roles-added"` + TranslateRolesRemoved []TranslateRole `json:"translate-roles-removed"` + Version uint64 `json:"version"` } +// DirectiveVersion defines how the buildDirective step of the controller gets +// the next directive version. It's important that the two methods on this +// interface are not consolidated into a single step, because we use each method +// as a sort of lock/unlock to ensure that only one directive (per address) is +// built at a time. Since we always to the `GetCurrent()` call at the beginning +// of buildDirective, if two directives are being build for the same address +// concurrently, then when one of the calls `SetNext()`, the RepeatableRead +// isolation level enforced on the transaction will cause the latest call to +// fail since the value of version will have changed since it was first read at +// the beginning of its transaction. type DirectiveVersion interface { - Increment(tx Transaction, delta uint64) (uint64, error) + GetCurrent(tx Transaction, addr Address) (uint64, error) + SetNext(tx Transaction, addr Address, current, next uint64) error } // DirectiveMethod is used to tell the compute node how it should handle the @@ -28,8 +49,15 @@ type DirectiveVersion interface { type DirectiveMethod string const ( - // DirectiveMethodDiff tells the compute node to diff the Directive with its - // local, cached Directive and only apply the differences. + // DirectiveMethodFull tells the compute node consider the Directive as the + // full, complete state to which it should adhere. It should diff the + // Directive with its local, cached Directive and only apply the + // differences. + DirectiveMethodFull DirectiveMethod = "full" + + // DirectiveMethodFull includes only diffs. The compute node should keep + // everything about its existing state the same, and just apply the diffs in + // the Directive. DirectiveMethodDiff DirectiveMethod = "diff" // DirectiveMethodReset tells the compute node to delete all of its existing @@ -91,21 +119,23 @@ func (d *Directive) ComputeShardsMap() map[TableKey]ShardNums { return m } -// TranslatePartitions returns the list of partitions, for the given table, for -// which this translate node is responsible. It assumes that the Directive does -// not contain more than one TranslateRole for the same table; in that case, we -// would need to return the union of Shards. -func (d *Directive) TranslatePartitions(tbl TableKey) PartitionNums { - if d == nil || d.TranslateRoles == nil { - return PartitionNums{} +// computeShardsMapOfMaps returns a map of TableKey to a map of ShardNum in +// order to support adding and removing shards as distinct values. This map can +// then be converted back to a slice of ShardNum. +func (d *Directive) computeShardsMapOfMaps() map[TableKey]map[ShardNum]struct{} { + m := make(map[TableKey]map[ShardNum]struct{}) + if d == nil || d.ComputeRoles == nil { + return m } - for _, tr := range d.TranslateRoles { - if tr.TableKey == tbl { - return tr.Partitions + for _, cr := range d.ComputeRoles { + m[cr.TableKey] = make(map[ShardNum]struct{}) + for _, shardNum := range cr.Shards { + m[cr.TableKey][shardNum] = struct{}{} } } - return PartitionNums{} + + return m } // TranslatePartitionsMap returns a map of table to partitions. It assumes that @@ -130,6 +160,53 @@ func (d *Directive) TranslatePartitionsMap() map[TableKey]PartitionNums { return m } +// translatePartitionsMapOfMaps returns a map of TableKey to a map of +// PartitionNum in order to support adding and removing partitions as distinct +// values. This map can then be converted back to a slice of PartitionNum. +func (d *Directive) translatePartitionsMapOfMaps() map[TableKey]map[PartitionNum]struct{} { + m := make(map[TableKey]map[PartitionNum]struct{}) + if d == nil || d.TranslateRoles == nil { + return m + } + + for _, tr := range d.TranslateRoles { + // Since we added FieldVersions to the TranslateRole, it's possible for + // a TranslateRole to have an empty Partitions list. In that case, we + // want to exclude that from the map. + if len(tr.Partitions) == 0 { + continue + } + m[tr.TableKey] = make(map[PartitionNum]struct{}) + for _, partitionNum := range tr.Partitions { + m[tr.TableKey][partitionNum] = struct{}{} + } + } + + return m +} + +// translateFieldsMapOfMaps returns a map of TableKey to a map of FieldName in +// order to support adding and removing fields as distinct values. This map can +// then be converted back to a slice of FieldName. +func (d *Directive) translateFieldsMapOfMaps() map[TableKey]map[FieldName]struct{} { + m := make(map[TableKey]map[FieldName]struct{}) + if d == nil || d.TranslateRoles == nil { + return m + } + + for _, tr := range d.TranslateRoles { + if len(tr.Fields) == 0 { + continue + } + m[tr.TableKey] = make(map[FieldName]struct{}) + for _, fname := range tr.Fields { + m[tr.TableKey][fname] = struct{}{} + } + } + + return m +} + // TranslateFieldsMap returns a map of table to fields. It assumes that // the Directive does not contain more than one TranslateRole for the same // table; in that case, we would need to return the union of FieldValues. @@ -171,9 +248,167 @@ func (d *Directive) IsEmpty() bool { return true } +// Copy returns a copy of Directive. +func (d *Directive) Copy() *Directive { + ret := &Directive{ + Address: d.Address, + Method: d.Method, + Version: d.Version, + } + ret.Tables = append(ret.Tables, d.Tables...) + ret.ComputeRoles = append(ret.ComputeRoles, d.ComputeRoles...) + ret.TranslateRoles = append(ret.TranslateRoles, d.TranslateRoles...) + // We intenionally do not copy the `Added` and `Removed` members because + // those are not necessary to keep in the cached Directive (which just needs + // to include the full Directive); they are only required when sending the + // diff Directive. + return ret +} + +// ApplyDiff applies the diffs specified in diff to d. +func (d *Directive) ApplyDiff(diff *Directive) *Directive { + // Add any tables which are included in diff but not in d. We don't remove + // tables based on a diff. + for _, qtbl := range diff.Tables { + if t, _ := d.Table(qtbl.QualifiedID()); t == nil { + d.Tables = append(d.Tables, qtbl) + } + } + + // cmap is a map of map used to apply the directive diffs. We will convert + // the final map to the ComputeRoles member in the returned Directive. + cmap := d.computeShardsMapOfMaps() + + // Handle ComputeRolesAdded + for _, crole := range diff.ComputeRolesAdded { + if _, ok := cmap[crole.TableKey]; !ok { + cmap[crole.TableKey] = make(map[ShardNum]struct{}) + } + for _, shardNum := range crole.Shards { + cmap[crole.TableKey][shardNum] = struct{}{} + } + } + + // Handle ComputeRolesRemoved + for _, crole := range diff.ComputeRolesRemoved { + if _, ok := cmap[crole.TableKey]; !ok { + continue + } + for _, shardNum := range crole.Shards { + delete(cmap[crole.TableKey], shardNum) + } + } + + // Convert cmap back to d.ComputeRoles. + croles := make([]ComputeRole, 0, len(cmap)) + for tkey, smap := range cmap { + shards := make([]ShardNum, 0, len(smap)) + for s := range smap { + shards = append(shards, s) + } + sort.Slice(shards, func(i, j int) bool { return shards[i] < shards[j] }) + croles = append(croles, ComputeRole{ + TableKey: tkey, + Shards: shards, + }) + } + // Sort croles by table. + sort.Slice(croles, func(i, j int) bool { return croles[i].TableKey < croles[j].TableKey }) + d.ComputeRoles = croles + + // tmap is a map of map used to apply the directive diffs. We will convert + // the final map to the TranslateRoles member in the returned Directive. + tmap := d.translatePartitionsMapOfMaps() + + // tmapf is a map of map, specific to translate fields, used to apply the + // directive diffs. We will convert the final map to the TranslateRoles + // member in the returned Directive. + tmapf := d.translateFieldsMapOfMaps() + + // Handle TransateRolesAdded + for _, trole := range diff.TranslateRolesAdded { + if len(trole.Fields) > 0 { + // Fields. + if _, ok := tmapf[trole.TableKey]; !ok { + tmapf[trole.TableKey] = make(map[FieldName]struct{}) + } + for _, fname := range trole.Fields { + tmapf[trole.TableKey][fname] = struct{}{} + } + } else { + // Partitions. + if _, ok := tmap[trole.TableKey]; !ok { + tmap[trole.TableKey] = make(map[PartitionNum]struct{}) + } + for _, partitionNum := range trole.Partitions { + tmap[trole.TableKey][partitionNum] = struct{}{} + } + } + } + + // Handle TranslateRolesRemoved + for _, trole := range diff.TranslateRolesRemoved { + if len(trole.Fields) > 0 { + // Fields. + if _, ok := tmapf[trole.TableKey]; !ok { + continue + } + for _, fname := range trole.Fields { + delete(tmapf[trole.TableKey], fname) + } + } else { + // Partitions. + if _, ok := tmap[trole.TableKey]; !ok { + continue + } + for _, partitionNum := range trole.Partitions { + delete(tmap[trole.TableKey], partitionNum) + } + } + } + + // Convert tmap back to d.TranslateRoles. + troles := make([]TranslateRole, 0, len(tmap)+len(tmapf)) + for tkey, pmap := range tmap { + partitions := make([]PartitionNum, 0, len(pmap)) + for p := range pmap { + partitions = append(partitions, p) + } + sort.Slice(partitions, func(i, j int) bool { return partitions[i] < partitions[j] }) + troles = append(troles, TranslateRole{ + TableKey: tkey, + Partitions: partitions, + }) + } + for tkey, fmap := range tmapf { + fields := make([]FieldName, 0, len(fmap)) + for f := range fmap { + fields = append(fields, f) + } + sort.Slice(fields, func(i, j int) bool { return fields[i] < fields[j] }) + troles = append(troles, TranslateRole{ + TableKey: tkey, + Fields: fields, + }) + } + + // Sort troles by table. + sort.Slice(troles, func(i, j int) bool { return troles[i].TableKey < troles[j].TableKey }) + d.TranslateRoles = troles + + // It doesn't really matter that we set method on the directive to be + // cached, but we do it just for informational purposes. + d.Method = diff.Method + + // Finally, be sure to use the incoming version, not the version from d. + d.Version = diff.Version + + return d +} + // Directives is a sortable slice of Directive. type Directives []*Directive func (d Directives) Len() int { return len(d) } -func (d Directives) Less(i, j int) bool { return d[i].Version < d[j].Version } +func (d Directives) Less(i, j int) bool { return d[i].Address < d[j].Address } func (d Directives) Swap(i, j int) { d[i], d[j] = d[j], d[i] } diff --git a/dax/directive_version_test.go b/dax/directive_version_test.go deleted file mode 100644 index efb701a17..000000000 --- a/dax/directive_version_test.go +++ /dev/null @@ -1,32 +0,0 @@ -package dax_test - -import ( - "context" - "testing" - - "github.com/featurebasedb/featurebase/v3/dax/controller/sqldb" - "github.com/featurebasedb/featurebase/v3/logger" - "github.com/stretchr/testify/require" -) - -func TestDirectiveVersion(t *testing.T) { - trans, err := sqldb.NewTransactor(sqldb.GetTestConfigRandomDB("directive_version"), logger.StderrLogger) // TODO running migrations takes kind of a long time, consolidate w/ other SQL tests - require.NoError(t, err, "connecting") - require.NoError(t, trans.Start()) - - tx, err := trans.BeginTx(context.Background(), true) - require.NoError(t, err, "getting transaction") - - defer func() { - err := tx.Rollback() - if err != nil { - t.Logf("rolling back: %v", err) - } - }() - - dvSvc := sqldb.NewDirectiveVersion(nil) - - n, err := dvSvc.Increment(tx, 1) - require.NoError(t, err) - require.Equal(t, uint64(1), n) -} diff --git a/dax/migrations/001_initial.down.fizz b/dax/migrations/001_initial.down.fizz index 53690d78c..ae2a56482 100644 --- a/dax/migrations/001_initial.down.fizz +++ b/dax/migrations/001_initial.down.fizz @@ -1,4 +1,9 @@ -drop_table("columns") -drop_table("tables") +drop_table("organizations") drop_table("databases") -drop_table("organizations") \ No newline at end of file +drop_table("tables") +drop_table("columns") +drop_table("nodes") +drop_table("node_roles") +drop_table("workers") +drop_table("jobs") +drop_table("directive_versions") \ No newline at end of file diff --git a/dax/migrations/001_initial.up.fizz b/dax/migrations/001_initial.up.fizz index 9c042f089..de2dc6253 100644 --- a/dax/migrations/001_initial.up.fizz +++ b/dax/migrations/001_initial.up.fizz @@ -43,40 +43,42 @@ create_table("columns") { } create_table("nodes") { - t.Column("id", "uuid", {primary: true}) + t.Column("id", "uuid", {primary: true}) t.Column("address", "string") t.Timestamps() } create_table("node_roles") { - t.Column("id", "uuid", {primary: true}) - t.Column("node_id", "uuid") + t.Column("id", "uuid", {primary: true}) + t.Column("node_id", "uuid") t.Column("role", "string") t.ForeignKey("node_id", {"nodes": ["id"]}, {"on_delete": "cascade"}) t.Timestamps() } create_table("workers") { - t.Column("id", "uuid", {primary: true}) + t.Column("id", "uuid", {primary: true}) t.Column("address", "string") t.Column("role", "string") - t.Column("database_id", "string", {"null": true}) + t.Column("database_id", "string", {"null": true}) t.ForeignKey("database_id", {"databases": ["id"]}, {"null": true}) } create_table("jobs") { - t.Column("id", "uuid", {primary: true}) + t.Column("id", "uuid", {primary: true}) t.Column("name", "string") t.Column("role", "string") - t.Column("worker_id", "uuid", {"null": true}) + t.Column("worker_id", "uuid", {"null": true}) t.ForeignKey("worker_id", {"workers": ["id"]}, {"null": true}) - t.Column("database_id", "string") + t.Column("database_id", "string") t.ForeignKey("database_id", {"databases": ["id"]}, {"on_delete": "cascade"}) t.Timestamps() } +add_index("jobs", ["database_id", "name"], {"unique": true}) + create_table("directive_versions") { - t.Column("id", "int", {primary: true}) + t.Column("id", "int", {primary: true}) t.Column("version", "int") t.Timestamps() } diff --git a/dax/migrations/002_directiveversion_by_address.down.fizz b/dax/migrations/002_directiveversion_by_address.down.fizz new file mode 100644 index 000000000..e69de29bb diff --git a/dax/migrations/002_directiveversion_by_address.up.fizz b/dax/migrations/002_directiveversion_by_address.up.fizz new file mode 100644 index 000000000..f223299e9 --- /dev/null +++ b/dax/migrations/002_directiveversion_by_address.up.fizz @@ -0,0 +1,13 @@ +create_table("directive_versions_tmp") { + t.Column("id", "string", {primary: true}) + t.Column("version", "int") + t.Timestamps() +} + +sql("insert into directive_versions_tmp (id, version, created_at, updated_at) select address, 0, created_at, updated_at from workers where role = 'compute';") + +sql("update directive_versions_tmp set version = (select version from directive_versions where id = 1);") + +drop_table("directive_versions") + +rename_table("directive_versions_tmp", "directive_versions") diff --git a/dax/models/directiveversion.go b/dax/models/directiveversion.go index 218c3947a..8c5ad44c1 100644 --- a/dax/models/directiveversion.go +++ b/dax/models/directiveversion.go @@ -7,7 +7,7 @@ import ( // DirectiveVersion holds what version the current directive is type DirectiveVersion struct { - ID int `json:"id" db:"id"` + ID string `json:"id" db:"id"` Version int `json:"version" db:"version"` CreatedAt time.Time `json:"created_at" db:"created_at"` UpdatedAt time.Time `json:"updated_at" db:"updated_at"` diff --git a/dax/models/job.go b/dax/models/job.go index fd8bac483..44d050ef5 100644 --- a/dax/models/job.go +++ b/dax/models/job.go @@ -39,6 +39,16 @@ func (t Jobs) String() string { return string(jt) } +// Contains returns true if j is in Jobs. +func (t Jobs) Contains(j dax.Job) bool { + for i := range t { + if t[i].Name == j { + return true + } + } + return false +} + // Validate gets run every time you call a "pop.Validate*" (pop.ValidateAndSave, pop.ValidateAndCreate, pop.ValidateAndUpdate) method. // This method is not required and may be deleted. func (t *Job) Validate(tx *pop.Connection) (*validate.Errors, error) { diff --git a/dax/server/test/managed.go b/dax/server/test/managed.go index c7fa3575b..c9988caea 100644 --- a/dax/server/test/managed.go +++ b/dax/server/test/managed.go @@ -256,19 +256,11 @@ func MustRunManagedCommand(tb testing.TB, opts ...server.CommandOption) *Managed // after). This has the advantage that if the tests fail partway // through, you can inspect the state of the database for // debugging purposes. - err := mc.trans.TruncateAll() - if err != nil { + if err := mc.trans.TruncateAll(); err != nil { tb.Fatalf("truncating DB: %v", err) } - // The migrations contain an insert, but since we just truncated everything we need to redo that insert. - err = mc.trans.RawQuery("INSERT INTO directive_versions (id, version, created_at, updated_at) VALUES (1, 1, '1970-01-01T00:00', '1970-01-01T00:00')").Exec() - if err != nil { - tb.Fatalf("reinserting directive_version record after truncation: %v", err) - } - - err = mc.trans.Close() - if err != nil { + if err := mc.trans.Close(); err != nil { tb.Fatalf("Closing conn after truncating all tables: %v", err) } diff --git a/dax/test/dax/dax_test.go b/dax/test/dax/dax_test.go index b9415850d..1e9d0d215 100644 --- a/dax/test/dax/dax_test.go +++ b/dax/test/dax/dax_test.go @@ -371,7 +371,7 @@ func TestDAXIntegration(t *testing.T) { qtid, err := controllerClient.TableID(ctx, qdbid, dax.TableName(defs.Keyed.Name(0))) assert.NoError(t, err) - controllerClient.SnapshotTable(ctx, qtid) + assert.NoError(t, controllerClient.SnapshotTable(ctx, qtid)) // Ingest more data. t.Run("ingest and query more data", func(t *testing.T) { @@ -692,7 +692,7 @@ func TestDAXIntegration(t *testing.T) { cfg.Computer.N = 4 opt := server.OptCommandConfig(cfg) mc := test.MustRunManagedCommand(t, opt) - + defer mc.Close() svcmgr := mc.Manage() // Set up Controller client. diff --git a/dax/transaction.go b/dax/transaction.go index 0e8e0f598..338da6641 100644 --- a/dax/transaction.go +++ b/dax/transaction.go @@ -1,9 +1,103 @@ package dax -import "context" +import ( + "context" + "strings" + + "github.com/featurebasedb/featurebase/v3/errors" +) type Transaction interface { Commit() error Context() context.Context Rollback() error } + +type Transactor interface { + // Start is useful for Transactor implementations which need to establish a + // connection. We don't want to do that in the NewImplementation() function; + // we want that to happen upon Start(). + Start() error + + BeginTx(ctx context.Context, writable bool) (Transaction, error) + Close() error +} + +const ( + // postgresTxConflictError occurs any time a transaction violates the + // repeatable isolation level. + postgresTxConflictError = "(SQLSTATE 40001)" + + // postgresDuplicateKeyContraint occurs when two concurrent transactions try + // to create the same record, causing one of them to violate a key + // constraint. + postgresDuplicateKeyContraint = "(SQLSTATE 23505)" +) + +// txFunc is the function signature for a function which can be retried using +// the RetryWithTx function. +type txFunc func(tx Transaction, writable bool) error + +// RetryWithTx will retry the txFunc up to maxTries, or a try succeeds, +// whichever comes first. If writable is set to true, RetryWithTx will use a +// writable transaction for each try, and attempt to Commit the transaction. If +// the transaction fails with an error related to invalid serialization, and +// there are still tries remaining, the transaction will be retried. +func RetryWithTx(ctx context.Context, trans Transactor, fn txFunc, writable bool, maxTries int) error { + // stopRetry can be set to true to abort the retry loop. This is useful when + // a transaction completes successfully, but maxTries has not been reached; + // i.e, because the transaction succeeded, there's no reason to keep trying. + var stopRetry bool + + for maxTries >= 1 && !stopRetry { + maxTries-- + + if err := func() error { + // Begin a read transaction. + tx, err := trans.BeginTx(ctx, writable) + if err != nil { + return errors.Wrapf(err, "beginning tx, writable: %v", writable) + } + defer tx.Rollback() + + // Call the function with the transaction. We pass in writable in + // case the function operates differently based on whether it is a + // read or write transaction. + if err := fn(tx, writable); err != nil { + return errors.Wrapf(err, "calling function with tx, writable: %v", writable) + } + + if writable { + if err := tx.Commit(); err != nil { + return errors.Wrap(err, "committing tx") + } + } + + stopRetry = true + return nil + }(); err != nil { + // If we get a serialization error, and we still have some write + // attempts remaining, then continue trying. + if maxTries > 0 && containsAny(err.Error(), []string{ + postgresTxConflictError, + postgresDuplicateKeyContraint, + }) { + continue + } + return err + } + } + + return nil +} + +// containsAny returns true if s contains at least one of the strings in +// substrs. +func containsAny(s string, substrs []string) bool { + for _, substr := range substrs { + if strings.Contains(s, substr) { + return true + } + } + return false +} diff --git a/dax/transaction_test.go b/dax/transaction_test.go new file mode 100644 index 000000000..ae7125665 --- /dev/null +++ b/dax/transaction_test.go @@ -0,0 +1,175 @@ +package dax_test + +import ( + "context" + "testing" + "time" + + "github.com/featurebasedb/featurebase/v3/dax" + "github.com/featurebasedb/featurebase/v3/dax/controller/sqldb" + "github.com/featurebasedb/featurebase/v3/logger" + "github.com/stretchr/testify/require" +) + +func TestTransaction(t *testing.T) { + t.Run("retryWithTx", func(t *testing.T) { + ctx := context.Background() + log := logger.StderrLogger + trans, err := sqldb.NewTransactor(sqldb.GetTestConfigRandomDB("retry_with_tx"), log) // TODO running migrations takes kind of a long time, consolidate w/ other SQL tests + require.NoError(t, err, "connecting") + require.NoError(t, trans.Start()) + + orgID1 := dax.OrganizationID("acme") + db1 := &dax.Database{ + ID: "db1id", + Name: "db1", + Options: dax.DatabaseOptions{ + WorkersMin: 1, + }, + } + qdb1 := dax.NewQualifiedDatabase(orgID1, db1) + qdbid1 := qdb1.QualifiedID() + schemar := sqldb.NewSchemar(log) + + // The purpose of this test is to ensure that we're enforcing repeatable + // read isolation level on writes. It tests the RetryWithTx function by + // retrying a write that fails and ensuring that it succeeds on the next + // retry. It performs the following steps: + // + // tx1: create db1 with units 1 + // tx2: read db1 (units should be 1) + // wait... on chan "wait2" + // read db1 again (units should still be 1) << repeatableread + // set units to 2 + // tx3 set units to 3 + // tx4 read db1 (units should be 3) + // close "wait2" + // tx5 read db1 (units should be 2) + + wait2 := make(chan struct{}) + wait3 := make(chan struct{}) + done := make(chan struct{}) + + // tx1 + tx1 := func(tx dax.Transaction, writable bool) error { + dt, ok := tx.(*sqldb.DaxTransaction) + require.True(t, ok) + + require.NoError(t, schemar.CreateDatabase(dt, qdb1)) + + return nil + } + require.NoError(t, dax.RetryWithTx(ctx, trans, tx1, true, 1)) + + // tx2 + + // tx2cnt tracks the number of times that tx2 has been called. We need + // this because we expect it to read different values depending on which + // call it's on. And we only want it to close channels the first time + // through. + var tx2cnt int + + // exp contains the values that we expect tx2 to read (for + // "workers-min") on the respective call. + exp := map[int]int{ + 0: 1, + 1: 3, + } + tx2 := func(tx dax.Transaction, writable bool) error { + dt, ok := tx.(*sqldb.DaxTransaction) + require.True(t, ok) + + qdb, err := schemar.DatabaseByID(dt, qdbid1) + require.NoError(t, err) + require.Equal(t, exp[tx2cnt], qdb.Options.WorkersMin) + if tx2cnt == 0 { + close(wait3) + } + + // Wait until tx3 commits before trying to do anything else. + <-wait2 + + qdb, err = schemar.DatabaseByID(dt, qdbid1) + require.NoError(t, err) + require.Equal(t, exp[tx2cnt], qdb.Options.WorkersMin) + + // Increment tx2cnt for the next time tx2 gets called. + tx2cnt++ + + return schemar.SetDatabaseOption(dt, qdbid1, dax.DatabaseOptionWorkersMin, "2") + } + + // Run the calls to tx2 in a go routine because we want to mimic + // concurrent attempt to read/write the same data. + go func() { + require.NoError(t, dax.RetryWithTx(ctx, trans, tx2, true, 2)) + + // After the second call of tx2 completes, close the done channel + // so that tx5 can proceed and verify that tx2 eventually got to + // commit its transaction. + close(done) + }() + + // Wait until tx2 does its first read of the data before allowing tx3 to + // begin. + select { + case <-wait3: + case <-time.After(10 * time.Second): + t.Fatal("expected close of channel: wait3") + } + + // tx3 + tx3 := func(tx dax.Transaction, writable bool) error { + dt, ok := tx.(*sqldb.DaxTransaction) + require.True(t, ok) + + qdb, err := schemar.DatabaseByID(dt, qdb1.QualifiedID()) + require.NoError(t, err) + require.Equal(t, 1, qdb.Options.WorkersMin) + + require.NoError(t, schemar.SetDatabaseOption(dt, qdbid1, dax.DatabaseOptionWorkersMin, "3")) + + return nil + } + + require.NoError(t, dax.RetryWithTx(ctx, trans, tx3, true, 1)) + + // tx4 + tx4 := func(tx dax.Transaction, writable bool) error { + dt, ok := tx.(*sqldb.DaxTransaction) + require.True(t, ok) + + qdb, err := schemar.DatabaseByID(dt, qdb1.QualifiedID()) + require.NoError(t, err) + require.Equal(t, 3, qdb.Options.WorkersMin) + + return nil + } + + require.NoError(t, dax.RetryWithTx(ctx, trans, tx4, false, 1)) + + // Close wait2 so that tx2 can continue retrying transactions. + close(wait2) + + select { + case <-done: + case <-time.After(10 * time.Second): + t.Fatal("expected close of channel: done") + } + + // tx5 + tx5 := func(tx dax.Transaction, writable bool) error { + dt, ok := tx.(*sqldb.DaxTransaction) + require.True(t, ok) + + qdb, err := schemar.DatabaseByID(dt, qdb1.QualifiedID()) + require.NoError(t, err) + require.Equal(t, 2, qdb.Options.WorkersMin) + + return nil + } + + require.NoError(t, dax.RetryWithTx(ctx, trans, tx5, false, 1)) + }) + +} diff --git a/dax/workerjob.go b/dax/workerjob.go index 2e0c7b12b..dae2f0486 100644 --- a/dax/workerjob.go +++ b/dax/workerjob.go @@ -42,7 +42,7 @@ type WorkerDiff struct { RemovedJobs []Job } -// Add adds w2 to w. It panics of w and w2 don't have teh same worker +// Add adds w2 to w. It panics if w and w2 don't have the same worker // ID. Any job that is added and then removed or removed and then // added cancels out and won't be present after add is called. func (w *WorkerDiff) Add(w2 WorkerDiff) { @@ -55,14 +55,15 @@ func (w *WorkerDiff) Add(w2 WorkerDiff) { r2 := NewSet(w2.RemovedJobs...) // final Added is (a1 - r2) + (a2 - r1) - // this is because anything that is removed and then added, or added and then removed cancels out + // this is because anything that is removed and then added, or added and + // then removed cancels out added := a1.Minus(r2).Plus(a2.Minus(r1)) // final removed is (r1 - a2) + (r2 - a1) removed := r1.Minus(a2).Plus(r2.Minus(a1)) - w.AddedJobs = added.Slice() - w.RemovedJobs = removed.Slice() + w.AddedJobs = added.Sorted() + w.RemovedJobs = removed.Sorted() } // WorkerDiffs is a sortable slice of WorkerDiff. @@ -72,6 +73,34 @@ func (w WorkerDiffs) Len() int { return len(w) } func (w WorkerDiffs) Less(i, j int) bool { return w[i].Address < w[j].Address } func (w WorkerDiffs) Swap(i, j int) { w[i], w[j] = w[j], w[i] } +func (w WorkerDiffs) Apply(o WorkerDiffs) WorkerDiffs { + out := make(WorkerDiffs, len(w)) + + // m is a map which makes it easy for us to match on Address between the two + // WorkerDiffs; i.e. without doing nested loops. + m := make(map[Address]int) + for i := range w { + out[i] = w[i] + m[out[i].Address] = i + } + + for i := range o { + oAddr := o[i].Address + if idx, ok := m[oAddr]; ok { + // merge these + out[idx].Add(o[i]) + continue + } + + // Append any items from o which don't exist in w. + out = append(out, o[i]) + } + + sort.Sort(out) + + return out +} + // Set is a set of stringy items. type Set[K ~string] map[K]struct{} diff --git a/dax/workerjob_test.go b/dax/workerjob_test.go index 2e2726c97..3bca858cb 100644 --- a/dax/workerjob_test.go +++ b/dax/workerjob_test.go @@ -79,3 +79,54 @@ func TestWorkerDiffAdd(t *testing.T) { }) } } + +func TestWorkerDiffsApply(t *testing.T) { + + a := []WorkerDiff{ + { + Address: "addr2", + AddedJobs: []Job{"j10", "j11"}, + RemovedJobs: []Job{"j99", "j100"}, + }, + { + Address: "addr1", + AddedJobs: []Job{"j1", "j2"}, + RemovedJobs: []Job{"j86"}, + }, + } + + b := []WorkerDiff{ + { + Address: "addr2", + AddedJobs: []Job{"j3", "j99"}, + RemovedJobs: []Job{"j2", "j10"}, + }, + { + Address: "addr3", + AddedJobs: []Job{"j777"}, + RemovedJobs: []Job{}, + }, + } + + out := WorkerDiffs(a).Apply(b) + + exp := []WorkerDiff{ + { + Address: "addr1", + AddedJobs: []Job{"j1", "j2"}, + RemovedJobs: []Job{"j86"}, + }, + { + Address: "addr2", + AddedJobs: []Job{"j11", "j3"}, + RemovedJobs: []Job{"j100", "j2"}, + }, + { + Address: "addr3", + AddedJobs: []Job{"j777"}, + RemovedJobs: []Job{}, + }, + } + + assert.ElementsMatch(t, exp, out) +} diff --git a/idk/serverless/importer.go b/idk/serverless/importer.go index 3fd947d03..bee9f3592 100644 --- a/idk/serverless/importer.go +++ b/idk/serverless/importer.go @@ -191,13 +191,11 @@ func (m *importer) EncodeImportValues(ctx context.Context, tid dax.TableID, fld return "", nil, errors.Wrapf(err, "getting qtbl") } - address, err := m.controller.IngestShard(context.Background(), qtbl.QualifiedID(), dax.ShardNum(shard)) - if err != nil { - return "", nil, errors.Wrap(err, "calling ingest-shard") - } - - // Set up a FeatureBase client with address. - fbClient, err := m.fbClient(address) + // Since we're calling EncodeImportValues on the client, we don't actually + // need a valid client (that method doesn't actually use the client). + // Really, that method should be a function on the client package rather + // than a method on the Client type. + fbClient, err := m.fbClient("") if err != nil { return "", nil, errors.Wrap(err, "getting featurebase client") } @@ -216,13 +214,11 @@ func (m *importer) EncodeImport(ctx context.Context, tid dax.TableID, fld *dax.F return "", nil, errors.Wrapf(err, "getting qtbl") } - address, err := m.controller.IngestShard(context.Background(), qtbl.QualifiedID(), dax.ShardNum(shard)) - if err != nil { - return "", nil, errors.Wrap(err, "calling ingest-shard") - } - - // Set up a FeatureBase client with address. - fbClient, err := m.fbClient(address) + // Since we're calling EncodeImportValues on the client, we don't actually + // need a valid client (that method doesn't actually use the client). + // Really, that method should be a function on the client package rather + // than a method on the Client type. + fbClient, err := m.fbClient("") if err != nil { return "", nil, errors.Wrap(err, "getting featurebase client") }