Skip to content

Commit

Permalink
Merge pull request #216 from leobago/ReleaseCodeRefacturing
Browse files Browse the repository at this point in the history
refactoring for release
  • Loading branch information
leobago authored Sep 28, 2018
2 parents 55d464d + 9b322e0 commit 7bce275
Show file tree
Hide file tree
Showing 14 changed files with 227 additions and 203 deletions.
11 changes: 6 additions & 5 deletions src/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -995,6 +995,7 @@ int FTI_BitFlip(int datasetID)
/*-------------------------------------------------------------------------*/
int FTI_Checkpoint(int id, int level)
{

char str[FTI_BUFS]; //For console output

if (FTI_Exec.initSCES == 0) {
Expand Down Expand Up @@ -1031,7 +1032,7 @@ int FTI_Checkpoint(int id, int level)
double t0 = MPI_Wtime(); //Start time
if (FTI_Exec.wasLastOffline == 1) { // Block until previous checkpoint is done (Async. work)
int lastLevel;
MPI_Recv(&lastLevel, 1, MPI_INT, FTI_Topo.headRank, FTI_Conf.ckptTag, FTI_Exec.globalComm, MPI_STATUS_IGNORE);
MPI_Recv(&lastLevel, 1, MPI_INT, FTI_Topo.headRank, FTI_Conf.generalTag, FTI_Exec.globalComm, MPI_STATUS_IGNORE);
if (lastLevel != FTI_NSCS) { //Head sends level of checkpoint if post-processing succeed, FTI_NSCS Otherwise
FTI_Exec.lastCkptLvel = lastLevel; //Store last successful post-processing checkpoint level
sprintf(str, "LastCkptLvel received from head: %d", lastLevel);
Expand Down Expand Up @@ -1092,9 +1093,9 @@ int FTI_Checkpoint(int id, int level)
} else {
strncpy(headInfo->ckptFile, FTI_Exec.meta[0].ckptFile, FTI_BUFS);
}
MPI_Send(headInfo, 1, FTIFF_MpiTypes[FTIFF_HEAD_INFO], FTI_Topo.headRank, FTI_Conf.ckptTag, FTI_Exec.globalComm);
MPI_Send(FTI_Exec.meta[0].varID, headInfo->nbVar, MPI_INT, FTI_Topo.headRank, FTI_Conf.ckptTag, FTI_Exec.globalComm);
MPI_Send(FTI_Exec.meta[0].varSize, headInfo->nbVar, MPI_LONG, FTI_Topo.headRank, FTI_Conf.ckptTag, FTI_Exec.globalComm);
MPI_Send(headInfo, 1, FTIFF_MpiTypes[FTIFF_HEAD_INFO], FTI_Topo.headRank, FTI_Conf.generalTag, FTI_Exec.globalComm);
MPI_Send(FTI_Exec.meta[0].varID, headInfo->nbVar, MPI_INT, FTI_Topo.headRank, FTI_Conf.generalTag, FTI_Exec.globalComm);
MPI_Send(FTI_Exec.meta[0].varSize, headInfo->nbVar, MPI_LONG, FTI_Topo.headRank, FTI_Conf.generalTag, FTI_Exec.globalComm);
free(headInfo);
}

Expand Down Expand Up @@ -1375,7 +1376,7 @@ int FTI_Finalize()
// If there is remaining work to do for last checkpoint
if (FTI_Exec.wasLastOffline == 1) {
int lastLevel;
MPI_Recv(&lastLevel, 1, MPI_INT, FTI_Topo.headRank, FTI_Conf.ckptTag, FTI_Exec.globalComm, MPI_STATUS_IGNORE);
MPI_Recv(&lastLevel, 1, MPI_INT, FTI_Topo.headRank, FTI_Conf.generalTag, FTI_Exec.globalComm, MPI_STATUS_IGNORE);
if (lastLevel != FTI_NSCS) { //Head sends level of checkpoint if post-processing succeed, FTI_NSCS Otherwise
FTI_Exec.lastCkptLvel = lastLevel;
}
Expand Down
16 changes: 8 additions & 8 deletions src/checkpoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ int FTI_WriteCkpt(FTIT_configuration* FTI_Conf, FTIT_execution* FTI_Exec,
}
switch (FTI_Conf->ioMode) {
case FTI_IO_FTIFF:

res = FTI_Try(FTIFF_WriteFTIFF(FTI_Conf, FTI_Exec, FTI_Topo, FTI_Ckpt, FTI_Data), "write checkpoint using FTI-FF.");
break;
#ifdef ENABLE_HDF5 //If HDF5 is installed
Expand Down Expand Up @@ -390,17 +391,16 @@ int FTI_Listen(FTIT_configuration* FTI_Conf, FTIT_execution* FTI_Exec,

FTI_Print("Head waits for message...", FTI_DBUG);

MPI_Iprobe( MPI_ANY_SOURCE, FTI_Conf->finalTag, FTI_Exec->globalComm, &finalize_flag, &finalize_status );
if ( FTI_Conf->stagingEnabled ) {
MPI_Iprobe( MPI_ANY_SOURCE, FTI_Conf->stageTag, FTI_Exec->nodeComm, &stage_flag, &stage_status );
}
MPI_Iprobe( MPI_ANY_SOURCE, FTI_Conf->ckptTag, FTI_Exec->globalComm, &ckpt_flag, &ckpt_status );
MPI_Iprobe( MPI_ANY_SOURCE, FTI_Conf->finalTag, FTI_Exec->globalComm, &finalize_flag, &finalize_status );

if( ckpt_flag ) {

// head will process the whole checkpoint
// (treated second due to priority)
FTI_HandleCkptRequest( FTI_Conf, FTI_Exec, FTI_Topo, FTI_Ckpt );
FTI_HandleCkptRequest( FTI_Conf, FTI_Exec, FTI_Topo, FTI_Ckpt );
ckpt_flag = 0;
continue;

Expand Down Expand Up @@ -467,7 +467,7 @@ int FTI_Listen(FTIT_configuration* FTI_Conf, FTIT_execution* FTI_Exec,
/*-------------------------------------------------------------------------*/
int FTI_HandleCkptRequest(FTIT_configuration* FTI_Conf, FTIT_execution* FTI_Exec,
FTIT_topology* FTI_Topo, FTIT_checkpoint* FTI_Ckpt)
{
{
char str[FTI_BUFS]; //For console output
int flags[7]; //Increment index if get corresponding value from application process
//(index (1 - 4): checkpoint level; index 5: stops head; index 6: reject checkpoint)
Expand Down Expand Up @@ -503,15 +503,15 @@ int FTI_HandleCkptRequest(FTIT_configuration* FTI_Conf, FTIT_execution* FTI_Exec
int k;
for (i = 0; i < FTI_Topo->nbApprocs; i++) { // Iterate on the application processes in the node
k = i+1;
MPI_Recv(&(headInfo[i]), 1, FTIFF_MpiTypes[FTIFF_HEAD_INFO], FTI_Topo->body[i], FTI_Conf->ckptTag, FTI_Exec->globalComm, MPI_STATUS_IGNORE);
MPI_Recv(&(headInfo[i]), 1, FTIFF_MpiTypes[FTIFF_HEAD_INFO], FTI_Topo->body[i], FTI_Conf->generalTag, FTI_Exec->globalComm, MPI_STATUS_IGNORE);
FTI_Exec->meta[0].exists[k] = headInfo[i].exists;
FTI_Exec->meta[0].nbVar[k] = headInfo[i].nbVar;
FTI_Exec->meta[0].maxFs[k] = headInfo[i].maxFs;
FTI_Exec->meta[0].fs[k] = headInfo[i].fs;
FTI_Exec->meta[0].pfs[k] = headInfo[i].pfs;
isDcpCnt += headInfo[i].isDcp;
MPI_Recv(&(FTI_Exec->meta[0].varID[k * FTI_BUFS]), headInfo[i].nbVar, MPI_INT, FTI_Topo->body[i], FTI_Conf->ckptTag, FTI_Exec->globalComm, MPI_STATUS_IGNORE);
MPI_Recv(&(FTI_Exec->meta[0].varSize[k * FTI_BUFS]), headInfo[i].nbVar, MPI_LONG, FTI_Topo->body[i], FTI_Conf->ckptTag, FTI_Exec->globalComm, MPI_STATUS_IGNORE);
MPI_Recv(&(FTI_Exec->meta[0].varID[k * FTI_BUFS]), headInfo[i].nbVar, MPI_INT, FTI_Topo->body[i], FTI_Conf->generalTag, FTI_Exec->globalComm, MPI_STATUS_IGNORE);
MPI_Recv(&(FTI_Exec->meta[0].varSize[k * FTI_BUFS]), headInfo[i].nbVar, MPI_LONG, FTI_Topo->body[i], FTI_Conf->generalTag, FTI_Exec->globalComm, MPI_STATUS_IGNORE);
strncpy(&(FTI_Exec->meta[0].ckptFile[k * FTI_BUFS]), headInfo[i].ckptFile , FTI_BUFS);
sscanf(&(FTI_Exec->meta[0].ckptFile[k * FTI_BUFS]), "Ckpt%d", &FTI_Exec->ckptID);
}
Expand Down Expand Up @@ -551,7 +551,7 @@ int FTI_HandleCkptRequest(FTIT_configuration* FTI_Conf, FTIT_execution* FTI_Exec
res = FTI_NSCS;
}
for (i = 0; i < FTI_Topo->nbApprocs; i++) { // Send msg. to avoid checkpoint collision
MPI_Send(&res, 1, MPI_INT, FTI_Topo->body[i], FTI_Conf->ckptTag, FTI_Exec->globalComm);
MPI_Send(&res, 1, MPI_INT, FTI_Topo->body[i], FTI_Conf->generalTag, FTI_Exec->globalComm);
}
return FTI_SCES;
}
Expand Down
Loading

0 comments on commit 7bce275

Please sign in to comment.