Skip to content

Commit

Permalink
Try eliminating DeferredParseAccess
Browse files Browse the repository at this point in the history
  • Loading branch information
franzpoeschel committed Dec 11, 2024
1 parent 3bbd8d0 commit 8db512f
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 98 deletions.
29 changes: 2 additions & 27 deletions include/openPMD/Iteration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,34 +45,13 @@ namespace internal
*/
enum class CloseStatus
{
ParseAccessDeferred, //!< The reader has not yet parsed this iteration
Open, //!< Iteration has not been closed
ClosedInFrontend, /*!< Iteration has been closed, but task has not yet
been propagated to the backend */
Closed, /*!< Iteration has been closed and task has been
propagated to the backend */
};

struct DeferredParseAccess
{
/**
* The group path within /data containing this iteration.
* Example: "1" for iteration 1, "" in variable-based iteration
* encoding.
*/
std::string path;
/**
* The iteration index as accessed by the user in series.iterations[i]
*/
uint64_t iteration = 0;
/**
* If this iteration is part of a Series with file-based layout.
* (Group- and variable-based parsing shares the same code logic.)
*/
bool fileBased = false;
bool beginStep = false;
};

class IterationData : public AttributableData
{
public:
Expand Down Expand Up @@ -104,11 +83,7 @@ namespace internal
*/
StepStatus m_stepStatus = StepStatus::NoStep;

/**
* Information on a parsing request that has not yet been executed.
* Otherwise empty.
*/
std::optional<DeferredParseAccess> m_deferredParseAccess{};
bool need_to_parse = false;
};
} // namespace internal
/** @brief Logical compilation of data from one snapshot (e.g. a single
Expand Down Expand Up @@ -280,7 +255,7 @@ class Iteration : public Attributable
void flushGroupBased(IterationIndex_t, internal::FlushParams const &);
void flushVariableBased(IterationIndex_t, internal::FlushParams const &);
void flush(internal::FlushParams const &);
void deferParseAccess(internal::DeferredParseAccess);
void deferParseAccess();
/*
* Control flow for runDeferredParseAccess(), readFileBased(),
* readGroupBased() and read_impl():
Expand Down
88 changes: 42 additions & 46 deletions src/Iteration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "openPMD/Error.hpp"
#include "openPMD/IO/AbstractIOHandler.hpp"
#include "openPMD/IO/IOTask.hpp"
#include "openPMD/IterationEncoding.hpp"
#include "openPMD/Series.hpp"
#include "openPMD/Streaming.hpp"
#include "openPMD/auxiliary/DerefDynamicCast.hpp"
Expand All @@ -44,7 +45,6 @@
namespace openPMD
{
using internal::CloseStatus;
using internal::DeferredParseAccess;

Iteration::Iteration() : Attributable(NoInit())
{
Expand Down Expand Up @@ -102,7 +102,6 @@ Iteration &Iteration::close(bool _flush)
case CloseStatus::ClosedInFrontend:
it.m_closed = CloseStatus::ClosedInFrontend;
break;
case CloseStatus::ParseAccessDeferred:
case CloseStatus::Closed:
// just keep it like it is
// (this means that closing an iteration that has not been parsed
Expand Down Expand Up @@ -160,10 +159,10 @@ Iteration &Iteration::open()
// Ensure that files are accessed.
// If the close status was Closed, this will open it.
s.openIteration(begin->first, *this);
if (it.m_closed == CloseStatus::ParseAccessDeferred)
if (it.need_to_parse)
{
it.m_closed = CloseStatus::Open;
runDeferredParseAccess();
it.m_closed = CloseStatus::Open;
}
if (getStepStatus() == StepStatus::OutOfStep)
{
Expand All @@ -178,33 +177,27 @@ bool Iteration::closed() const
{
switch (get().m_closed)
{
case CloseStatus::ParseAccessDeferred:
case CloseStatus::Open:
// std::cout << "Open" << std::endl;
return false;
/*
* Temporarily closing a file is something that the openPMD API
* does for optimization purposes.
* Logically to the user, it is still open.
*/
case CloseStatus::ClosedInFrontend:
// std::cout << "ClosedInFrontend" << std::endl;
return true;
case CloseStatus::Closed:
// std::cout << "Closed" << std::endl;
return true;
}
throw std::runtime_error("Unreachable!");
}

bool Iteration::parsed() const
{
switch (get().m_closed)
{
case CloseStatus::ParseAccessDeferred:
return false;
case CloseStatus::Open:
case CloseStatus::ClosedInFrontend:
case CloseStatus::Closed:
return true;
}
throw std::runtime_error("Unreachable!");
return !get().need_to_parse;
}

bool Iteration::closedByWriter() const
Expand Down Expand Up @@ -401,21 +394,22 @@ void Iteration::flush(internal::FlushParams const &flushParams)
}
}

void Iteration::deferParseAccess(DeferredParseAccess dr)
void Iteration::deferParseAccess()
{
get().m_deferredParseAccess =
std::make_optional<DeferredParseAccess>(std::move(dr));
get().need_to_parse = true;
get().m_closed = CloseStatus::Closed;
}

void Iteration::reread(std::string const &path)
{
if (get().m_deferredParseAccess.has_value())
{
throw std::runtime_error(
"[Iteration] Internal control flow error: Trying to reread an "
"iteration that has not yet been read for its first time.");
}
read_impl(path);
throw std::runtime_error("Iteration::reread kapier ich ned");
// if (get().m_deferredParseAccess.has_value())
// {
// throw std::runtime_error(
// "[Iteration] Internal control flow error: Trying to reread an "
// "iteration that has not yet been read for its first time.");
// }
// read_impl(path);
}

void Iteration::readFileBased(
Expand All @@ -424,13 +418,13 @@ void Iteration::readFileBased(
std::string const &groupPath,
bool doBeginStep)
{
if (doBeginStep)
{
/*
* beginStep() must take care to open files
*/
beginStep(/* reread = */ false);
}
// if (doBeginStep)
// {
// /*
// * beginStep() must take care to open files
// */
// beginStep(/* reread = */ false);
// }
auto series = retrieveSeries();

series.readOneIterationFileBased(filePath);
Expand Down Expand Up @@ -914,11 +908,18 @@ Iteration::resetIteration()
auto parent = writable().parent;
auto old_iteration = *this;

auto s = retrieveSeries();
if (&s.indexOf(*this)->second != this)
{
throw std::runtime_error("AAAAAAAAAAAAAAAAAAAAAAAAAA");
}

// old value
auto old_attributable = std::move(this->m_attri);
res_t res{std::move(old_iteration), std::move(*old_attributable)};

this->setData(std::make_shared<internal::IterationData>());
Attributable::get().m_attributes = std::move(res.second->m_attributes);

static_cast<attr_t &>(*old_attributable) =
static_cast<attr_t const &>(*this->m_attri);
Expand Down Expand Up @@ -954,41 +955,36 @@ void Iteration::runDeferredParseAccess()
if (access::read(IOHandler()->m_frontendAccess))
{
auto &it = get();
if (!it.m_deferredParseAccess.has_value())
{
return;
}
auto const &deferred = it.m_deferredParseAccess.value();
auto s = retrieveSeries();
auto myself = s.indexOf(*this);
it.need_to_parse = false;

auto oldStatus = IOHandler()->m_seriesStatus;
IOHandler()->m_seriesStatus = internal::SeriesStatus::Parsing;
try
{
if (deferred.fileBased)
if (s.iterationEncoding() == IterationEncoding::fileBased)
{
auto const &filename =
retrieveSeries().get().m_iterationFilenames.at(
deferred.iteration);
s.get().m_iterationFilenames.at(myself->first);
readFileBased(
deferred.iteration,
myself->first,
filename,
deferred.path,
deferred.beginStep);
it->m_writable.ownKeyWithinParent,
written());
}
else
{
readGorVBased(deferred.path, deferred.beginStep);
readGorVBased(it->m_writable.ownKeyWithinParent, written());
}
}
catch (...)
{
// reset this thing
it.m_deferredParseAccess = std::optional<DeferredParseAccess>();
IOHandler()->m_seriesStatus = oldStatus;
throw;
}
// reset this thing
it.m_deferredParseAccess = std::optional<DeferredParseAccess>();
IOHandler()->m_seriesStatus = oldStatus;
}
}
Expand Down
27 changes: 11 additions & 16 deletions src/Series.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1566,8 +1566,6 @@ void Series::readFileBased(
std::optional<IterationIndex_t> read_only_this_single_iteration)
{
auto &series = get();
Parameter<Operation::OPEN_FILE> fOpen;
Parameter<Operation::READ_ATT> aRead;

// Tell the backend that we are parsing file-based iteration encoding.
// This especially means that READ_RANDOM_ACCESS will be used instead of
Expand Down Expand Up @@ -1604,7 +1602,7 @@ void Series::readFileBased(
Iteration &i = series.iterations[index];
series.m_iterationFilenames[index] =
cleanFilename(filename, series.m_filenameExtension).body;
i.deferParseAccess({std::to_string(index), index, true});
i.deferParseAccess();
});

if (series.iterations.empty())
Expand Down Expand Up @@ -1654,8 +1652,7 @@ void Series::readFileBased(
{
for (auto &iteration : series.iterations)
{
iteration.second.get().m_closed =
internal::CloseStatus::ParseAccessDeferred;
iteration.second.get().m_closed = internal::CloseStatus::Closed;
}
// open the first iteration, just to parse Series attributes
bool atLeastOneIterationSuccessful = false;
Expand Down Expand Up @@ -2055,8 +2052,7 @@ creating new iterations.
auto readSingleIteration =
[&series, &pOpen, this](
IterationIndex_t index,
std::string const &path,
bool beginStep) -> std::optional<error::ReadError> {
std::string const &path) -> std::optional<error::ReadError> {
if (series.iterations.contains(index))
{
// maybe re-read
Expand All @@ -2067,7 +2063,8 @@ creating new iterations.
{
return {};
}
if (i.get().m_closed != internal::CloseStatus::ParseAccessDeferred)
throw std::runtime_error("kapier ich ned");
if (!i.get().need_to_parse)
{
pOpen.path = path;
IOHandler()->enqueue(IOTask(&i, pOpen));
Expand All @@ -2081,7 +2078,7 @@ creating new iterations.
{
// parse for the first time, resp. delay the parsing process
Iteration &i = series.iterations[index];
i.deferParseAccess({path, index, false, beginStep});
i.deferParseAccess();
if (!series.m_parseLazily)
{
try
Expand All @@ -2100,7 +2097,7 @@ creating new iterations.
}
else
{
i.get().m_closed = internal::CloseStatus::ParseAccessDeferred;
i.get().m_closed = internal::CloseStatus::Closed;
}
}
return std::nullopt;
Expand Down Expand Up @@ -2128,7 +2125,7 @@ creating new iterations.
}
if (auto err = internal::withRWAccess(
IOHandler()->m_seriesStatus,
[&]() { return readSingleIteration(index, it, false); });
[&]() { return readSingleIteration(index, it); });
err)
{
std::cerr << "Cannot read iteration " << index
Expand Down Expand Up @@ -2188,7 +2185,7 @@ creating new iterations.
if (auto err = internal::withRWAccess(
IOHandler()->m_seriesStatus,
[&readSingleIteration, it]() {
return readSingleIteration(it, "", true);
return readSingleIteration(it, "");
});
err)
{
Expand Down Expand Up @@ -2630,7 +2627,7 @@ auto Series::openIterationIfDirty(IterationIndex_t index, Iteration &iteration)
* Check side conditions on accessing iterations, and if they are fulfilled,
* forward function params to openIteration().
*/
if (data.m_closed == internal::CloseStatus::ParseAccessDeferred)
if (data.need_to_parse)
{
return IterationOpened::RemainsClosed;
}
Expand Down Expand Up @@ -2721,7 +2718,6 @@ void Series::openIteration(IterationIndex_t index, Iteration &iteration)
case CL::Open:
iteration.get().m_closed = CL::Open;
break;
case CL::ParseAccessDeferred:
case CL::ClosedInFrontend:
// just keep it like it is
break;
Expand All @@ -2747,8 +2743,7 @@ void Series::openIteration(IterationIndex_t index, Iteration &iteration)
* before it is possible to open it.
*/
if (!iteration.written() &&
(IOHandler()->m_frontendAccess == Access::CREATE ||
oldStatus != internal::CloseStatus::ParseAccessDeferred))
(IOHandler()->m_frontendAccess == Access::CREATE))
{
// nothing to do, file will be opened by writing routines
break;
Expand Down
Loading

0 comments on commit 8db512f

Please sign in to comment.