Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

README improvements + link_test improvements (hello, world example); also resolves Flow-IPC/ipc_core#24. #91

Merged
merged 10 commits into from
Mar 13, 2024
Merged
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,12 @@ merely a "black box" of capabilities. E.g., for advanced users:

## What's next?

If the [example](#how-does-flow-ipc-help) and/or [promises](#so-flow-ipc-is-for-sending-cap-n-proto-messages) above
If the [example](#how-does-flow-ipc-help) and/or [promises](#so-flow-ipc-is-for-transmitting-capn-proto-messages) above
have piqued your interest:

A little [complete example](https://github.com/Flow-IPC/ipc_shm/tree/main/test/basic/link_test) transmits
(with zero-copy) a structured message containing the string `Hello, world!` and the number `42`.

In the Manual, the [API Overview / Synopsis](https://flow-ipc.github.io/doc/flow-ipc/versions/main/generated/html_public/api_overview.html)
summarizes (with code snippets) what is available in Flow-IPC.

Expand Down
2 changes: 1 addition & 1 deletion ipc_transport_structured
37 changes: 35 additions & 2 deletions test/suite/perf_demo/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ const ipc::session::Client_app::Master_set CLI_APPS
{
CLI_NAME,
/* The ipc::session security model is such that the binary must be invoked *exactly* using the
* command listed here. In *nix land at least this is how that is likely to look.
* (In a production scenario this would be a canonical (absolute, etc.) path.) */
* command listed here. In *nix land at least this is how that is likely to look.
* (In a production scenario this would be a canonical (absolute, etc.) path.) */
fs::path(".") / (S_EXEC_PREFIX + CLI_NAME + S_EXEC_PRE_POSTFIX + S_EXEC_POSTFIX),
::geteuid(), ::getegid()
}
Expand All @@ -79,6 +79,39 @@ void ensure_run_env(const char* argv0, bool srv_else_cli)
}
}

void setup_logging(std::optional<flow::log::Simple_ostream_logger>* std_logger,
std::optional<flow::log::Async_file_logger>* log_logger,
int argc, char const * const * argv, bool srv_else_cli)
{
using flow::util::String_view;
using flow::util::ostream_op_string;
using flow::log::Config;
using flow::log::Sev;
using flow::Flow_log_component;

// `static`s below because must exist throughout the logger's existence; this is an easy way in our little app.

// Console logger setup.
static Config std_log_config;
std_log_config.init_component_to_union_idx_mapping<Flow_log_component>
(1000, Config::standard_component_payload_enum_sparse_length<Flow_log_component>());
std_log_config.init_component_to_union_idx_mapping<ipc::Log_component>
(2000, Config::standard_component_payload_enum_sparse_length<ipc::Log_component>());
std_log_config.init_component_names<Flow_log_component>(flow::S_FLOW_LOG_COMPONENT_NAME_MAP, false, "flow-");
std_log_config.init_component_names<ipc::Log_component>(ipc::S_IPC_LOG_COMPONENT_NAME_MAP, false, "ipc-");
std_logger->emplace(&std_log_config);
FLOW_LOG_SET_CONTEXT(&(**std_logger), Flow_log_component::S_UNCAT);

// This is separate: the IPC/Flow logging will go into this file.
const auto LOG_FILE = ostream_op_string(S_EXEC_PREFIX, srv_else_cli ? SRV_NAME : CLI_NAME, ".log");
const size_t ARG_IDX = srv_else_cli ? 2 : 1;
const auto log_file = (size_t(argc) > ARG_IDX) ? String_view(argv[ARG_IDX]) : String_view(LOG_FILE);
FLOW_LOG_INFO("Opening log file [" << log_file << "] for IPC/Flow logs only.");
static auto log_config = std_log_config;
log_config.configure_default_verbosity(Sev::S_INFO, true);
log_logger->emplace(nullptr, &log_config, log_file, false /* No rotation; we're no serious business. */);
}

void ev_wait(Asio_handle* hndl_of_interest,
bool ev_of_interest_snd_else_rcv, ipc::util::sync_io::Task_ptr&& on_active_ev_func)
{
Expand Down
5 changes: 5 additions & 0 deletions test/suite/perf_demo/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <flow/log/async_file_logger.hpp>
#include <boost/filesystem/path.hpp>
#include <string>
#include <optional>

namespace fs = boost::filesystem;

Expand Down Expand Up @@ -62,6 +63,10 @@ using Blob_mutable = ipc::util::Blob_mutable;

// Invoke from main() from either application to ensure it's being run directly from the expected CWD.
void ensure_run_env(const char* argv0, bool srv_else_cli);
// Invoke from main() to set up console and file logging.
void setup_logging(std::optional<flow::log::Simple_ostream_logger>* std_logger,
std::optional<flow::log::Async_file_logger>* log_logger,
int argc, char const * const * argv, bool srv_else_cli);

void ev_wait(Asio_handle* hndl_of_interest,
bool ev_of_interest_snd_else_rcv, ipc::util::sync_io::Task_ptr&& on_active_ev_func);
42 changes: 14 additions & 28 deletions test/suite/perf_demo/main_cli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,46 +45,32 @@ int main(int argc, char const * const * argv)
using Session = Client_session;
using flow::log::Simple_ostream_logger;
using flow::log::Async_file_logger;
using flow::log::Config;
using flow::log::Sev;
using flow::Flow_log_component;
using flow::util::String_view;
using flow::util::ceil_div;
using boost::promise;
using boost::chrono::microseconds;
using boost::chrono::round;
using std::exception;
using std::optional;

constexpr String_view LOG_FILE = "perf_demo_cli.log";
constexpr int BAD_EXIT = 1;

// Set up logging.
Config std_log_config;
std_log_config.init_component_to_union_idx_mapping<Flow_log_component>
(1000, Config::standard_component_payload_enum_sparse_length<Flow_log_component>());
std_log_config.init_component_to_union_idx_mapping<ipc::Log_component>
(2000, Config::standard_component_payload_enum_sparse_length<ipc::Log_component>());
std_log_config.init_component_names<Flow_log_component>(flow::S_FLOW_LOG_COMPONENT_NAME_MAP, false, "flow-");
std_log_config.init_component_names<ipc::Log_component>(ipc::S_IPC_LOG_COMPONENT_NAME_MAP, false, "ipc-");
Simple_ostream_logger std_logger(&std_log_config);
FLOW_LOG_SET_CONTEXT(&std_logger, Flow_log_component::S_UNCAT);
// This is separate: the IPC/Flow logging will go into this file.
const auto log_file = (argc >= 2) ? String_view(argv[1]) : LOG_FILE;
FLOW_LOG_INFO("Opening log file [" << log_file << "] for IPC/Flow logs only.");
Config log_config = std_log_config;
log_config.configure_default_verbosity(Sev::S_INFO, true);
Async_file_logger log_logger(nullptr, &log_config, log_file, false);
/* Set up logging within this function. We could easily just use `cout` and `cerr` instead, but this
* Flow stuff will give us time stamps and such for free, so why not? Normally, one derives from
* Log_context to do this very trivially, but we just have the one function, main(), so far so: */
optional<Simple_ostream_logger> std_logger;
optional<Async_file_logger> log_logger;
setup_logging(&std_logger, &log_logger, argc, argv, false);
FLOW_LOG_SET_CONTEXT(&(*std_logger), Flow_log_component::S_UNCAT);

#if JEM_ELSE_CLASSIC
ipc::session::shm::arena_lend::Borrower_shm_pool_collection_repository_singleton::get_instance()
.set_logger(&log_logger);
.set_logger(&(*log_logger));
#endif

try
{
ensure_run_env(argv[0], false);

Session session(&log_logger,
Session session(&(*log_logger),
CLI_APPS.find(CLI_NAME)->second,
SRV_APPS.find(SRV_NAME)->second, [](const Error_code&) {});

Expand All @@ -98,11 +84,11 @@ int main(int argc, char const * const * argv)
assert(chans.size() == 2); // Server shall offer us 2 channels. (We could also ask for some above, but we won't.)

auto& chan_raw = chans[0]; // Binary channel for raw-ish tests.
Channel_struc chan_struc(&log_logger, std::move(chans[1]), // Structured channel: SHM-backed underneath.
Channel_struc chan_struc(&(*log_logger), std::move(chans[1]), // Structured channel: SHM-backed underneath.
ipc::transport::struc::Channel_base::S_SERIALIZE_VIA_SESSION_SHM, &session);

run_capnp_over_raw(&std_logger, &chan_raw); // Benchmark 1. capnp data transmission without Flow-IPC zero-copy.
run_capnp_zero_cpy(&std_logger, &chan_struc); // Benchmark 2. Same but with it.
run_capnp_over_raw(&(*std_logger), &chan_raw); // Benchmark 1. capnp data transmission without Flow-IPC zero-copy.
run_capnp_zero_cpy(&(*std_logger), &chan_struc); // Benchmark 2. Same but with it.

/* They already printed detailed timing info; now let's summarize the total results. As you can see it
* just prints b1's RTT, b2's RTT, and the ratio; while reminding how much data was transmitted.
Expand Down Expand Up @@ -142,7 +128,7 @@ int main(int argc, char const * const * argv)
FLOW_LOG_WARNING("Caught exception: [" << exc.what() << "].");
FLOW_LOG_WARNING("(Perhaps you did not execute session-server executable in parallel, or "
"you executed one or both of us oddly?)");
return BAD_EXIT;
return 1;
}

return 0;
Expand Down
68 changes: 22 additions & 46 deletions test/suite/perf_demo/main_srv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,43 +82,29 @@ int main(int argc, char const * const * argv)
{
using flow::log::Simple_ostream_logger;
using flow::log::Async_file_logger;
using flow::log::Config;
using flow::log::Sev;
using flow::Flow_log_component;
using flow::util::String_view;
using flow::util::ceil_div;
using boost::promise;
using boost::lexical_cast;
using std::exception;
using std::optional;

constexpr String_view LOG_FILE = "perf_demo_srv.log";
constexpr float TOTAL_SZ_MI = 1 * 1000;
constexpr int BAD_EXIT = 1;

// Set up logging.
Config std_log_config;
std_log_config.init_component_to_union_idx_mapping<Flow_log_component>
(1000, Config::standard_component_payload_enum_sparse_length<Flow_log_component>());
std_log_config.init_component_to_union_idx_mapping<ipc::Log_component>
(2000, Config::standard_component_payload_enum_sparse_length<ipc::Log_component>());
std_log_config.init_component_names<Flow_log_component>(flow::S_FLOW_LOG_COMPONENT_NAME_MAP, false, "flow-");
std_log_config.init_component_names<ipc::Log_component>(ipc::S_IPC_LOG_COMPONENT_NAME_MAP, false, "ipc-");
Simple_ostream_logger std_logger(&std_log_config);
FLOW_LOG_SET_CONTEXT(&std_logger, Flow_log_component::S_UNCAT);
FLOW_LOG_INFO("FYI -- Usage: " << argv[0] << " [msg payload size in MiB] [log file]");
FLOW_LOG_INFO("FYI -- Defaults: " << TOTAL_SZ_MI << " " << LOG_FILE);
// This is separate: the IPC/Flow logging will go into this file.
const auto log_file = (argc >= 3) ? String_view(argv[2]) : LOG_FILE;
FLOW_LOG_INFO("Opening log file [" << log_file << "] for IPC/Flow logs only.");
Config log_config = std_log_config;
log_config.configure_default_verbosity(Sev::S_INFO, true);
Async_file_logger log_logger(nullptr, &log_config, log_file, false);

/* Set up logging within this function. We could easily just use `cout` and `cerr` instead, but this
* Flow stuff will give us time stamps and such for free, so why not? Normally, one derives from
* Log_context to do this very trivially, but we just have the one function, main(), so far so: */
optional<Simple_ostream_logger> std_logger;
optional<Async_file_logger> log_logger;
setup_logging(&std_logger, &log_logger, argc, argv, true);
FLOW_LOG_SET_CONTEXT(&(*std_logger), Flow_log_component::S_UNCAT);

#if JEM_ELSE_CLASSIC
/* Instructed to do so by ipc::session::shm::arena_lend public docs (short version: this is basically a global,
* and it would not be cool for ipc::session non-global objects to impose their individual loggers on it). */
ipc::session::shm::arena_lend::Borrower_shm_pool_collection_repository_singleton::get_instance()
.set_logger(&log_logger);
.set_logger(&(*log_logger));
#endif

try
Expand Down Expand Up @@ -184,43 +170,33 @@ int main(int argc, char const * const * argv)
/* Accept one session. Use the async-I/O API, as perf for this part really doesn't matter to anyone ever,
* and we're not timing it anyway, and it doesn't affect what happens after. We don't start a thread; just
* use promise/future pattern to wait until Session_server is ready with success or failure. */
Session_server srv(&log_logger, SRV_APPS.find(SRV_NAME)->second, CLI_APPS);
Session_server srv(&(*log_logger), SRV_APPS.find(SRV_NAME)->second, CLI_APPS);
FLOW_LOG_INFO("Session-server started. You can now invoke session-client executable from same CWD; "
"it will open session with some channel(s).");

Session session;
promise<void> accepted_promise;
bool ok = false;
promise<Error_code> accepted_promise;
Session_server::Channels chans;
srv.async_accept(&session, &chans, nullptr, nullptr,
[](auto&&...) -> size_t { return 2; }, // 2 init-channels to open.
[](auto&&...) {},
[&](const Error_code& err_code)
{
if (err_code)
{
FLOW_LOG_WARNING("Error is totally unexpected. Error: [" << err_code << "] [" << err_code.message() << "].");
}
else
{
FLOW_LOG_INFO("Session accepted: [" << session << "].");
ok = true;
}
// Either way though:
accepted_promise.set_value();
accepted_promise.set_value(err_code);
});
accepted_promise.get_future().wait();
if (!ok)
const auto err_code = accepted_promise.get_future().get();
if (err_code)
{
return BAD_EXIT;
throw Runtime_error(err_code, "totally unexpected error while accepting");
}
// else
FLOW_LOG_INFO("Session accepted: [" << session << "].");

/* Ignore session errors (see disclaimer comment at top of for general justification).
* Basically we know it'll be, if anything, just the client disconnecting from us when it's done; and by that
* point we'll be shutting down anyway. And any transmission error will be detected along the channel of
* transmission. As a not-serious-production-app, no need for this stuff. */
session.init_handlers([](const Error_code&) {});
session.init_handlers([](auto&&...) {});
// Session in PEER state (opened fully); so channels are ready too.

/* For now there are just these two channels. (See above where we specified `return 2`.)
Expand All @@ -234,18 +210,18 @@ int main(int argc, char const * const * argv)
auto& chan_raw = chans[0];

// And this one we immediately upgrade to a Flow-IPC transport::struc::Channel.
Channel_struc chan_struc(&log_logger, std::move(chans[1]), // Structured channel: SHM-backed underneath.
Channel_struc chan_struc(&(*log_logger), std::move(chans[1]), // Structured channel: SHM-backed underneath.
ipc::transport::struc::Channel_base::S_SERIALIZE_VIA_SESSION_SHM, &session);

run_capnp_over_raw(&std_logger, &chan_raw); // Benchmark 1. capnp data transmission without Flow-IPC zero-copy.
run_capnp_zero_copy(&std_logger, &chan_struc, &session); // Benchmark 2. Same but with it.
run_capnp_over_raw(&(*std_logger), &chan_raw); // Benchmark 1. capnp data transmission without Flow-IPC zero-copy.
run_capnp_zero_copy(&(*std_logger), &chan_struc, &session); // Benchmark 2. Same but with it.

FLOW_LOG_INFO("Exiting.");
} // try
catch (const exception& exc)
{
FLOW_LOG_WARNING("Caught exception: [" << exc.what() << "].");
return BAD_EXIT;
return 1;
}

return 0;
Expand Down
Loading