Skip to content

Commit

Permalink
Robj/memtable race fix (#574)
Browse files Browse the repository at this point in the history
* Memtable Generation Bugfix

Fixes a bug where memtable_maybe_rotate_and_get_insert_lock would
speculatively increment the memtable generation even when the next
memtable was not yet ready. This would cause concurrent lookup threads
to attempt to access that memtable, resulting in errors.

This fix requires the insert threads to wait until the next memtable is
ready before finalizing the current one.

* abstract memtable and trunk root-addr locking apis

---------

Co-authored-by: Alex Conway <[email protected]>
  • Loading branch information
rtjohnso and ajhconway authored Apr 30, 2023
1 parent 8c639a0 commit 6a2348c
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 82 deletions.
115 changes: 75 additions & 40 deletions src/memtable.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,20 @@ memtable_process(memtable_context *ctxt, uint64 generation)
ctxt->process(ctxt->process_ctxt, generation);
}

void
memtable_get_insert_lock(memtable_context *ctxt)
static inline void
memtable_begin_insert(memtable_context *ctxt)
{
platform_batch_rwlock_get(ctxt->rwlock, MEMTABLE_INSERT_LOCK_IDX);
}

void
memtable_unget_insert_lock(memtable_context *ctxt)
memtable_end_insert(memtable_context *ctxt)
{
platform_batch_rwlock_unget(ctxt->rwlock, MEMTABLE_INSERT_LOCK_IDX);
}

bool
memtable_try_lock_insert_lock(memtable_context *ctxt)
static inline bool
memtable_try_begin_insert_rotation(memtable_context *ctxt)
{
if (!platform_batch_rwlock_try_claim(ctxt->rwlock, MEMTABLE_INSERT_LOCK_IDX))
{
Expand All @@ -66,87 +66,120 @@ memtable_try_lock_insert_lock(memtable_context *ctxt)
return TRUE;
}

void
memtable_lock_insert_lock(memtable_context *ctxt)
static inline void
memtable_end_insert_rotation(memtable_context *ctxt)
{
platform_batch_rwlock_unlock(ctxt->rwlock, MEMTABLE_INSERT_LOCK_IDX);
platform_batch_rwlock_unclaim(ctxt->rwlock, MEMTABLE_INSERT_LOCK_IDX);
}

static inline void
memtable_begin_raw_rotation(memtable_context *ctxt)
{
platform_batch_rwlock_claim(ctxt->rwlock, MEMTABLE_INSERT_LOCK_IDX);
platform_batch_rwlock_get(ctxt->rwlock, MEMTABLE_INSERT_LOCK_IDX);
platform_batch_rwlock_claim_loop(ctxt->rwlock, MEMTABLE_INSERT_LOCK_IDX);
platform_batch_rwlock_lock(ctxt->rwlock, MEMTABLE_INSERT_LOCK_IDX);
}

void
memtable_unlock_insert_lock(memtable_context *ctxt)
static inline void
memtable_end_raw_rotation(memtable_context *ctxt)
{
platform_batch_rwlock_unclaim(ctxt->rwlock, MEMTABLE_INSERT_LOCK_IDX);
platform_batch_rwlock_unlock(ctxt->rwlock, MEMTABLE_INSERT_LOCK_IDX);
platform_batch_rwlock_full_unlock(ctxt->rwlock, MEMTABLE_INSERT_LOCK_IDX);
}

void
memtable_get_lookup_lock(memtable_context *ctxt)
memtable_begin_lookup(memtable_context *ctxt)
{
platform_batch_rwlock_get(ctxt->rwlock, MEMTABLE_LOOKUP_LOCK_IDX);
}

void
memtable_unget_lookup_lock(memtable_context *ctxt)
memtable_end_lookup(memtable_context *ctxt)
{
platform_batch_rwlock_unget(ctxt->rwlock, MEMTABLE_LOOKUP_LOCK_IDX);
}

void
memtable_lock_lookup_lock(memtable_context *ctxt)
memtable_block_lookups(memtable_context *ctxt)
{
platform_batch_rwlock_claim(ctxt->rwlock, MEMTABLE_LOOKUP_LOCK_IDX);
platform_batch_rwlock_get(ctxt->rwlock, MEMTABLE_LOOKUP_LOCK_IDX);
platform_batch_rwlock_claim_loop(ctxt->rwlock, MEMTABLE_LOOKUP_LOCK_IDX);
platform_batch_rwlock_lock(ctxt->rwlock, MEMTABLE_LOOKUP_LOCK_IDX);
}

void
memtable_unlock_lookup_lock(memtable_context *ctxt)
memtable_unblock_lookups(memtable_context *ctxt)
{
platform_batch_rwlock_unclaim(ctxt->rwlock, MEMTABLE_LOOKUP_LOCK_IDX);
platform_batch_rwlock_unlock(ctxt->rwlock, MEMTABLE_LOOKUP_LOCK_IDX);
platform_batch_rwlock_full_unlock(ctxt->rwlock, MEMTABLE_LOOKUP_LOCK_IDX);
}


platform_status
memtable_maybe_rotate_and_get_insert_lock(memtable_context *ctxt,
uint64 *generation)
memtable_maybe_rotate_and_begin_insert(memtable_context *ctxt,
uint64 *generation)
{
uint64 wait = 100;
while (TRUE) {
memtable_get_insert_lock(ctxt);
*generation = ctxt->generation;
uint64 mt_no = *generation % ctxt->cfg.max_memtables;
memtable *mt = &ctxt->mt[mt_no];
if (mt->state != MEMTABLE_STATE_READY) {
memtable_begin_insert(ctxt);
uint64 current_generation = ctxt->generation;
uint64 current_mt_no = current_generation % ctxt->cfg.max_memtables;
memtable *current_mt = &ctxt->mt[current_mt_no];
if (current_mt->state != MEMTABLE_STATE_READY) {
// The next memtable is not ready yet, back off and wait.
memtable_unget_insert_lock(ctxt);
memtable_end_insert(ctxt);
platform_sleep_ns(wait);
wait = wait > 2048 ? wait : 2 * wait;
continue;
}
wait = 100;

if (memtable_is_full(&ctxt->cfg, &ctxt->mt[mt_no])) {
// If the current memtable is full, try to retire it.
memtable_unget_insert_lock(ctxt);
if (memtable_try_lock_insert_lock(ctxt)) {
if (memtable_is_full(&ctxt->cfg, current_mt)) {
// If the current memtable is full, try to retire it

uint64 next_generation = current_generation + 1;
uint64 next_mt_no = next_generation % ctxt->cfg.max_memtables;
memtable *next_mt = &ctxt->mt[next_mt_no];
if (next_mt->state != MEMTABLE_STATE_READY) {
memtable_end_insert(ctxt);
return STATUS_BUSY;
}

if (memtable_try_begin_insert_rotation(ctxt)) {
// We successfully got the lock, so we do the finalization
memtable_transition(
mt, MEMTABLE_STATE_READY, MEMTABLE_STATE_FINALIZED);
current_mt, MEMTABLE_STATE_READY, MEMTABLE_STATE_FINALIZED);

// Safe to increment non-atomically because we have a lock on
// the insert lock
uint64 process_generation = ctxt->generation++;
ctxt->generation++;
platform_assert(ctxt->generation - ctxt->generation_retired
<= ctxt->cfg.max_memtables,
"ctxt->generation: %lu, "
"ctxt->generation_retired: %lu, "
"current_generation: %lu\n",
ctxt->generation,
ctxt->generation_retired,
current_generation);
platform_assert(current_generation + 1 == ctxt->generation,
"ctxt->generation: %lu, "
"ctxt->generation_retired: %lu, "
"current_generation: %lu\n",
ctxt->generation,
ctxt->generation_retired,
current_generation);

memtable_mark_empty(ctxt);
memtable_unlock_insert_lock(ctxt);
memtable_process(ctxt, process_generation);
memtable_end_insert_rotation(ctxt);
memtable_end_insert(ctxt);
memtable_process(ctxt, current_generation);
} else {
memtable_end_insert(ctxt);
platform_sleep_ns(wait);
wait = wait > 2048 ? wait : 2 * wait;
}
continue;
}
*generation = ctxt->generation;
*generation = current_generation;
return STATUS_OK;
}
}
Expand Down Expand Up @@ -232,17 +265,19 @@ memtable_dec_ref_maybe_recycle(memtable_context *ctxt, memtable *mt)
uint64
memtable_force_finalize(memtable_context *ctxt)
{
memtable_lock_insert_lock(ctxt);
memtable_begin_raw_rotation(ctxt);

uint64 generation = ctxt->generation;
uint64 mt_no = generation % ctxt->cfg.max_memtables;
memtable *mt = &ctxt->mt[mt_no];
memtable_transition(mt, MEMTABLE_STATE_READY, MEMTABLE_STATE_FINALIZED);
uint64 process_generation = ctxt->generation++;
uint64 current_generation = ctxt->generation++;
platform_assert(ctxt->generation - ctxt->generation_retired
<= ctxt->cfg.max_memtables);
memtable_mark_empty(ctxt);

memtable_unlock_insert_lock(ctxt);
return process_generation;
memtable_end_raw_rotation(ctxt);
return current_generation;
}

void
Expand Down
16 changes: 8 additions & 8 deletions src/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ typedef struct memtable_context {
platform_mutex incorporation_mutex;
volatile uint64 generation_to_incorporate;

// Protected by the MEMTABLE_INSERT_LOCK_IDX'th lock of rwlock. Must hold
// Protected by the MEMTABLE_LOOKUP_LOCK_IDX'th lock of rwlock. Must hold
// read lock to read and write lock to modify.
volatile uint64 generation_retired;

Expand All @@ -140,23 +140,23 @@ typedef struct memtable_context {
} memtable_context;

platform_status
memtable_maybe_rotate_and_get_insert_lock(memtable_context *ctxt,
uint64 *generation);
memtable_maybe_rotate_and_begin_insert(memtable_context *ctxt,
uint64 *generation);

void
memtable_unget_insert_lock(memtable_context *ctxt);
memtable_end_insert(memtable_context *ctxt);

void
memtable_get_lookup_lock(memtable_context *ctxt);
memtable_begin_lookup(memtable_context *ctxt);

void
memtable_unget_lookup_lock(memtable_context *ctxt);
memtable_end_lookup(memtable_context *ctxt);

void
memtable_lock_lookup_lock(memtable_context *ctxt);
memtable_block_lookups(memtable_context *ctxt);

void
memtable_unlock_lookup_lock(memtable_context *ctxt);
memtable_unblock_lookups(memtable_context *ctxt);

platform_status
memtable_insert(memtable_context *ctxt,
Expand Down
22 changes: 19 additions & 3 deletions src/platform_linux/platform.c
Original file line number Diff line number Diff line change
Expand Up @@ -275,41 +275,57 @@ platform_batch_rwlock_unlock(platform_batch_rwlock *lock, uint64 lock_idx)
__sync_lock_release(&lock->write_lock[lock_idx].lock);
}

void
platform_batch_rwlock_full_unlock(platform_batch_rwlock *lock, uint64 lock_idx)
{
platform_batch_rwlock_unlock(lock, lock_idx);
platform_batch_rwlock_unclaim(lock, lock_idx);
platform_batch_rwlock_unget(lock, lock_idx);
}

/*
*-----------------------------------------------------------------------------
* try_claim/claim/unlock
*
* A claim blocks all other claimants (and therefore all other writelocks,
* because writelocks are required to hold a claim during the writelock).
*
* Cannot hold a get (read lock)
*
* Must hold a get (read lock)
* try_claim returns whether the claim succeeded
*-----------------------------------------------------------------------------
*/

bool
platform_batch_rwlock_try_claim(platform_batch_rwlock *lock, uint64 lock_idx)
{
threadid tid = platform_get_tid();
debug_assert(lock->read_counter[tid][lock_idx]);
if (__sync_lock_test_and_set(&lock->write_lock[lock_idx].claim, 1)) {
return FALSE;
}
debug_only uint8 old_counter =
__sync_fetch_and_sub(&lock->read_counter[tid][lock_idx], 1);
debug_assert(0 < old_counter);
return TRUE;
}

void
platform_batch_rwlock_claim(platform_batch_rwlock *lock, uint64 lock_idx)
platform_batch_rwlock_claim_loop(platform_batch_rwlock *lock, uint64 lock_idx)
{
uint64 wait = 1;
while (!platform_batch_rwlock_try_claim(lock, lock_idx)) {
platform_batch_rwlock_unget(lock, lock_idx);
platform_sleep_ns(wait);
wait = wait > 2048 ? wait : 2 * wait;
platform_batch_rwlock_get(lock, lock_idx);
}
}

void
platform_batch_rwlock_unclaim(platform_batch_rwlock *lock, uint64 lock_idx)
{
threadid tid = platform_get_tid();
__sync_fetch_and_add(&lock->read_counter[tid][lock_idx], 1);
__sync_lock_release(&lock->write_lock[lock_idx].claim);
}

Expand Down
41 changes: 36 additions & 5 deletions src/platform_linux/platform_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,29 +96,60 @@ _Static_assert(sizeof(platform_batch_rwlock)
== PLATFORM_CACHELINE_SIZE * (MAX_THREADS / 2 + 1),
"Missized platform_batch_rwlock\n");


/*
* The state machine for a thread interacting with a batch_rwlock is:
*
* get claim lock
* unlocked <-------> read-locked <----------> claimed <--------> write-locked
* unget unclaim unlock
*
* Note that try_claim() may fail, in which case the state of the lock
* is unchanged, i.e. the caller still holds a read lock.
*/


void
platform_batch_rwlock_init(platform_batch_rwlock *lock);

/* no lock -> shared lock */
void
platform_batch_rwlock_lock(platform_batch_rwlock *lock, uint64 lock_idx);
platform_batch_rwlock_get(platform_batch_rwlock *lock, uint64 lock_idx);

/* shared lock -> no lock */
void
platform_batch_rwlock_unlock(platform_batch_rwlock *lock, uint64 lock_idx);
platform_batch_rwlock_unget(platform_batch_rwlock *lock, uint64 lock_idx);

/*
* shared-lock -> claim (may fail)
*
* Callers still hold a shared lock after a failed claim attempt.
* Callers _must_ release their shared lock after a failed claim attempt.
*/
bool
platform_batch_rwlock_try_claim(platform_batch_rwlock *lock, uint64 lock_idx);

/* shared-lock -> claim, BUT(!) may temporarily release the shared-lock in the
* process. */
void
platform_batch_rwlock_claim(platform_batch_rwlock *lock, uint64 lock_idx);
platform_batch_rwlock_claim_loop(platform_batch_rwlock *lock, uint64 lock_idx);

/* claim -> shared lock */
void
platform_batch_rwlock_unclaim(platform_batch_rwlock *lock, uint64 lock_idx);

/* claim -> exclusive lock */
void
platform_batch_rwlock_get(platform_batch_rwlock *lock, uint64 lock_idx);
platform_batch_rwlock_lock(platform_batch_rwlock *lock, uint64 lock_idx);

/* exclusive lock -> claim */
void
platform_batch_rwlock_unget(platform_batch_rwlock *lock, uint64 lock_idx);
platform_batch_rwlock_unlock(platform_batch_rwlock *lock, uint64 lock_idx);

/* exclusive-lock -> unlocked */
void
platform_batch_rwlock_full_unlock(platform_batch_rwlock *lock, uint64 lock_idx);


// Buffer handle
typedef struct {
Expand Down
Loading

0 comments on commit 6a2348c

Please sign in to comment.