Skip to content

Commit

Permalink
Fixes swallowed exceptions in computation thread (AntaresSimulatorTea…
Browse files Browse the repository at this point in the history
…m#1685)

* Fixes swallowed exceptions in computation thread

Exceptions are now transferred from the computation thread
to the main one using std::future mechanisms.
Only the first encountered exception is thrown in the main
thread.

Signed-off-by: Sylvain Leclerc <[email protected]>

* - transfer exceptions occuring in zip_writer too
- some cleanup
- an integration test to check behaviour in simulation

Signed-off-by: Sylvain Leclerc <[email protected]>

* Take into account review

Signed-off-by: Sylvain Leclerc <[email protected]>

* - Separate thread pool for zip IO
- Addition of a flush method for writers, to synchronize
- Addition of tests for zip writer

Signed-off-by: Sylvain Leclerc <[email protected]>

* Some comments

Signed-off-by: Sylvain Leclerc <[email protected]>

* Throw more exceptions from zip writer

Signed-off-by: Sylvain Leclerc <[email protected]>

* Fix headers

Signed-off-by: Sylvain Leclerc <[email protected]>

* Fix missing header (windows build)

Signed-off-by: Sylvain Leclerc <[email protected]>

* Take into account review

Signed-off-by: Sylvain Leclerc <[email protected]>

* Fix windows build again

Signed-off-by: Sylvain Leclerc <[email protected]>

---------

Signed-off-by: Sylvain Leclerc <[email protected]>
  • Loading branch information
sylvlecl authored Oct 12, 2023
1 parent cbce2c4 commit e33402c
Show file tree
Hide file tree
Showing 23 changed files with 568 additions and 45 deletions.
2 changes: 1 addition & 1 deletion src/libs/antares/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ add_subdirectory(object)

add_subdirectory(array)
add_subdirectory(correlation)

add_subdirectory(concurrency)

add_subdirectory(logs)
add_subdirectory(jit)
Expand Down
9 changes: 9 additions & 0 deletions src/libs/antares/concurrency/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@


add_library(concurrency)
add_library(Antares::concurrency ALIAS concurrency)

target_sources(concurrency PRIVATE concurrency.cpp)
target_include_directories(concurrency PUBLIC include)

target_link_libraries(concurrency yuni-static-core)
83 changes: 83 additions & 0 deletions src/libs/antares/concurrency/concurrency.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
** Copyright 2007-2023 RTE
** Authors: Antares_Simulator Team
**
** This file is part of Antares_Simulator.
**
** Antares_Simulator is free software: you can redistribute it and/or modify
** it under the terms of the GNU General Public License as published by
** the Free Software Foundation, either version 3 of the License, or
** (at your option) any later version.
**
** There are special exceptions to the terms and conditions of the
** license as they are applied to this software. View the full text of
** the exceptions in file COPYING.txt in the directory of this software
** distribution
**
** Antares_Simulator is distributed in the hope that it will be useful,
** but WITHOUT ANY WARRANTY; without even the implied warranty of
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
** GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License
** along with Antares_Simulator. If not, see <http://www.gnu.org/licenses/>.
**
** SPDX-License-Identifier: licenceRef-GPL3_WITH_RTE-Exceptions
*/
#include <memory>
#include "yuni/job/job.h"
#include "antares/concurrency/concurrency.h"

namespace Antares::Concurrency
{

namespace {

/*!
* Just wraps an arbitrary task as a yuni job, and allows to retrieve the corresponding future.
*/
class PackagedJob : public Yuni::Job::IJob {
public:
PackagedJob(const Task& task) : task_(task) {}

TaskFuture getFuture() {
return task_.get_future();
}

protected:
void onExecute() override {
task_();
}

private:
std::packaged_task<void()> task_;
};

}

std::future<void> AddTask(Yuni::Job::QueueService& threadPool,
const Task& task,
Yuni::Job::Priority priority) {
auto job = std::make_unique<PackagedJob>(task);
auto future = job->getFuture();
threadPool.add(job.release(), priority);
return future;
}

void FutureSet::add(TaskFuture&& f) {
std::lock_guard lock(mutex_);
futures_.push_back(std::move(f));
}

void FutureSet::join() {
std::vector<TaskFuture> toBeJoined;
{
std::lock_guard lock(mutex_);
std::swap(futures_, toBeJoined);
}
for (auto& f: toBeJoined) {
f.get();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
** Copyright 2007-2023 RTE
** Authors: Antares_Simulator Team
**
** This file is part of Antares_Simulator.
**
** Antares_Simulator is free software: you can redistribute it and/or modify
** it under the terms of the GNU General Public License as published by
** the Free Software Foundation, either version 3 of the License, or
** (at your option) any later version.
**
** There are special exceptions to the terms and conditions of the
** license as they are applied to this software. View the full text of
** the exceptions in file COPYING.txt in the directory of this software
** distribution
**
** Antares_Simulator is distributed in the hope that it will be useful,
** but WITHOUT ANY WARRANTY; without even the implied warranty of
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
** GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License
** along with Antares_Simulator. If not, see <http://www.gnu.org/licenses/>.
**
** SPDX-License-Identifier: licenceRef-GPL3_WITH_RTE-Exceptions
*/
#ifndef ANTARES_CONCURRENCY_H
#define ANTARES_CONCURRENCY_H

#include <future>
#include "yuni/job/queue/service.h"

namespace Antares::Concurrency
{

using Task = std::function<void()>;
using TaskFuture = std::future<void>;

/*!
* \brief Queues the provided function and returns the corresponding std::future.
*
* This allows to handle exceptions occuring in the underlying task,
* as opposite to Yuni::Job::QueueService::add which swallows them.
*/
[[nodiscard]] TaskFuture AddTask(Yuni::Job::QueueService& threadPool,
const Task& task,
Yuni::Job::Priority priority = Yuni::Job::priorityDefault);

/*!
* \brief Utility class to gather futures to wait for.
*/
class FutureSet
{
public:
FutureSet() = default;
~FutureSet() = default;

FutureSet(const FutureSet&) = delete;
FutureSet& operator=(const FutureSet&) = delete;
FutureSet(FutureSet&&) = delete;
FutureSet& operator=(FutureSet&&) = delete;

/*!
* \brief Adds one future to be monitored by this set.
*
* Note: the provided future will be left in "moved from" state.
*/
void add(TaskFuture&& f);

/*!
* \brief Waits for completion of all added futures.
*
* If one of the future ends on exception, re-throws the first encountered exception.
* Note that futures cannot be added while some thread is waiting for completion.
*
* Joining also resets the list of tasks to wait for.
*/
void join();

private:
std::mutex mutex_;
std::vector<TaskFuture> futures_;
};

}


#endif //ANTARES_CONCURRENCY_H
3 changes: 2 additions & 1 deletion src/libs/antares/writer/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ target_include_directories(result_writer
target_link_libraries(result_writer
PUBLIC
Antares::benchmarking
PRIVATE
yuni-static-core
PRIVATE
MINIZIP::minizip
logs
inifile
io
Antares::concurrency
)


Expand Down
18 changes: 18 additions & 0 deletions src/libs/antares/writer/antares/writer/i_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,35 @@

#include <memory>
#include <string>
#include <stdexcept>

namespace Antares
{
namespace Solver
{

/*!
* A generic I/O exception that may be thrown by writer operations.
*/
class IOError : public std::runtime_error
{
public:
using std::runtime_error::runtime_error;
};

class IResultWriter
{
public:
using Ptr = std::shared_ptr<IResultWriter>;
virtual void addEntryFromBuffer(const std::string& entryPath, Yuni::Clob& entryContent) = 0;
virtual void addEntryFromBuffer(const std::string& entryPath, std::string& entryContent) = 0;
virtual void addEntryFromFile(const std::string& entryPath, const std::string& filePath) = 0;

/*!
* Waits for completion of every write operation previously appended.
* An IOError may be raised if any of those fails.
*/
virtual void flush() = 0;
virtual bool needsTheJobQueue() const = 0;
virtual void finalize(bool verbose) = 0;
};
Expand All @@ -24,6 +41,7 @@ class NullResultWriter: public Solver::IResultWriter {
void addEntryFromBuffer(const std::string &, Yuni::Clob &) override;
void addEntryFromBuffer(const std::string &, std::string &) override;
void addEntryFromFile(const std::string &, const std::string &) override;
void flush() override;
bool needsTheJobQueue() const override;
void finalize(bool ) override;
};
Expand Down
1 change: 1 addition & 0 deletions src/libs/antares/writer/ensure_queue_started.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ EnsureQueueStartedIfNeeded::EnsureQueueStartedIfNeeded(
qs->start();
}
}

EnsureQueueStartedIfNeeded::~EnsureQueueStartedIfNeeded()
{
if (startQueue)
Expand Down
6 changes: 6 additions & 0 deletions src/libs/antares/writer/immediate_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ void ImmediateFileResultWriter::addEntryFromFile(const std::string& entryPath,
}
}

void ImmediateFileResultWriter::flush()
{}

bool ImmediateFileResultWriter::needsTheJobQueue() const
{
return false;
Expand All @@ -128,6 +131,9 @@ void NullResultWriter::addEntryFromBuffer(const std::string&, std::string&)
{}
void NullResultWriter::addEntryFromFile(const std::string&, const std::string&)
{}
void NullResultWriter::flush()
{}

bool NullResultWriter::needsTheJobQueue() const
{
return false;
Expand Down
1 change: 1 addition & 0 deletions src/libs/antares/writer/private/immediate_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class ImmediateFileResultWriter : public IResultWriter
void addEntryFromBuffer(const std::string& entryPath, Yuni::Clob& entryContent) override;
void addEntryFromBuffer(const std::string& entryPath, std::string& entryContent) override;
void addEntryFromFile(const std::string& entryPath, const std::string& filePath) override;
void flush() override;
bool needsTheJobQueue() const override;
void finalize(bool verbose) override;

Expand Down
18 changes: 14 additions & 4 deletions src/libs/antares/writer/private/zip_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
#include <string>

#include <yuni/job/queue/service.h>
#include <yuni/job/job.h>
#include <yuni/core/string.h>

#include "antares/writer/i_writer.h"
#include <antares/benchmarking/DurationCollector.h>
#include "antares/concurrency/concurrency.h"


namespace Antares::Solver
Expand All @@ -20,15 +20,23 @@ enum class ZipState
};

class ZipWriter;

/*!
* In charge of writing one entry into the underlying zip.
* May be used as a function object.
*/
template<class ContentT>
class ZipWriteJob final : public Yuni::Job::IJob
class ZipWriteJob
{
public:
ZipWriteJob(ZipWriter& writer,
std::string entryPath,
ContentT& content,
Benchmarking::IDurationCollector& duration_collector);
virtual void onExecute() override;
void writeEntry();
void operator()() {
writeEntry();
}

private:
// Pointer to Zip handle
Expand All @@ -55,9 +63,9 @@ class ZipWriter : public IResultWriter
void addEntryFromBuffer(const std::string& entryPath, Yuni::Clob& entryContent) override;
void addEntryFromBuffer(const std::string& entryPath, std::string& entryContent) override;
void addEntryFromFile(const std::string& entryPath, const std::string& filePath) override;
void flush() override;
bool needsTheJobQueue() const override;
void finalize(bool verbose) override;

friend class ZipWriteJob<Yuni::Clob>;
friend class ZipWriteJob<std::string>;

Expand All @@ -75,6 +83,8 @@ class ZipWriter : public IResultWriter
// Benchmarking. Passed to jobs
Benchmarking::IDurationCollector& pDurationCollector;

Concurrency::FutureSet pendingTasks_;

private:
template<class ContentType>
void addEntryFromBufferHelper(const std::string& entryPath, ContentType& entryContent);
Expand Down
8 changes: 5 additions & 3 deletions src/libs/antares/writer/private/zip_writer.hxx
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "antares/concurrency/concurrency.h"
#include "ensure_queue_started.h"

namespace Antares::Solver
Expand All @@ -11,8 +12,9 @@ void ZipWriter::addEntryFromBufferHelper(const std::string& entryPath, ContentTy
return;

EnsureQueueStartedIfNeeded ensureQueue(this, pQueueService);
pQueueService->add(
new ZipWriteJob<ContentType>(*this, entryPath, entryContent, pDurationCollector),
Yuni::Job::priorityLow);
pendingTasks_.add(Concurrency::AddTask(*pQueueService,
ZipWriteJob<ContentType>(*this, entryPath, entryContent, pDurationCollector),
Yuni::Job::priorityLow));
}

} // namespace Antares::Solver
Loading

0 comments on commit e33402c

Please sign in to comment.