Skip to content

Commit

Permalink
Move BufferedActions impl to ADIOS2File.cpp
Browse files Browse the repository at this point in the history
  • Loading branch information
franzpoeschel committed Nov 20, 2023
1 parent 16c7e5c commit 90e3ccc
Show file tree
Hide file tree
Showing 7 changed files with 1,499 additions and 1,451 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ set(IO_SOURCE
src/IO/JSON/JSONIOHandlerImpl.cpp
src/IO/JSON/JSONFilePosition.cpp
src/IO/ADIOS/ADIOS2IOHandler.cpp
src/IO/ADIOS/ADIOS2File.cpp
src/IO/ADIOS/ADIOS2Auxiliary.cpp
src/IO/InvalidatableFile.cpp)

Expand Down
24 changes: 24 additions & 0 deletions include/openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,37 @@ namespace adios_defs
Disk_Override
};

using FlushTarget = adios_defs::FlushTarget;
FlushTarget flushTargetFromString(std::string const &str);

enum class UseGroupTable
{
Yes,
No
};
} // namespace adios_defs

/*
* The following strings are used during parsing of the JSON configuration
* string for the ADIOS2 backend.
*/
namespace ADIOS2Defaults
{
using const_str = char const *const;
constexpr const_str str_engine = "engine";
constexpr const_str str_type = "type";
constexpr const_str str_params = "parameters";
constexpr const_str str_usesteps = "usesteps";
constexpr const_str str_flushtarget = "preferred_flush_target";
constexpr const_str str_usesstepsAttribute = "__openPMD_internal/useSteps";
constexpr const_str str_adios2Schema =
"__openPMD_internal/openPMD2_adios2_schema";
constexpr const_str str_isBoolean = "__is_boolean__";
constexpr const_str str_activeTablePrefix = "__openPMD_groups";
constexpr const_str str_groupBasedWarning =
"__openPMD_internal/warning_bugprone_groupbased_encoding";
} // namespace ADIOS2Defaults

#if openPMD_HAVE_ADIOS2
namespace detail
{
Expand Down
65 changes: 61 additions & 4 deletions include/openPMD/IO/ADIOS/ADIOS2File.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "openPMD/IO/AbstractIOHandler.hpp"
#include "openPMD/IO/IOTask.hpp"

#include <functional>
#include <string>

#include "openPMD/IO/InvalidatableFile.hpp"
Expand Down Expand Up @@ -70,6 +71,19 @@ struct BufferedGet : BufferedAction
void run(BufferedActions &) override;
};

struct DatasetReader
{
template <typename T>
static void call(
ADIOS2IOHandlerImpl *impl,
BufferedGet &bp,
adios2::IO &IO,
adios2::Engine &engine,
std::string const &fileName);

static constexpr char const *errorMsg = "ADIOS2: readDataset()";
};

struct BufferedPut : BufferedAction
{
std::string name;
Expand All @@ -78,6 +92,15 @@ struct BufferedPut : BufferedAction
void run(BufferedActions &) override;
};

struct WriteDataset
{
template <typename T>
static void call(BufferedActions &ba, BufferedPut &bp);

template <int n, typename... Params>
static void call(Params &&...);
};

struct BufferedUniquePtrPut
{
std::string name;
Expand Down Expand Up @@ -221,10 +244,18 @@ class BufferedActions
adios2::Engine &getEngine();

template <typename BA>
void enqueue(BA &&ba);
void enqueue(BA &&ba)
{
enqueue<BA>(std::forward<BA>(ba), m_buffer);
}

template <typename BA>
void enqueue(BA &&ba, decltype(m_buffer) &);
void enqueue(BA &&ba, decltype(m_buffer) &buffer)
{
using BA_ = typename std::remove_reference<BA>::type;
buffer.emplace_back(
std::unique_ptr<BufferedAction>(new BA_(std::forward<BA>(ba))));
}

template <typename... Args>
void flush(Args &&...args);
Expand Down Expand Up @@ -262,10 +293,10 @@ class BufferedActions
* @param flushUnconditionally Whether to run the functor even if no
* deferred IO tasks had been queued.
*/
template <typename F>
void flush_impl(
ADIOS2FlushParams flushParams,
F &&performPutsGets,
std::function<void(BufferedActions &, adios2::Engine &)> const
&performPutGets,
bool writeLatePuts,
bool flushUnconditionally);

Expand Down Expand Up @@ -434,4 +465,30 @@ class BufferedActions
void configure_IO_Read();
void configure_IO_Write();
};

template <typename... Args>
void BufferedActions::flush(Args &&...args)
{
try
{
flush_impl(std::forward<Args>(args)...);
}
catch (error::ReadError const &)
{
/*
* We need to take actions out of the buffer, since an exception
* should reset everything from the current IOHandler->flush() call.
* However, we cannot simply clear the buffer, since tasks may have
* been enqueued to ADIOS2 already and we cannot undo that.
* So, we need to keep the memory alive for the benefit of ADIOS2.
* Luckily, we have m_alreadyEnqueued for exactly that purpose.
*/
for (auto &task : m_buffer)
{
m_alreadyEnqueued.emplace_back(std::move(task));
}
m_buffer.clear();
throw;
}
}
} // namespace openPMD::detail
107 changes: 63 additions & 44 deletions include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/
#pragma once

#include "openPMD/Error.hpp"
#include "openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp"
#include "openPMD/IO/ADIOS/ADIOS2FilePosition.hpp"
#include "openPMD/IO/AbstractIOHandler.hpp"
Expand All @@ -29,9 +30,11 @@
#include "openPMD/IO/IOTask.hpp"
#include "openPMD/IO/InvalidatableFile.hpp"
#include "openPMD/IterationEncoding.hpp"
#include "openPMD/ThrowError.hpp"
#include "openPMD/auxiliary/JSON_internal.hpp"
#include "openPMD/backend/Writable.hpp"
#include "openPMD/config.hpp"
#include <stdexcept>

#if openPMD_HAVE_ADIOS2
#include <adios2.h>
Expand Down Expand Up @@ -406,7 +409,66 @@ class ADIOS2IOHandlerImpl
Offset const &offset,
Extent const &extent,
adios2::IO &IO,
std::string const &var);
std::string const &varName)
{
{
auto requiredType = adios2::GetType<T>();
auto actualType = IO.VariableType(varName);

if (requiredType != actualType)
{
std::stringstream errorMessage;
errorMessage << "Trying to access a dataset with wrong type "
"(trying to access dataset with type "
<< determineDatatype<T>() << ", but has type "
<< detail::fromADIOS2Type(actualType, false)
<< ")";
throw error::ReadError(
error::AffectedObject::Dataset,
error::Reason::UnexpectedContent,
"ADIOS2",
errorMessage.str());
};
}
adios2::Variable<T> var = IO.InquireVariable<T>(varName);
if (!var.operator bool())
{

throw std::runtime_error(
"[ADIOS2] Internal error: Failed opening ADIOS2 variable.");
}
// TODO leave this check to ADIOS?
adios2::Dims shape = var.Shape();
auto actualDim = shape.size();
{
auto requiredDim = extent.size();
if (requiredDim != actualDim)
{
throw error::ReadError(
error::AffectedObject::Dataset,
error::Reason::UnexpectedContent,
"ADIOS2",
"Trying to access a dataset with wrong dimensionality "
"(trying to access dataset with dimensionality " +
std::to_string(requiredDim) +
", but has dimensionality " +
std::to_string(actualDim) + ")");
}
}
for (unsigned int i = 0; i < actualDim; i++)
{
if (offset[i] + extent[i] > shape[i])
{
throw std::runtime_error(
"[ADIOS2] Dataset access out of bounds.");
}
}

var.SetSelection(
{adios2::Dims(offset.begin(), offset.end()),
adios2::Dims(extent.begin(), extent.end())});
return var;
}

struct
{
Expand All @@ -415,27 +477,6 @@ class ADIOS2IOHandlerImpl
} printedWarningsAlready;
}; // ADIOS2IOHandlerImpl

/*
* The following strings are used during parsing of the JSON configuration
* string for the ADIOS2 backend.
*/
namespace ADIOS2Defaults
{
using const_str = char const *const;
constexpr const_str str_engine = "engine";
constexpr const_str str_type = "type";
constexpr const_str str_params = "parameters";
constexpr const_str str_usesteps = "usesteps";
constexpr const_str str_flushtarget = "preferred_flush_target";
constexpr const_str str_usesstepsAttribute = "__openPMD_internal/useSteps";
constexpr const_str str_adios2Schema =
"__openPMD_internal/openPMD2_adios2_schema";
constexpr const_str str_isBoolean = "__is_boolean__";
constexpr const_str str_activeTablePrefix = "__openPMD_groups";
constexpr const_str str_groupBasedWarning =
"__openPMD_internal/warning_bugprone_groupbased_encoding";
} // namespace ADIOS2Defaults

namespace detail
{
// Helper structs for calls to the switchType function
Expand All @@ -445,19 +486,6 @@ namespace detail
std::is_same_v<T, std::complex<long double>> ||
std::is_same_v<T, std::vector<std::complex<long double>>>;

struct DatasetReader
{
template <typename T>
static void call(
ADIOS2IOHandlerImpl *impl,
BufferedGet &bp,
adios2::IO &IO,
adios2::Engine &engine,
std::string const &fileName);

static constexpr char const *errorMsg = "ADIOS2: readDataset()";
};

struct AttributeReader
{
template <typename T>
Expand Down Expand Up @@ -495,15 +523,6 @@ namespace detail
static constexpr char const *errorMsg = "ADIOS2: openDataset()";
};

struct WriteDataset
{
template <typename T>
static void call(BufferedActions &ba, BufferedPut &bp);

template <int n, typename... Params>
static void call(Params &&...);
};

struct VariableDefiner
{
/**
Expand Down
31 changes: 31 additions & 0 deletions src/IO/ADIOS/ADIOS2Auxiliary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,37 @@

#include <iostream>

namespace openPMD::adios_defs
{
FlushTarget flushTargetFromString(std::string const &str)
{
if (str == "buffer")
{
return FlushTarget::Buffer;
}
else if (str == "disk")
{
return FlushTarget::Disk;
}
else if (str == "buffer_override")
{
return FlushTarget::Buffer_Override;
}
else if (str == "disk_override")
{
return FlushTarget::Disk_Override;
}
else
{
throw error::BackendConfigSchema(
{"adios2", "engine", ADIOS2Defaults::str_flushtarget},
"Flush target must be either 'disk' or 'buffer', but "
"was " +
str + ".");
}
}
} // namespace openPMD::adios_defs

namespace openPMD::detail
{
template <typename T>
Expand Down
Loading

0 comments on commit 90e3ccc

Please sign in to comment.