Skip to content

Commit

Permalink
Rewrite a lot of stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
Capital-Asterisk committed Aug 5, 2024
1 parent b6bd827 commit 0899c8b
Show file tree
Hide file tree
Showing 42 changed files with 2,132 additions and 1,249 deletions.
744 changes: 744 additions & 0 deletions src/osp/framework/builder.h

Large diffs are not rendered by default.

116 changes: 77 additions & 39 deletions src/osp/tasks/top_execute.cpp → src/osp/framework/executor.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/**
* Open Space Program
* Copyright © 2019-2022 Open Space Program Project
* Copyright © 2019-2024 Open Space Program Project
*
* MIT License
*
Expand All @@ -22,55 +22,93 @@
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#include "top_execute.h"
#include "top_worker.h"
#include "execute.h"
#include "executor.h"

#include <Corrade/Containers/ArrayViewStl.h>

#include <entt/core/any.hpp>
#include <spdlog/fmt/ostr.h>

#include <algorithm>
#include <iomanip>
#include <vector>

namespace osp
namespace osp::fw
{


void SingleThreadedExecutor::load(Framework& rFramework)
{
m_graph = osp::make_exec_graph(rFramework.tasks);
exec_conform(rFramework.tasks, m_execContext);
m_execContext.doLogging = m_log != nullptr;
}

void SingleThreadedExecutor::run(Framework& rAppTasks, PipelineId pipeline)
{
exec_request_run(m_execContext, pipeline);
}

void top_run_blocking(Tasks const& tasks, TaskGraph const& graph, TopTaskDataVec_t& rTaskData, ArrayView<entt::any> topData, ExecContext& rExec, WorkerContext worker)
void SingleThreadedExecutor::signal(Framework& rAppTasks, PipelineId pipeline)
{
std::vector<entt::any> topDataRefs;
exec_signal(m_execContext, pipeline);
}

// Run until there's no tasks left to run
while (true)
void SingleThreadedExecutor::wait(Framework& rAppTasks)
{
if (m_log != nullptr)
{
auto const runTasksLeft = rExec.tasksQueuedRun.size();
//auto const blockedTasksLeft = rExec.tasksQueuedBlocked.size();
m_log->info("\n>>>>>>>>>> Previous State Changes\n{}\n>>>>>>>>>> Current State\n{}\n",
WriteLog {rAppTasks.tasks, rAppTasks.taskImpl, m_graph, m_execContext},
WriteState{rAppTasks.tasks, rAppTasks.taskImpl, m_graph, m_execContext} );
m_execContext.logMsg.clear();
}

if (runTasksLeft != 0)
{
TaskId const task = rExec.tasksQueuedRun[0];
TopTask &rTopTask = rTaskData[task];
exec_update(rAppTasks.tasks, m_graph, m_execContext);
run_blocking(rAppTasks.tasks, m_graph, rAppTasks.taskImpl, rAppTasks.data, m_execContext);

if (m_log != nullptr)
{
m_log->info("\n>>>>>>>>>> New State Changes\n{}",
WriteLog{rAppTasks.tasks, rAppTasks.taskImpl, m_graph, m_execContext} );
m_execContext.logMsg.clear();
}
}

bool SingleThreadedExecutor::is_running(Framework const& appTasks)
{
return m_execContext.hasRequestRun || (m_execContext.pipelinesRunning != 0);
}

topDataRefs.clear();
topDataRefs.reserve(rTopTask.m_dataUsed.size());
for (TopDataId const dataId : rTopTask.m_dataUsed)
{
topDataRefs.push_back((dataId != lgrn::id_null<TopDataId>())
? topData[dataId].as_ref()
: entt::any{});
}

bool const shouldRun = (rTopTask.m_func != nullptr);
void SingleThreadedExecutor::run_blocking(
Tasks const &tasks,
TaskGraph const &graph,
KeyedVec<TaskId, TaskImpl> &rTaskImpl,
KeyedVec<DataId, entt::any> &fwData,
ExecContext &rExec,
WorkerContext worker)
{
std::vector<entt::any> argumentRefs;

// Task function is called here
TaskActions const status = shouldRun ? rTopTask.m_func(worker, topDataRefs) : TaskActions{};
while (rExec.tasksQueuedRun.size() != 0)
{
TaskId const willRunId = rExec.tasksQueuedRun[0];
TaskImpl &rWillRunImpl = rTaskImpl[willRunId];

complete_task(tasks, graph, rExec, task, status);
if (rWillRunImpl.func != nullptr)
{
argumentRefs.clear();
argumentRefs.reserve(rWillRunImpl.args.size());
for (DataId const dataId : rWillRunImpl.args)
{
argumentRefs.push_back(dataId.has_value() ? fwData[dataId].as_ref() : entt::any{});
}

TaskActions const status = rWillRunImpl.func(worker, argumentRefs);
complete_task(tasks, graph, rExec, willRunId, status);
}
else
{
break;
// Allow tasks to not have a function.
complete_task(tasks, graph, rExec, willRunId, {});
}

exec_update(tasks, graph, rExec);
Expand All @@ -94,7 +132,7 @@ static void write_task_requirements(std::ostream &rStream, Tasks const& tasks, T
}


std::ostream& operator<<(std::ostream& rStream, TopExecWriteState const& write)
std::ostream& operator<<(std::ostream& rStream, SingleThreadedExecutor::WriteState const& write)
{
auto const& [tasks, taskData, graph, exec] = write;

Expand Down Expand Up @@ -144,7 +182,7 @@ std::ostream& operator<<(std::ostream& rStream, TopExecWriteState const& write)

std::size_t charsUsed = 7; // "PL###" + ": "

if (info.stageType != lgrn::id_null<PipelineInfo::stage_type_t>())
if (info.stageType != lgrn::id_null<PipelineInfo::StageTypeId>())
{
auto const stageNames = ArrayView<std::string_view const>{PipelineInfo::sm_stageNames[info.stageType]};

Expand Down Expand Up @@ -198,17 +236,17 @@ std::ostream& operator<<(std::ostream& rStream, TopExecWriteState const& write)

for (auto const [task, block] : exec.tasksQueuedBlocked.each())
{
rStream << "Task Blocked: " << "TASK" << TaskInt(task) << " - " << taskData[task].m_debugName << "\n";
rStream << "Task Blocked: " << "TASK" << TaskInt(task) << " - " << taskData[task].debugName << "\n";

write_task_requirements(rStream, tasks, graph, exec, task);
}

return rStream;
}

std::ostream& operator<<(std::ostream& rStream, TopExecWriteLog const& write)
std::ostream& operator<<(std::ostream& rStream, SingleThreadedExecutor::WriteLog const& write)
{
auto const& [tasks, taskData, graph, exec] = write;
auto const& [tasks, rTaskImpl, graph, exec] = write;

auto const stage_name = [&tasks=tasks] (PipelineId pl, StageId stg) -> std::string_view
{
Expand All @@ -224,7 +262,7 @@ std::ostream& operator<<(std::ostream& rStream, TopExecWriteLog const& write)
}
};

auto const visitMsg = [&rStream, &tasks=tasks, &taskData=taskData, &graph=graph, &stage_name] (auto&& msg)
auto const visitMsg = [&rStream, &tasks=tasks, &rTaskImpl=rTaskImpl, &graph=graph, &stage_name] (auto&& msg)
{
using MSG_T = std::decay_t<decltype(msg)>;
if constexpr (std::is_same_v<MSG_T, ExecContext::UpdateStart>)
Expand Down Expand Up @@ -270,7 +308,7 @@ std::ostream& operator<<(std::ostream& rStream, TopExecWriteLog const& write)
rStream << " Enqueue " << (msg.blocked ? "Blocked" : "Run")
<< " on PL" << std::setw(3) << std::left << PipelineInt(msg.pipeline)
<< "(" << stage_name(msg.pipeline, msg.stage) << ")"
<< " TASK" << TaskInt(msg.task) << " - " << taskData[msg.task].m_debugName << "\n";
<< " TASK" << TaskInt(msg.task) << " - " << rTaskImpl[msg.task].debugName << "\n";
}
else if constexpr (std::is_same_v<MSG_T, ExecContext::EnqueueTaskReq>)
{
Expand All @@ -284,7 +322,7 @@ std::ostream& operator<<(std::ostream& rStream, TopExecWriteLog const& write)
}
else if constexpr (std::is_same_v<MSG_T, ExecContext::CompleteTask>)
{
rStream << "Complete TASK" << TaskInt(msg.task) << " - " << taskData[msg.task].m_debugName << "\n";
rStream << "Complete TASK" << TaskInt(msg.task) << " - " << rTaskImpl[msg.task].debugName << "\n";
}
else if constexpr (std::is_same_v<MSG_T, ExecContext::ExternalRunRequest>)
{
Expand Down
93 changes: 93 additions & 0 deletions src/osp/framework/executor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/**
* Open Space Program
* Copyright © 2019-2024 Open Space Program Project
*
* MIT License
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#pragma once

#include "framework.h"

#include "../tasks/execute.h"

#include <spdlog/logger.h>

namespace osp::fw
{


class SingleThreadedExecutor final : public IExecutor
{
public:

struct WriteState
{
Tasks const &tasks;
KeyedVec<TaskId, TaskImpl> &rTaskImpl;
TaskGraph const &graph;
ExecContext const &exec;

friend std::ostream& operator<<(std::ostream& rStream, WriteState const& write);
};

struct WriteLog
{
Tasks const &tasks;
KeyedVec<TaskId, TaskImpl> const &rTaskImpl;
TaskGraph const &graph;
ExecContext const &exec;

friend std::ostream& operator<<(std::ostream& rStream, WriteLog const& write);
};

void load(Framework& rAppTasks) override;

void run(Framework& rAppTasks, PipelineId pipeline) override;

void signal(Framework& rAppTasks, PipelineId pipeline) override;

void wait(Framework& rAppTasks) override;

bool is_running(Framework const& rAppTasks) override;

std::shared_ptr<spdlog::logger> m_log;

private:

static void run_blocking(
Tasks const &tasks,
TaskGraph const &graph,
KeyedVec<TaskId, TaskImpl> &rTaskImpl,
KeyedVec<DataId, entt::any> &fwData,
ExecContext &rExec,
WorkerContext worker = {} );


ExecContext m_execContext;
TaskGraph m_graph;


};




} // namespace testapp
Loading

0 comments on commit 0899c8b

Please sign in to comment.