Skip to content

Commit

Permalink
Merge pull request #2773 from pnorbert/bp5-aggregation
Browse files Browse the repository at this point in the history
Bp5 aggregation
  • Loading branch information
pnorbert authored Jun 25, 2021
2 parents bb08498 + 37d6d25 commit 0a43c4e
Show file tree
Hide file tree
Showing 20 changed files with 355 additions and 111 deletions.
10 changes: 5 additions & 5 deletions source/adios2/engine/bp3/BP3Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ void BP3Writer::InitTransports()
// only consumers will interact with transport managers
std::vector<std::string> bpSubStreamNames;

if (m_BP3Serializer.m_Aggregator.m_IsConsumer)
if (m_BP3Serializer.m_Aggregator.m_IsAggregator)
{
// Names passed to IO AddTransport option with key "Name"
const std::vector<std::string> transportsNames =
Expand All @@ -192,7 +192,7 @@ void BP3Writer::InitTransports()
m_BP3Serializer.m_Parameters.NodeLocal);
m_BP3Serializer.m_Profiler.Stop("mkdir");

if (m_BP3Serializer.m_Aggregator.m_IsConsumer)
if (m_BP3Serializer.m_Aggregator.m_IsAggregator)
{
if (m_BP3Serializer.m_Parameters.AsyncTasks)
{
Expand Down Expand Up @@ -246,7 +246,7 @@ void BP3Writer::DoClose(const int transportIndex)

DoFlush(true, transportIndex);

if (m_BP3Serializer.m_Aggregator.m_IsConsumer)
if (m_BP3Serializer.m_Aggregator.m_IsAggregator)
{
m_FileDataManager.CloseFiles(transportIndex);
}
Expand Down Expand Up @@ -396,7 +396,7 @@ void BP3Writer::AggregateWriteData(const bool isFinal, const int transportIndex)
m_BP3Serializer.m_Aggregator.IExchangeAbsolutePosition(
m_BP3Serializer.m_Data, r);

if (m_BP3Serializer.m_Aggregator.m_IsConsumer)
if (m_BP3Serializer.m_Aggregator.m_IsAggregator)
{
const format::Buffer &bufferSTL =
m_BP3Serializer.m_Aggregator.GetConsumerBuffer(
Expand Down Expand Up @@ -425,7 +425,7 @@ void BP3Writer::AggregateWriteData(const bool isFinal, const int transportIndex)
m_BP3Serializer.AggregateCollectiveMetadata(
m_BP3Serializer.m_Aggregator.m_Comm, bufferSTL, false);

if (m_BP3Serializer.m_Aggregator.m_IsConsumer)
if (m_BP3Serializer.m_Aggregator.m_IsAggregator)
{
m_FileDataManager.WriteFiles(bufferSTL.m_Buffer.data(),
bufferSTL.m_Position, transportIndex);
Expand Down
14 changes: 7 additions & 7 deletions source/adios2/engine/bp4/BP4Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ void BP4Writer::InitTransports()
PathSeparator + m_Name;
}

if (m_BP4Serializer.m_Aggregator.m_IsConsumer)
if (m_BP4Serializer.m_Aggregator.m_IsAggregator)
{
// Names passed to IO AddTransport option with key "Name"
const std::vector<std::string> transportsNames =
Expand Down Expand Up @@ -222,7 +222,7 @@ void BP4Writer::InitTransports()
}
m_BP4Serializer.m_Profiler.Stop("mkdir");

if (m_BP4Serializer.m_Aggregator.m_IsConsumer)
if (m_BP4Serializer.m_Aggregator.m_IsAggregator)
{
if (m_BP4Serializer.m_Parameters.AsyncTasks)
{
Expand Down Expand Up @@ -333,7 +333,7 @@ void BP4Writer::InitBPBuffer()
static_cast<uint32_t>(lastStep);
m_BP4Serializer.m_MetadataSet.CurrentStep += lastStep;

if (m_BP4Serializer.m_Aggregator.m_IsConsumer)
if (m_BP4Serializer.m_Aggregator.m_IsAggregator)
{
m_BP4Serializer.m_PreDataFileLength =
m_FileDataManager.GetFileSize(0);
Expand Down Expand Up @@ -361,7 +361,7 @@ void BP4Writer::InitBPBuffer()
m_BP4Serializer.MakeHeader(m_BP4Serializer.m_MetadataIndex,
"Index Table", true);
}
if (m_BP4Serializer.m_Aggregator.m_IsConsumer)
if (m_BP4Serializer.m_Aggregator.m_IsAggregator)
{
m_BP4Serializer.MakeHeader(m_BP4Serializer.m_Data, "Data", false);
}
Expand Down Expand Up @@ -403,7 +403,7 @@ void BP4Writer::DoClose(const int transportIndex)

DoFlush(true, transportIndex);

if (m_BP4Serializer.m_Aggregator.m_IsConsumer)
if (m_BP4Serializer.m_Aggregator.m_IsAggregator)
{
m_FileDataManager.CloseFiles(transportIndex);
// Delete files from temporary storage if draining was on
Expand Down Expand Up @@ -465,7 +465,7 @@ void BP4Writer::DoClose(const int transportIndex)
}
}

if (m_BP4Serializer.m_Aggregator.m_IsConsumer && m_DrainBB)
if (m_BP4Serializer.m_Aggregator.m_IsAggregator && m_DrainBB)
{
/* Signal the BB thread that no more work is coming */
m_FileDrainer.Finish();
Expand Down Expand Up @@ -740,7 +740,7 @@ void BP4Writer::AggregateWriteData(const bool isFinal, const int transportIndex)
m_BP4Serializer.m_Aggregator.IExchangeAbsolutePosition(
m_BP4Serializer.m_Data, r);

if (m_BP4Serializer.m_Aggregator.m_IsConsumer)
if (m_BP4Serializer.m_Aggregator.m_IsAggregator)
{
const format::Buffer &bufferSTL =
m_BP4Serializer.m_Aggregator.GetConsumerBuffer(
Expand Down
32 changes: 21 additions & 11 deletions source/adios2/engine/bp5/BP5Engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,28 +103,21 @@ std::string BP5Engine::GetBPSubStreamName(const std::string &name,
}

const std::string bpName = helper::RemoveTrailingSlash(name);

const size_t index = id;
// isReader ? id
// : m_Aggregator.m_IsActive ? m_Aggregator.m_SubStreamIndex : id;

/* the name of a data file starts with "data." */
const std::string bpRankName(bpName + PathSeparator + "data." +
std::to_string(index));
std::to_string(id));
return bpRankName;
}

std::vector<std::string>
BP5Engine::GetBPSubStreamNames(const std::vector<std::string> &names) const
noexcept
BP5Engine::GetBPSubStreamNames(const std::vector<std::string> &names,
size_t subFileIndex) const noexcept
{
std::vector<std::string> bpNames;
bpNames.reserve(names.size());

for (const auto &name : names)
{
bpNames.push_back(
GetBPSubStreamName(name, static_cast<unsigned int>(m_RankMPI)));
bpNames.push_back(GetBPSubStreamName(name, subFileIndex));
}
return bpNames;
}
Expand Down Expand Up @@ -167,6 +160,23 @@ void BP5Engine::ParseParams(IO &io, struct BP5Params &Params)
return false;
};

auto lf_SetUIntParameter = [&](const std::string key,
unsigned int &parameter, unsigned int def) {
auto itKey = io.m_Parameters.find(key);
parameter = def;
if (itKey != io.m_Parameters.end())
{
unsigned long result = std::stoul(itKey->second);
if (result > std::numeric_limits<unsigned>::max())
{
result = std::numeric_limits<unsigned>::max();
}
parameter = static_cast<unsigned int>(result);
return true;
}
return false;
};

auto lf_SetStringParameter = [&](const std::string key,
std::string &parameter, const char *def) {
auto itKey = io.m_Parameters.find(key);
Expand Down
5 changes: 4 additions & 1 deletion source/adios2/engine/bp5/BP5Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ class BP5Engine
static constexpr size_t m_VersionTagLength = 32;

std::vector<std::string>
GetBPSubStreamNames(const std::vector<std::string> &names) const noexcept;
GetBPSubStreamNames(const std::vector<std::string> &names,
size_t subFileIndex) const noexcept;

std::vector<std::string>
GetBPMetadataFileNames(const std::vector<std::string> &names) const
Expand Down Expand Up @@ -87,6 +88,8 @@ class BP5Engine
MACRO(NodeLocal, Bool, bool, false) \
MACRO(verbose, Int, int, 0) \
MACRO(CollectiveMetadata, Bool, bool, true) \
MACRO(NumAggregators, UInt, unsigned int, 999999999) \
MACRO(AsyncTasks, Bool, bool, true) \
MACRO(ReaderShortCircuitReads, Bool, bool, false)

struct BP5Params
Expand Down
13 changes: 7 additions & 6 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ StepStatus BP5Reader::BeginStep(StepMode mode, const float timeoutSeconds)
size_t pgstart = m_MetadataIndexTable[m_CurrentStep][0];
size_t Position = pgstart + sizeof(uint64_t); // skip total data size
size_t MDPosition = Position + 2 * sizeof(uint64_t) * m_WriterCount;
for (int i = 0; i < m_WriterCount; i++)
for (size_t i = 0; i < m_WriterCount; i++)
{
// variable metadata for timestep
size_t ThisMDSize = helper::ReadValue<uint64_t>(
Expand All @@ -104,7 +104,7 @@ StepStatus BP5Reader::BeginStep(StepMode mode, const float timeoutSeconds)
m_BP5Deserializer->InstallMetaData(ThisMD, ThisMDSize, i);
MDPosition += ThisMDSize;
}
for (int i = 0; i < m_WriterCount; i++)
for (size_t i = 0; i < m_WriterCount; i++)
{
// attribute metadata for timestep
size_t ThisADSize = helper::ReadValue<uint64_t>(
Expand Down Expand Up @@ -139,20 +139,21 @@ void BP5Reader::ReadData(const size_t WriterRank, const size_t Timestep,
char *Destination)
{
size_t DataStartPos = m_MetadataIndexTable[Timestep][2];
size_t SubfileNum = m_WriterToFileMap[WriterRank];
DataStartPos += WriterRank * sizeof(uint64_t);
size_t DataStart = helper::ReadValue<uint64_t>(
m_MetadataIndex.m_Buffer, DataStartPos, m_Minifooter.IsLittleEndian);
// check if subfile is already opened
if (m_DataFileManager.m_Transports.count(WriterRank) == 0)
if (m_DataFileManager.m_Transports.count(SubfileNum) == 0)
{
const std::string subFileName = GetBPSubStreamName(
m_Name, WriterRank, m_Minifooter.HasSubFiles, true);
m_Name, SubfileNum, m_Minifooter.HasSubFiles, true);

m_DataFileManager.OpenFileID(subFileName, WriterRank, Mode::Read,
m_DataFileManager.OpenFileID(subFileName, SubfileNum, Mode::Read,
{{"transport", "File"}}, false);
}
m_DataFileManager.ReadFile(Destination, Length, DataStart + StartOffset,
WriterRank);
SubfileNum);
}

void BP5Reader::PerformGets()
Expand Down
Loading

0 comments on commit 0a43c4e

Please sign in to comment.