diff --git a/pkg/ccl/crosscluster/logical/logical_replication_job_test.go b/pkg/ccl/crosscluster/logical/logical_replication_job_test.go index ca52a991af07..396fb35047af 100644 --- a/pkg/ccl/crosscluster/logical/logical_replication_job_test.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_job_test.go @@ -2075,7 +2075,7 @@ func TestLogicalReplicationCreationChecks(t *testing.T) { `cannot create logical replication stream: destination table tab CHECK constraints do not match source table tab`, "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab", dbBURL.String(), ) - // Allos user to create LDR stream with mismatched CHECK via SKIP SCHEMA CHECK. + // Allow user to create LDR stream with mismatched CHECK via SKIP SCHEMA CHECK. var jobIDSkipSchemaCheck jobspb.JobID dbA.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab WITH SKIP SCHEMA CHECK", @@ -2097,7 +2097,39 @@ func TestLogicalReplicationCreationChecks(t *testing.T) { dbA.Exec(t, "CANCEL JOB $1", jobAID) jobutils.WaitForJobToCancel(t, dbA, jobAID) + // Check if the table references a UDF. + dbA.Exec(t, "CREATE OR REPLACE FUNCTION my_udf() RETURNS INT AS $$ SELECT 1 $$ LANGUAGE SQL") + dbA.Exec(t, "ALTER TABLE tab ADD COLUMN udf_col INT NOT NULL") + dbA.Exec(t, "ALTER TABLE tab ALTER COLUMN udf_col SET DEFAULT my_udf()") + dbB.Exec(t, "ALTER TABLE tab ADD COLUMN udf_col INT NOT NULL DEFAULT 1") + dbA.ExpectErr(t, + `cannot create logical replication stream: table tab references functions with IDs \[[0-9]+\]`, + "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab", dbBURL.String(), + ) + + // Check if the table references a sequence. + dbA.Exec(t, "ALTER TABLE tab DROP COLUMN udf_col") + dbB.Exec(t, "ALTER TABLE tab DROP COLUMN udf_col") + dbA.Exec(t, "CREATE SEQUENCE my_seq") + dbA.Exec(t, "ALTER TABLE tab ADD COLUMN seq_col INT NOT NULL DEFAULT nextval('my_seq')") + dbB.Exec(t, "ALTER TABLE tab ADD COLUMN seq_col INT NOT NULL DEFAULT 1") + dbA.ExpectErr(t, + `cannot create logical replication stream: table tab references sequences with IDs \[[0-9]+\]`, + "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab", dbBURL.String(), + ) + + // Check if table has a trigger. + dbA.Exec(t, "ALTER TABLE tab DROP COLUMN seq_col") + dbB.Exec(t, "ALTER TABLE tab DROP COLUMN seq_col") + dbA.Exec(t, "CREATE OR REPLACE FUNCTION my_trigger() RETURNS TRIGGER AS $$ BEGIN RETURN NEW; END $$ LANGUAGE PLPGSQL") + dbA.Exec(t, "CREATE TRIGGER my_trigger BEFORE INSERT ON tab FOR EACH ROW EXECUTE FUNCTION my_trigger()") + dbA.ExpectErr(t, + `cannot create logical replication stream: table tab references triggers \[my_trigger\]`, + "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab", dbBURL.String(), + ) + // Verify that the stream cannot be created with user defined types. + dbA.Exec(t, "DROP TRIGGER my_trigger ON tab") dbA.Exec(t, "CREATE TYPE mytype AS ENUM ('a', 'b', 'c')") dbB.Exec(t, "CREATE TYPE b.mytype AS ENUM ('a', 'b', 'c')") dbA.Exec(t, "ALTER TABLE tab ADD COLUMN enum_col mytype NOT NULL") diff --git a/pkg/sql/catalog/tabledesc/logical_replication_helpers.go b/pkg/sql/catalog/tabledesc/logical_replication_helpers.go index d9f36606d9c7..8e0928b88956 100644 --- a/pkg/sql/catalog/tabledesc/logical_replication_helpers.go +++ b/pkg/sql/catalog/tabledesc/logical_replication_helpers.go @@ -52,7 +52,31 @@ func CheckLogicalReplicationCompatibility( return pgerror.Wrapf(err, pgcode.InvalidTableDefinition, cannotLDRMsg) } } + if err := checkOutboundReferences(dst); err != nil { + return pgerror.Wrapf(err, pgcode.InvalidTableDefinition, cannotLDRMsg) + } + + return nil +} +// checkOutboundReferences verifies that the table descriptor does not +// reference any user-defined functions, sequences, or triggers. +func checkOutboundReferences(dst *descpb.TableDescriptor) error { + for _, col := range dst.Columns { + if len(col.UsesSequenceIds) > 0 { + return errors.Newf("table %s references sequences with IDs %v", dst.Name, col.UsesSequenceIds) + } + if len(col.UsesFunctionIds) > 0 { + return errors.Newf("table %s references functions with IDs %v", dst.Name, col.UsesFunctionIds) + } + } + if len(dst.Triggers) > 0 { + triggerNames := make([]string, len(dst.Triggers)) + for i, trigger := range dst.Triggers { + triggerNames[i] = trigger.Name + } + return errors.Newf("table %s references triggers [%s]", dst.Name, strings.Join(triggerNames, ", ")) + } return nil }