Skip to content

Commit

Permalink
Change DB owner with ALTER AUTHORIZATION (#1954)
Browse files Browse the repository at this point in the history
Implement ALTER AUTHORIZATION to change database ownership.

Syntax: ALTER AUTHORIZATION ON DATABASE::<dbname> TO <login-name>

It is a common requirement for the DBA to create a database to be owned by a login. Thus far in Babelfish this was
possible only by (temporarily) making the target login a member of sysadmin role, which is undesirable.
To change the database owner to a different login, all that is required is to update sys.babelfish_sysdatabases.owner to
the new owner's login name. There are some restrictions such as that the new owner cannot be a user in the database
already (guest user does not count). A non-sysadmin DB owner cannot grant ownership to anyone else, including to a
sysadmin login. (however, it is OK to grant ownership to yourself although there seems little point in that).
When the current DB owner has an active session in the database with the current DB set to that same database, and at
the same time a different session change the database ownership, the now-previous owner retains access rights in the
session as long as the database context remains unchanged; as soon as the database context is changed, the access
rights from the DB ownership are lost: Babelfish behaves identically to T-SQL here.
To implement this feature, statement type PLTSQL_STMT_CHANGE_DBOWNER was introduced rather than something
more generic-looking like PLTSQL_STMT_ALTERAUTH: other cases of ALTER AUTHORIZATION may affect the
ownership/permissions-related artefacts that are created by Babelfish, or require different catalog modifications;
therefore, the DB ownership change is more of a one-off case.

BABEL-2121: Babel does not support ALTER AUTHORIZATION syntax to change database owner
Signed-off-by: Rob Verschoor <[email protected]>
  • Loading branch information
robverschoor authored Oct 26, 2023
1 parent 7c0e468 commit 1083c88
Show file tree
Hide file tree
Showing 20 changed files with 1,368 additions and 22 deletions.
95 changes: 84 additions & 11 deletions contrib/babelfishpg_tsql/src/catalog.c
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ static struct cachedesc my_cacheinfo[] = {
-1,
1,
{
Anum_sysdatabaese_oid,
Anum_sysdatabases_oid,
0,
0,
0
Expand All @@ -125,7 +125,7 @@ static struct cachedesc my_cacheinfo[] = {
-1,
1,
{
Anum_sysdatabaese_name,
Anum_sysdatabases_name,
0,
0,
0
Expand Down Expand Up @@ -302,7 +302,7 @@ get_db_name(int16 dbid)
if (!HeapTupleIsValid(tuple))
return NULL;

name_datum = SysCacheGetAttr(SYSDATABASEOID, tuple, Anum_sysdatabaese_name, &isNull);
name_datum = SysCacheGetAttr(SYSDATABASEOID, tuple, Anum_sysdatabases_name, &isNull);
name = TextDatumGetCString(name_datum);
ReleaseSysCache(tuple);

Expand All @@ -326,7 +326,7 @@ get_one_user_db_name(void)
{
char *db_name;

Datum name = heap_getattr(tuple, Anum_sysdatabaese_name,
Datum name = heap_getattr(tuple, Anum_sysdatabases_name,
rel->rd_att, &is_null);

db_name = TextDatumGetCString(name);
Expand Down Expand Up @@ -433,7 +433,7 @@ babelfish_helpdb(PG_FUNCTION_ARGS)
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("The database '%s' does not exist. Supply a valid database name. To see available databases, use sys.databases.", dbname)));
ScanKeyInit(&scanKey,
Anum_sysdatabaese_name,
Anum_sysdatabases_name,
BTEqualStrategyNumber, F_TEXTEQ,
CStringGetTextDatum(dbname_lower));
scan = systable_beginscan(rel, sysdatabaese_idx_name_oid, true,
Expand Down Expand Up @@ -463,7 +463,7 @@ babelfish_helpdb(PG_FUNCTION_ARGS)

MemSet(nulls, 0, sizeof(nulls));

db_name_entry = TextDatumGetCString(heap_getattr(tuple, Anum_sysdatabaese_name,
db_name_entry = TextDatumGetCString(heap_getattr(tuple, Anum_sysdatabases_name,
RelationGetDescr(rel), &isNull));

values[0] = CStringGetTextDatum(db_name_entry);
Expand All @@ -481,7 +481,7 @@ babelfish_helpdb(PG_FUNCTION_ARGS)
else
values[3] = sysdb->dbid;

tmstmp = DatumGetTimestamp(heap_getattr(tuple, Anum_sysdatabaese_crdate,
tmstmp = DatumGetTimestamp(heap_getattr(tuple, Anum_sysdatabases_crdate,
RelationGetDescr(rel), &isNull));

tmstmp_str = OidOutputFunctionCall(datetime_output_func, tmstmp);
Expand Down Expand Up @@ -1592,7 +1592,7 @@ static void rename_procfunc_update_bbf_catalog(RenameStmt *stmt);
*****************************************/
RelData catalog_data[] =
{
{"babelfish_sysdatabases", InvalidOid, InvalidOid, true, InvalidOid, Anum_sysdatabaese_name, F_TEXTEQ},
{"babelfish_sysdatabases", InvalidOid, InvalidOid, true, InvalidOid, Anum_sysdatabases_name, F_TEXTEQ},
{"babelfish_namespace_ext", InvalidOid, InvalidOid, true, InvalidOid, Anum_namespace_ext_namespace, F_NAMEEQ},
{"babelfish_authid_login_ext", InvalidOid, InvalidOid, true, InvalidOid, Anum_bbf_authid_login_ext_rolname, F_NAMEEQ},
{"babelfish_authid_user_ext", InvalidOid, InvalidOid, true, InvalidOid, Anum_bbf_authid_user_ext_rolname, F_NAMEEQ},
Expand Down Expand Up @@ -2237,7 +2237,7 @@ init_catalog_data(void)
{
catalog_data[i].tbl_oid = sysdatabases_oid;
catalog_data[i].idx_oid = sysdatabaese_idx_name_oid;
catalog_data[i].atttype = get_atttype(sysdatabases_oid, Anum_sysdatabaese_name);
catalog_data[i].atttype = get_atttype(sysdatabases_oid, Anum_sysdatabases_name);
}
else if (strcmp(catalog_data[i].tblname, "babelfish_namespace_ext") == 0)
{
Expand Down Expand Up @@ -2430,7 +2430,7 @@ update_user_catalog_for_guest(PG_FUNCTION_ARGS)

while (HeapTupleIsValid(tuple))
{
Datum db_name_datum = heap_getattr(tuple, Anum_sysdatabaese_name,
Datum db_name_datum = heap_getattr(tuple, Anum_sysdatabases_name,
db_rel->rd_att, &is_null);
const char *db_name = TextDatumGetCString(db_name_datum);

Expand Down Expand Up @@ -3183,7 +3183,7 @@ update_user_catalog_for_guest_schema(PG_FUNCTION_ARGS)

while (HeapTupleIsValid(tuple))
{
Datum db_name_datum = heap_getattr(tuple, Anum_sysdatabaese_name,
Datum db_name_datum = heap_getattr(tuple, Anum_sysdatabases_name,
db_rel->rd_att, &is_null);
const char *db_name = TextDatumGetCString(db_name_datum);

Expand Down Expand Up @@ -3250,3 +3250,76 @@ alter_guest_schema_for_db (const char *dbname)
table_endscan(tblscan);
table_close(bbf_authid_user_ext_rel, RowExclusiveLock);
}

/*
* Update the owner of a database in the catalog.
*/
void
update_db_owner(const char *new_owner_name, const char *db_name)
{
volatile Relation sysdatabases_rel;
TupleDesc sysdatabases_rel_descr;
ScanKeyData key;
HeapTuple tuple, db_found;
TableScanDesc tblscan;

Datum values[SYSDATABASES_NUM_COLS];
bool nulls[SYSDATABASES_NUM_COLS];
bool replaces[SYSDATABASES_NUM_COLS];

/* Do not allow changes to system databases. */
/* Note: T-SQL allows changing ownership of msdb. */
if ( (strlen(db_name) == 6 && (strncmp(db_name, "master", 6) == 0)) ||
(strlen(db_name) == 6 && (strncmp(db_name, "tempdb", 6) == 0))
)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Cannot change the owner of the master, model, tempdb or distribution database.")));
}

/* Find the database */
sysdatabases_rel = table_open(sysdatabases_oid, RowExclusiveLock);
sysdatabases_rel_descr = RelationGetDescr(sysdatabases_rel);

ScanKeyInit(&key,
Anum_sysdatabases_name,
BTEqualStrategyNumber, F_TEXTEQ,
CStringGetTextDatum(db_name));

tblscan = table_beginscan_catalog(sysdatabases_rel, 1, &key);

db_found = heap_getnext(tblscan, ForwardScanDirection);

if (!db_found)
{
/* Database should have been verified to exist, but if not, exit politely */
table_close(sysdatabases_rel, RowExclusiveLock);
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database \"%s\" does not exist", db_name)));
}

/* Build a tuple */
MemSet(values, 0, sizeof(values));
MemSet(nulls, false, sizeof(nulls));
MemSet(replaces, false, sizeof(replaces));

/* Set up the new owner. */
values[Anum_sysdatabases_owner - 1] = CStringGetDatum(new_owner_name);
replaces[Anum_sysdatabases_owner - 1] = true;

tuple = heap_modify_tuple(db_found,
sysdatabases_rel_descr,
values,
nulls,
replaces);

/* Perform the actual catalog update. */
CatalogTupleUpdate(sysdatabases_rel, &tuple->t_self, tuple);

/* Cleanup. */
heap_freetuple(tuple);
table_endscan(tblscan);
table_close(sysdatabases_rel, RowExclusiveLock);
}
8 changes: 5 additions & 3 deletions contrib/babelfishpg_tsql/src/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ extern Oid sysdatabaese_idx_name_oid;

/* MUST comply with babelfish_sysdatabases table */
#define SYSDATABASES_NUM_COLS 8
#define Anum_sysdatabaese_oid 1
#define Anum_sysdatabaese_name 6
#define Anum_sysdatabaese_crdate 7
#define Anum_sysdatabases_oid 1
#define Anum_sysdatabases_owner 4
#define Anum_sysdatabases_name 6
#define Anum_sysdatabases_crdate 7

/* MUST comply with babelfish_sysdatabases table */
typedef struct FormData_sysdatabases
Expand Down Expand Up @@ -148,6 +149,7 @@ extern List *get_authid_user_ext_db_users(const char *db_name);
extern char *get_user_for_database(const char *db_name);
extern void alter_user_can_connect(bool is_grant, char *user_name, char *db_name);
extern bool guest_role_exists_for_db(const char *dbname);
extern void update_db_owner(const char *new_owner_name, const char *db_name);

/* MUST comply with babelfish_authid_user_ext table */
typedef struct FormData_authid_user_ext
Expand Down
1 change: 1 addition & 0 deletions contrib/babelfishpg_tsql/src/codegen.c
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ stmt_default_act(Walker_context *ctx, PLtsql_stmt *stmt)
case PLTSQL_STMT_THROW:
case PLTSQL_STMT_USEDB:
case PLTSQL_STMT_GRANTDB:
case PLTSQL_STMT_CHANGE_DBOWNER:
case PLTSQL_STMT_GRANTSCHEMA:
case PLTSQL_STMT_INSERT_BULK:
case PLTSQL_STMT_DBCC:
Expand Down
2 changes: 1 addition & 1 deletion contrib/babelfishpg_tsql/src/dbcmds.c
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,7 @@ drop_all_dbs(PG_FUNCTION_ARGS)

while (HeapTupleIsValid(tuple) && i < DROP_DB_BATCH_SIZE)
{
Datum name = heap_getattr(tuple, Anum_sysdatabaese_name,
Datum name = heap_getattr(tuple, Anum_sysdatabases_name,
sysdatabase_rel->rd_att, &is_null);

dbnames[i] = TextDatumGetCString(name);
Expand Down
9 changes: 9 additions & 0 deletions contrib/babelfishpg_tsql/src/iterative_exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,15 @@ dispatch_stmt(PLtsql_execstate *estate, PLtsql_stmt *stmt)
}
exec_stmt_grantdb(estate, (PLtsql_stmt_grantdb *) stmt);
break;
case PLTSQL_STMT_CHANGE_DBOWNER:
if (pltsql_explain_only)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Showing Estimated Execution Plan for ALTER AUTHORIZATION statement is not yet supported")));
}
exec_stmt_change_dbowner(estate, (PLtsql_stmt_change_dbowner *) stmt);
break;
case PLTSQL_STMT_GRANTSCHEMA:
if (pltsql_explain_only)
{
Expand Down
65 changes: 65 additions & 0 deletions contrib/babelfishpg_tsql/src/pl_exec-2.c
Original file line number Diff line number Diff line change
Expand Up @@ -3752,3 +3752,68 @@ exec_stmt_grantschema(PLtsql_execstate *estate, PLtsql_stmt_grantschema *stmt)
}
return PLTSQL_RC_OK;
}

/*
* ALTER AUTHORIZATION ON DATABASE::dbname TO loginname
*/
static int
exec_stmt_change_dbowner(PLtsql_execstate *estate, PLtsql_stmt_change_dbowner *stmt)
{
char *new_owner_is_user;

/* Verify target database exists. */
if (!DbidIsValid(get_db_id(stmt->db_name)))
{
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("Cannot find the database '%s', because it does not exist or you do not have permission.", stmt->db_name)));
}

/* Verify new owner exists as a login. */
if (get_role_oid(stmt->new_owner_name, true) == InvalidOid)
{
ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("Cannot find the principal '%s', because it does not exist or you do not have permission.", stmt->new_owner_name)));
}

/* T-SQL allows granting ownership to yourself when you are owner already, even without having sysadmin role. */
if (get_role_oid(stmt->new_owner_name, true) == GetSessionUserId()) // Granting ownership to myself?
{
/* Is the current login already DB owner? */
if (get_role_oid(get_owner_of_db(stmt->db_name), true) == GetSessionUserId())
{
/* Current login is DB owner, so perform the update */
update_db_owner(stmt->new_owner_name, stmt->db_name);
return PLTSQL_RC_OK;
}
}

/*
* The executing login must have sysadmin role: even when the current session is the owner, but has no sysadmin role,
* T-SQL does not allow the owner to grant ownership to another login -- not even to 'sa'.
*/
if (!has_privs_of_role(GetSessionUserId(), get_role_oid("sysadmin", false)))
{
ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("Cannot find the principal '%s', because it does not exist or you do not have permission.", stmt->new_owner_name)));
}

/* The new owner cannot be a user in the database already (but 'guest' user is fine). */
new_owner_is_user = get_authid_user_ext_physical_name(stmt->db_name, stmt->new_owner_name);
if (!new_owner_is_user)
{
// OK to proceed
}
else if (new_owner_is_user && pg_strcasecmp(new_owner_is_user, "guest") == 0)
{
// OK to proceed
}
else
{
ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("The proposed new database owner is already a user or aliased in the database.")));
}

/* All validations done, perform the actual update */
update_db_owner(stmt->new_owner_name, stmt->db_name);
return PLTSQL_RC_OK;
}
13 changes: 13 additions & 0 deletions contrib/babelfishpg_tsql/src/pl_funcs-2.c
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ free_stmt2(PLtsql_stmt *stmt)
case PLTSQL_STMT_USEDB:
case PLTSQL_STMT_INSERT_BULK:
case PLTSQL_STMT_GRANTDB:
case PLTSQL_STMT_CHANGE_DBOWNER:
case PLTSQL_STMT_GRANTSCHEMA:
case PLTSQL_STMT_SET_EXPLAIN_MODE:
{
Expand Down Expand Up @@ -541,6 +542,7 @@ void dump_stmt_raiserror(PLtsql_stmt_raiserror *stmt_raiserror);
void dump_stmt_throw(PLtsql_stmt_throw *stmt_throw);
void dump_stmt_usedb(PLtsql_stmt_usedb *stmt_usedb);
void dump_stmt_grantdb(PLtsql_stmt_grantdb *stmt_grantdb);
void dump_stmt_change_dbowner(PLtsql_stmt_change_dbowner *stmt_change_dbowner);
void dump_stmt_insert_bulk(PLtsql_stmt_insert_bulk *stmt_insert_bulk);
void dump_stmt_try_catch(PLtsql_stmt_try_catch *stmt_try_catch);
void dump_stmt_query_set(PLtsql_stmt_query_set *query_set);
Expand Down Expand Up @@ -679,6 +681,12 @@ dump_stmt_grantdb(PLtsql_stmt_grantdb *stmt_grantdb)
resetStringInfo(&grantees_names);
}

void
dump_stmt_change_dbowner(PLtsql_stmt_change_dbowner *stmt_change_dbowner)
{
printf("ALTER AUTHORIZATION ON DATABASE::%s TO %s\n", stmt_change_dbowner->db_name, stmt_change_dbowner->new_owner_name);
}

void
dump_stmt_insert_bulk(PLtsql_stmt_insert_bulk *stmt_insert_bulk)
{
Expand Down Expand Up @@ -828,6 +836,11 @@ dump_stmt2(PLtsql_stmt *stmt)
dump_stmt_grantdb((PLtsql_stmt_grantdb *) stmt);
break;
}
case PLTSQL_STMT_CHANGE_DBOWNER:
{
dump_stmt_change_dbowner((PLtsql_stmt_change_dbowner *) stmt);
break;
}
case PLTSQL_STMT_INSERT_BULK:
{
dump_stmt_insert_bulk((PLtsql_stmt_insert_bulk *) stmt);
Expand Down
2 changes: 2 additions & 0 deletions contrib/babelfishpg_tsql/src/pl_funcs.c
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,8 @@ pltsql_stmt_typename(PLtsql_stmt *stmt)
case PLTSQL_STMT_GRANTDB:
return ((PLtsql_stmt_grantdb *) stmt)->is_grant ?
"GRANT CONNECT TO" : "REVOKE CONNECT FROM";
case PLTSQL_STMT_CHANGE_DBOWNER:
return "ALTER AUTHORIZATION ON DATABASE::";
/* TSQL-only executable node */
case PLTSQL_STMT_SAVE_CTX:
return "SAVE_CONTEXT";
Expand Down
12 changes: 12 additions & 0 deletions contrib/babelfishpg_tsql/src/pltsql.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ typedef enum PLtsql_stmt_type
PLTSQL_STMT_RESTORE_CTX_PARTIAL,
PLTSQL_STMT_INSERT_BULK,
PLTSQL_STMT_GRANTDB,
PLTSQL_STMT_CHANGE_DBOWNER,
PLTSQL_STMT_DBCC,
PLTSQL_STMT_GRANTSCHEMA
} PLtsql_stmt_type;
Expand Down Expand Up @@ -1036,6 +1037,17 @@ typedef struct PLtsql_stmt_grantdb
List *grantees; /* list of users */
} PLtsql_stmt_grantdb;

/*
* ALTER AUTHORIZATION ON DATABASE::<dbname> TO <login>
*/
typedef struct PLtsql_stmt_change_dbowner
{
PLtsql_stmt_type cmd_type;
int lineno;
char *db_name;
char *new_owner_name; /* Login name for new owner */
} PLtsql_stmt_change_dbowner;

/*
* Grant on schema stmt
*/
Expand Down
1 change: 1 addition & 0 deletions contrib/babelfishpg_tsql/src/pltsql_instr.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ typedef enum PgTsqlInstrMetricType
INSTR_UNSUPPORTED_TSQL_VARYING,
INSTR_UNSUPPORTED_TSQL_UNKNOWN_DDL,
INSTR_UNSUPPORTED_TSQL_NOT_IMPLEMENTED_SYSTEM_PROCEDURE,
INSTR_UNSUPPORTED_TSQL_ALTER_AUTHORIZATION,
INSTR_TSQL_TIMESTAMP_DATATYPE,
INSTR_TSQL_ROWVERSION_DATATYPE,
INSTR_TSQL_HIERARCHYID_DATATYPE,
Expand Down
Loading

0 comments on commit 1083c88

Please sign in to comment.