Skip to content

Commit

Permalink
[TDB-20] With checkpoint disabled, closing FT file still rotates header.
Browse files Browse the repository at this point in the history
         [Updated the comments and formats from the last PR.]

         This is an attempt based on @BohuTANG's patch
             percona#331

         in particular, the ctest mostly comes from his with minor updates.

         Summary:
             1) The issue is real.
             2) @bohu's patch works for mediating the issue but somewhat introduces
                a ft leak that the ft with ref=0 may be left until next open/close or
                checkpoint.
             3) If ft_close should be held off upon a backup, then it simply means
                a backup should hold a ref to the ft and responsible for closing ft up
                if it is the last one to hold it, like any other possible referencer.
                including the checkpoint, the txn and ft handlers.
             4) mtr test contributed by @BohuTANG still passes. (in a different PS PR).
  • Loading branch information
Jun committed Mar 29, 2018
1 parent af8783b commit 41f5270
Show file tree
Hide file tree
Showing 12 changed files with 317 additions and 85 deletions.
5 changes: 5 additions & 0 deletions ft/cachetable/cachetable-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ struct cachefile {
void (*end_checkpoint_userdata)(CACHEFILE cf, int fd, void *userdata); // after checkpointing cachefiles call this function.
void (*note_pin_by_checkpoint)(CACHEFILE cf, void *userdata); // add a reference to the userdata to prevent it from being removed from memory
void (*note_unpin_by_checkpoint)(CACHEFILE cf, void *userdata); // add a reference to the userdata to prevent it from being removed from memory

void (*note_pin_by_backup)(CACHEFILE cf, void *userdata); // add a reference to the userdata to prevent it from being removed from memory
void (*note_unpin_by_backup)(CACHEFILE cf, void *userdata); // add a reference to the userdata to prevent it from being removed from memory
BACKGROUND_JOB_MANAGER bjm;
};

Expand Down Expand Up @@ -413,6 +416,8 @@ class checkpointer {
void add_background_job();
void remove_background_job();
void end_checkpoint(void (*testcallback_f)(void*), void* testextra);
void begin_backup();
void end_backup();
TOKULOGGER get_logger();
// used during begin_checkpoint
void increment_num_txns();
Expand Down
37 changes: 36 additions & 1 deletion ft/cachetable/cachetable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2793,6 +2793,37 @@ void toku_cachetable_end_checkpoint(CHECKPOINTER cp, TOKULOGGER UU(logger),
cp->end_checkpoint(testcallback_f, testextra);
}

// in_backup begin

struct iterate_note_pin_backup {
static int fn(const CACHEFILE &cf, uint32_t UU(idx), void **UU(extra)) {
assert(cf->note_pin_by_backup);
cf->note_pin_by_backup(cf, cf->userdata);
return 0;
}
};

struct iterate_note_unpin_backup {
static int fn(const CACHEFILE &cf, uint32_t UU(idx), void **UU(extra)) {
assert(cf->note_unpin_by_backup);
cf->note_unpin_by_backup(cf, cf->userdata);
return 0;
}
};
void toku_cachetable_begin_backup(CACHETABLE ct)
{
ct->cf_list.read_lock();
ct->cf_list.m_active_fileid.iterate<void *, iterate_note_pin_backup::fn>(nullptr);
ct->cf_list.read_unlock();
}

void toku_cachetable_end_backup(CACHETABLE ct)
{
ct->cf_list.m_active_fileid.iterate<void *, iterate_note_unpin_backup::fn>(nullptr);
}

// in_backup end

TOKULOGGER toku_cachefile_logger (CACHEFILE cf) {
return cf->cachetable->cp.get_logger();
}
Expand Down Expand Up @@ -2909,7 +2940,9 @@ toku_cachefile_set_userdata (CACHEFILE cf,
void (*begin_checkpoint_userdata)(LSN, void*),
void (*end_checkpoint_userdata)(CACHEFILE, int, void*),
void (*note_pin_by_checkpoint)(CACHEFILE, void*),
void (*note_unpin_by_checkpoint)(CACHEFILE, void*)) {
void (*note_unpin_by_checkpoint)(CACHEFILE, void*),
void (*note_pin_by_backup)(CACHEFILE, void*),
void (*note_unpin_by_backup)(CACHEFILE, void*)) {
cf->userdata = userdata;
cf->log_fassociate_during_checkpoint = log_fassociate_during_checkpoint;
cf->close_userdata = close_userdata;
Expand All @@ -2919,6 +2952,8 @@ toku_cachefile_set_userdata (CACHEFILE cf,
cf->end_checkpoint_userdata = end_checkpoint_userdata;
cf->note_pin_by_checkpoint = note_pin_by_checkpoint;
cf->note_unpin_by_checkpoint = note_unpin_by_checkpoint;
cf->note_pin_by_backup = note_pin_by_backup;
cf->note_unpin_by_backup = note_unpin_by_backup;
}

void *toku_cachefile_get_userdata(CACHEFILE cf) {
Expand Down
6 changes: 5 additions & 1 deletion ft/cachetable/cachetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ void toku_cachetable_begin_checkpoint (CHECKPOINTER cp, struct tokulogger *logge
void toku_cachetable_end_checkpoint(CHECKPOINTER cp, struct tokulogger *logger,
void (*testcallback_f)(void*), void * testextra);

void toku_cachetable_begin_backup(CACHETABLE ct);
void toku_cachetable_end_backup(CACHETABLE ct);

// Shuts down checkpoint thread
// Requires no locks be held that are taken by the checkpoint function
Expand Down Expand Up @@ -285,7 +287,9 @@ void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata,
void (*begin_checkpoint_userdata)(LSN, void*),
void (*end_checkpoint_userdata)(CACHEFILE, int, void*),
void (*note_pin_by_checkpoint)(CACHEFILE, void*),
void (*note_unpin_by_checkpoint)(CACHEFILE, void*));
void (*note_unpin_by_checkpoint)(CACHEFILE, void*),
void (*note_pin_by_backup)(CACHEFILE, void*),
void (*note_unpin_by_backup)(CACHEFILE, void*));
// Effect: Store some cachefile-specific user data. When the last reference to a cachefile is closed, we call close_userdata().
// Before starting a checkpoint, we call checkpoint_prepare_userdata().
// When the cachefile needs to be checkpointed, we call checkpoint_userdata().
Expand Down
4 changes: 3 additions & 1 deletion ft/ft-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,9 @@ struct ft {
uint32_t num_txns;
// A checkpoint is running. If true, then keep this header around for checkpoint, like a transaction
bool pinned_by_checkpoint;

// Number of backups that are using this FT. If it is nonzero, keep this header around until backup
// is completd.
uint32_t num_backups;
// is this ft a blackhole? if so, all messages are dropped.
bool blackhole;

Expand Down
84 changes: 55 additions & 29 deletions ft/ft.cc
Original file line number Diff line number Diff line change
Expand Up @@ -306,13 +306,42 @@ static void unpin_by_checkpoint_callback(FT ft, void *extra) {
}

// maps to cf->note_unpin_by_checkpoint
//Must be protected by ydb lock.
//Called by end_checkpoint, which grabs ydb lock around note_unpin
// Must be protected by ydb lock.
// Called by end_checkpoint, which grabs ydb lock around note_unpin
static void ft_note_unpin_by_checkpoint (CACHEFILE UU(cachefile), void *header_v) {
FT ft = (FT) header_v;
toku_ft_remove_reference(ft, false, ZERO_LSN, unpin_by_checkpoint_callback, NULL);
}


// maps to cf->note_pin_by_backup
// Must be protected by ydb lock.
// Is only called by backup begin, which holds it
static void ft_note_pin_by_backup (CACHEFILE UU(cachefile), void *header_v) {
// Note: open_close lock is held by checkpoint begin
FT ft = (FT) header_v;
toku_ft_grab_reflock(ft);
assert(toku_ft_needed_unlocked(ft));
ft->num_backups++;
toku_ft_release_reflock(ft);
}

// Requires: the reflock is held.
static void unpin_by_backup_callback(FT ft, void *extra) {
invariant(extra == NULL);
invariant(ft->num_backups > 0);
ft->num_backups--;
}

// maps to cf->note_unpin_by_backup
// Must be protected by ydb lock.
// Called by end_backup, which grabs ydb lock around note_unpin
static void ft_note_unpin_by_backup (CACHEFILE UU(cachefile), void *header_v) {
FT ft = (FT) header_v;
toku_ft_remove_reference(ft, false, ZERO_LSN, unpin_by_backup_callback, NULL);
}


//
// End of Functions that are callbacks to the cachefile
/////////////////////////////////////////////////////////////////////////
Expand All @@ -332,38 +361,33 @@ static void setup_initial_ft_root_node(FT ft, BLOCKNUM blocknum) {
}

static void ft_init(FT ft, FT_OPTIONS options, CACHEFILE cf) {
// fake, prevent unnecessary upgrade logic
ft->layout_version_read_from_disk = FT_LAYOUT_VERSION;
ft->checkpoint_header = NULL;
// fake, prevent unnecessary upgrade logic
ft->layout_version_read_from_disk = FT_LAYOUT_VERSION;
ft->checkpoint_header = NULL;

toku_list_init(&ft->live_ft_handles);
toku_list_init(&ft->live_ft_handles);

// intuitively, the comparator points to the FT's cmp descriptor
ft->cmp.create(options->compare_fun, &ft->cmp_descriptor, options->memcmp_magic);
ft->update_fun = options->update_fun;
// intuitively, the comparator points to the FT's cmp descriptor
ft->cmp.create(options->compare_fun, &ft->cmp_descriptor,
options->memcmp_magic);
ft->update_fun = options->update_fun;

if (ft->cf != NULL) {
assert(ft->cf == cf);
}
ft->cf = cf;
ft->in_memory_stats = ZEROSTATS;
if (ft->cf != NULL) {
assert(ft->cf == cf);
}
ft->cf = cf;
ft->in_memory_stats = ZEROSTATS;

setup_initial_ft_root_node(ft, ft->h->root_blocknum);
toku_cachefile_set_userdata(ft->cf,
ft,
ft_log_fassociate_during_checkpoint,
ft_close,
ft_free,
ft_checkpoint,
ft_begin_checkpoint,
ft_end_checkpoint,
ft_note_pin_by_checkpoint,
ft_note_unpin_by_checkpoint);
setup_initial_ft_root_node(ft, ft->h->root_blocknum);
toku_cachefile_set_userdata(
ft->cf, ft, ft_log_fassociate_during_checkpoint, ft_close, ft_free,
ft_checkpoint, ft_begin_checkpoint, ft_end_checkpoint,
ft_note_pin_by_checkpoint, ft_note_unpin_by_checkpoint,
ft_note_pin_by_backup, ft_note_unpin_by_backup);

ft->blocktable.verify_no_free_blocknums();
ft->blocktable.verify_no_free_blocknums();
}


static FT_HEADER
ft_header_create(FT_OPTIONS options, BLOCKNUM root_blocknum, TXNID root_xid_that_created)
{
Expand Down Expand Up @@ -455,7 +479,9 @@ int toku_read_ft_and_store_in_cachefile (FT_HANDLE ft_handle, CACHEFILE cf, LSN
ft_begin_checkpoint,
ft_end_checkpoint,
ft_note_pin_by_checkpoint,
ft_note_unpin_by_checkpoint);
ft_note_unpin_by_checkpoint,
ft_note_pin_by_backup,
ft_note_unpin_by_backup);
*header = ft;
return 0;
}
Expand All @@ -475,7 +501,7 @@ static int
ft_get_reference_count(FT ft) {
uint32_t pinned_by_checkpoint = ft->pinned_by_checkpoint ? 1 : 0;
int num_handles = toku_list_num_elements_est(&ft->live_ft_handles);
return pinned_by_checkpoint + ft->num_txns + num_handles;
return pinned_by_checkpoint + ft->num_txns + ft-> num_backups + num_handles;
}

// a ft is needed in memory iff its reference count is non-zero
Expand Down
20 changes: 7 additions & 13 deletions ft/tests/cachetable-pin-checkpoint.cc
Original file line number Diff line number Diff line change
Expand Up @@ -358,21 +358,15 @@ cachetable_test (void) {
toku_cachetable_create(&ct, test_limit, ZERO_LSN, nullptr);
const char *fname1 = TOKU_TEST_FILENAME;
unlink(fname1);
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR | O_CREAT,
S_IRWXU | S_IRWXG | S_IRWXO);
assert(r == 0);

toku_cachefile_set_userdata(
f1,
NULL,
&dummy_log_fassociate,
&dummy_close_usr,
&dummy_free_usr,
&dummy_chckpnt_usr,
&test_begin_checkpoint,
&dummy_end,
&dummy_note_pin,
&dummy_note_unpin
);

f1, NULL, &dummy_log_fassociate, &dummy_close_usr, &dummy_free_usr,
&dummy_chckpnt_usr, &test_begin_checkpoint, &dummy_end, &dummy_note_pin,
&dummy_note_unpin, &dummy_note_pin, &dummy_note_unpin);

toku_pthread_t time_tid;
toku_pthread_t checkpoint_tid;
toku_pthread_t move_tid[NUM_MOVER_THREADS];
Expand Down
16 changes: 6 additions & 10 deletions ft/tests/cachetable-put-checkpoint.cc
Original file line number Diff line number Diff line change
Expand Up @@ -487,20 +487,16 @@ cachetable_test (void) {
toku_cachetable_create(&ct, test_limit, ZERO_LSN, nullptr);
const char *fname1 = TOKU_TEST_FILENAME;
unlink(fname1);
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR | O_CREAT,
S_IRWXU | S_IRWXG | S_IRWXO);
assert(r == 0);

toku_cachefile_set_userdata(
f1,
NULL,
&dummy_log_fassociate,
&dummy_close_usr,
&dummy_free_usr,
f1, NULL, &dummy_log_fassociate, &dummy_close_usr, &dummy_free_usr,
&dummy_chckpnt_usr,
test_begin_checkpoint, // called in begin_checkpoint
&dummy_end,
&dummy_note_pin,
&dummy_note_unpin
);
&dummy_end, &dummy_note_pin, &dummy_note_unpin, &dummy_note_pin,
&dummy_note_unpin);

toku_pthread_t time_tid;
toku_pthread_t checkpoint_tid;
Expand Down
16 changes: 4 additions & 12 deletions ft/tests/cachetable-simple-close.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,10 @@ static void free_usr(CACHEFILE UU(cf), void* UU(p)) {
}

static void set_cf_userdata(CACHEFILE f1) {
toku_cachefile_set_userdata(
f1,
NULL,
&dummy_log_fassociate,
&close_usr,
&free_usr,
&dummy_chckpnt_usr,
&dummy_begin,
&dummy_end,
&dummy_note_pin,
&dummy_note_unpin
);
toku_cachefile_set_userdata(f1, NULL, &dummy_log_fassociate, &close_usr,
&free_usr, &dummy_chckpnt_usr, &dummy_begin,
&dummy_end, &dummy_note_pin, &dummy_note_unpin,
&dummy_note_pin, &dummy_note_unpin);
}

bool keep_me;
Expand Down
16 changes: 5 additions & 11 deletions ft/tests/cachetable-test.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,9 @@ static void dummy_note_unpin(CACHEFILE UU(cf), void* UU(p)) { }
static UU() void
create_dummy_functions(CACHEFILE cf)
{
void *ud = NULL;
toku_cachefile_set_userdata(cf,
ud,
&dummy_log_fassociate,
&dummy_close_usr,
&dummy_free_usr,
&dummy_chckpnt_usr,
&dummy_begin,
&dummy_end,
&dummy_note_pin,
&dummy_note_unpin);
void *ud = NULL;
toku_cachefile_set_userdata(cf, ud, &dummy_log_fassociate, &dummy_close_usr,
&dummy_free_usr, &dummy_chckpnt_usr, &dummy_begin,
&dummy_end, &dummy_note_pin, &dummy_note_unpin,
&dummy_note_pin, &dummy_note_unpin);
};
12 changes: 7 additions & 5 deletions ft/tests/fifo-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,17 @@ test_enqueue(int n) {
if (i == 0) {
xids = toku_xids_get_root_xids();
} else {
int r = toku_xids_create_child(toku_xids_get_root_xids(), &xids, (TXNID)i);
assert_zero(r);
int r = toku_xids_create_child(toku_xids_get_root_xids(), &xids,
(TXNID)i);
assert_zero(r);
}
MSN msn = next_dummymsn();
if (startmsn.msn == ZERO_MSN.msn)
startmsn = msn;
enum ft_msg_type type = (enum ft_msg_type) i;
startmsn = msn;
enum ft_msg_type type = (enum ft_msg_type)i;
DBT k, v;
ft_msg msg(toku_fill_dbt(&k, thekey, thekeylen), toku_fill_dbt(&v, theval, thevallen), type, msn, xids);
ft_msg msg(toku_fill_dbt(&k, thekey, thekeylen),
toku_fill_dbt(&v, theval, thevallen), type, msn, xids);
msg_buffer.enqueue(msg, true, nullptr);
toku_xids_destroy(&xids);
toku_free(thekey);
Expand Down
Loading

0 comments on commit 41f5270

Please sign in to comment.