diff --git a/src/paxos.c b/src/paxos.c index 1b1b5414..e9d8f612 100644 --- a/src/paxos.c +++ b/src/paxos.c @@ -41,7 +41,10 @@ struct proposal { struct learned { int ballot; + int instance_number; int number; + paxos_state_t state; + char *value; }; struct paxos_msghdr { @@ -51,6 +54,7 @@ struct paxos_msghdr { char piname[PAXOS_NAME_LEN+1]; int ballot_number; int proposer_id; + int instance_number; unsigned int extralen; unsigned int valuelen; }; @@ -60,6 +64,7 @@ struct proposer { int ballot; int open_number; int accepted_number; + int instance_number; int proposed; struct proposal *proposal; }; @@ -74,6 +79,7 @@ struct learner { int state; int learned_max; int learned_ballot; + struct learned latest_learned; struct learned learned[0]; }; @@ -105,7 +111,6 @@ struct learner_operations { struct paxos_instance *, void *, int); }; - struct paxos_space { char name[PAXOS_NAME_LEN+1]; @@ -143,7 +148,7 @@ static int have_quorum(struct paxos_space *ps, int member) if (ps->role[i] & ACCEPTOR) sum++; } - + log_debug("total acceptor %d, current %d", sum, member); if (member * 2 > sum) return 1; else @@ -192,6 +197,7 @@ static void proposer_prepare(struct paxos_instance *pi, int *round) hdr->state = htonl(PREPARING); hdr->from = htonl(pi->ps->p_op->get_myid()); hdr->proposer_id = hdr->from; + hdr->instance_number = htonl(pi->proposer->instance_number); strcpy(hdr->psname, pi->ps->name); strcpy(hdr->piname, pi->name); hdr->ballot_number = htonl(ballot); @@ -294,25 +300,35 @@ static void proposer_commit(struct paxos_space *ps, struct paxos_msghdr *hdr; pi_handle_t pih = (pi_handle_t)pi; void *extra; - int ballot; + int ballot, instance_number; log_debug("proposer commit ..."); - if (msglen != sizeof(struct paxos_msghdr) + ps->extralen) { + if (msglen != sizeof(struct paxos_msghdr) + ps->extralen + + ps->valuelen) { log_error("message length incorrect, " - "msglen: %d, msghdr len: %lu, extralen: %u", + "msglen: %d, msghdr len: %lu, extralen: %u, valuelen: %u", msglen, (long)sizeof(struct paxos_msghdr), - ps->extralen); + ps->extralen, ps->valuelen); return; } extra = (char *)msg + sizeof(struct paxos_msghdr); hdr = msg; - ballot = ntohl(hdr->ballot_number); - if (pi->proposer->ballot != ballot) { + ballot = ntohl(hdr->ballot_number); + instance_number = ntohl(hdr->instance_number); + if (pi->proposer->ballot != ballot) { log_debug("not the same ballot, proposer ballot: %d, " - "received ballot: %d", pi->proposer->ballot, ballot); - return; + "received ballot: %d", pi->proposer->ballot, + ballot); + return; + } + if (pi->proposer->instance_number != instance_number) { + log_debug("not the same instance number, " + "proposer instance number: %d, " + "received instance number: %d", + pi->proposer->instance_number, instance_number); + return; } pi->proposer->accepted_number++; @@ -341,6 +357,8 @@ static void acceptor_promise(struct paxos_space *ps, unsigned long to; pi_handle_t pih = (pi_handle_t)pi; void *extra; + int proposer_id = -1; + int ballot = -1; log_debug("acceptor promise ..."); if (pi->acceptor->state == RECOVERY) { @@ -358,18 +376,20 @@ static void acceptor_promise(struct paxos_space *ps, hdr = msg; extra = (char *)msg + sizeof(struct paxos_msghdr); - if (ntohl(hdr->ballot_number) < pi->acceptor->highest_promised) { + ballot = ntohl(hdr->ballot_number); + if (ballot < pi->acceptor->highest_promised) { log_debug("ballot number: %d, highest promised: %d", ntohl(hdr->ballot_number), pi->acceptor->highest_promised); return; } + proposer_id = ntohl(hdr->proposer_id); if (ps->p_op->promise - && ps->p_op->promise(pih, extra) < 0) + && ps->p_op->promise(pih, extra, proposer_id, ballot) < 0) return; - pi->acceptor->highest_promised = ntohl(hdr->ballot_number); + pi->acceptor->highest_promised = ballot; pi->acceptor->state = PROMISING; to = ntohl(hdr->from); hdr->from = htonl(ps->p_op->get_myid()); @@ -419,6 +439,8 @@ static void acceptor_accepted(struct paxos_space *ps, && ps->p_op->accepted(pih, extra, ballot, value) < 0) return; + if (ballot > pi->acceptor->highest_promised) + pi->acceptor->highest_promised = ballot; pi->acceptor->state = ACCEPTING; to = ntohl(hdr->from); hdr->from = htonl(myid); @@ -426,68 +448,152 @@ static void acceptor_accepted(struct paxos_space *ps, if (ps->p_op->broadcast) ps->p_op->broadcast(msg, sizeof(struct paxos_msghdr) - + ps->extralen); + + ps->extralen + + ps->valuelen); else { int i; for (i = 0; i < ps->number; i++) { if (ps->role[i] & LEARNER) ps->p_op->send(i, msg, sizeof(struct paxos_msghdr) - + ps->extralen); + + ps->extralen + + ps->valuelen); } if (!(ps->role[to] & LEARNER)) ps->p_op->send(to, msg, sizeof(struct paxos_msghdr) - + ps->extralen); + + ps->extralen + + ps->valuelen); } } +static void debug_learned(const struct learned *learned, + int i, + struct paxos_space *ps) +{ + log_debug("learned[%d](ballot=%d, " + "instance_number=%d, " + "number=%d, " + "state=%d)", + i, learned[i].ballot, + learned[i].instance_number, + learned[i].number, + learned[i].state); + if (ps != NULL) + ps->p_op->debug_value(learned[i].value); +} + +static void cp_learned(struct learned *to, struct learned *from, + unsigned int valuelen) +{ + to->ballot = from->ballot; + to->instance_number = from->instance_number; + to->number = from->number; + to->state = from->state; + memcpy(to->value, from->value, valuelen); +} +static void push_learned(struct paxos_instance *pi, size_t size) +{ + int i; + log_debug("start push_learned"); + for (i = size - 1; i > 0; i--) + cp_learned(&pi->learner->learned[i], + &pi->learner->learned[i - 1], + pi->ps->valuelen); + + pi->learner->learned[0].ballot = 0; + pi->learner->learned[0].instance_number = 0; + pi->learner->learned[0].number = 0; + pi->learner->learned[0].state = INIT; + + log_debug("exit push_learned"); +} + static void learner_response(struct paxos_space *ps, struct paxos_instance *pi, void *msg, int msglen) { struct paxos_msghdr *hdr; pi_handle_t pih = (pi_handle_t)pi; - void *extra; - int i, unused = 0, found = 0; - int ballot; + void *extra, *value; + int i, unused = 0, found = 0, current = 0; + int ballot, instance_number; log_debug("learner response ..."); - if (msglen != sizeof(struct paxos_msghdr) + ps->extralen) { + if (msglen != sizeof(struct paxos_msghdr) + ps->extralen + + ps->valuelen) { log_error("message length incorrect, " - "msglen: %d, msghdr len: %lu, extralen: %u", + "msglen: %d, msghdr len: %lu, extralen: %u, valuelen: %u", msglen, (long)sizeof(struct paxos_msghdr), - ps->extralen); + ps->extralen, ps->valuelen); return; } hdr = msg; extra = (char *)msg + sizeof(struct paxos_msghdr); ballot = ntohl(hdr->ballot_number); + instance_number = ntohl(hdr->instance_number); + + value = pi->acceptor->accepted_proposal->value; + memcpy(value, (char *)msg + sizeof(struct paxos_msghdr) + ps->extralen, + ps->valuelen); for (i = 0; i < ps->number; i++) { - if (!pi->learner->learned[i].ballot) { - unused = i; - break; - } - if (pi->learner->learned[i].ballot == ballot) { + if (pi->learner->learned[i].ballot == ballot + && pi->learner->learned[i].instance_number + == instance_number + && ps->p_op->equal_value( + pi->learner->learned[i].value, value) == 0) { + current = i; pi->learner->learned[i].number++; - if (pi->learner->learned[i].number - > pi->learner->learned_max) - pi->learner->learned_max - = pi->learner->learned[i].number; found = 1; + debug_learned(pi->learner->learned, i, ps); break; } } if (!found) { - pi->learner->learned[unused].ballot = ntohl(hdr->ballot_number); + log_debug("found a new accept message"); + current = unused; + push_learned(pi, ps->number); + pi->learner->learned[unused].ballot = + ntohl(hdr->ballot_number); + pi->learner->learned[unused].instance_number = + ntohl(hdr->instance_number); pi->learner->learned[unused].number = 1; + pi->learner->learned[unused].state = INIT; + memcpy(pi->learner->learned[unused].value, + value, ps->valuelen); + debug_learned(pi->learner->learned, unused, ps); } - if (!have_quorum(ps, pi->learner->learned_max)) + if (!have_quorum(ps, pi->learner->learned[current].number)) + return; + + if (pi->learner->learned[current].state == COMMITTED) return; + + if (pi->learner->latest_learned.ballot >= 0) { + if (pi->learner->latest_learned.ballot > + pi->learner->learned[current].ballot) { + log_debug("this commit is delayed than the latest commit."); + pi->learner->learned[current].state = COMMITTED; + return; + } else if (pi->learner->latest_learned.ballot == + pi->learner->learned[current].ballot && + pi->learner->latest_learned.instance_number > + pi->learner->learned[current].instance_number) { + log_debug("this commit is delayed than the latest commit."); + pi->learner->learned[current].state = COMMITTED; + return; + } + } + if (ps->p_op->learned) - ps->p_op->learned(pih, extra, ballot); + ps->p_op->learned(pih, extra, ballot, value); + + cp_learned(&pi->learner->latest_learned, + &pi->learner->learned[current], ps->valuelen); + + pi->learner->learned[current].state = COMMITTED; } const struct proposer_operations generic_proposer_operations = { @@ -609,6 +715,7 @@ pi_handle_t paxos_instance_init(ps_handle_t handle, const void *name, int *prio) } memset(proposer->proposal, 0, sizeof(struct proposal) + valuelen); + proposer->instance_number = INSTANCE_NUMBER_EMPTY; pi->proposer = proposer; } @@ -640,6 +747,7 @@ pi_handle_t paxos_instance_init(ps_handle_t handle, const void *name, int *prio) } if (ps->role[myid] & LEARNER) { + int i; learner = malloc(sizeof(struct learner) + ps->number * sizeof(struct learned)); if (!learner) { @@ -651,6 +759,25 @@ pi_handle_t paxos_instance_init(ps_handle_t handle, const void *name, int *prio) sizeof(struct learner) + ps->number * sizeof(struct learned)); learner->state = INIT; + for (i = 0; i < ps->number; i++) { + learner->learned[i].value = (char *)malloc(sizeof(char) + * valuelen); + if (learner->learned[i].value == NULL) { + log_error("no mem for learner"); + rv = -ENOMEM; + goto out_accepted_proposal; + } + memset(learner->learned[i].value, 0, + sizeof(char) * valuelen); + } + learner->latest_learned.ballot = -1; + learner->latest_learned.value = (char *)malloc(sizeof(char) + * valuelen); + if (learner->latest_learned.value == NULL) { + log_error("no mem for learner"); + rv = -ENOMEM; + goto out_accepted_proposal; + } pi->learner = learner; } @@ -700,6 +827,7 @@ int paxos_round_request(pi_handle_t handle, pi->proposer->open_number = 0; pi->proposer->accepted_number = 0; pi->proposer->proposed = 0; + pi->proposer->instance_number = 0; memcpy(pi->proposer->proposal->value, value, pi->ps->valuelen); pi->end = end_request; @@ -743,6 +871,7 @@ int paxos_propose(pi_handle_t handle, void *value, int round) struct paxos_instance *pi = (struct paxos_instance *)handle; struct paxos_msghdr *hdr; void *extra, *msg; + int len = sizeof(struct paxos_msghdr) + pi->ps->extralen + pi->ps->valuelen; @@ -763,12 +892,14 @@ int paxos_propose(pi_handle_t handle, void *value, int round) strcpy(pi->proposer->proposal->value, value); pi->proposer->accepted_number = 0; pi->round = round; + pi->proposer->instance_number++; memset(msg, 0, len); hdr = msg; hdr->state = htonl(PROPOSING); hdr->from = htonl(pi->ps->p_op->get_myid()); hdr->proposer_id = hdr->from; + hdr->instance_number = htonl(pi->proposer->instance_number); strcpy(hdr->psname, pi->ps->name); strcpy(hdr->piname, pi->name); hdr->ballot_number = htonl(pi->round); @@ -794,6 +925,24 @@ int paxos_propose(pi_handle_t handle, void *value, int round) return 0; } +int paxos_boost_round(pi_handle_t handle, + void *value, + int round, + void (*end_request) (pi_handle_t handle, + int round, + int result)) +{ + struct paxos_instance *pi = (struct paxos_instance *)handle; + + if (pi->proposer->instance_number == INSTANCE_NUMBER_EMPTY) { + log_debug("instance number is empty. run prepare."); + return paxos_round_request(handle, value, &round, + end_request); + } + + return -1; +} + int paxos_catchup(pi_handle_t handle) { struct paxos_instance *pi = (struct paxos_instance *)handle; diff --git a/src/paxos.h b/src/paxos.h index cff8c667..ec1f7398 100644 --- a/src/paxos.h +++ b/src/paxos.h @@ -25,23 +25,30 @@ #define ACCEPTOR 0x2 #define LEARNER 0x1 +#define INSTANCE_NUMBER_EMPTY -1 + typedef long ps_handle_t; typedef long pi_handle_t; + struct paxos_operations { int (*get_myid) (void); int (*send) (unsigned long id, void *value, int len); int (*broadcast) (void *value, int len); int (*catchup) (pi_handle_t handle); + int (*equal_value) (const void *value1, const void *value2); + void (*debug_value) (const void *value); int (*prepare) (pi_handle_t handle, void *extra); - int (*promise) (pi_handle_t handle, void *extra); + int (*promise) (pi_handle_t handle, void *extra, + int proposer_id, int round); int (*is_prepared) (pi_handle_t handle, void *extra); int (*propose) (pi_handle_t handle, void *extra, int round, void *value); int (*accepted) (pi_handle_t handle, void *extra, int round, void *value); int (*commit) (pi_handle_t handle, void *extra, int round); - int (*learned) (pi_handle_t handle, void *extra, int round); + int (*learned) (pi_handle_t handle, void *extra, + int round, void *value); }; int paxos_recvmsg(void *msg, int msglen); @@ -76,6 +83,13 @@ int paxos_catchup(pi_handle_t handle); int paxos_propose(pi_handle_t handle, void *value, int round); +int paxos_boost_round(pi_handle_t handle, + void *value, + int round, + void (*end_request) (pi_handle_t handle, + int round, + int result)); + int paxos_instance_exit(pi_handle_t handle); int paxos_space_exit(ps_handle_t handle); diff --git a/src/paxos_lease.c b/src/paxos_lease.c index b4f97477..e744f3dc 100644 --- a/src/paxos_lease.c +++ b/src/paxos_lease.c @@ -16,6 +16,7 @@ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ +#include #include #include #include @@ -38,10 +39,17 @@ #define LEASE_STARTED 0 #define LEASE_STOPPED 1 +typedef enum { + OTHER_LEASE = 0, + PROPOSER_LEASE, + NO_LEASE, + RESTRICT_LEASE +} lease_state_t; + struct paxos_lease_msghdr { int op; int clear; - int leased; + lease_state_t leased; }; struct paxos_lease_value { @@ -87,6 +95,44 @@ static struct paxos_operations *px_op = NULL; const struct paxos_lease_operations *p_l_op; ps_handle_t ps_handle = 0; +static void debug_value(const void *value) +{ + struct paxos_lease_value *plv = (struct paxos_lease_value *) value; + log_debug("plv(name=%s, owner=%d, expiry=%d)", + plv->name, + plv->owner, + plv->expiry); +} + +static int equal_value(const void *value1, const void *value2) +{ + struct paxos_lease_value *plv1 = (struct paxos_lease_value *) value1, + *plv2 = (struct paxos_lease_value *) value2; + if (plv1->expiry == plv2->expiry && + strcmp(plv1->name, plv2->name) == 0 && + plv1->owner == plv1->owner) + return 0; + return 1; +} + +static void debug_plr(const struct paxos_lease_result *plr) +{ + struct tm *expires = NULL; + struct timeval tv; + char expires_str[100] = "Fail"; + int rv; + tv.tv_sec = plr->expires; + expires = localtime(&tv.tv_sec); + rv = sprintf(expires_str, + "%04d/%02d/%02d %02d:%02d:%02d", + expires->tm_year + 1900, expires->tm_mon + 1, + expires->tm_mday, expires->tm_hour, + expires->tm_min, expires->tm_sec); + log_debug("paxos_lease_result->(name=%s, " + "owner=%d, expires=%s, ballot=%d)", + plr->name, plr->owner, expires_str, plr->ballot); +} + static int find_paxos_lease(pi_handle_t handle, struct paxos_lease **pl) { struct paxos_lease *lpl; @@ -132,6 +178,7 @@ static void renew_expires(unsigned long data) { struct paxos_lease *pl = (struct paxos_lease *)data; struct paxos_lease_value value; + int next_round; log_debug("renew expires ..."); @@ -144,6 +191,15 @@ static void renew_expires(unsigned long data) strncpy(value.name, pl->name, PAXOS_NAME_LEN + 1); value.owner = myid; value.expiry = pl->expiry; + if (!pl->end_lease) + pl->end_lease = p_l_op->end_acquire; + next_round = paxos_boost_round(pl->pih, &value, + pl->proposer.round, end_paxos_request); + if (next_round >= 0) { + if (next_round > pl->proposer.round) + pl->proposer.round = next_round; + return; + } paxos_propose(pl->pih, &value, pl->proposer.round); } @@ -347,14 +403,26 @@ static inline int start_lease_is_prepared(pi_handle_t handle __attribute__((unus void *header) { struct paxos_lease_msghdr *hdr = header; - + int leased = ntohl(hdr->leased); log_debug("enter start_lease_is_prepared"); - if (hdr->leased) { - log_debug("already leased"); + + switch (leased) { + case OTHER_LEASE: + log_debug("an other proposer already was leased"); return 0; - } else { - log_debug("not leased"); + case PROPOSER_LEASE: + log_debug("myself already was leased"); return 1; + case NO_LEASE: + log_debug("no one was leased"); + return 1; + case RESTRICT_LEASE: + log_debug("lease is restricted"); + return 0; + default: + /* this is not pass */ + log_error("this is not pass"); + return 0; } } @@ -386,7 +454,8 @@ static int lease_is_prepared(pi_handle_t handle, void *header) return ret; } -static int start_lease_promise(pi_handle_t handle, void *header) +static int start_lease_promise(pi_handle_t handle, void *header, + int proposer_id) { struct paxos_lease_msghdr *hdr = header; struct paxos_lease *pl; @@ -398,16 +467,20 @@ static int start_lease_promise(pi_handle_t handle, void *header) if (NOT_CLEAR_RELEASE == clear && LEASE_STOPPED == pl->release) { log_debug("could not be leased"); - hdr->leased = 1; + hdr->leased = htonl(RESTRICT_LEASE); } else if (-1 == pl->owner) { - log_debug("has not been leased"); - hdr->leased = 0; + log_debug("no one has been leased"); + hdr->leased = htonl(NO_LEASE); + } else if (pl->owner == proposer_id) { + log_debug("a proposer has been leased"); + hdr->leased = htonl(PROPOSER_LEASE); } else { - log_debug("has been leased"); - hdr->leased = 1; + log_debug("an other proposer has been leased"); + hdr->leased = htonl(OTHER_LEASE); } - if (hdr->leased == 1) { + if (hdr->leased == ntohl(RESTRICT_LEASE) + || hdr->leased == ntohl(OTHER_LEASE)) { log_error("the proposal collided"); return -1; } @@ -429,7 +502,8 @@ static int stop_lease_promise(pi_handle_t handle, return 0; } -static int lease_promise(pi_handle_t handle, void *header) +static int lease_promise(pi_handle_t handle, void *header, + int proposer_id, int round) { struct paxos_lease_msghdr *hdr = header; int ret = 0; @@ -437,9 +511,11 @@ static int lease_promise(pi_handle_t handle, void *header) log_debug("enter lease_promise"); assert(OP_START_LEASE == op || OP_STOP_LEASE == op); + + switch (op) { case OP_START_LEASE: - ret = start_lease_promise(handle, header); + ret = start_lease_promise(handle, header, proposer_id); break; case OP_STOP_LEASE: ret = stop_lease_promise(handle, header); @@ -546,6 +622,31 @@ static int lease_propose(pi_handle_t handle, void *extra, return ret; } +static void make_acceptor_timer(struct paxos_lease *pl) +{ + if (pl->acceptor.timer1 && pl->acceptor.timer2 != pl->acceptor.timer1) + del_timer(&pl->acceptor.timer1); + pl->acceptor.timer1 = add_timer(pl->expiry, (unsigned long)pl, + lease_expires); + pl->acceptor.expires = current_time() + pl->expiry; +} + +static int set_acceptor_plv(struct paxos_lease *pl, void *value) +{ + if (!pl->acceptor.plv) { + pl->acceptor.plv = malloc(sizeof(struct paxos_lease_value)); + if (!pl->acceptor.plv) { + log_error("could not alloc mem for acceptor plv"); + return -ENOMEM; + } + } + memcpy(pl->acceptor.plv, value, sizeof(struct paxos_lease_value)); + + make_acceptor_timer(pl); + + return 0; +} + static int start_lease_accepted(pi_handle_t handle, void *extra, int round, void *value) { @@ -556,6 +657,16 @@ static int start_lease_accepted(pi_handle_t handle, void *extra, if (!find_paxos_lease(handle, &pl)) return -1; + assert(!(round < pl->acceptor.round)); + + if (round >= pl->acceptor.round) { + log_debug("acceptor can accept the current round, " + "current round: %d, acceptor round: %d", + round, pl->acceptor.round); + if (round > pl->acceptor.round) + log_debug("a promise message may be delay or lost"); + } + pl->acceptor.round = round; if (NOT_CLEAR_RELEASE == hdr->clear && LEASE_STOPPED == pl->release) { @@ -563,21 +674,6 @@ static int start_lease_accepted(pi_handle_t handle, void *extra, return -1; } - if (!pl->acceptor.plv) { - pl->acceptor.plv = malloc(sizeof(struct paxos_lease_value)); - if (!pl->acceptor.plv) { - log_error("could not alloc mem for acceptor plv"); - return -ENOMEM; - } - } - memcpy(pl->acceptor.plv, value, sizeof(struct paxos_lease_value)); - - if (pl->acceptor.timer1 && pl->acceptor.timer2 != pl->acceptor.timer1) - del_timer(&pl->acceptor.timer1); - pl->acceptor.timer1 = add_timer(pl->expiry, (unsigned long)pl, - lease_expires); - pl->acceptor.expires = current_time() + pl->expiry; - log_debug("exit start_lease_accepted"); return 0; } @@ -592,15 +688,18 @@ static int stop_lease_accepted(pi_handle_t handle, if (!find_paxos_lease(handle, &pl)) return -1; - pl->acceptor.round = round; - if (!pl->acceptor.plv) { - pl->acceptor.plv = malloc(sizeof(struct paxos_lease_value)); - if (!pl->acceptor.plv) { - log_error("could not alloc mem for acceptor plv"); - return -ENOMEM; - } + assert(!(round < pl->acceptor.round)); + + if (round >= pl->acceptor.round) { + log_debug("acceptor can accept the current round, " + "current round: %d, acceptor round: %d", + round, pl->acceptor.round); + if (round > pl->acceptor.round) + log_debug("a promise message may be delay or lost"); } - memcpy(pl->acceptor.plv, value, sizeof(struct paxos_lease_value)); + + pl->acceptor.round = round; + log_debug("exit stop_lease_accepted"); return 0; } @@ -646,6 +745,9 @@ static int start_lease_commit(pi_handle_t handle, void *extra, int round) pl->release = LEASE_STARTED; pl->owner = pl->proposer.plv->owner; pl->expiry = pl->proposer.plv->expiry; + + make_acceptor_timer(pl); + if (pl->acceptor.timer2 != pl->acceptor.timer1) { if (pl->acceptor.timer2) del_timer(&pl->acceptor.timer2); @@ -657,7 +759,7 @@ static int start_lease_commit(pi_handle_t handle, void *extra, int round) plr.expires = current_time() + pl->proposer.plv->expiry; plr.ballot = round; p_l_op->notify((pl_handle_t)pl, &plr); - + debug_plr(&plr); log_debug("exit start_lease_commit"); return 0; } @@ -696,6 +798,7 @@ static int stop_lease_commit(pi_handle_t handle, plr.ballot = round; plr.expires = 0; p_l_op->notify((pl_handle_t)pl, &plr); + debug_plr(&plr); log_debug("exit stop_lease_commit"); return 0; } @@ -721,7 +824,8 @@ static int lease_commit(pi_handle_t handle, void *extra, int round) return ret; } -static int start_lease_learned(pi_handle_t handle, void *extra, int round) +static int start_lease_learned(pi_handle_t handle, void *extra, + int round, void *value) { struct paxos_lease *pl; struct paxos_lease_result plr; @@ -731,18 +835,20 @@ static int start_lease_learned(pi_handle_t handle, void *extra, int round) return -1; if (round != pl->acceptor.round) { - log_error("current round is not the acceptor round, " + log_debug("current round is not the acceptor round, " "current round: %d, acceptor round: %d", round, pl->acceptor.round); - return -1; + if (round > pl->acceptor.round) + log_debug("a promise or propose message may be delay or lost"); } - if (!pl->acceptor.plv) + if (set_acceptor_plv(pl, value) < 0) return -1; pl->release = LEASE_STARTED; pl->owner = pl->acceptor.plv->owner; pl->expiry = pl->acceptor.plv->expiry; + if (pl->acceptor.timer2 != pl->acceptor.timer1) { if (pl->acceptor.timer2) del_timer(&pl->acceptor.timer2); @@ -754,13 +860,14 @@ static int start_lease_learned(pi_handle_t handle, void *extra, int round) plr.expires = current_time() + pl->acceptor.plv->expiry; plr.ballot = round; p_l_op->notify((pl_handle_t)pl, &plr); + debug_plr(&plr); log_debug("exit start_lease_learned"); return 0; } static int stop_lease_learned(pi_handle_t handle, void *extra __attribute__((unused)), - int round) + int round, void *value) { struct paxos_lease *pl; struct paxos_lease_result plr; @@ -770,13 +877,14 @@ static int stop_lease_learned(pi_handle_t handle, return -1; if (round != pl->acceptor.round) { - log_error("current round is not the acceptor round, " + log_debug("current round is not the acceptor round, " "current round: %d, acceptor round: %d", round, pl->acceptor.round); - return -1; + if (round > pl->acceptor.round) + log_debug("a promise or propose message may be delay or lost"); } - if (!pl->acceptor.plv) + if (set_acceptor_plv(pl, value) < 0) return -1; if (pl->acceptor.timer2) @@ -790,11 +898,13 @@ static int stop_lease_learned(pi_handle_t handle, plr.ballot = round; plr.expires = 0; p_l_op->notify((pl_handle_t)pl, &plr); + debug_plr(&plr); log_debug("exit stop_lease_learned"); return 0; } -static int lease_learned(pi_handle_t handle, void *extra, int round) +static int lease_learned(pi_handle_t handle, void *extra, + int round, void *value) { struct paxos_lease_msghdr *hdr = extra; int ret = 0; @@ -804,10 +914,10 @@ static int lease_learned(pi_handle_t handle, void *extra, int round) assert(OP_START_LEASE == op || OP_STOP_LEASE == op); switch (op) { case OP_START_LEASE: - ret = start_lease_learned(handle, extra, round); + ret = start_lease_learned(handle, extra, round, value); break; case OP_STOP_LEASE: - ret = stop_lease_learned(handle, extra, round); + ret = stop_lease_learned(handle, extra, round, value); break; } @@ -847,6 +957,8 @@ pl_handle_t paxos_lease_init(const void *name, px_op->send = pl_op->send; px_op->broadcast = pl_op->broadcast; px_op->catchup = lease_catchup; + px_op->equal_value = equal_value; + px_op->debug_value = debug_value; px_op->prepare = lease_prepare; px_op->is_prepared = lease_is_prepared; px_op->promise = lease_promise; diff --git a/src/paxos_lease.h b/src/paxos_lease.h index e541b0e0..c3f5e9b2 100644 --- a/src/paxos_lease.h +++ b/src/paxos_lease.h @@ -24,6 +24,7 @@ #define NOT_CLEAR_RELEASE 0 #define CLEAR_RELEASE 1 + typedef long pl_handle_t; struct paxos_lease_result { @@ -40,6 +41,7 @@ struct paxos_lease_operations { int (*catchup) (const void *name, int *owner, int *ballot, unsigned long long *expires); int (*notify) (pl_handle_t handle, struct paxos_lease_result *result); + void (*end_acquire) (pl_handle_t handle, int error); }; pl_handle_t paxos_lease_init(const void *name, diff --git a/src/ticket.c b/src/ticket.c index 2558ac8c..04d9420f 100644 --- a/src/ticket.c +++ b/src/ticket.c @@ -553,6 +553,7 @@ const struct paxos_lease_operations ticket_operations = { .broadcast = ticket_broadcast, .catchup = ticket_catchup, .notify = ticket_write, + .end_acquire = end_acquire, }; int setup_ticket(void)