diff --git a/src/libs/antares/CMakeLists.txt b/src/libs/antares/CMakeLists.txt index 91c83bd02f..66cf4ec145 100644 --- a/src/libs/antares/CMakeLists.txt +++ b/src/libs/antares/CMakeLists.txt @@ -9,7 +9,7 @@ add_subdirectory(object) add_subdirectory(array) add_subdirectory(correlation) - +add_subdirectory(concurrency) add_subdirectory(logs) add_subdirectory(jit) diff --git a/src/libs/antares/concurrency/CMakeLists.txt b/src/libs/antares/concurrency/CMakeLists.txt new file mode 100644 index 0000000000..ca8eb73c0e --- /dev/null +++ b/src/libs/antares/concurrency/CMakeLists.txt @@ -0,0 +1,9 @@ + + +add_library(concurrency) +add_library(Antares::concurrency ALIAS concurrency) + +target_sources(concurrency PRIVATE concurrency.cpp) +target_include_directories(concurrency PUBLIC include) + +target_link_libraries(concurrency yuni-static-core) \ No newline at end of file diff --git a/src/libs/antares/concurrency/concurrency.cpp b/src/libs/antares/concurrency/concurrency.cpp new file mode 100644 index 0000000000..51f683372a --- /dev/null +++ b/src/libs/antares/concurrency/concurrency.cpp @@ -0,0 +1,83 @@ +/* +** Copyright 2007-2023 RTE +** Authors: Antares_Simulator Team +** +** This file is part of Antares_Simulator. +** +** Antares_Simulator is free software: you can redistribute it and/or modify +** it under the terms of the GNU General Public License as published by +** the Free Software Foundation, either version 3 of the License, or +** (at your option) any later version. +** +** There are special exceptions to the terms and conditions of the +** license as they are applied to this software. View the full text of +** the exceptions in file COPYING.txt in the directory of this software +** distribution +** +** Antares_Simulator is distributed in the hope that it will be useful, +** but WITHOUT ANY WARRANTY; without even the implied warranty of +** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +** GNU General Public License for more details. +** +** You should have received a copy of the GNU General Public License +** along with Antares_Simulator. If not, see . +** +** SPDX-License-Identifier: licenceRef-GPL3_WITH_RTE-Exceptions +*/ +#include +#include "yuni/job/job.h" +#include "antares/concurrency/concurrency.h" + +namespace Antares::Concurrency +{ + +namespace { + +/*! +* Just wraps an arbitrary task as a yuni job, and allows to retrieve the corresponding future. +*/ +class PackagedJob : public Yuni::Job::IJob { +public: + PackagedJob(const Task& task) : task_(task) {} + + TaskFuture getFuture() { + return task_.get_future(); + } + +protected: + void onExecute() override { + task_(); + } + +private: + std::packaged_task task_; +}; + +} + +std::future AddTask(Yuni::Job::QueueService& threadPool, + const Task& task, + Yuni::Job::Priority priority) { + auto job = std::make_unique(task); + auto future = job->getFuture(); + threadPool.add(job.release(), priority); + return future; +} + +void FutureSet::add(TaskFuture&& f) { + std::lock_guard lock(mutex_); + futures_.push_back(std::move(f)); +} + +void FutureSet::join() { + std::vector toBeJoined; + { + std::lock_guard lock(mutex_); + std::swap(futures_, toBeJoined); + } + for (auto& f: toBeJoined) { + f.get(); + } +} + +} diff --git a/src/libs/antares/concurrency/include/antares/concurrency/concurrency.h b/src/libs/antares/concurrency/include/antares/concurrency/concurrency.h new file mode 100644 index 0000000000..8813f3eeb1 --- /dev/null +++ b/src/libs/antares/concurrency/include/antares/concurrency/concurrency.h @@ -0,0 +1,88 @@ +/* +** Copyright 2007-2023 RTE +** Authors: Antares_Simulator Team +** +** This file is part of Antares_Simulator. +** +** Antares_Simulator is free software: you can redistribute it and/or modify +** it under the terms of the GNU General Public License as published by +** the Free Software Foundation, either version 3 of the License, or +** (at your option) any later version. +** +** There are special exceptions to the terms and conditions of the +** license as they are applied to this software. View the full text of +** the exceptions in file COPYING.txt in the directory of this software +** distribution +** +** Antares_Simulator is distributed in the hope that it will be useful, +** but WITHOUT ANY WARRANTY; without even the implied warranty of +** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +** GNU General Public License for more details. +** +** You should have received a copy of the GNU General Public License +** along with Antares_Simulator. If not, see . +** +** SPDX-License-Identifier: licenceRef-GPL3_WITH_RTE-Exceptions +*/ +#ifndef ANTARES_CONCURRENCY_H +#define ANTARES_CONCURRENCY_H + +#include +#include "yuni/job/queue/service.h" + +namespace Antares::Concurrency +{ + +using Task = std::function; +using TaskFuture = std::future; + +/*! + * \brief Queues the provided function and returns the corresponding std::future. + * + * This allows to handle exceptions occuring in the underlying task, + * as opposite to Yuni::Job::QueueService::add which swallows them. + */ +[[nodiscard]] TaskFuture AddTask(Yuni::Job::QueueService& threadPool, + const Task& task, + Yuni::Job::Priority priority = Yuni::Job::priorityDefault); + +/*! + * \brief Utility class to gather futures to wait for. + */ +class FutureSet +{ +public: + FutureSet() = default; + ~FutureSet() = default; + + FutureSet(const FutureSet&) = delete; + FutureSet& operator=(const FutureSet&) = delete; + FutureSet(FutureSet&&) = delete; + FutureSet& operator=(FutureSet&&) = delete; + + /*! + * \brief Adds one future to be monitored by this set. + * + * Note: the provided future will be left in "moved from" state. + */ + void add(TaskFuture&& f); + + /*! + * \brief Waits for completion of all added futures. + * + * If one of the future ends on exception, re-throws the first encountered exception. + * Note that futures cannot be added while some thread is waiting for completion. + * + * Joining also resets the list of tasks to wait for. + */ + void join(); + +private: + std::mutex mutex_; + std::vector futures_; +}; + +} + + +#endif //ANTARES_CONCURRENCY_H diff --git a/src/libs/antares/writer/CMakeLists.txt b/src/libs/antares/writer/CMakeLists.txt index ef7129cc51..1f468f5151 100644 --- a/src/libs/antares/writer/CMakeLists.txt +++ b/src/libs/antares/writer/CMakeLists.txt @@ -32,12 +32,13 @@ target_include_directories(result_writer target_link_libraries(result_writer PUBLIC Antares::benchmarking - PRIVATE yuni-static-core + PRIVATE MINIZIP::minizip logs inifile io + Antares::concurrency ) diff --git a/src/libs/antares/writer/antares/writer/i_writer.h b/src/libs/antares/writer/antares/writer/i_writer.h index 0256ccc292..be579165e5 100644 --- a/src/libs/antares/writer/antares/writer/i_writer.h +++ b/src/libs/antares/writer/antares/writer/i_writer.h @@ -4,11 +4,22 @@ #include #include +#include namespace Antares { namespace Solver { + +/*! + * A generic I/O exception that may be thrown by writer operations. + */ +class IOError : public std::runtime_error +{ +public: + using std::runtime_error::runtime_error; +}; + class IResultWriter { public: @@ -16,6 +27,12 @@ class IResultWriter virtual void addEntryFromBuffer(const std::string& entryPath, Yuni::Clob& entryContent) = 0; virtual void addEntryFromBuffer(const std::string& entryPath, std::string& entryContent) = 0; virtual void addEntryFromFile(const std::string& entryPath, const std::string& filePath) = 0; + + /*! + * Waits for completion of every write operation previously appended. + * An IOError may be raised if any of those fails. + */ + virtual void flush() = 0; virtual bool needsTheJobQueue() const = 0; virtual void finalize(bool verbose) = 0; }; @@ -24,6 +41,7 @@ class NullResultWriter: public Solver::IResultWriter { void addEntryFromBuffer(const std::string &, Yuni::Clob &) override; void addEntryFromBuffer(const std::string &, std::string &) override; void addEntryFromFile(const std::string &, const std::string &) override; + void flush() override; bool needsTheJobQueue() const override; void finalize(bool ) override; }; diff --git a/src/libs/antares/writer/ensure_queue_started.cpp b/src/libs/antares/writer/ensure_queue_started.cpp index 18618a5f96..8c77b524e7 100644 --- a/src/libs/antares/writer/ensure_queue_started.cpp +++ b/src/libs/antares/writer/ensure_queue_started.cpp @@ -15,6 +15,7 @@ EnsureQueueStartedIfNeeded::EnsureQueueStartedIfNeeded( qs->start(); } } + EnsureQueueStartedIfNeeded::~EnsureQueueStartedIfNeeded() { if (startQueue) diff --git a/src/libs/antares/writer/immediate_file_writer.cpp b/src/libs/antares/writer/immediate_file_writer.cpp index ed0b521302..8d3ff5e016 100644 --- a/src/libs/antares/writer/immediate_file_writer.cpp +++ b/src/libs/antares/writer/immediate_file_writer.cpp @@ -112,6 +112,9 @@ void ImmediateFileResultWriter::addEntryFromFile(const std::string& entryPath, } } +void ImmediateFileResultWriter::flush() +{} + bool ImmediateFileResultWriter::needsTheJobQueue() const { return false; @@ -128,6 +131,9 @@ void NullResultWriter::addEntryFromBuffer(const std::string&, std::string&) {} void NullResultWriter::addEntryFromFile(const std::string&, const std::string&) {} +void NullResultWriter::flush() +{} + bool NullResultWriter::needsTheJobQueue() const { return false; diff --git a/src/libs/antares/writer/private/immediate_file_writer.h b/src/libs/antares/writer/private/immediate_file_writer.h index fad9422bab..91f45719e5 100644 --- a/src/libs/antares/writer/private/immediate_file_writer.h +++ b/src/libs/antares/writer/private/immediate_file_writer.h @@ -15,6 +15,7 @@ class ImmediateFileResultWriter : public IResultWriter void addEntryFromBuffer(const std::string& entryPath, Yuni::Clob& entryContent) override; void addEntryFromBuffer(const std::string& entryPath, std::string& entryContent) override; void addEntryFromFile(const std::string& entryPath, const std::string& filePath) override; + void flush() override; bool needsTheJobQueue() const override; void finalize(bool verbose) override; diff --git a/src/libs/antares/writer/private/zip_writer.h b/src/libs/antares/writer/private/zip_writer.h index 734d15eced..4b3f076742 100644 --- a/src/libs/antares/writer/private/zip_writer.h +++ b/src/libs/antares/writer/private/zip_writer.h @@ -4,11 +4,11 @@ #include #include -#include #include #include "antares/writer/i_writer.h" #include +#include "antares/concurrency/concurrency.h" namespace Antares::Solver @@ -20,15 +20,23 @@ enum class ZipState }; class ZipWriter; + +/*! + * In charge of writing one entry into the underlying zip. + * May be used as a function object. + */ template -class ZipWriteJob final : public Yuni::Job::IJob +class ZipWriteJob { public: ZipWriteJob(ZipWriter& writer, std::string entryPath, ContentT& content, Benchmarking::IDurationCollector& duration_collector); - virtual void onExecute() override; + void writeEntry(); + void operator()() { + writeEntry(); + } private: // Pointer to Zip handle @@ -55,9 +63,9 @@ class ZipWriter : public IResultWriter void addEntryFromBuffer(const std::string& entryPath, Yuni::Clob& entryContent) override; void addEntryFromBuffer(const std::string& entryPath, std::string& entryContent) override; void addEntryFromFile(const std::string& entryPath, const std::string& filePath) override; + void flush() override; bool needsTheJobQueue() const override; void finalize(bool verbose) override; - friend class ZipWriteJob; friend class ZipWriteJob; @@ -75,6 +83,8 @@ class ZipWriter : public IResultWriter // Benchmarking. Passed to jobs Benchmarking::IDurationCollector& pDurationCollector; + Concurrency::FutureSet pendingTasks_; + private: template void addEntryFromBufferHelper(const std::string& entryPath, ContentType& entryContent); diff --git a/src/libs/antares/writer/private/zip_writer.hxx b/src/libs/antares/writer/private/zip_writer.hxx index 945251457b..a933f94a5b 100644 --- a/src/libs/antares/writer/private/zip_writer.hxx +++ b/src/libs/antares/writer/private/zip_writer.hxx @@ -1,5 +1,6 @@ #pragma once +#include "antares/concurrency/concurrency.h" #include "ensure_queue_started.h" namespace Antares::Solver @@ -11,8 +12,9 @@ void ZipWriter::addEntryFromBufferHelper(const std::string& entryPath, ContentTy return; EnsureQueueStartedIfNeeded ensureQueue(this, pQueueService); - pQueueService->add( - new ZipWriteJob(*this, entryPath, entryContent, pDurationCollector), - Yuni::Job::priorityLow); + pendingTasks_.add(Concurrency::AddTask(*pQueueService, + ZipWriteJob(*this, entryPath, entryContent, pDurationCollector), + Yuni::Job::priorityLow)); } + } // namespace Antares::Solver diff --git a/src/libs/antares/writer/zip_writer.cpp b/src/libs/antares/writer/zip_writer.cpp index 51c33b5562..383dd9828d 100644 --- a/src/libs/antares/writer/zip_writer.cpp +++ b/src/libs/antares/writer/zip_writer.cpp @@ -16,9 +16,22 @@ extern "C" #include // std::time #include +#include namespace Antares::Solver { + +namespace +{ + +void logErrorAndThrow(const std::string& errorMessage) +{ + logs.error() << errorMessage; + throw IOError(errorMessage); +} + +} + // Class ZipWriteJob template ZipWriteJob::ZipWriteJob(ZipWriter& writer, @@ -46,7 +59,7 @@ static std::unique_ptr createInfo(const std::string& entryPath) } template -void ZipWriteJob::onExecute() +void ZipWriteJob::writeEntry() { // Don't write data if finalize() has been called if (pState != ZipState::can_receive_data) @@ -62,13 +75,14 @@ void ZipWriteJob::onExecute() Benchmarking::Timer timer_write; if (int32_t ret = mz_zip_writer_entry_open(pZipHandle, file_info.get()); ret != MZ_OK) - logs.error() << "Error opening entry " << pEntryPath << " (" << ret << ")"; - + { + logErrorAndThrow("Error opening entry " + pEntryPath + " (" + std::to_string(ret) + ")"); + } int32_t bw = mz_zip_writer_entry_write(pZipHandle, pContent.data(), pContent.size()); if (static_cast(bw) != pContent.size()) { - logs.error() << "Error writing entry " << pEntryPath << "(written = " << bw - << ", size = " << pContent.size() << ")"; + logErrorAndThrow("Error writing entry " + pEntryPath + "(written = " + std::to_string(bw) + + ", size = " + std::to_string(pContent.size()) + ")"); } timer_write.stop(); @@ -87,7 +101,7 @@ ZipWriter::ZipWriter(std::shared_ptr qs, mz_zip_writer_create(&pZipHandle); if (int32_t ret = mz_zip_writer_open_file(pZipHandle, pArchivePath.c_str(), 0, 0); ret != MZ_OK) { - logs.error() << "Error opening zip file " << pArchivePath << " (" << ret << ")"; + logErrorAndThrow("Error opening zip file " + pArchivePath + " (" + std::to_string(ret) + ")"); } // TODO : make level of compression configurable mz_zip_writer_set_compress_level(pZipHandle, MZ_COMPRESS_LEVEL_FAST); @@ -129,16 +143,16 @@ void ZipWriter::addEntryFromFile(const std::string& entryPath, const std::string addEntryFromBufferHelper(entryPath, buffer); break; case errNotFound: - logs.error() << filePath << ": file does not exist"; + logErrorAndThrow(filePath + ": file does not exist"); break; - case errReadFailed: - logs.error() << "Read failed '" << filePath << "'"; + case errReadFailed: + logErrorAndThrow("Read failed '" + filePath + "'"); break; case errMemoryLimit: - logs.error() << "Size limit hit for file '" << filePath << "'"; + logErrorAndThrow("Size limit hit for file '" + filePath + "'"); break; default: - logs.error() << "Unhandled error"; + logErrorAndThrow("Unhandled error"); break; } } @@ -150,6 +164,9 @@ bool ZipWriter::needsTheJobQueue() const void ZipWriter::finalize(bool verbose) { + //wait for completion of pending writing tasks + flush(); + // Prevent new jobs from being submitted pState = ZipState::blocking; @@ -169,4 +186,10 @@ void ZipWriter::finalize(bool verbose) if (verbose) logs.notice() << "Done"; } + +void ZipWriter::flush() +{ + pendingTasks_.join(); +} + } // namespace Antares::Solver diff --git a/src/solver/application.cpp b/src/solver/application.cpp index 143d0fe240..3ef40b7ad5 100644 --- a/src/solver/application.cpp +++ b/src/solver/application.cpp @@ -281,8 +281,11 @@ void Application::processCaption(const Yuni::String& caption) void Application::prepareWriter(Antares::Data::Study& study, Benchmarking::IDurationCollector& duration_collector) { + ioQueueService = std::make_shared(); + ioQueueService->maximumThreadCount(1); + ioQueueService->start(); resultWriter = resultWriterFactory( - study.parameters.resultFormat, study.folderOutput, study.pQueueService, duration_collector); + study.parameters.resultFormat, study.folderOutput, ioQueueService, duration_collector); } void Application::readDataForTheStudy(Data::StudyLoadOptions& options) diff --git a/src/solver/application.h b/src/solver/application.h index 76f75de59a..7740087a73 100644 --- a/src/solver/application.h +++ b/src/solver/application.h @@ -94,6 +94,7 @@ class Application : public Yuni::IEventObserver ioQueueService; IResultWriter::Ptr resultWriter = nullptr; void prepareWriter(Antares::Data::Study& study, diff --git a/src/solver/main/CMakeLists.txt b/src/solver/main/CMakeLists.txt index 239702e682..592a0a87e7 100644 --- a/src/solver/main/CMakeLists.txt +++ b/src/solver/main/CMakeLists.txt @@ -12,10 +12,12 @@ add_library(antares-solver-main-adequacy ${SRC_SOLVER_MAIN_ADEQUACY}) target_link_libraries(antares-solver-main-adequacy PRIVATE Antares::infoCollection + antares-solver-simulation ) add_library(antares-solver-main-economy ${SRC_SOLVER_MAIN_ECONOMY}) target_link_libraries(antares-solver-main-economy PRIVATE Antares::infoCollection + antares-solver-simulation ) diff --git a/src/solver/simulation/CMakeLists.txt b/src/solver/simulation/CMakeLists.txt index ab20e592d2..1e95d30615 100644 --- a/src/solver/simulation/CMakeLists.txt +++ b/src/solver/simulation/CMakeLists.txt @@ -62,4 +62,5 @@ target_link_libraries(antares-solver-simulation PRIVATE yuni-static-core Antares::study Antares::result_writer + Antares::concurrency ) diff --git a/src/solver/simulation/solver.hxx b/src/solver/simulation/solver.hxx index a46781556f..9b9e268077 100644 --- a/src/solver/simulation/solver.hxx +++ b/src/solver/simulation/solver.hxx @@ -45,11 +45,13 @@ #include #include +#include "antares/concurrency/concurrency.h" + namespace Antares::Solver::Simulation { template -class yearJob final : public Yuni::Job::IJob +class yearJob { public: yearJob(ISimulation* simulation, @@ -133,7 +135,8 @@ private: } } - virtual void onExecute() override +public: + void operator()() { Progression::Task progression(study, y, Solver::Progression::sectYear); @@ -941,13 +944,7 @@ void ISimulation::loopThroughYears(uint firstYear, allocateMemoryForRandomNumbers(randomForParallelYears); // Number of threads to perform the jobs waiting in the queue - { - int numThreads = pNbMaxPerformedYearsInParallel; - // If the result writer uses the job queue, add one more thread for it - if (pResultWriter.needsTheJobQueue()) - numThreads++; - pQueueService->maximumThreadCount(numThreads); - } + pQueueService->maximumThreadCount(pNbMaxPerformedYearsInParallel); // Loop over sets of parallel years std::vector::iterator set_it; @@ -964,6 +961,7 @@ void ISimulation::loopThroughYears(uint firstYear, std::vector::iterator year_it; bool yearPerformed = false; + Concurrency::FutureSet results; for (year_it = set_it->yearsIndices.begin(); year_it != set_it->yearsIndices.end(); ++year_it) { @@ -983,21 +981,20 @@ void ISimulation::loopThroughYears(uint firstYear, // have to be rerun (meaning : they must be run once). if(!set_it->yearFailed[y]) // continue; - pQueueService->add( - new yearJob(this, - y, - set_it->yearFailed, - set_it->isFirstPerformedYearOfASet, - pFirstSetParallelWithAPerformedYearWasRun, - numSpace, - randomForParallelYears, - performCalculations, - study, - state, - pYearByYear, - pDurationCollector, - pResultWriter)); - + Concurrency::Task task = yearJob(this, + y, + set_it->yearFailed, + set_it->isFirstPerformedYearOfASet, + pFirstSetParallelWithAPerformedYearWasRun, + numSpace, + randomForParallelYears, + performCalculations, + study, + state, + pYearByYear, + pDurationCollector, + pResultWriter); + results.add(Concurrency::AddTask(*pQueueService, task)); } // End loop over years of the current set of parallel years logPerformedYearsInAset(*set_it); @@ -1006,6 +1003,8 @@ void ISimulation::loopThroughYears(uint firstYear, pQueueService->wait(Yuni::qseIdle); pQueueService->stop(); + results.join(); + pResultWriter.flush(); // At this point, the first set of parallel year(s) was run with at least one year performed if (!pFirstSetParallelWithAPerformedYearWasRun && yearPerformed) diff --git a/src/tests/end-to-end/simple_study/simple-study.cpp b/src/tests/end-to-end/simple_study/simple-study.cpp index ee2a3a94cd..d200c32bc8 100644 --- a/src/tests/end-to-end/simple_study/simple-study.cpp +++ b/src/tests/end-to-end/simple_study/simple-study.cpp @@ -140,4 +140,25 @@ BOOST_AUTO_TEST_CASE(two_mc_years_with_different_weight__two_ts) BOOST_TEST(output.overallCost(area).hour(0) == averageLoad * clusterCost, tt::tolerance(0.001)); } + +BOOST_AUTO_TEST_SUITE_END() + +BOOST_AUTO_TEST_SUITE(error_cases) + +BOOST_AUTO_TEST_CASE(error_on_wrong_hydro_data) +{ + StudyBuilder builder; + builder.simulationBetweenDays(0, 7); + builder.setNumberMCyears(1); + Area& area = *builder.addAreaToStudy("A"); + PartHydro& hydro = area.hydro; + TimeSeriesConfigurer(hydro.series->storage) + .setColumnCount(1) + .fillColumnWith(0, -1.0); //Negative inflow will cause a consistency error with mingen + + auto simulation = builder.simulation; + simulation->create(); + BOOST_CHECK_THROW(simulation->run(), Antares::FatalError); +} + BOOST_AUTO_TEST_SUITE_END() diff --git a/src/tests/src/libs/antares/CMakeLists.txt b/src/tests/src/libs/antares/CMakeLists.txt index 63137627ed..a6771db392 100644 --- a/src/tests/src/libs/antares/CMakeLists.txt +++ b/src/tests/src/libs/antares/CMakeLists.txt @@ -1,3 +1,5 @@ +add_subdirectory(concurrency) +add_subdirectory(writer) add_subdirectory(study) set(src_libs_antares "${CMAKE_SOURCE_DIR}/libs/antares") diff --git a/src/tests/src/libs/antares/concurrency/CMakeLists.txt b/src/tests/src/libs/antares/concurrency/CMakeLists.txt new file mode 100644 index 0000000000..71cc0484c4 --- /dev/null +++ b/src/tests/src/libs/antares/concurrency/CMakeLists.txt @@ -0,0 +1,11 @@ +add_executable(test-concurrency) + +target_sources(test-concurrency PRIVATE test_concurrency.cpp) + +target_link_libraries(test-concurrency + PRIVATE + Boost::unit_test_framework + Antares::concurrency +) + +add_test(NAME concurrency COMMAND test-concurrency) diff --git a/src/tests/src/libs/antares/concurrency/test_concurrency.cpp b/src/tests/src/libs/antares/concurrency/test_concurrency.cpp new file mode 100644 index 0000000000..2e9cffbd1a --- /dev/null +++ b/src/tests/src/libs/antares/concurrency/test_concurrency.cpp @@ -0,0 +1,99 @@ +/* +** Copyright 2007-2023 RTE +** Authors: Antares_Simulator Team +** +** This file is part of Antares_Simulator. +** +** Antares_Simulator is free software: you can redistribute it and/or modify +** it under the terms of the GNU General Public License as published by +** the Free Software Foundation, either version 3 of the License, or +** (at your option) any later version. +** +** There are special exceptions to the terms and conditions of the +** license as they are applied to this software. View the full text of +** the exceptions in file COPYING.txt in the directory of this software +** distribution +** +** Antares_Simulator is distributed in the hope that it will be useful, +** but WITHOUT ANY WARRANTY; without even the implied warranty of +** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +** GNU General Public License for more details. +** +** You should have received a copy of the GNU General Public License +** along with Antares_Simulator. If not, see . +** +** SPDX-License-Identifier: licenceRef-GPL3_WITH_RTE-Exceptions +*/ +#define BOOST_TEST_MODULE test-concurrency tests +#define BOOST_TEST_DYN_LINK +#include +#include + +#include "antares/concurrency/concurrency.h" + +using namespace Yuni::Job; +using namespace Antares::Concurrency; + +std::unique_ptr createThreadPool(int size) +{ + auto threadPool = std::make_unique(); + threadPool->maximumThreadCount(size); + threadPool->start(); + return threadPool; +} + +BOOST_AUTO_TEST_CASE(test_no_error) +{ + auto threadPool = createThreadPool(1); + int counter = 0; + Task incrementCounter = [&counter]() { + counter++; + }; + TaskFuture future = AddTask(*threadPool, incrementCounter); + future.get(); + BOOST_CHECK(counter == 1); +} + + +template +Task failingTask() { + return []() { + throw Exc(); + }; +} + +class TestException {}; + +BOOST_AUTO_TEST_CASE(test_throw) +{ + auto threadPool = createThreadPool(1); + TaskFuture future = AddTask(*threadPool, failingTask()); + BOOST_CHECK_THROW(future.get(), TestException); +} + +BOOST_AUTO_TEST_CASE(test_future_set) +{ + auto threadPool = createThreadPool(4); + int counter = 0; + Task incrementCounter = [&counter]() { + counter++; + }; + FutureSet futures; + for (int i = 0; i < 10; i++) { + futures.add(AddTask(*threadPool, incrementCounter)); + } + futures.join(); + BOOST_CHECK(counter == 10); +} + +template +class TestExceptionN {}; + +BOOST_AUTO_TEST_CASE(test_future_set_rethrows_first_submitted) +{ + auto threadPool = createThreadPool(2); + FutureSet futures; + futures.add(AddTask(*threadPool, failingTask>())); + futures.add(AddTask(*threadPool, failingTask>())); + BOOST_CHECK_THROW(futures.join(), TestExceptionN<1>); +} diff --git a/src/tests/src/libs/antares/writer/CMakeLists.txt b/src/tests/src/libs/antares/writer/CMakeLists.txt new file mode 100644 index 0000000000..5d0e479419 --- /dev/null +++ b/src/tests/src/libs/antares/writer/CMakeLists.txt @@ -0,0 +1,13 @@ +add_executable(test-writer) + +target_sources(test-writer PRIVATE test_zip_writer.cpp) + +target_link_libraries(test-writer + PRIVATE + Boost::unit_test_framework + Antares::result_writer + test_utils_unit + MINIZIP::minizip +) + +add_test(NAME writer COMMAND test-writer) diff --git a/src/tests/src/libs/antares/writer/test_zip_writer.cpp b/src/tests/src/libs/antares/writer/test_zip_writer.cpp new file mode 100644 index 0000000000..474b7c6c4a --- /dev/null +++ b/src/tests/src/libs/antares/writer/test_zip_writer.cpp @@ -0,0 +1,129 @@ +/* +** Copyright 2007-2023 RTE +** Authors: Antares_Simulator Team +** +** This file is part of Antares_Simulator. +** +** Antares_Simulator is free software: you can redistribute it and/or modify +** it under the terms of the GNU General Public License as published by +** the Free Software Foundation, either version 3 of the License, or +** (at your option) any later version. +** +** There are special exceptions to the terms and conditions of the +** license as they are applied to this software. View the full text of +** the exceptions in file COPYING.txt in the directory of this software +** distribution +** +** Antares_Simulator is distributed in the hope that it will be useful, +** but WITHOUT ANY WARRANTY; without even the implied warranty of +** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +** GNU General Public License for more details. +** +** You should have received a copy of the GNU General Public License +** along with Antares_Simulator. If not, see . +** +** SPDX-License-Identifier: licenceRef-GPL3_WITH_RTE-Exceptions +*/ +#define BOOST_TEST_MODULE test-writer tests +#define BOOST_TEST_DYN_LINK +#include +#include + +#include "yuni/job/queue/service.h" +#include "antares/writer/i_writer.h" +#include "antares/writer/writer_factory.h" +#include "antares/benchmarking/DurationCollector.h" +#include "utils.h" + +extern "C" +{ +#include +#include +#include +#include +} + +using namespace Yuni::Job; +using Benchmarking::NullDurationCollector; +using Benchmarking::IDurationCollector; +using Antares::Solver::IResultWriter; + +// Handles lifetime of necessary objects +struct TestContext +{ + std::shared_ptr threadPool; + std::unique_ptr durationCollector; + std::shared_ptr writer; +}; + +std::shared_ptr createThreadPool(int size) +{ + auto threadPool = std::make_shared(); + threadPool->maximumThreadCount(size); + threadPool->start(); + return threadPool; +} + +std::string removeExtension(const std::string& name, const std::string& ext) +{ + int length = name.size(); + if (name.size() > ext.size() && name.substr(length - ext.size()) == ext) { + return name.substr(0, length - ext.size()); + } + return name; +} + + +TestContext createContext(const std::filesystem::path zipPath, int threadCount) +{ + auto threadPool = createThreadPool(threadCount); + std::unique_ptr durationCollector = std::make_unique(); + std::string archiveName = zipPath.string(); + auto writer = Antares::Solver::resultWriterFactory( + Antares::Data::zipArchive, + removeExtension(zipPath.string(), ".zip"), + threadPool, + *durationCollector + ); + return { + threadPool, + std::move(durationCollector), + writer + }; +} + +using ZipReaderHandle = void*; + +void checkZipContent(ZipReaderHandle handle, const std::string& path, const std::string& expectedContent) +{ + BOOST_CHECK(mz_zip_reader_locate_entry(handle, path.c_str(), 0) == MZ_OK); + BOOST_CHECK(mz_zip_reader_entry_open(handle) == MZ_OK); + char buffer[4096]; + int bytesRead = mz_zip_reader_entry_read(handle, buffer, sizeof(buffer)); + std::string stringRead(buffer, bytesRead); + BOOST_CHECK(stringRead == expectedContent); + mz_zip_reader_entry_close(handle); +} + +BOOST_AUTO_TEST_CASE(test_zip) +{ + // Writer some content to test.zip, possibly from 2 threads + auto working_tmp_dir = CREATE_TMP_DIR_BASED_ON_TEST_NAME(); + auto zipPath = working_tmp_dir / "test.zip"; + auto context = createContext(zipPath, 2); + std::string content1 = "test-content1"; + std::string content2 = "test-content2"; + context.writer->addEntryFromBuffer("test-path", content1); + context.writer->addEntryFromBuffer("test-second-path", content2); + context.writer->flush(); + context.writer->finalize(true); + + // Check content is correct + ZipReaderHandle readerHandle; + mz_zip_reader_create(&readerHandle); + std::string zipPathStr = zipPath.string(); + BOOST_CHECK(mz_zip_reader_open_file(readerHandle, zipPathStr.c_str()) == MZ_OK); + checkZipContent(readerHandle, "test-path", "test-content1"); + checkZipContent(readerHandle, "test-second-path", "test-content2"); + mz_zip_reader_close(readerHandle); +}