Skip to content

Commit

Permalink
Merge pull request #91 from Flow-IPC/polish
Browse files Browse the repository at this point in the history
README improvements + link_test improvements (hello, world example); also resolves Flow-IPC/ipc_core#24.
  • Loading branch information
ygoldfeld authored Mar 13, 2024
2 parents 9153b14 + 74b1f32 commit 512ce51
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 82 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,12 @@ merely a "black box" of capabilities. E.g., for advanced users:

## What's next?

If the [example](#how-does-flow-ipc-help) and/or [promises](#so-flow-ipc-is-for-sending-cap-n-proto-messages) above
If the [example](#how-does-flow-ipc-help) and/or [promises](#so-flow-ipc-is-for-transmitting-capn-proto-messages) above
have piqued your interest:

A little [complete example](https://github.com/Flow-IPC/ipc_shm/tree/main/test/basic/link_test) transmits
(with zero-copy) a structured message containing the string `Hello, world!` and the number `42`.

In the Manual, the [API Overview / Synopsis](https://flow-ipc.github.io/doc/flow-ipc/versions/main/generated/html_public/api_overview.html)
summarizes (with code snippets) what is available in Flow-IPC.

Expand Down
2 changes: 1 addition & 1 deletion ipc_transport_structured
37 changes: 35 additions & 2 deletions test/suite/perf_demo/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ const ipc::session::Client_app::Master_set CLI_APPS
{
CLI_NAME,
/* The ipc::session security model is such that the binary must be invoked *exactly* using the
* command listed here. In *nix land at least this is how that is likely to look.
* (In a production scenario this would be a canonical (absolute, etc.) path.) */
* command listed here. In *nix land at least this is how that is likely to look.
* (In a production scenario this would be a canonical (absolute, etc.) path.) */
fs::path(".") / (S_EXEC_PREFIX + CLI_NAME + S_EXEC_PRE_POSTFIX + S_EXEC_POSTFIX),
::geteuid(), ::getegid()
}
Expand All @@ -79,6 +79,39 @@ void ensure_run_env(const char* argv0, bool srv_else_cli)
}
}

void setup_logging(std::optional<flow::log::Simple_ostream_logger>* std_logger,
std::optional<flow::log::Async_file_logger>* log_logger,
int argc, char const * const * argv, bool srv_else_cli)
{
using flow::util::String_view;
using flow::util::ostream_op_string;
using flow::log::Config;
using flow::log::Sev;
using flow::Flow_log_component;

// `static`s below because must exist throughout the logger's existence; this is an easy way in our little app.

// Console logger setup.
static Config std_log_config;
std_log_config.init_component_to_union_idx_mapping<Flow_log_component>
(1000, Config::standard_component_payload_enum_sparse_length<Flow_log_component>());
std_log_config.init_component_to_union_idx_mapping<ipc::Log_component>
(2000, Config::standard_component_payload_enum_sparse_length<ipc::Log_component>());
std_log_config.init_component_names<Flow_log_component>(flow::S_FLOW_LOG_COMPONENT_NAME_MAP, false, "flow-");
std_log_config.init_component_names<ipc::Log_component>(ipc::S_IPC_LOG_COMPONENT_NAME_MAP, false, "ipc-");
std_logger->emplace(&std_log_config);
FLOW_LOG_SET_CONTEXT(&(**std_logger), Flow_log_component::S_UNCAT);

// This is separate: the IPC/Flow logging will go into this file.
const auto LOG_FILE = ostream_op_string(S_EXEC_PREFIX, srv_else_cli ? SRV_NAME : CLI_NAME, ".log");
const size_t ARG_IDX = srv_else_cli ? 2 : 1;
const auto log_file = (size_t(argc) > ARG_IDX) ? String_view(argv[ARG_IDX]) : String_view(LOG_FILE);
FLOW_LOG_INFO("Opening log file [" << log_file << "] for IPC/Flow logs only.");
static auto log_config = std_log_config;
log_config.configure_default_verbosity(Sev::S_INFO, true);
log_logger->emplace(nullptr, &log_config, log_file, false /* No rotation; we're no serious business. */);
}

void ev_wait(Asio_handle* hndl_of_interest,
bool ev_of_interest_snd_else_rcv, ipc::util::sync_io::Task_ptr&& on_active_ev_func)
{
Expand Down
5 changes: 5 additions & 0 deletions test/suite/perf_demo/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <flow/log/async_file_logger.hpp>
#include <boost/filesystem/path.hpp>
#include <string>
#include <optional>

namespace fs = boost::filesystem;

Expand Down Expand Up @@ -62,6 +63,10 @@ using Blob_mutable = ipc::util::Blob_mutable;

// Invoke from main() from either application to ensure it's being run directly from the expected CWD.
void ensure_run_env(const char* argv0, bool srv_else_cli);
// Invoke from main() to set up console and file logging.
void setup_logging(std::optional<flow::log::Simple_ostream_logger>* std_logger,
std::optional<flow::log::Async_file_logger>* log_logger,
int argc, char const * const * argv, bool srv_else_cli);

void ev_wait(Asio_handle* hndl_of_interest,
bool ev_of_interest_snd_else_rcv, ipc::util::sync_io::Task_ptr&& on_active_ev_func);
42 changes: 14 additions & 28 deletions test/suite/perf_demo/main_cli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,46 +45,32 @@ int main(int argc, char const * const * argv)
using Session = Client_session;
using flow::log::Simple_ostream_logger;
using flow::log::Async_file_logger;
using flow::log::Config;
using flow::log::Sev;
using flow::Flow_log_component;
using flow::util::String_view;
using flow::util::ceil_div;
using boost::promise;
using boost::chrono::microseconds;
using boost::chrono::round;
using std::exception;
using std::optional;

constexpr String_view LOG_FILE = "perf_demo_cli.log";
constexpr int BAD_EXIT = 1;

// Set up logging.
Config std_log_config;
std_log_config.init_component_to_union_idx_mapping<Flow_log_component>
(1000, Config::standard_component_payload_enum_sparse_length<Flow_log_component>());
std_log_config.init_component_to_union_idx_mapping<ipc::Log_component>
(2000, Config::standard_component_payload_enum_sparse_length<ipc::Log_component>());
std_log_config.init_component_names<Flow_log_component>(flow::S_FLOW_LOG_COMPONENT_NAME_MAP, false, "flow-");
std_log_config.init_component_names<ipc::Log_component>(ipc::S_IPC_LOG_COMPONENT_NAME_MAP, false, "ipc-");
Simple_ostream_logger std_logger(&std_log_config);
FLOW_LOG_SET_CONTEXT(&std_logger, Flow_log_component::S_UNCAT);
// This is separate: the IPC/Flow logging will go into this file.
const auto log_file = (argc >= 2) ? String_view(argv[1]) : LOG_FILE;
FLOW_LOG_INFO("Opening log file [" << log_file << "] for IPC/Flow logs only.");
Config log_config = std_log_config;
log_config.configure_default_verbosity(Sev::S_INFO, true);
Async_file_logger log_logger(nullptr, &log_config, log_file, false);
/* Set up logging within this function. We could easily just use `cout` and `cerr` instead, but this
* Flow stuff will give us time stamps and such for free, so why not? Normally, one derives from
* Log_context to do this very trivially, but we just have the one function, main(), so far so: */
optional<Simple_ostream_logger> std_logger;
optional<Async_file_logger> log_logger;
setup_logging(&std_logger, &log_logger, argc, argv, false);
FLOW_LOG_SET_CONTEXT(&(*std_logger), Flow_log_component::S_UNCAT);

#if JEM_ELSE_CLASSIC
ipc::session::shm::arena_lend::Borrower_shm_pool_collection_repository_singleton::get_instance()
.set_logger(&log_logger);
.set_logger(&(*log_logger));
#endif

try
{
ensure_run_env(argv[0], false);

Session session(&log_logger,
Session session(&(*log_logger),
CLI_APPS.find(CLI_NAME)->second,
SRV_APPS.find(SRV_NAME)->second, [](const Error_code&) {});

Expand All @@ -98,11 +84,11 @@ int main(int argc, char const * const * argv)
assert(chans.size() == 2); // Server shall offer us 2 channels. (We could also ask for some above, but we won't.)

auto& chan_raw = chans[0]; // Binary channel for raw-ish tests.
Channel_struc chan_struc(&log_logger, std::move(chans[1]), // Structured channel: SHM-backed underneath.
Channel_struc chan_struc(&(*log_logger), std::move(chans[1]), // Structured channel: SHM-backed underneath.
ipc::transport::struc::Channel_base::S_SERIALIZE_VIA_SESSION_SHM, &session);

run_capnp_over_raw(&std_logger, &chan_raw); // Benchmark 1. capnp data transmission without Flow-IPC zero-copy.
run_capnp_zero_cpy(&std_logger, &chan_struc); // Benchmark 2. Same but with it.
run_capnp_over_raw(&(*std_logger), &chan_raw); // Benchmark 1. capnp data transmission without Flow-IPC zero-copy.
run_capnp_zero_cpy(&(*std_logger), &chan_struc); // Benchmark 2. Same but with it.

/* They already printed detailed timing info; now let's summarize the total results. As you can see it
* just prints b1's RTT, b2's RTT, and the ratio; while reminding how much data was transmitted.
Expand Down Expand Up @@ -142,7 +128,7 @@ int main(int argc, char const * const * argv)
FLOW_LOG_WARNING("Caught exception: [" << exc.what() << "].");
FLOW_LOG_WARNING("(Perhaps you did not execute session-server executable in parallel, or "
"you executed one or both of us oddly?)");
return BAD_EXIT;
return 1;
}

return 0;
Expand Down
68 changes: 22 additions & 46 deletions test/suite/perf_demo/main_srv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,43 +82,29 @@ int main(int argc, char const * const * argv)
{
using flow::log::Simple_ostream_logger;
using flow::log::Async_file_logger;
using flow::log::Config;
using flow::log::Sev;
using flow::Flow_log_component;
using flow::util::String_view;
using flow::util::ceil_div;
using boost::promise;
using boost::lexical_cast;
using std::exception;
using std::optional;

constexpr String_view LOG_FILE = "perf_demo_srv.log";
constexpr float TOTAL_SZ_MI = 1 * 1000;
constexpr int BAD_EXIT = 1;

// Set up logging.
Config std_log_config;
std_log_config.init_component_to_union_idx_mapping<Flow_log_component>
(1000, Config::standard_component_payload_enum_sparse_length<Flow_log_component>());
std_log_config.init_component_to_union_idx_mapping<ipc::Log_component>
(2000, Config::standard_component_payload_enum_sparse_length<ipc::Log_component>());
std_log_config.init_component_names<Flow_log_component>(flow::S_FLOW_LOG_COMPONENT_NAME_MAP, false, "flow-");
std_log_config.init_component_names<ipc::Log_component>(ipc::S_IPC_LOG_COMPONENT_NAME_MAP, false, "ipc-");
Simple_ostream_logger std_logger(&std_log_config);
FLOW_LOG_SET_CONTEXT(&std_logger, Flow_log_component::S_UNCAT);
FLOW_LOG_INFO("FYI -- Usage: " << argv[0] << " [msg payload size in MiB] [log file]");
FLOW_LOG_INFO("FYI -- Defaults: " << TOTAL_SZ_MI << " " << LOG_FILE);
// This is separate: the IPC/Flow logging will go into this file.
const auto log_file = (argc >= 3) ? String_view(argv[2]) : LOG_FILE;
FLOW_LOG_INFO("Opening log file [" << log_file << "] for IPC/Flow logs only.");
Config log_config = std_log_config;
log_config.configure_default_verbosity(Sev::S_INFO, true);
Async_file_logger log_logger(nullptr, &log_config, log_file, false);

/* Set up logging within this function. We could easily just use `cout` and `cerr` instead, but this
* Flow stuff will give us time stamps and such for free, so why not? Normally, one derives from
* Log_context to do this very trivially, but we just have the one function, main(), so far so: */
optional<Simple_ostream_logger> std_logger;
optional<Async_file_logger> log_logger;
setup_logging(&std_logger, &log_logger, argc, argv, true);
FLOW_LOG_SET_CONTEXT(&(*std_logger), Flow_log_component::S_UNCAT);

#if JEM_ELSE_CLASSIC
/* Instructed to do so by ipc::session::shm::arena_lend public docs (short version: this is basically a global,
* and it would not be cool for ipc::session non-global objects to impose their individual loggers on it). */
ipc::session::shm::arena_lend::Borrower_shm_pool_collection_repository_singleton::get_instance()
.set_logger(&log_logger);
.set_logger(&(*log_logger));
#endif

try
Expand Down Expand Up @@ -184,43 +170,33 @@ int main(int argc, char const * const * argv)
/* Accept one session. Use the async-I/O API, as perf for this part really doesn't matter to anyone ever,
* and we're not timing it anyway, and it doesn't affect what happens after. We don't start a thread; just
* use promise/future pattern to wait until Session_server is ready with success or failure. */
Session_server srv(&log_logger, SRV_APPS.find(SRV_NAME)->second, CLI_APPS);
Session_server srv(&(*log_logger), SRV_APPS.find(SRV_NAME)->second, CLI_APPS);
FLOW_LOG_INFO("Session-server started. You can now invoke session-client executable from same CWD; "
"it will open session with some channel(s).");

Session session;
promise<void> accepted_promise;
bool ok = false;
promise<Error_code> accepted_promise;
Session_server::Channels chans;
srv.async_accept(&session, &chans, nullptr, nullptr,
[](auto&&...) -> size_t { return 2; }, // 2 init-channels to open.
[](auto&&...) {},
[&](const Error_code& err_code)
{
if (err_code)
{
FLOW_LOG_WARNING("Error is totally unexpected. Error: [" << err_code << "] [" << err_code.message() << "].");
}
else
{
FLOW_LOG_INFO("Session accepted: [" << session << "].");
ok = true;
}
// Either way though:
accepted_promise.set_value();
accepted_promise.set_value(err_code);
});
accepted_promise.get_future().wait();
if (!ok)
const auto err_code = accepted_promise.get_future().get();
if (err_code)
{
return BAD_EXIT;
throw Runtime_error(err_code, "totally unexpected error while accepting");
}
// else
FLOW_LOG_INFO("Session accepted: [" << session << "].");

/* Ignore session errors (see disclaimer comment at top of for general justification).
* Basically we know it'll be, if anything, just the client disconnecting from us when it's done; and by that
* point we'll be shutting down anyway. And any transmission error will be detected along the channel of
* transmission. As a not-serious-production-app, no need for this stuff. */
session.init_handlers([](const Error_code&) {});
session.init_handlers([](auto&&...) {});
// Session in PEER state (opened fully); so channels are ready too.

/* For now there are just these two channels. (See above where we specified `return 2`.)
Expand All @@ -234,18 +210,18 @@ int main(int argc, char const * const * argv)
auto& chan_raw = chans[0];

// And this one we immediately upgrade to a Flow-IPC transport::struc::Channel.
Channel_struc chan_struc(&log_logger, std::move(chans[1]), // Structured channel: SHM-backed underneath.
Channel_struc chan_struc(&(*log_logger), std::move(chans[1]), // Structured channel: SHM-backed underneath.
ipc::transport::struc::Channel_base::S_SERIALIZE_VIA_SESSION_SHM, &session);

run_capnp_over_raw(&std_logger, &chan_raw); // Benchmark 1. capnp data transmission without Flow-IPC zero-copy.
run_capnp_zero_copy(&std_logger, &chan_struc, &session); // Benchmark 2. Same but with it.
run_capnp_over_raw(&(*std_logger), &chan_raw); // Benchmark 1. capnp data transmission without Flow-IPC zero-copy.
run_capnp_zero_copy(&(*std_logger), &chan_struc, &session); // Benchmark 2. Same but with it.

FLOW_LOG_INFO("Exiting.");
} // try
catch (const exception& exc)
{
FLOW_LOG_WARNING("Caught exception: [" << exc.what() << "].");
return BAD_EXIT;
return 1;
}

return 0;
Expand Down

0 comments on commit 512ce51

Please sign in to comment.