diff --git a/pkg/ccl/logictestccl/testdata/logic_test/partitioning_implicit_read_committed b/pkg/ccl/logictestccl/testdata/logic_test/partitioning_implicit_read_committed index 9e035b0b789a..4b6f7da30ce8 100644 --- a/pkg/ccl/logictestccl/testdata/logic_test/partitioning_implicit_read_committed +++ b/pkg/ccl/logictestccl/testdata/logic_test/partitioning_implicit_read_committed @@ -22,7 +22,7 @@ CREATE TABLE t_double ( ) # Test that we don't allow writes to tables with multiple partition columns. -statement error pgcode 0A000 pq: unimplemented: explicit unique checks are not yet supported under read committed isolation +statement error pgcode 0A000 pq: unimplemented: unique without index constraint under non-serializable isolation levels INSERT INTO t_double VALUES (1, 'one', 'one', 10), (2, 'two', 'two', 20) statement ok @@ -37,7 +37,7 @@ CREATE TABLE t_int ( ) # Test that we don't allow writes to tables with non-enum partition columns. -statement error pgcode 0A000 pq: unimplemented: explicit unique checks are not yet supported under read committed isolation +statement error pgcode 0A000 pq: unimplemented: unique without index constraint under non-serializable isolation levels INSERT INTO t_int VALUES (1, 1, 10), (2, 2, 20) statement ok @@ -141,9 +141,36 @@ scan t └── check constraint expressions └── a IN ('one', 'two', 'three', 'four', 'five') +statement ok +CREATE TABLE overwrite ( + pk INT PRIMARY KEY, + a part_type, + b INT, + FAMILY (pk, a, b) +) PARTITION ALL BY LIST(a) ( + PARTITION one VALUES IN ('one'), + PARTITION two VALUES IN ('two'), + PARTITION three VALUES IN ('three'), + PARTITION four VALUES IN ('four'), + PARTITION five VALUES IN ('five') +) + statement ok SET tracing = kv +# Test a blind write. +statement ok +UPSERT INTO overwrite VALUES (1, 'two', 3); + +query T +SELECT message FROM [SHOW TRACE FOR SESSION] WHERE message LIKE 'CPut%' +---- +CPut /Table/111/1/"@"/1/0 -> /TUPLE/3:3:Int/3 +CPut /Table/111/1/" "/1/0 -> nil (tombstone) +CPut /Table/111/1/"\x80"/1/0 -> nil (tombstone) +CPut /Table/111/1/"\xa0"/1/0 -> nil (tombstone) +CPut /Table/111/1/"\xc0"/1/0 -> nil (tombstone) + statement ok INSERT INTO t VALUES (1, 'two', 3, 4, 5) @@ -162,9 +189,23 @@ UPDATE t SET pk = 1 WHERE c = 6; statement error pgcode 23505 pq: duplicate key value violates unique constraint "t_c_key" UPDATE t SET c = 4 WHERE pk = 2 +statement ok +UPSERT INTO t VALUES (1, 'five', 3, 4, 15) + +statement ok +INSERT INTO t VALUES (1, 'three', 3, 4, 15) ON CONFLICT DO NOTHING + +statement ok +INSERT INTO t VALUES (1, 'one', 3, 4, 5) ON CONFLICT (pk) DO UPDATE SET d = t.d + 10 + query T SELECT message FROM [SHOW TRACE FOR SESSION] WHERE message LIKE 'CPut%' ---- +CPut /Table/111/1/"@"/1/0 -> /TUPLE/3:3:Int/3 +CPut /Table/111/1/" "/1/0 -> nil (tombstone) +CPut /Table/111/1/"\x80"/1/0 -> nil (tombstone) +CPut /Table/111/1/"\xa0"/1/0 -> nil (tombstone) +CPut /Table/111/1/"\xc0"/1/0 -> nil (tombstone) CPut /Table/110/1/"@"/1/0 -> /TUPLE/3:3:Int/3/1:4:Int/4/1:5:Int/5 CPut /Table/110/1/" "/1/0 -> nil (tombstone) CPut /Table/110/1/"\x80"/1/0 -> nil (tombstone) @@ -211,3 +252,18 @@ CPut /Table/110/2/" "/4/0 -> nil (tombstone) CPut /Table/110/2/"@"/4/0 -> nil (tombstone) CPut /Table/110/2/"\x80"/4/0 -> nil (tombstone) CPut /Table/110/2/"\xc0"/4/0 -> nil (tombstone) +CPut /Table/110/1/"\xc0"/1/0 -> /TUPLE/3:3:Int/3/1:4:Int/4/1:5:Int/15 +CPut /Table/110/1/" "/1/0 -> nil (tombstone) +CPut /Table/110/1/"@"/1/0 -> nil (tombstone) +CPut /Table/110/1/"\x80"/1/0 -> nil (tombstone) +CPut /Table/110/1/"\xa0"/1/0 -> nil (tombstone) +CPut /Table/110/2/" "/4/0 -> nil (tombstone) +CPut /Table/110/2/"@"/4/0 -> nil (tombstone) +CPut /Table/110/2/"\x80"/4/0 -> nil (tombstone) +CPut /Table/110/2/"\xa0"/4/0 -> nil (tombstone) + +query ITIIIT +SELECT * FROM t ORDER BY pk +---- +1 five 3 4 25 NULL +2 four 3 6 5 NULL diff --git a/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_read_committed b/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_read_committed index eac30ae7bf76..f8244f13bedc 100644 --- a/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_read_committed +++ b/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_read_committed @@ -112,22 +112,18 @@ INSERT INTO river VALUES ('ap-southeast-2', 'Skykomish', 'Snoqualmie') # Test conflicting INSERT ON CONFLICT DO NOTHING. -statement error pgcode 0A000 explicit unique checks are not yet supported under read committed isolation +statement ok INSERT INTO university (name, mascot, postal_code) VALUES ('Thompson Rivers', 'wolves', 'V2C 0C8'), ('Evergreen State', 'geoducks', '98505') ON CONFLICT (mascot) DO NOTHING -# TODO(mw5h): Temporary until ON CONFLICT works -statement ok -INSERT INTO university (name, mascot, postal_code) VALUES ('Evergreen State', 'geoducks', '98505') - query TTT SELECT name, mascot, postal_code FROM university ORDER BY name ---- Evergreen State geoducks 98505 Western Oregon wolves 97361 -statement error pgcode 0A000 explicit unique checks are not yet supported under read committed isolation +statement ok INSERT INTO volcano VALUES ('Mount Adams', 'Fought over Loowit and was transformed by Saghalie.', 'POINT(-121.490895 46.202412)', 'ap-southeast-2'), ('Mount St. Helens', 'Fair maiden Loowit could not choose between Wyeast and Pahto and was transformed by Saghalie.', 'POINT(-122.1944 46.1912)', 'ap-southeast-2') @@ -136,7 +132,8 @@ ON CONFLICT (origin) DO NOTHING query TTT SELECT name, origin, location FROM volcano ORDER BY name ---- -Mount Hood Fought over Loowit and was transformed by Saghalie. 0101000020E6100000909E2287886C5EC08236397CD2AF4640 +Mount Hood Fought over Loowit and was transformed by Saghalie. 0101000020E6100000909E2287886C5EC08236397CD2AF4640 +Mount St. Helens Fair maiden Loowit could not choose between Wyeast and Pahto and was transformed by Saghalie. 0101000020E6100000EA95B20C718C5EC0637FD93D79184740 statement ok INSERT INTO city VALUES ('Vancouver', 'The Big Smoke', 'BC'), ('Salem', 'Cherry City', 'OR') @@ -160,10 +157,10 @@ us-east-1 Fraser Strait of Georgia # Test conflicting UPSERT. -statement error pgcode 0A000 explicit unique checks are not yet supported under read committed isolation +statement error pgcode 23505 pq: duplicate key value violates unique constraint "university_mascot_key" UPSERT INTO university (name, mascot, postal_code) VALUES ('Thompson Rivers', 'wolves', 'V2C 0C8') -statement error pgcode 0A000 explicit unique checks are not yet supported under read committed isolation +statement error pgcode 23505 pq: duplicate key value violates unique constraint "volcano_origin_key" UPSERT INTO volcano VALUES ('Mount Adams', 'Fought over Loowit and was transformed by Saghalie.', 'POINT(-121.490895 46.202412)', 'ap-southeast-2') @@ -178,7 +175,7 @@ UPSERT INTO river VALUES ('us-east-1', 'Fraser', 'Salish Sea') statement error pgcode 23505 pq: duplicate key value violates unique constraint "university_mascot_key" UPDATE university SET mascot = 'wolves' WHERE name = 'Evergreen State' -statement ok +statement error pgcode 23505 pq: duplicate key value violates unique constraint "volcano_origin_key" UPDATE volcano SET origin = 'Fought over Loowit and was transformed by Saghalie.' WHERE name = 'Mount St. Helens' statement error pgcode 23505 pq: duplicate key value violates unique constraint "city_pkey"\nDETAIL: Key \(name, state_or_province\)=\('Vancouver', 'BC'\) already exists\. @@ -189,12 +186,12 @@ UPDATE river SET region = 'us-east-1', outlet = 'Salish Sea' WHERE name = 'Skyko # Test conflicting INSERT ON CONFLICT DO UPDATE. -statement error pgcode 0A000 explicit unique checks are not yet supported under read committed isolation +statement error pgcode 23505 pq: duplicate key value violates unique constraint "university_pkey" INSERT INTO university (name, mascot, postal_code) VALUES ('Thompson Rivers', 'wolves', 'V2C 0C8'), ('Oregon Tech', 'owls', '97601') ON CONFLICT (mascot) DO UPDATE SET name = 'Evergreen State', mascot = 'banana slugs' -statement error pgcode 0A000 explicit unique checks are not yet supported under read committed isolation +statement error pgcode 23505 pq: duplicate key value violates unique constraint "volcano_pkey" INSERT INTO volcano VALUES ('Mount Adams', 'Fought over Loowit and was transformed by Saghalie.', 'POINT(-121.490895 46.202412)', 'ap-southeast-2'), ('Mount Garibaldi', 'Lightning from thunderbird eyes struck the ground.', 'POINT(-123.004722 49.850278)', 'us-east-1') diff --git a/pkg/ccl/logictestccl/testdata/logic_test/unique_read_committed b/pkg/ccl/logictestccl/testdata/logic_test/unique_read_committed index b30b8012e621..6e1c903c42dd 100644 --- a/pkg/ccl/logictestccl/testdata/logic_test/unique_read_committed +++ b/pkg/ccl/logictestccl/testdata/logic_test/unique_read_committed @@ -25,19 +25,19 @@ SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED # Test non-conflicting INSERT. -statement error pgcode 0A000 explicit unique checks are not yet supported under read committed isolation +statement error pgcode 0A000 pq: unimplemented: unique without index constraint under non-serializable isolation levels INSERT INTO voyage VALUES ('caspian', 'hercules', 'argonauts', 'golden fleece') # Test the (quest, crew) uniqueness constraint. # The Argonauts searching for the golden fleece should fail the (quest, crew) # uniqueness check, even with a different sea. -statement error pgcode 0A000 explicit unique checks are not yet supported under read committed isolation +statement error pgcode 0A000 pq: unimplemented: unique without index constraint under non-serializable isolation levels INSERT INTO voyage VALUES (DEFAULT, 'odysseus', 'nobody', 'penelope'), ('black', 'jason', 'argonauts', 'golden fleece') # Only Odysseus should be inserted. -statement error pgcode 0A000 explicit unique checks are not yet supported under read committed isolation +statement error pgcode 0A000 pq: unimplemented: unique without index constraint under non-serializable isolation levels INSERT INTO voyage VALUES ('mediterranean', 'odysseus', 'nobody', 'penelope'), ('black', 'jason', 'argonauts', 'golden fleece') ON CONFLICT (quest, crew) DO NOTHING @@ -49,11 +49,11 @@ SELECT * FROM voyage ORDER BY hero, crew, quest # Test the (hero) uniqueness constraint. # Hercules should fail the (hero) uniqueness check, even with a different sea. -statement error pgcode 0A000 explicit unique checks are not yet supported under read committed isolation +statement error pgcode 0A000 pq: unimplemented: unique without index constraint under non-serializable isolation levels INSERT INTO voyage (hero, quest) VALUES ('perseus', 'medusa'), ('hercules', 'geryon') # Only Perseus should be inserted. -statement error pgcode 0A000 explicit unique checks are not yet supported under read committed isolation +statement error pgcode 0A000 pq: unimplemented: unique without index constraint under non-serializable isolation levels INSERT INTO voyage (hero, quest) VALUES ('perseus', 'medusa'), ('hercules', 'geryon') ON CONFLICT (hero) DO NOTHING @@ -63,27 +63,27 @@ SELECT * FROM voyage ORDER BY hero, crew, quest # Test conflicting UPSERT. -statement error pgcode 0A000 explicit unique checks are not yet supported under read committed isolation +statement error pgcode 0A000 pq: unimplemented: unique without index constraint under non-serializable isolation levels UPSERT INTO voyage VALUES ('black', 'jason', 'argonauts', 'golden fleece') -statement error pgcode 0A000 explicit unique checks are not yet supported under read committed isolation +statement error pgcode 0A000 pq: unimplemented: unique without index constraint under non-serializable isolation levels UPSERT INTO voyage (hero, quest) VALUES ('hercules', 'geryon') # Test conflicting UPDATE. -statement error pgcode 0A000 explicit unique checks are not yet supported under read committed isolation +statement error pgcode 0A000 pq: unimplemented: unique without index constraint under non-serializable isolation levels UPDATE voyage SET crew = 'argonauts', quest = 'golden fleece' WHERE hero = 'perseus' -statement error pgcode 0A000 explicit unique checks are not yet supported under read committed isolation +statement error pgcode 0A000 pq: unimplemented: unique without index constraint under non-serializable isolation levels UPDATE voyage SET hero = 'hercules' WHERE hero = 'odysseus' # Test conflicting INSERT ON CONFLICT DO UPDATE. -statement error pgcode 0A000 explicit unique checks are not yet supported under read committed isolation +statement error pgcode 0A000 pq: unimplemented: unique without index constraint under non-serializable isolation levels INSERT INTO voyage VALUES ('black', 'jason', 'argonauts', 'golden fleece') ON CONFLICT (quest, crew) DO UPDATE SET quest = 'penelope', crew = 'nobody' -statement error pgcode 0A000 explicit unique checks are not yet supported under read committed isolation +statement error pgcode 0A000 pq: unimplemented: unique without index constraint under non-serializable isolation levels INSERT INTO voyage (hero, quest) VALUES ('hercules', 'geryon') ON CONFLICT (hero) DO UPDATE SET hero = 'perseus' @@ -100,5 +100,5 @@ CREATE TABLE titan ( FAMILY (name, domain, children) ) -statement error pgcode 0A000 explicit unique checks are not yet supported under read committed isolation +statement error pgcode 0A000 pq: unimplemented: unique without index constraint under non-serializable isolation levels INSERT INTO titan VALUES ('cronus', 'time', ARRAY['zeus', 'hera', 'hades', 'poseidon', 'demeter', 'hestia']) diff --git a/pkg/sql/distsql_spec_exec_factory.go b/pkg/sql/distsql_spec_exec_factory.go index 4cd18e6e8369..3b476735d53f 100644 --- a/pkg/sql/distsql_spec_exec_factory.go +++ b/pkg/sql/distsql_spec_exec_factory.go @@ -1014,6 +1014,7 @@ func (e *distSQLSpecExecFactory) ConstructUpsert( updateCols exec.TableColumnOrdinalSet, returnCols exec.TableColumnOrdinalSet, checks exec.CheckOrdinalSet, + uniqueWithTombstoneIndexes cat.IndexOrdinals, autoCommit bool, ) (exec.Node, error) { return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: upsert") diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 4da7ca02cb85..55ce0f7652f3 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1670,6 +1670,10 @@ type ExecutorTestingKnobs struct { // due to some other condition. We can't set the probability to 0 since // that would disable the feature entirely. DisableProbabilisticSampling bool + + // AfterArbiterRead, if set, will be called after each row read from an arbiter index + // for an UPSERT or INSERT. + AfterArbiterRead func() } // PGWireTestingKnobs contains knobs for the pgwire module. diff --git a/pkg/sql/insert.go b/pkg/sql/insert.go index 5112ff654391..1f873d759cfe 100644 --- a/pkg/sql/insert.go +++ b/pkg/sql/insert.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowcontainer" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/errors" ) @@ -307,6 +308,13 @@ func (n *insertNode) BatchedNext(params runParams) (bool, error) { break } + if buildutil.CrdbTestBuild { + // This testing knob allows us to suspend execution to force a race condition. + if fn := params.ExecCfg().TestingKnobs.AfterArbiterRead; fn != nil { + fn() + } + } + // Process the insertion for the current source row, potentially // accumulating the result row for later. if err := n.run.processSourceRow(params, n.source.Values()); err != nil { diff --git a/pkg/sql/insert_fast_path.go b/pkg/sql/insert_fast_path.go index 9e781c515f3d..bf222c55d570 100644 --- a/pkg/sql/insert_fast_path.go +++ b/pkg/sql/insert_fast_path.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/span" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/intsets" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" @@ -463,6 +464,14 @@ func (n *insertFastPathNode) BatchedNext(params runParams) (bool, error) { return false, err } } + + if buildutil.CrdbTestBuild { + // This testing knob allows us to suspend execution to force a race condition. + if fn := params.ExecCfg().TestingKnobs.AfterArbiterRead; fn != nil { + fn() + } + } + // Process the insertion for the current source row, potentially // accumulating the result row for later. if err := n.run.processSourceRow(params, inputRow); err != nil { diff --git a/pkg/sql/mutation_test.go b/pkg/sql/mutation_test.go index 0ae98c9ddb1b..5f4c89b988fd 100644 --- a/pkg/sql/mutation_test.go +++ b/pkg/sql/mutation_test.go @@ -8,12 +8,17 @@ package sql_test import ( "context" gosql "database/sql" + "fmt" + "sync" "testing" + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" ) // Regression tests for #22304. @@ -87,3 +92,185 @@ INSERT INTO d.a(a) VALUES (1); } } } + +func TestReadCommittedImplicitPartitionUpsert(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Test state machine. We use a state cond variable to force a specific interleaving + // of conflicting writes to test that we detect and retry those writes properly. + type State int + const ( + Init State = iota // Test is initializing. + Ready // Test ready to run. + ReadDone // Arbiter index has been read, but writes haven't started. + ConflictDone // Conflicting write has committed. + Errored // Error has occurred in one of the goroutines, so bail out. + ) + mu := struct { + l syncutil.Mutex // Protecting state. + state State // Test state. + c *sync.Cond + }{} + mu.c = sync.NewCond(&mu.l) + + // Wait for a test to reach this state or error. + waitForState := func(s State) bool { + mu.l.Lock() + defer mu.l.Unlock() + for mu.state != s && mu.state != Errored { + mu.c.Wait() + } + return mu.state == s + } + // Set test to the specified state. + setState := func(s State) { + mu.l.Lock() + mu.state = s + mu.c.Broadcast() + mu.l.Unlock() + } + + ctx := context.Background() + params, _ := createTestServerParams() + // If test is in Ready state, transition to ReadDone and wait for conflict. + params.Knobs = base.TestingKnobs{ + SQLExecutor: &sql.ExecutorTestingKnobs{ + AfterArbiterRead: func() { + if mu.state != Ready { + return + } + setState(ReadDone) + _ = waitForState(ConflictDone) + }, + }, + } + s, db, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(context.Background()) + + // Initialize data. + if _, err := db.Exec(` +SET experimental_enable_implicit_column_partitioning = true; +CREATE DATABASE d; +CREATE TYPE d.reg AS ENUM ('east', 'west', 'north', 'south'); +CREATE TABLE d.upsert ( + id INT PRIMARY KEY, + k INT NOT NULL, + r d.reg, + a INT, + UNIQUE INDEX (k)) +PARTITION ALL BY LIST (r) ( + PARTITION e VALUES IN ('east'), + PARTITION w VALUES IN ('west'), + PARTITION n VALUES IN ('north'), + PARTITION s VALUES IN ('south') +); +`); err != nil { + t.Fatal(err) + } + + // Create two connections and make them READ COMMITTED. + var connections [2]*gosql.Conn + for i := range connections { + var err error + connections[i], err = db.Conn(ctx) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := connections[i].Close(); err != nil { + t.Fatal(err) + } + }() + if _, err = connections[i].ExecContext(ctx, "SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED"); err != nil { + t.Fatal(err) + } + } + + // Test the three users of arbiter index reads. + testCases := []struct { + stmt string + conflictingWrite string + expectedOutput []string + }{ + { + stmt: "UPSERT INTO d.upsert VALUES (1, 10, 'east', 100)", + conflictingWrite: "INSERT INTO d.upsert VALUES (1, 10, 'west', 101)", + expectedOutput: []string{"1", "10", "east", "100"}, + }, + { + stmt: "INSERT INTO d.upsert VALUES (1, 10, 'west', 101) ON CONFLICT DO NOTHING", + conflictingWrite: "INSERT INTO d.upsert VALUES (1, 10, 'south', 100)", + expectedOutput: []string{"1", "10", "south", "100"}, + }, + { + stmt: "INSERT INTO d.upsert VALUES (1, 11, 'east', 100) ON CONFLICT (id) DO UPDATE SET a = upsert.a + 1", + conflictingWrite: "INSERT INTO d.upsert VALUES (1, 10, 'north', 100)", + expectedOutput: []string{"1", "10", "north", "101"}, + }, + } + + // Execute the test statement. + runTestStmt := func(stmt string, wg *sync.WaitGroup, ch chan error) { + defer wg.Done() + _, err := connections[0].ExecContext(ctx, stmt) + if err != nil { + ch <- err + setState(Errored) + } + } + // Wait for the arbiter read to be done, then execute the conflicting write. + runConflictingWrite := func(stmt string, wg *sync.WaitGroup, ch chan error) { + defer wg.Done() + if !waitForState(ReadDone) { + return + } + _, err := connections[1].ExecContext(ctx, stmt) + if err != nil { + ch <- err + setState(Errored) + } else { + setState(ConflictDone) + } + } + + for idx, tc := range testCases { + fmt.Printf("Starting test %d -- %q\n", idx+1, tc.stmt) + setState(Ready) + + ch := make(chan error, len(connections)) + wg := sync.WaitGroup{} + wg.Add(len(connections)) + + go runTestStmt(tc.stmt, &wg, ch) + go runConflictingWrite(tc.conflictingWrite, &wg, ch) + + // Wait for test to complete and read any errors. + wg.Wait() + select { + case err := <-ch: + t.Fatal(err) + default: + } + + // Verifty that the write completed correctly. + rows, err := db.QueryContext(ctx, "SELECT * FROM d.upsert") + if err != nil { + t.Fatal(err) + } + for i := 0; rows.Next(); i++ { + var id, k, r, a string + if err := rows.Scan(&id, &k, &r, &a); err != nil { + t.Fatal(err) + } + if id != tc.expectedOutput[0] || k != tc.expectedOutput[1] || r != tc.expectedOutput[2] || a != tc.expectedOutput[3] { + t.Fatalf("%d: expected %v, got %v", idx, tc.expectedOutput, []string{id, k, r, a}) + } + } + rows.Close() + + if _, err := db.Exec(`TRUNCATE TABLE d.upsert`); err != nil { + t.Fatal(err) + } + } +} diff --git a/pkg/sql/opt/exec/execbuilder/mutation.go b/pkg/sql/opt/exec/execbuilder/mutation.go index b46bb271f41e..96ce4191094a 100644 --- a/pkg/sql/opt/exec/execbuilder/mutation.go +++ b/pkg/sql/opt/exec/execbuilder/mutation.go @@ -532,6 +532,7 @@ func (b *Builder) buildUpsert(ups *memo.UpsertExpr) (_ execPlan, outputCols colO updateColOrds, returnColOrds, checkOrds, + ups.UniqueWithTombstoneIndexes, b.allowAutoCommit && len(ups.UniqueChecks) == 0 && len(ups.FKChecks) == 0 && len(ups.FKCascades) == 0, ) diff --git a/pkg/sql/opt/exec/execbuilder/testdata/unique_read_committed b/pkg/sql/opt/exec/execbuilder/testdata/unique_read_committed index abb4377f4dc7..b43fa97c8b6d 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/unique_read_committed +++ b/pkg/sql/opt/exec/execbuilder/testdata/unique_read_committed @@ -25,176 +25,22 @@ CREATE TABLE uniq_enum ( statement ok SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED -query T +statement error pgcode 0A000 pq: unimplemented: unique without index constraint under non-serializable isolation levels EXPLAIN (OPT) INSERT INTO uniq_enum VALUES ('us-west', 'foo', 1, 1), ('us-east', 'bar', 2, 2) ----- -insert uniq_enum - ├── project - │ ├── values - │ │ ├── ('us-west', 'foo', 1, 1) - │ │ └── ('us-east', 'bar', 2, 2) - │ └── projections - │ └── column1 IN ('us-east', 'us-west', 'eu-west') - └── unique-checks - ├── unique-checks-item: uniq_enum(i) - │ └── project - │ └── semi-join (lookup uniq_enum) - │ ├── flags: prefer lookup join (into right side) - │ ├── locking: for-share,predicate,durability-guaranteed - │ ├── with-scan &1 - │ └── filters - │ └── r != uniq_enum.r - └── unique-checks-item: uniq_enum(s,j) - └── project - └── semi-join (lookup uniq_enum@uniq_enum_r_s_j_key) - ├── flags: prefer lookup join (into right side) - ├── locking: for-share,predicate,durability-guaranteed - ├── with-scan &1 - └── filters - └── (r != uniq_enum.r) OR (i != uniq_enum.i) -query T +statement error pgcode 0A000 pq: unimplemented: unique without index constraint under non-serializable isolation levels EXPLAIN (OPT) INSERT INTO uniq_enum (s, i) VALUES ('foo', 1), ('bar', 2) ----- -insert uniq_enum - ├── project - │ ├── project - │ │ ├── values - │ │ │ ├── ('foo', 1) - │ │ │ └── ('bar', 2) - │ │ └── projections - │ │ ├── CASE (random() * 3.0)::INT8 WHEN 0 THEN 'us-east' WHEN 1 THEN 'us-west' ELSE 'eu-west' END - │ │ └── CAST(NULL AS INT8) - │ └── projections - │ └── r_default IN ('us-east', 'us-west', 'eu-west') - └── unique-checks - └── unique-checks-item: uniq_enum(i) - └── project - └── semi-join (lookup uniq_enum) - ├── flags: prefer lookup join (into right side) - ├── locking: for-share,predicate,durability-guaranteed - ├── with-scan &1 - └── filters - └── r != uniq_enum.r -query T +statement error pgcode 0A000 pq: unimplemented: unique without index constraint under non-serializable isolation levels EXPLAIN (OPT) INSERT INTO uniq_enum VALUES ('us-west', 'foo', 1, 1), ('us-east', 'bar', 2, 2) ON CONFLICT DO NOTHING ----- -insert uniq_enum - ├── arbiter constraints: unique_i unique_s_j - └── project - ├── anti-join (lookup uniq_enum@uniq_enum_r_s_j_key) - │ ├── flags: prefer lookup join (into right side) - │ ├── lookup columns are key - │ ├── locking: for-share,predicate,durability-guaranteed - │ ├── anti-join (lookup uniq_enum) - │ │ ├── flags: prefer lookup join (into right side) - │ │ ├── lookup columns are key - │ │ ├── locking: for-share,predicate,durability-guaranteed - │ │ ├── values - │ │ │ ├── ('us-west', 'foo', 1, 1) - │ │ │ └── ('us-east', 'bar', 2, 2) - │ │ └── filters (true) - │ └── filters (true) - └── projections - └── column1 IN ('us-east', 'us-west', 'eu-west') -query T +statement error pgcode 0A000 pq: unimplemented: unique without index constraint under non-serializable isolation levels EXPLAIN (OPT) UPDATE uniq_enum SET r = DEFAULT, s = 'baz', i = 3 WHERE r = 'eu-west' AND i > 10 AND i <= 20 ----- -update uniq_enum - ├── project - │ ├── project - │ │ ├── scan uniq_enum - │ │ │ └── constraint: /9/11: [/'eu-west'/11 - /'eu-west'/20] - │ │ └── projections - │ │ ├── CASE (random() * 3.0)::INT8 WHEN 0 THEN 'us-east' WHEN 1 THEN 'us-west' ELSE 'eu-west' END - │ │ ├── 'baz' - │ │ └── 3 - │ └── projections - │ └── r_new IN ('us-east', 'us-west', 'eu-west') - └── unique-checks - ├── unique-checks-item: uniq_enum(i) - │ └── project - │ └── semi-join (lookup uniq_enum) - │ ├── flags: prefer lookup join (into right side) - │ ├── locking: for-share,predicate,durability-guaranteed - │ ├── with-scan &1 - │ └── filters - │ └── r != uniq_enum.r - └── unique-checks-item: uniq_enum(s,j) - └── project - └── semi-join (lookup uniq_enum@uniq_enum_r_s_j_key) - ├── flags: prefer lookup join (into right side) - ├── locking: for-share,predicate,durability-guaranteed - ├── with-scan &1 - └── filters - └── (r != uniq_enum.r) OR (i != uniq_enum.i) -query T +statement error pgcode 0A000 pq: unimplemented: unique without index constraint under non-serializable isolation levels EXPLAIN (OPT) UPSERT INTO uniq_enum VALUES ('us-west', 'foo', 1, 1), ('us-east', 'bar', 2, 2) ----- -upsert uniq_enum - ├── arbiter indexes: uniq_enum_pkey - ├── project - │ ├── project - │ │ ├── left-join (lookup uniq_enum) - │ │ │ ├── lookup columns are key - │ │ │ ├── values - │ │ │ │ ├── ('us-west', 'foo', 1, 1) - │ │ │ │ └── ('us-east', 'bar', 2, 2) - │ │ │ └── filters (true) - │ │ └── projections - │ │ ├── CASE WHEN uniq_enum.r IS NULL THEN column1 ELSE uniq_enum.r END - │ │ └── CASE WHEN uniq_enum.r IS NULL THEN column3 ELSE uniq_enum.i END - │ └── projections - │ └── upsert_r IN ('us-east', 'us-west', 'eu-west') - └── unique-checks - ├── unique-checks-item: uniq_enum(i) - │ └── project - │ └── semi-join (lookup uniq_enum) - │ ├── flags: prefer lookup join (into right side) - │ ├── locking: for-share,predicate,durability-guaranteed - │ ├── with-scan &1 - │ └── filters - │ └── r != uniq_enum.r - └── unique-checks-item: uniq_enum(s,j) - └── project - └── semi-join (lookup uniq_enum@uniq_enum_r_s_j_key) - ├── flags: prefer lookup join (into right side) - ├── locking: for-share,predicate,durability-guaranteed - ├── with-scan &1 - └── filters - └── (r != uniq_enum.r) OR (i != uniq_enum.i) -query T +statement error pgcode 0A000 pq: unimplemented: unique without index constraint under non-serializable isolation levels EXPLAIN (OPT) INSERT INTO uniq_enum VALUES ('us-west', 'foo', 1, 1), ('us-east', 'bar', 2, 2) ON CONFLICT (s, j) DO UPDATE SET i = 3 ----- -upsert uniq_enum - ├── arbiter constraints: unique_s_j - ├── project - │ ├── project - │ │ ├── left-join (lookup uniq_enum@uniq_enum_r_s_j_key) - │ │ │ ├── flags: prefer lookup join (into right side) - │ │ │ ├── lookup columns are key - │ │ │ ├── locking: for-update,predicate,durability-guaranteed - │ │ │ ├── values - │ │ │ │ ├── ('us-west', 'foo', 1, 1) - │ │ │ │ └── ('us-east', 'bar', 2, 2) - │ │ │ └── filters (true) - │ │ └── projections - │ │ ├── CASE WHEN uniq_enum.r IS NULL THEN column1 ELSE uniq_enum.r END - │ │ └── CASE WHEN uniq_enum.r IS NULL THEN column3 ELSE 3 END - │ └── projections - │ └── upsert_r IN ('us-east', 'us-west', 'eu-west') - └── unique-checks - └── unique-checks-item: uniq_enum(i) - └── project - └── semi-join (lookup uniq_enum) - ├── flags: prefer lookup join (into right side) - ├── locking: for-share,predicate,durability-guaranteed - ├── with-scan &1 - └── filters - └── r != uniq_enum.r diff --git a/pkg/sql/opt/exec/factory.opt b/pkg/sql/opt/exec/factory.opt index 5c1210a07cd7..d49720933175 100644 --- a/pkg/sql/opt/exec/factory.opt +++ b/pkg/sql/opt/exec/factory.opt @@ -561,6 +561,7 @@ define Upsert { UpdateCols exec.TableColumnOrdinalSet ReturnCols exec.TableColumnOrdinalSet Checks exec.CheckOrdinalSet + UniqueWithTombstonesIndexes cat.IndexOrdinals # If set, the operator will commit the transaction as part of its execution. # This is false when executing inside an explicit transaction, or there are diff --git a/pkg/sql/opt/optbuilder/mutation_builder_arbiter.go b/pkg/sql/opt/optbuilder/mutation_builder_arbiter.go index 6abf5ed27b57..15597611b65a 100644 --- a/pkg/sql/opt/optbuilder/mutation_builder_arbiter.go +++ b/pkg/sql/opt/optbuilder/mutation_builder_arbiter.go @@ -304,11 +304,6 @@ func (mb *mutationBuilder) buildAntiJoinForDoNothingArbiter( Strength: tree.ForShare, Targets: []tree.TableName{tree.MakeUnqualifiedTableName(mb.tab.Name())}, WaitPolicy: tree.LockWaitBlock, - // Unique arbiters must ensure the non-existence of certain rows, so - // we use predicate locks instead of record locks to prevent - // insertion of new rows into the locked span(s) by other concurrent - // transactions. - Form: tree.LockPredicate, }, }, } @@ -439,11 +434,6 @@ func (mb *mutationBuilder) buildLeftJoinForUpsertArbiter( Strength: tree.ForUpdate, Targets: []tree.TableName{tree.MakeUnqualifiedTableName(mb.tab.Name())}, WaitPolicy: tree.LockWaitBlock, - // Unique arbiters must ensure the non-existence of certain rows, so - // we use predicate locks instead of record locks to prevent - // insertion of new rows into the locked span(s) by other concurrent - // transactions. - Form: tree.LockPredicate, }, }, } diff --git a/pkg/sql/opt/optbuilder/mutation_builder_unique.go b/pkg/sql/opt/optbuilder/mutation_builder_unique.go index f7fde1d64df4..de6e7b2d71d0 100644 --- a/pkg/sql/opt/optbuilder/mutation_builder_unique.go +++ b/pkg/sql/opt/optbuilder/mutation_builder_unique.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/intsets" ) @@ -47,6 +48,18 @@ func (mb *mutationBuilder) buildUniqueChecksForInsert() { if !u.WithoutIndex() || u.UniquenessGuaranteedByAnotherIndex() { continue } + + // For non-serializable transactions, we guarantee uniqueness by writing tombstones in all + // partitions of a unique index with implicit partitioning columns. + if mb.b.evalCtx.TxnIsoLevel != isolation.Serializable { + if !mb.tab.Unique(i).CanUseTombstones() { + panic(unimplemented.NewWithIssue(126592, + "unique without index constraint under non-serializable isolation levels")) + } + mb.uniqueWithTombstoneIndexes.Add(i) + continue + } + // If this constraint is an arbiter of an INSERT ... ON CONFLICT ... DO // NOTHING clause, we don't need to plan a check (ON CONFLICT ... DO UPDATE // does not go through this code path; that's handled by @@ -55,13 +68,6 @@ func (mb *mutationBuilder) buildUniqueChecksForInsert() { continue } - // For non-serializable transactions, we guarantee uniqueness by writing tombstones in all - // partitions of a unique index with implicit partitioning columns. - if mb.b.evalCtx.TxnIsoLevel != isolation.Serializable && mb.tab.Unique(i).CanUseTombstones() { - mb.uniqueWithTombstoneIndexes.Add(i) - continue - } - if h.init(mb, i) { uniqueChecksItem, fastPathUniqueChecksItem := h.buildInsertionCheck(buildFastPathCheck) if fastPathUniqueChecksItem == nil { @@ -104,7 +110,11 @@ func (mb *mutationBuilder) buildUniqueChecksForUpdate() { } // For non-serializable transactions, we guarantee uniqueness by writing tombstones in all // partitions of a unique index with implicit partitioning columns. - if mb.b.evalCtx.TxnIsoLevel != isolation.Serializable && mb.tab.Unique(i).CanUseTombstones() { + if mb.b.evalCtx.TxnIsoLevel != isolation.Serializable { + if !mb.tab.Unique(i).CanUseTombstones() { + panic(unimplemented.NewWithIssue(126592, + "unique without index constraint under non-serializable isolation levels")) + } mb.uniqueWithTombstoneIndexes.Add(i) continue } @@ -139,10 +149,20 @@ func (mb *mutationBuilder) buildUniqueChecksForUpsert() { if !u.WithoutIndex() || u.UniquenessGuaranteedByAnotherIndex() { continue } + // For non-serializable transactions, we guarantee uniqueness by writing tombstones in all + // partitions of a unique index with implicit partitioning columns. + if mb.b.evalCtx.TxnIsoLevel != isolation.Serializable { + if !mb.tab.Unique(i).CanUseTombstones() { + panic(unimplemented.NewWithIssue(126592, + "unique without index constraint under non-serializable isolation levels")) + } + mb.uniqueWithTombstoneIndexes.Add(i) + continue + } // If this constraint is an arbiter of an INSERT ... ON CONFLICT ... DO // UPDATE clause and not updated by the DO UPDATE clause, we don't need to // plan a check (ON CONFLICT ... DO NOTHING does not go through this code - // path; that's handled by buildUniqueChecksForInsert). Note that that if + // path; that's handled by buildUniqueChecksForInsert). Note that if // the constraint is partial and columns referenced in the predicate are // updated, we'll still plan the check (this is handled correctly by // mb.uniqueColsUpdated). diff --git a/pkg/sql/opt_exec_factory.go b/pkg/sql/opt_exec_factory.go index 9ab2e30c0bde..a4c78e000c27 100644 --- a/pkg/sql/opt_exec_factory.go +++ b/pkg/sql/opt_exec_factory.go @@ -1640,6 +1640,7 @@ func (ef *execFactory) ConstructUpsert( updateColOrdSet exec.TableColumnOrdinalSet, returnColOrdSet exec.TableColumnOrdinalSet, checks exec.CheckOrdinalSet, + uniqueWithTombstoneIndexes cat.IndexOrdinals, autoCommit bool, ) (exec.Node, error) { // Derive table and column descriptors. @@ -1656,7 +1657,7 @@ func (ef *execFactory) ConstructUpsert( ef.planner.txn, ef.planner.ExecCfg().Codec, tabDesc, - nil, /* uniqueWithTombstoneIndexes */ + ordinalsToIndexes(table, uniqueWithTombstoneIndexes), insertCols, ef.getDatumAlloc(), &ef.planner.ExecCfg().Settings.SV, @@ -1673,7 +1674,7 @@ func (ef *execFactory) ConstructUpsert( ef.planner.txn, ef.planner.ExecCfg().Codec, tabDesc, - nil, /* uniqueWithTombstoneIndexes */ + ordinalsToIndexes(table, uniqueWithTombstoneIndexes), updateCols, fetchCols, row.UpdaterDefault, diff --git a/pkg/sql/upsert.go b/pkg/sql/upsert.go index 727d760b38c7..0b633b013f30 100644 --- a/pkg/sql/upsert.go +++ b/pkg/sql/upsert.go @@ -13,6 +13,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" ) var upsertNodePool = sync.Pool{ @@ -175,6 +176,13 @@ func (n *upsertNode) processSourceRow(params runParams, rowVals tree.Datums) err rowVals = rowVals[:ord] } + if buildutil.CrdbTestBuild { + // This testing knob allows us to suspend execution to force a race condition. + if fn := params.ExecCfg().TestingKnobs.AfterArbiterRead; fn != nil { + fn() + } + } + // Process the row. This is also where the tableWriter will accumulate // the row for later. return n.run.tw.row(params.ctx, rowVals, pm, n.run.traceKV)