Skip to content

Commit

Permalink
Refactor internal/principal key LWLocks
Browse files Browse the repository at this point in the history
Before this commit, several flaws existed in the lock protection of the
Internal and Principal keys. Such as double locking, insufficient
locking while rotating the principal key and internal key retrieval
from the disk. We also had two locks guarding the same actions, which
in 90% of cases were locked and unlocked together. This commit tries to
address all these issues.

For: https://perconadev.atlassian.net/browse/PG-858
  • Loading branch information
dAdAbird committed Sep 25, 2024
1 parent 6d4f7e5 commit 5a38a6c
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 136 deletions.
65 changes: 30 additions & 35 deletions src/access/pg_tde_tdemap.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,13 @@ pg_tde_create_key_map_entry(const RelFileLocator *newrlocator)
RelKeyData *enc_rel_key_data;
TDEPrincipalKey *principal_key;
XLogRelKey xlrec;
LWLock *lock_pk = tde_lwlock_enc_keys();

principal_key = GetPrincipalKey(newrlocator->dbOid, newrlocator->spcOid);
LWLockAcquire(lock_pk, LW_EXCLUSIVE);
principal_key = GetPrincipalKey(newrlocator->dbOid, newrlocator->spcOid, LW_EXCLUSIVE);
if (principal_key == NULL)
{
LWLockRelease(lock_pk);
ereport(ERROR,
(errmsg("failed to retrieve principal key")));

Expand All @@ -158,6 +161,7 @@ pg_tde_create_key_map_entry(const RelFileLocator *newrlocator)

if (!RAND_bytes(int_key.key, INTERNAL_KEY_LEN))
{
LWLockRelease(lock_pk);
ereport(FATAL,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("could not generate internal key for relation \"%s\": %s",
Expand All @@ -184,6 +188,7 @@ pg_tde_create_key_map_entry(const RelFileLocator *newrlocator)
* Add the encyrpted key to the key map data file structure.
*/
pg_tde_write_key_map_entry(newrlocator, enc_rel_key_data, &principal_key->keyInfo);
LWLockRelease(lock_pk);
pfree(enc_rel_key_data);
return rel_key_data;
}
Expand Down Expand Up @@ -486,13 +491,12 @@ pg_tde_write_one_keydata(int fd, int32 key_index, RelKeyData *enc_rel_key_data)
* The keydata function will then write the encrypted key on the desired
* location.
*
* The map file must be updated while holding an exclusive lock.
* The caller must hold an exclusive lock tde_lwlock_enc_keys.
*/
void
pg_tde_write_key_map_entry(const RelFileLocator *rlocator, RelKeyData *enc_rel_key_data, TDEPrincipalKeyInfo *principal_key_info)
{
int32 key_index = 0;
LWLock *lock_files = tde_lwlock_mk_files();
char db_map_path[MAXPGPATH] = {0};
char db_keydata_path[MAXPGPATH] = {0};

Expand All @@ -502,12 +506,10 @@ pg_tde_write_key_map_entry(const RelFileLocator *rlocator, RelKeyData *enc_rel_k
pg_tde_set_db_file_paths(rlocator, db_map_path, db_keydata_path);

/* Create the map entry and then add the encrypted key to the data file */
LWLockAcquire(lock_files, LW_EXCLUSIVE);
key_index = pg_tde_write_map_entry(rlocator, db_map_path, principal_key_info);

/* Add the encrypted key to the data file. */
pg_tde_write_keydata(db_keydata_path, principal_key_info, key_index, enc_rel_key_data);
LWLockRelease(lock_files);
}

/*
Expand All @@ -519,7 +521,7 @@ pg_tde_delete_key_map_entry(const RelFileLocator *rlocator)
{
int32 key_index = 0;
off_t offset = 0;
LWLock *lock_files = tde_lwlock_mk_files();
LWLock *lock_files = tde_lwlock_enc_keys();
char db_map_path[MAXPGPATH] = {0};
char db_keydata_path[MAXPGPATH] = {0};

Expand Down Expand Up @@ -563,7 +565,7 @@ void
pg_tde_free_key_map_entry(const RelFileLocator *rlocator, off_t offset)
{
int32 key_index = 0;
LWLock *lock_files = tde_lwlock_mk_files();
LWLock *lock_files = tde_lwlock_enc_keys();
char db_map_path[MAXPGPATH] = {0};
char db_keydata_path[MAXPGPATH] = {0};

Expand Down Expand Up @@ -649,8 +651,6 @@ pg_tde_perform_rotate_key(TDEPrincipalKey *principal_key, TDEPrincipalKey *new_p
off_t keydata_size;
XLogPrincipalKeyRotate *xlrec;
off_t xlrec_size;
LWLock *lock_files = tde_lwlock_mk_files();
LWLock *lock_cache = tde_lwlock_mk_cache();
char db_map_path[MAXPGPATH] = {0};
char db_keydata_path[MAXPGPATH] = {0};

Expand All @@ -665,9 +665,6 @@ pg_tde_perform_rotate_key(TDEPrincipalKey *principal_key, TDEPrincipalKey *new_p
strncpy(m_path[OLD_PRINCIPAL_KEY], db_map_path, MAXPGPATH);
strncpy(k_path[OLD_PRINCIPAL_KEY], db_keydata_path, MAXPGPATH);

LWLockAcquire(lock_files, LW_EXCLUSIVE);
LWLockAcquire(lock_cache, LW_EXCLUSIVE);

/* Open both files in read only mode. We don't need to track the current position of the keydata file. We always use the key index */
m_fd[OLD_PRINCIPAL_KEY] = pg_tde_open_file(m_path[OLD_PRINCIPAL_KEY], &principal_key->keyInfo, false, O_RDONLY, &is_new_file, &curr_pos[OLD_PRINCIPAL_KEY]);
k_fd[OLD_PRINCIPAL_KEY] = pg_tde_open_file(k_path[OLD_PRINCIPAL_KEY], &principal_key->keyInfo, false, O_RDONLY, &is_new_file, &read_pos_tmp);
Expand Down Expand Up @@ -744,9 +741,6 @@ pg_tde_perform_rotate_key(TDEPrincipalKey *principal_key, TDEPrincipalKey *new_p
finalize_key_rotation(m_path[OLD_PRINCIPAL_KEY], k_path[OLD_PRINCIPAL_KEY],
m_path[NEW_PRINCIPAL_KEY], k_path[NEW_PRINCIPAL_KEY]);

LWLockRelease(lock_cache);
LWLockRelease(lock_files);

/* Free up the palloc'ed data */
pfree(xlrec);

Expand All @@ -771,8 +765,6 @@ pg_tde_write_map_keydata_files(off_t map_size, char *m_file_data, off_t keydata_
bool is_new_file;
off_t curr_pos = 0;
off_t read_pos_tmp = 0;
LWLock *lock_files = tde_lwlock_mk_files();
LWLock *lock_cache = tde_lwlock_mk_cache();
char db_map_path[MAXPGPATH] = {0};
char db_keydata_path[MAXPGPATH] = {0};
bool is_err = false;
Expand All @@ -787,9 +779,6 @@ pg_tde_write_map_keydata_files(off_t map_size, char *m_file_data, off_t keydata_
0},
db_map_path, db_keydata_path);

LWLockAcquire(lock_files, LW_EXCLUSIVE);
LWLockAcquire(lock_cache, LW_EXCLUSIVE);

/* Initialize the new files and set the names */
m_fd_new = keyrotation_init_file(&fheader->principal_key_info, m_path_new, db_map_path, &is_new_file, &curr_pos);
k_fd_new = keyrotation_init_file(&fheader->principal_key_info, k_path_new, db_keydata_path, &is_new_file, &read_pos_tmp);
Expand Down Expand Up @@ -839,9 +828,6 @@ pg_tde_write_map_keydata_files(off_t map_size, char *m_file_data, off_t keydata_
if (!is_err)
finalize_key_rotation(db_map_path, db_keydata_path, m_path_new, k_path_new);

LWLockRelease(lock_cache);
LWLockRelease(lock_files);

return !is_err;
}

Expand All @@ -859,19 +845,30 @@ pg_tde_get_key_from_file(const RelFileLocator *rlocator)
RelKeyData *rel_key_data;
RelKeyData *enc_rel_key_data;
off_t offset = 0;
LWLock *lock_files = tde_lwlock_mk_files();
LWLock *lock_pk = tde_lwlock_enc_keys();
char db_map_path[MAXPGPATH] = {0};
char db_keydata_path[MAXPGPATH] = {0};

Assert(rlocator);

LWLockAcquire(lock_files, LW_SHARED);

/* Get/generate a principal key, create the key for relation and get the encrypted key with bytes to write */
principal_key = GetPrincipalKey(rlocator->dbOid, rlocator->spcOid);
/*
* Get/generate a principal key, create the key for relation and get the
* encrypted key with bytes to write
*
* We should hold the lock until the internal key is loaded to be sure the
* retrieved key was encrypted with the obtained principal key. Otherwise,
* the next may happen:
* - GetPrincipalKey returns key "PKey_1".
* - Some other process rotates the Principal key and re-encrypt an
* Internal key with "PKey_2".
* - We read the Internal key and decrypt it with "PKey_1" (that's what
* we've got). As the result we return an invalid Internal key.
*/
LWLockAcquire(lock_pk, LW_SHARED);
principal_key = GetPrincipalKey(rlocator->dbOid, rlocator->spcOid, LW_SHARED);
if (principal_key == NULL)
{
LWLockRelease(lock_files);
LWLockRelease(lock_pk);
ereport(ERROR,
(errmsg("failed to retrieve principal key")));
}
Expand All @@ -884,12 +881,12 @@ pg_tde_get_key_from_file(const RelFileLocator *rlocator)

if (key_index == -1)
{
LWLockRelease(lock_files);
LWLockRelease(lock_pk);
return NULL;
}

enc_rel_key_data = pg_tde_read_keydata(db_keydata_path, key_index, principal_key);
LWLockRelease(lock_files);
LWLockRelease(lock_pk);

rel_key_data = tde_decrypt_rel_key(principal_key, enc_rel_key_data, rlocator);

Expand Down Expand Up @@ -1000,6 +997,7 @@ pg_tde_process_map_entry(const RelFileLocator *rlocator, char *db_map_path, off_

/*
* Open the file and read the required key data from file and return encrypted key.
* The caller should hold
*/
static RelKeyData *
pg_tde_read_keydata(char *db_keydata_path, int32 key_index, TDEPrincipalKey *principal_key)
Expand All @@ -1008,18 +1006,15 @@ pg_tde_read_keydata(char *db_keydata_path, int32 key_index, TDEPrincipalKey *pri
RelKeyData *enc_rel_key_data;
off_t read_pos = 0;
bool is_new_file;
LWLock *lock_files = tde_lwlock_mk_files();

/* Open and validate file for basic correctness. */
LWLockAcquire(lock_files, LW_SHARED);
fd = pg_tde_open_file(db_keydata_path, &principal_key->keyInfo, false, O_RDONLY, &is_new_file, &read_pos);

/* Read the encrypted key from file */
enc_rel_key_data = pg_tde_read_one_keydata(fd, key_index, principal_key);

/* Let's close the file. */
close(fd);
LWLockRelease(lock_files);

return enc_rel_key_data;
}
Expand Down Expand Up @@ -1241,7 +1236,7 @@ pg_tde_read_one_keydata(int keydata_fd, int32 key_index, TDEPrincipalKey *princi
* a LW_SHARED or higher lock on files before calling this function.
*/
TDEPrincipalKeyInfo *
pg_tde_get_principal_key(Oid dbOid, Oid spcOid)
pg_tde_get_principal_key_info(Oid dbOid, Oid spcOid)
{
int fd = -1;
TDEFileHeader fheader;
Expand Down
6 changes: 6 additions & 0 deletions src/access/pg_tde_xlog.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,17 @@ tdeheap_rmgr_redo(XLogReaderState *record)
{
XLogRelKey *xlrec = (XLogRelKey *) XLogRecGetData(record);

LWLockAcquire(tde_lwlock_enc_keys(), LW_EXCLUSIVE);
pg_tde_write_key_map_entry(&xlrec->rlocator, &xlrec->relKey, NULL);
LWLockRelease(tde_lwlock_enc_keys());
}
else if (info == XLOG_TDE_ADD_PRINCIPAL_KEY)
{
TDEPrincipalKeyInfo *mkey = (TDEPrincipalKeyInfo *) XLogRecGetData(record);

LWLockAcquire(tde_lwlock_enc_keys(), LW_EXCLUSIVE);
save_principal_key_info(mkey);
LWLockRelease(tde_lwlock_enc_keys());
}
else if (info == XLOG_TDE_EXTENSION_INSTALL_KEY)
{
Expand All @@ -64,7 +68,9 @@ tdeheap_rmgr_redo(XLogReaderState *record)
{
XLogPrincipalKeyRotate *xlrec = (XLogPrincipalKeyRotate *) XLogRecGetData(record);

LWLockAcquire(tde_lwlock_enc_keys(), LW_EXCLUSIVE);
xl_tde_perform_rotate_key(xlrec);
LWLockRelease(tde_lwlock_enc_keys());
}
else
{
Expand Down
Loading

0 comments on commit 5a38a6c

Please sign in to comment.