Skip to content

Commit

Permalink
Merge #107567
Browse files Browse the repository at this point in the history
107567: sql: fix CREATE AS [SHOW CREATE FUNCTION] job failure r=rafiss a=ecwall

Fixes #106268

Previously `CREATE TABLE AS`/`CREATE MATERIALIZED VIEW AS` sourcing from `SHOW CREATE FUNCTION <function>` generated a failing schema change job with a `unknown function: f(): function undefined` error because the job runs in the system database and functions cannot be referenced cross database.

This PR fixes this by using the original user session (which includes the database) in the schema change job.

Release note (bug fix): Fixes the schema changer job when CREATE AS sources from SHOW CREATE FUNCTION:
CREATE TABLE t AS SELECT * FROM [SHOW CREATE FUNCTION f];

Co-authored-by: Evan Wall <[email protected]>
  • Loading branch information
craig[bot] and ecwall committed Aug 2, 2023
2 parents a462321 + 0695f6a commit 89040b4
Show file tree
Hide file tree
Showing 20 changed files with 171 additions and 46 deletions.
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ ALL_TESTS = [
"//pkg/sql/sem/tree:tree_disallowed_imports_test",
"//pkg/sql/sem/tree:tree_test",
"//pkg/sql/sessiondata:sessiondata_test",
"//pkg/sql/sessiondatapb:sessiondatapb_test",
"//pkg/sql/sessioninit:sessioninit_test",
"//pkg/sql/span:span_test",
"//pkg/sql/sqlinstance/instancestorage:instancestorage_test",
Expand Down Expand Up @@ -2051,6 +2052,7 @@ GO_TARGETS = [
"//pkg/sql/sessiondata:sessiondata",
"//pkg/sql/sessiondata:sessiondata_test",
"//pkg/sql/sessiondatapb:sessiondatapb",
"//pkg/sql/sessiondatapb:sessiondatapb_test",
"//pkg/sql/sessioninit:sessioninit",
"//pkg/sql/sessioninit:sessioninit_test",
"//pkg/sql/sessionphase:sessionphase",
Expand Down
4 changes: 3 additions & 1 deletion pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,9 @@ message SchemaChangeDetails {
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"
];

// Next id 13.
sessiondatapb.SessionData session_data = 13;

// Next id 14.
}

message SchemaChangeProgress {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ var (
errExporterWrap = errors.New("core.Exporter is not supported (not an execinfra.RowSource)")
errSamplerWrap = errors.New("core.Sampler is not supported (not an execinfra.RowSource)")
errSampleAggregatorWrap = errors.New("core.SampleAggregator is not supported (not an execinfra.RowSource)")
errExperimentalWrappingProhibited = errors.New("wrapping for non-JoinReader and non-LocalPlanNode cores is prohibited in vectorize=experimental_always")
errExperimentalWrappingProhibited = errors.Newf("wrapping for non-JoinReader and non-LocalPlanNode cores is prohibited in vectorize=%s", sessiondatapb.VectorizeExperimentalAlways)
errWrappedCast = errors.New("mismatched types in NewColOperator and unsupported casts")
errLookupJoinUnsupported = errors.New("lookup join reader is unsupported in vectorized")
errFilteringAggregation = errors.New("filtering aggregation not supported")
Expand Down
7 changes: 0 additions & 7 deletions pkg/sql/create_as_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ func TestCreateAsShow(t *testing.T) {
testCases := []struct {
sql string
setup string
skip bool
}{
{
sql: "SHOW CLUSTER SETTINGS",
Expand Down Expand Up @@ -165,9 +164,6 @@ func TestCreateAsShow(t *testing.T) {
{
sql: "SHOW CREATE FUNCTION show_create_fn",
setup: "CREATE FUNCTION show_create_fn(i int) RETURNS INT AS 'SELECT i' LANGUAGE SQL",
// TODO(sql-foundations): Fix `unknown function: show_create_fn(): function undefined` error in job.
// See https://github.com/cockroachdb/cockroach/issues/106268.
skip: true,
},
{
sql: "SHOW CREATE ALL TYPES",
Expand Down Expand Up @@ -294,9 +290,6 @@ func TestCreateAsShow(t *testing.T) {

for i, testCase := range testCases {
t.Run(testCase.sql, func(t *testing.T) {
if testCase.skip {
return
}
if testCase.setup != "" {
if s.StartedDefaultTestTenant() && strings.Contains(testCase.setup, "create_tenant") {
// Only the system tenant has the ability to create other
Expand Down
26 changes: 24 additions & 2 deletions pkg/sql/delegate/show_function.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,20 @@ FROM crdb_internal.create_function_statements
WHERE schema_name = %[1]s
AND function_name = %[2]s
`
un, ok := n.Name.FunctionReference.(*tree.UnresolvedName)
resolvableFunctionReference := &n.Name
un, ok := resolvableFunctionReference.FunctionReference.(*tree.UnresolvedName)
if !ok {
return nil, errors.AssertionFailedf("not a valid function name")
}

fn, err := d.catalog.ResolveFunction(d.ctx, un, &d.evalCtx.SessionData().SearchPath)
searchPath := &d.evalCtx.SessionData().SearchPath
var fn *tree.ResolvedFunctionDefinition
var err error
if d.qualifyDataSourceNamesInAST {
fn, err = resolvableFunctionReference.Resolve(d.ctx, searchPath, d.catalog)
} else {
fn, err = d.catalog.ResolveFunction(d.ctx, un, searchPath)
}
if err != nil {
return nil, err
}
Expand All @@ -41,12 +49,26 @@ AND function_name = %[2]s
for _, o := range fn.Overloads {
if o.IsUDF {
udfSchema = o.Schema
break
}
}
if udfSchema == "" {
return nil, errors.Errorf("function %s does not exist", tree.AsString(un))
}

if d.qualifyDataSourceNamesInAST {
referenceByName := resolvableFunctionReference.ReferenceByName
if !referenceByName.HasExplicitSchema() {
referenceByName.Parts[1] = udfSchema
}
if !referenceByName.HasExplicitCatalog() {
referenceByName.Parts[2] = d.evalCtx.SessionData().Database
}
if referenceByName.NumParts < 3 {
referenceByName.NumParts = 3
}
}

fullQuery := fmt.Sprintf(query, lexbase.EscapeSQLString(udfSchema), lexbase.EscapeSQLString(un.Parts[0]))
return d.parse(fullQuery)
}
1 change: 1 addition & 0 deletions pkg/sql/drop_function.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ func (p *planner) writeDropFuncSchemaChange(ctx context.Context, funcDesc *funcd
DescriptorIDs: descpb.IDs{funcDesc.ID},
Details: jobspb.SchemaChangeDetails{
DroppedFunctions: descpb.IDs{funcDesc.ID},
SessionData: &p.SessionData().SessionData,
},
Progress: jobspb.TypeSchemaChangeProgress{},
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/drop_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ func (p *planner) createDropSchemaJob(
// The version distinction for database jobs doesn't matter for jobs that
// drop schemas.
FormatVersion: jobspb.DatabaseJobFormatVersion,
SessionData: &p.SessionData().SessionData,
},
Progress: jobspb.SchemaChangeProgress{},
NonCancelable: true,
Expand Down
13 changes: 7 additions & 6 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,12 +588,13 @@ var VectorizeClusterMode = settings.RegisterEnumSetting(
VectorizeClusterSettingName,
"default vectorize mode",
"on",
map[int64]string{
int64(sessiondatapb.VectorizeUnset): "on",
int64(sessiondatapb.VectorizeOn): "on",
int64(sessiondatapb.VectorizeExperimentalAlways): "experimental_always",
int64(sessiondatapb.VectorizeOff): "off",
},
func() map[int64]string {
m := make(map[int64]string, len(sessiondatapb.VectorizeExecMode_name))
for k := range sessiondatapb.VectorizeExecMode_name {
m[int64(k)] = sessiondatapb.VectorizeExecMode(k).String()
}
return m
}(),
).WithPublic()

// DistSQLClusterExecMode controls the cluster default for when DistSQL is used.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/logictest/testdata/logic_test/builtin_function
Original file line number Diff line number Diff line change
Expand Up @@ -2989,7 +2989,7 @@ select crdb_internal.json_to_pb('cockroach.sql.sqlbase.Descriptor', crdb_interna
----
true

query error pq: crdb_internal.json_to_pb\(\): invalid proto JSON: unmarshaling json to cockroach.sql.sqlbase.Descriptor: unknown field "__redacted__" in descpb.Descriptor
query error pq: crdb_internal.json_to_pb\(\): invalid proto JSON: unmarshaling to cockroach.sql.sqlbase.Descriptor json: .+ unknown field "__redacted__" in descpb.Descriptor
select crdb_internal.json_to_pb('cockroach.sql.sqlbase.Descriptor', crdb_internal.pb_to_json('cockroach.sql.sqlbase.Descriptor', descriptor, true, true)) = descriptor from system.descriptor where id = 1

subtest regexp_split
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/logictest/testdata/logic_test/set
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ SET vectorize = experimental_always
statement ok
SET vectorize = off

statement error invalid value for parameter "vectorize": "bogus"
statement error invalid value for parameter "vectorize": "bogus"\nHINT: Available values: off,on,experimental_always
SET vectorize = bogus

statement ok
Expand Down
47 changes: 47 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/vectorize
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
# Note that there is no much benefit from running this file with other logic
# test configurations because we override vectorize setting.

subtest todo1

# NOTE: all queries in this file should run with vectorize=experimental_always
# unless they are known to be unsupported (like generate_series, etc). If you do
# need to execute an unsupported query, follow the pattern of
Expand Down Expand Up @@ -605,6 +607,14 @@ SELECT EXTRACT(YEAR FROM x) FROM extract_test
----
2017

statement ok
RESET vectorize

subtest end

subtest 38937


# Regression test for #38937
statement ok
CREATE TABLE t38937 (_int2) AS SELECT 1::INT2;
Expand All @@ -614,6 +624,13 @@ SELECT sum_int(_int2) FROM t38937
----
1

subtest end

subtest todo2

statement ok
SET vectorize=experimental_always

# Regression tests for #38959

statement ok
Expand Down Expand Up @@ -1167,6 +1184,13 @@ SELECT crdb_internal_mvcc_timestamp IS NOT NULL FROM mvcc@i
----
true

statement ok
RESET vectorize

subtest end

subtest 51841

# Regression test for builtin funcs that take no arguments, combined with
# data that can't be safely decoded as input.

Expand All @@ -1176,6 +1200,10 @@ CREATE TABLE t51841 (a) AS SELECT gen_random_uuid();
statement ok
SELECT random() from t51841

subtest end

subtest todo3

# Regression test to ensure that we can EXPLAIN (VEC) plans that contain user
# defined types. We aren't interested in the output here, but are just ensuring
# that we don't panic.
Expand Down Expand Up @@ -1247,6 +1275,13 @@ SELECT b FROM t66706@u WHERE NOT (b = 'foo')
bar
bar

statement ok
RESET vectorize

subtest end

subtest 68040

# Regression test for ignoring the escaping in the LIKE pattern (#68040).
statement ok
CREATE TABLE t68040 (c) AS SELECT 'string with \ backslash'
Expand All @@ -1256,6 +1291,13 @@ SELECT c FROM t68040 WHERE c LIKE '%\\%'
----
string with \ backslash

subtest end

subtest todo4

statement ok
SET vectorize=experimental_always

# Regression test for #68979. The IN operator should evaluate correctly when the
# tuple contents are not sorted by the optimizer.
statement ok
Expand Down Expand Up @@ -1296,3 +1338,8 @@ query I
SELECT _int2 * _int2 FROM ints WHERE _int4 + _int4 = _int8 + 2
----
4

statement ok
RESET vectorize

subtest end
5 changes: 3 additions & 2 deletions pkg/sql/protoreflect/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,9 @@ func MessageToJSON(msg protoutil.Message, flags FmtFlags) (jsonb.JSON, error) {
// Returns serialized byte representation of the protocol message.
func JSONBMarshalToMessage(input jsonb.JSON, target protoutil.Message) ([]byte, error) {
json := &jsonpb.Unmarshaler{}
if err := json.Unmarshal(strings.NewReader(input.String()), target); err != nil {
return nil, errors.Wrapf(err, "unmarshaling json to %s", proto.MessageName(target))
jsonString := input.String()
if err := json.Unmarshal(strings.NewReader(jsonString), target); err != nil {
return nil, errors.Wrapf(err, "unmarshaling to %s json: %s", proto.MessageName(target), jsonString)
}
data, err := protoutil.Marshal(target)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func (p *planner) writeSchemaDescChange(
// The version distinction for database jobs doesn't matter for schema
// jobs.
FormatVersion: jobspb.DatabaseJobFormatVersion,
SessionData: &p.SessionData().SessionData,
},
Progress: jobspb.SchemaChangeProgress{},
NonCancelable: true,
Expand Down
10 changes: 9 additions & 1 deletion pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/storage"
Expand Down Expand Up @@ -156,6 +157,7 @@ type SchemaChanger struct {
clock *hlc.Clock
settings *cluster.Settings
execCfg *ExecutorConfig
sessionData *sessiondatapb.SessionData
}

// NewSchemaChangerForTesting only for tests.
Expand Down Expand Up @@ -305,6 +307,8 @@ func (sc *SchemaChanger) backfillQueryIntoTable(
return err
}

sd := NewInternalSessionData(ctx, sc.execCfg.Settings, "backfillQueryIntoTable")
sd.SessionData = *sc.sessionData
// Create an internal planner as the planner used to serve the user query
// would have committed by this point.
p, cleanup := NewInternalPlanner(
Expand All @@ -313,7 +317,7 @@ func (sc *SchemaChanger) backfillQueryIntoTable(
username.RootUserName(),
&MemoryMetrics{},
sc.execCfg,
NewInternalSessionData(ctx, sc.execCfg.Settings, "backfillQueryIntoTable"),
sd,
)

defer cleanup()
Expand Down Expand Up @@ -2155,6 +2159,7 @@ func (sc *SchemaChanger) updateJobForRollback(
TableMutationID: sc.mutationID,
ResumeSpanList: spanList,
FormatVersion: oldDetails.FormatVersion,
SessionData: sc.sessionData,
},
); err != nil {
return err
Expand Down Expand Up @@ -2611,6 +2616,7 @@ func (r schemaChangeResumer) Resume(ctx context.Context, execCtx interface{}) er
settings: p.ExecCfg().Settings,
execCfg: p.ExecCfg(),
metrics: p.ExecCfg().SchemaChangerMetrics,
sessionData: details.SessionData,
}
opts := retry.Options{
InitialBackoff: 20 * time.Millisecond,
Expand Down Expand Up @@ -2828,6 +2834,7 @@ func (r schemaChangeResumer) OnFailOrCancel(
clock: p.ExecCfg().Clock,
settings: p.ExecCfg().Settings,
execCfg: p.ExecCfg(),
sessionData: details.SessionData,
}

if r.job.Payload().FinalResumeError == nil {
Expand Down Expand Up @@ -2931,6 +2938,7 @@ func (sc *SchemaChanger) queueCleanupJob(
// The version distinction for database jobs doesn't matter for jobs on
// tables.
FormatVersion: jobspb.DatabaseJobFormatVersion,
SessionData: sc.sessionData,
},
Progress: jobspb.SchemaChangeProgress{},
NonCancelable: true,
Expand Down
13 changes: 12 additions & 1 deletion pkg/sql/sessiondatapb/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
load("@rules_proto//proto:defs.bzl", "proto_library")
load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "sessiondatapb",
Expand Down Expand Up @@ -54,3 +54,14 @@ go_proto_library(
"@com_github_gogo_protobuf//gogoproto",
],
)

go_test(
name = "sessiondatapb_test",
srcs = ["session_data_test.go"],
args = ["-test.timeout=295s"],
embed = [":sessiondatapb"],
deps = [
"//pkg/sql/protoreflect",
"@com_github_stretchr_testify//require",
],
)
Loading

0 comments on commit 89040b4

Please sign in to comment.