Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling of interruptions #503

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/test_examples.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ jobs:
[ "$file" = "MHD_modelsipynb.py" ] ||
[ "$file" = "density_grid_samplingipynb.py" ] ||
[ "$file" = "lensing_crv4ipynb.py" ] ||
[ "$file" = "interrupt_candidateVectoripynb.py" ] ||
[ "$file" = "interrupt_sourceipynb.py" ] ||
[ "$file" = "lensing_mapsv4ipynb.py" ]; then
echo "skip file $file"
else
Expand Down
1 change: 1 addition & 0 deletions doc/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Contents
pages/acceleration.rst
pages/extending_crpropa.rst
pages/example_notebooks/propagation_comparison/Propagation_Comparison_CK_BP.ipynb
pages/interrupting-simulations.rst
pages/AdditionalResources.rst


Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

22 changes: 22 additions & 0 deletions doc/pages/interrupting-simulations.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
Interrupting simulations on runtime
------------------------------------------------

CRPropa simulations can be interrupted on runtime with the `SIGTERM` or `SIGINT` signals.
If the user defines an output for the interruption (called `InterruptAction`) all candidates which are currently in the simulation will be passed to this output.
In the error stream the user will see a message denoting the number of candidates which have not been started yet.
If the simulation was run with a `candidateVector` as source, the indices of the candidates which have not been started yet will be printed or written to the file.
For a simulation with a source interface, a restart with the missing number of candidates will be sufficient to continue the simulation.

.. toctree::
:caption: Using a candidateVector as source
:maxdepth: 1

example_notebooks/interrupting_simulations/interrupt_candidateVector.ipynb

.. toctree::
:caption: Using a source interface
:maxdepth: 1

example_notebooks/interrupting_simulations/interrupt_source.ipynb


7 changes: 7 additions & 0 deletions include/crpropa/ModuleList.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "crpropa/Candidate.h"
#include "crpropa/Module.h"
#include "crpropa/Source.h"
#include "crpropa/module/Output.h"

#include <list>
#include <sstream>
Expand Down Expand Up @@ -47,9 +48,15 @@ class ModuleList: public Module {
iterator end();
const_iterator end() const;

void setInterruptAction(Output* action);
void dumpCandidate(Candidate* cand) const;

private:
module_list_t modules;
bool showProgress;
Output* interruptAction;
bool haveInterruptAction = false;
std::vector<int> notFinished; // list with not finished numbers of candidates
};

/**
Expand Down
22 changes: 18 additions & 4 deletions include/crpropa/module/Output.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,18 @@ namespace crpropa {
They can be easily customised by enabling/disabling specific columns.
*/
class Output: public Module {
protected:
double lengthScale, energyScale;
std::bitset<64> fields;

public:
struct Property
{
std::string name;
std::string comment;
Variant defaultValue;
};

protected:
double lengthScale, energyScale;
std::bitset<64> fields;

std::vector<Property> properties;

bool oneDimensional;
Expand Down Expand Up @@ -163,6 +165,18 @@ class Output: public Module {
size_t size() const;

void process(Candidate *) const;

/**
* write the indices of not started candidates into the output file.
* Used for interrupting the simulation
* @param indices list of not started indices
*/
virtual void dumpIndexList(std::vector<int> indices) {
std::cout << "indices:\t";
for (int i = 0; i < indices.size(); i++)
std::cout << indices[i] << ", ";
std::cout << "\n";
};
};

/** @}*/
Expand Down
2 changes: 2 additions & 0 deletions include/crpropa/module/TextOutput.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ class TextOutput: public Output {
*/
static void load(const std::string &filename, ParticleCollector *collector);
std::string getDescription() const;

void dumpIndexList(std::vector<int> indicies);
};
/** @}*/

Expand Down
7 changes: 5 additions & 2 deletions python/2_headers.i
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,11 @@
%feature("director") crpropa::AbstractCondition;
%include "crpropa/Module.h"

%template(OutputRefPtr) crpropa::ref_ptr<Output>;
%feature("director") crpropa::Output;
%ignore crpropa::Output::dumpIndexList(std::vector<int>);
%include "crpropa/module/Output.h"

%implicitconv crpropa::ref_ptr<crpropa::MagneticField>;
%template(MagneticFieldRefPtr) crpropa::ref_ptr<crpropa::MagneticField>;
%feature("director") crpropa::MagneticField;
Expand Down Expand Up @@ -394,8 +399,6 @@
}
}


%include "crpropa/module/Output.h"
%include "crpropa/module/DiffusionSDE.h"
%include "crpropa/module/TextOutput.h"
%include "crpropa/module/HDF5Output.h"
Expand Down
55 changes: 51 additions & 4 deletions src/ModuleList.cpp.in
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <algorithm>
#include <csignal>
#include <bits/stdc++.h>
#ifndef sighandler_t
typedef void (*sighandler_t)(int);
#endif
Expand Down Expand Up @@ -87,6 +88,10 @@ void ModuleList::run(Candidate* candidate, bool recursive, bool secondariesFirst
run(candidate->secondaries[i], recursive, secondariesFirst);
}
}

// dump candidae and secondaries if interrupted.
if (candidate->isActive() && (g_cancel_signal_flag != 0))
dumpCandidate(candidate);
}

void ModuleList::run(ref_ptr<Candidate> candidate, bool recursive, bool secondariesFirst) {
Expand Down Expand Up @@ -114,8 +119,11 @@ void ModuleList::run(const candidate_vector_t *candidates, bool recursive, bool

#pragma omp parallel for schedule(OMP_SCHEDULE)
for (size_t i = 0; i < count; i++) {
if (g_cancel_signal_flag != 0)
if (g_cancel_signal_flag != 0) {
#pragma omp critical(interrupt_write)
notFinished.push_back(i);
continue;
}

try {
run(candidates->operator[](i), recursive);
Expand All @@ -132,8 +140,18 @@ void ModuleList::run(const candidate_vector_t *candidates, bool recursive, bool
::signal(SIGINT, old_sigint_handler);
::signal(SIGTERM, old_sigterm_handler);
// Propagate signal to old handler.
if (g_cancel_signal_flag > 0)
if (g_cancel_signal_flag > 0) {
raise(g_cancel_signal_flag);
std::cerr << "############################################################################\n";
std::cerr << "# Interrupted CRPropa simulation \n";
std::cerr << "# A total of " << notFinished.size() << " candidates have not been started.\n";
std::cerr << "# the indicies of the vector haven been written to to output file. \n";
std::cerr << "############################################################################\n";

// dump list to output file
std::sort(notFinished.begin(), notFinished.end());
interruptAction->dumpIndexList(notFinished);
}
}

void ModuleList::run(SourceInterface *source, size_t count, bool recursive, bool secondariesFirst) {
Expand All @@ -156,8 +174,11 @@ void ModuleList::run(SourceInterface *source, size_t count, bool recursive, bool

#pragma omp parallel for schedule(OMP_SCHEDULE)
for (size_t i = 0; i < count; i++) {
if (g_cancel_signal_flag !=0)
if (g_cancel_signal_flag !=0) {
#pragma omp critical(interrupt_write)
notFinished.push_back(i);
continue;
}

ref_ptr<Candidate> candidate;

Expand Down Expand Up @@ -189,8 +210,13 @@ void ModuleList::run(SourceInterface *source, size_t count, bool recursive, bool
::signal(SIGINT, old_signal_handler);
::signal(SIGTERM, old_sigterm_handler);
// Propagate signal to old handler.
if (g_cancel_signal_flag > 0)
if (g_cancel_signal_flag > 0) {
raise(g_cancel_signal_flag);
std::cerr << "############################################################################\n";
std::cerr << "# Interrupted CRPropa simulation \n";
std::cerr << "# Number of not started candidates from source: " << notFinished.size() << "\n";
std::cerr << "############################################################################\n";
}
}

ModuleList::iterator ModuleList::begin() {
Expand Down Expand Up @@ -222,6 +248,27 @@ void ModuleList::showModules() const {
std::cout << getDescription();
}

void ModuleList::setInterruptAction(Output* action) {
interruptAction = action;
haveInterruptAction = true;
}

void ModuleList::dumpCandidate(Candidate *cand) const {
if (!haveInterruptAction)
return;

if (cand->isActive())
interruptAction->process(cand);
else
KISS_LOG_WARNING << "ModuleList::dumpCandidate is called with a non active candidate. This should not happen for the interrupt action. Please check candidate with serieal number "
<< cand->getSerialNumber() << std::endl;

for (int i = 0; i < cand->secondaries.size(); i++) {
if (cand->secondaries[i])
dumpCandidate(cand->secondaries[i]);
}
}

ModuleListRunner::ModuleListRunner(ModuleList *mlist) : mlist(mlist) {
}

Expand Down
14 changes: 14 additions & 0 deletions src/module/TextOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include "kiss/string.h"

#include <sstream>
#include <cstdio>
#include <stdexcept>
#include <iostream>
Expand Down Expand Up @@ -378,4 +379,17 @@ void TextOutput::gzip() {
#endif
}

void TextOutput::dumpIndexList(std::vector<int> indices) {
#pragma omp critical(FileOutput)
{
std::stringstream ss;
ss << "#" << "\t";
for (int i = 0; i < indices.size(); i++)
ss << indices[i] << "\t";

const std::string cstr = ss.str();
out-> write(cstr.c_str(), cstr.length());
}
}

} // namespace crpropa
Loading