diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index e10285b..656cfea 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -87,6 +87,9 @@ set(ProtoFiles ${CMAKE_CURRENT_SOURCE_DIR}/tateyama/proto/kvs/transaction.proto ${CMAKE_CURRENT_SOURCE_DIR}/tateyama/proto/kvs/data.proto ${CMAKE_CURRENT_SOURCE_DIR}/tateyama/proto/diagnostics.proto + ${CMAKE_CURRENT_SOURCE_DIR}/tateyama/proto/session/request.proto + ${CMAKE_CURRENT_SOURCE_DIR}/tateyama/proto/session/response.proto + ${CMAKE_CURRENT_SOURCE_DIR}/tateyama/proto/session/diagnostic.proto ) # By default, PROTOBUF_GENERATE_CPP generates file path for .pb.cc as if they are in the same directory. diff --git a/src/tateyama/datastore/backup.cpp b/src/tateyama/datastore/backup.cpp index 7bd172d..d6693e4 100644 --- a/src/tateyama/datastore/backup.cpp +++ b/src/tateyama/datastore/backup.cpp @@ -23,7 +23,6 @@ #include -#include #include #include "tateyama/configuration/bootstrap_configuration.h" @@ -94,32 +93,6 @@ static bool prompt(std::string_view msg) return rtnv; } -static std::string database_name() { - if (auto conf = tateyama::api::configuration::create_configuration(FLAGS_conf, tateyama::configuration::default_property_for_bootstrap()); conf != nullptr) { - auto endpoint_config = conf->get_section("ipc_endpoint"); - if (endpoint_config == nullptr) { - std::cerr << "cannot find ipc_endpoint section in the configuration" << std::endl; - exit(tgctl::return_code::err); - } - auto database_name_opt = endpoint_config->get("database_name"); - if (!database_name_opt) { - std::cerr << "cannot find database_name at the section in the configuration" << std::endl; - exit(tgctl::return_code::err); - } - return database_name_opt.value(); - } - std::cerr << "error in create_configuration" << std::endl; - exit(2); -} - -static std::string digest() { - auto bst_conf = configuration::bootstrap_configuration::create_bootstrap_configuration(FLAGS_conf); - if (bst_conf.valid()) { - return bst_conf.digest(); - } - return {}; -} - tgctl::return_code tgctl_backup_create(const std::string& path_to_backup) { std::unique_ptr monitor_output{}; @@ -131,7 +104,7 @@ tgctl::return_code tgctl_backup_create(const std::string& path_to_backup) { auto rtnv = tgctl::return_code::ok; authentication::auth_options(); try { - auto transport = std::make_unique(database_name(), digest(), tateyama::framework::service_id_datastore); + auto transport = std::make_unique(tateyama::framework::service_id_datastore); ::tateyama::proto::datastore::request::Request requestBegin{}; auto backup_begin = requestBegin.mutable_backup_begin(); if (!FLAGS_label.empty()) { @@ -240,7 +213,7 @@ tgctl::return_code tgctl_backup_estimate() { authentication::auth_options(); try { - auto transport = std::make_unique(database_name(), digest(), tateyama::framework::service_id_datastore); + auto transport = std::make_unique(tateyama::framework::service_id_datastore); ::tateyama::proto::datastore::request::Request request{}; request.mutable_backup_estimate(); auto response = transport->send<::tateyama::proto::datastore::response::BackupEstimate>(request); @@ -268,7 +241,7 @@ tgctl::return_code tgctl_backup_estimate() { } } } catch (std::runtime_error &e) { - std::cerr << "could not connect to database with name " << database_name() << std::endl; + std::cerr << "could not connect to database with name " << tateyama::bootstrap::wire::transport::database_name() << std::endl; } rtnv = tgctl::return_code::err; @@ -302,7 +275,7 @@ tgctl::return_code tgctl_restore_backup(const std::string& path_to_backup) { authentication::auth_options(); try { - auto transport = std::make_unique(database_name(), digest(), tateyama::framework::service_id_datastore); + auto transport = std::make_unique(tateyama::framework::service_id_datastore); ::tateyama::proto::datastore::request::Request request{}; auto restore_begin = request.mutable_restore_begin(); restore_begin->set_backup_directory(path_to_backup); @@ -334,7 +307,7 @@ tgctl::return_code tgctl_restore_backup(const std::string& path_to_backup) { } } } catch (std::runtime_error &e) { - std::cerr << "could not connect to database with name " << database_name() << std::endl; + std::cerr << "could not connect to database with name " << tateyama::bootstrap::wire::transport::database_name() << std::endl; } rtnv = tgctl::return_code::err; @@ -380,7 +353,7 @@ tgctl::return_code tgctl_restore_backup_use_file_list(const std::string& path_to std::cerr << "option --nokeep_backup is ignored when --use-file-list is specified" << std::endl; } - auto transport = std::make_unique(database_name(), digest(), tateyama::framework::service_id_datastore); + auto transport = std::make_unique(tateyama::framework::service_id_datastore); ::tateyama::proto::datastore::request::Request request{}; auto restore_begin = request.mutable_restore_begin(); auto entries = restore_begin->mutable_entries(); @@ -421,7 +394,7 @@ tgctl::return_code tgctl_restore_backup_use_file_list(const std::string& path_to } } } catch (std::runtime_error &e) { - std::cerr << "could not connect to database with name " << database_name() << std::endl; + std::cerr << "could not connect to database with name " << tateyama::bootstrap::wire::transport::database_name() << std::endl; } rtnv = tgctl::return_code::err; @@ -443,7 +416,7 @@ tgctl::return_code tgctl_restore_tag(const std::string& tag_name) { authentication::auth_options(); try { - auto transport = std::make_unique(database_name(), digest(), tateyama::framework::service_id_datastore); + auto transport = std::make_unique(tateyama::framework::service_id_datastore); ::tateyama::proto::datastore::request::Request request{}; auto restore_begin = request.mutable_restore_begin(); @@ -472,7 +445,7 @@ tgctl::return_code tgctl_restore_tag(const std::string& tag_name) { } } } catch (std::runtime_error &e) { - std::cerr << "could not connect to database with name " << database_name() << std::endl; + std::cerr << "could not connect to database with name " << tateyama::bootstrap::wire::transport::database_name() << std::endl; } rtnv = tgctl::return_code::err; diff --git a/src/tateyama/proto/diagnostics.proto b/src/tateyama/proto/diagnostics.proto index fd81526..e82ca2d 100644 --- a/src/tateyama/proto/diagnostics.proto +++ b/src/tateyama/proto/diagnostics.proto @@ -57,6 +57,9 @@ enum Code { // request header or payload is not valid. INVALID_REQUEST = 41; + + // operation was canceled by user or system. + OPERATION_CANCELED = 42; } // diagnostic record. diff --git a/src/tateyama/proto/session/diagnostic.proto b/src/tateyama/proto/session/diagnostic.proto new file mode 100644 index 0000000..4f5be40 --- /dev/null +++ b/src/tateyama/proto/session/diagnostic.proto @@ -0,0 +1,42 @@ +syntax = "proto3"; + +package tateyama.proto.session.diagnostic; + +// the error code. +enum ErrorCode { + // the error code is not set. + ERROR_CODE_NOT_SPECIFIED = 0; + + // the unknown error was occurred. + UNKNOWN = 1; + + // the target session is not found. + SESSION_NOT_FOUND = 2; + + // the session label is not unique within the target sessions. + SESSION_AMBIGUOUS = 3; + + // the operation was not permitted. + OPERATION_NOT_PERMITTED = 4; + + // the target session variable is not declared. + SESSION_VARIABLE_NOT_DECLARED = 5; + + // the setting value is invalid for the target variable. + SESSION_VARIABLE_INVALID_VALUE = 6; +} + +// the error information of the session control operations. +message Error { + // the error code. + ErrorCode error_code = 1; + + // the error message (optional). + string message = 2; + + // the error message code (optional). + string message_code = 3; + + // the arguments for the message code (optional). + repeated string message_arguments = 4; +} diff --git a/src/tateyama/proto/session/request.proto b/src/tateyama/proto/session/request.proto new file mode 100644 index 0000000..f96cf3e --- /dev/null +++ b/src/tateyama/proto/session/request.proto @@ -0,0 +1,98 @@ +syntax = "proto3"; + +package tateyama.proto.session.request; + +// the request message to session pseudo service. +message Request { + // service message version (major) + uint64 service_message_version_major = 1; + + // service message version (minor) + uint64 service_message_version_minor = 2; + + // reserved for system use + reserved 3 to 10; + + // the request command. + oneof command { + // obtains session information. + SessionGet session_get = 11; + + // obtains list of active sessions. + SessionList session_list = 12; + + // request shutdown to the session. + SessionShutdown session_shutdown = 13; + + // set session variables. + SessionSetVariable session_set_variable = 14; + + // retrieve session variables. + SessionGetVariable session_get_variable= 15; + } + reserved 16 to 99; +} + + +// obtains session information. +message SessionGet { + + // the target session specifier - ID or label. + string session_specifier = 1; +} + + +// obtains list of active sessions. +message SessionList { + // no special properties. +} + + +// represents kind of shutdown request. +enum SessionShutdownType { + + // the shutdown request type is not set (default behavior). + SESSION_SHUTDOWN_TYPE_NOT_SET = 0; + + // denies new requests, and shutdown the session after the all running requests were finished. + GRACEFUL = 1; + + // denies new requests, tells cancellation to the running requests, + // and then shutdown the session after the all requests were finished or cancelled. + FORCEFUL = 2; +} + +// request shutdown to the session. +message SessionShutdown { + + // the target session specifier - ID or label. + string session_specifier = 1; + + // the shutdown request type. + SessionShutdownType request_type = 2; +} + + +// set session variables. +message SessionSetVariable { + + // the target session specifier - ID or label. + string session_specifier = 1; + + // the target variable name, case insensitive. + string name = 2; + + // the text represented value to set. + string value = 3; +} + + +// retrieve session variables. +message SessionGetVariable { + + // the target session specifier - ID or label. + string session_specifier = 1; + + // the target variable name, case insensitive. + string name = 2; +} diff --git a/src/tateyama/proto/session/response.proto b/src/tateyama/proto/session/response.proto new file mode 100644 index 0000000..a60eee9 --- /dev/null +++ b/src/tateyama/proto/session/response.proto @@ -0,0 +1,133 @@ +syntax = "proto3"; + +package tateyama.proto.session.response; + +import "tateyama/proto/session/diagnostic.proto"; + +// the running session information. +message SessionInfo { + // the session ID, starting with ":". + string session_id = 1; + + // the session label (may be empty). + string label = 2; + + // the application name (may be empty). + string application = 3; + + // the session user name (may be empty). + string user = 4; + + // the session starting time (millisecond offset from 1970-01-01T00:00:00.000Z). + sfixed64 start_at = 5; + + // the session connection type (e.g. "ipc" or "tcp"). + string connection_type = 6; + + // the remote host information (e.g. TCP: remote host, IPC: PID). + string connection_info = 7; +} + +// the results of request.SessionGet. +message SessionGet { + // the successful message. + message Success { + // the obtained session information. + SessionInfo entry = 1; + } + + // the response body. + oneof result { + // request is successfully completed. + Success success = 1; + + // request was failed by error. + diagnostic.Error error = 2; + } +} + +// the results of request.SessionList. +message SessionList { + // the successful message. + message Success { + // the obtained list of sessions. + repeated SessionInfo entries = 1; + } + + // the response body. + oneof result { + // request is successfully completed. + Success success = 1; + + // request was failed by error. + diagnostic.Error error = 2; + } +} + +// the results of request.SessionShutdown. +message SessionShutdown { + // the successful message. + message Success {} + + // the response body. + oneof result { + // the shutdown was successfully requested. + Success success = 1; + + // request was failed by error. + diagnostic.Error error = 2; + } +} + +// the results of request.SessionSetVariable. +message SessionSetVariable { + // the successful message. + message Success {} + + // the response body. + oneof result { + // the session variable was successfully set. + Success success = 1; + + // request was failed by error. + diagnostic.Error error = 2; + } +} + +// the results of request.SessionGetVariable. +message SessionGetVariable { + // the successful message. + message Success { + + // the canonical variable name. + string name = 1; + + // the description of the variable. + string description = 2; + + // the resulting value, or NOT_SET if the value is empty. + oneof value { + + // the boolean value. + bool bool_value = 3; + + // the signed integer value. + sint64 signed_integer_value = 4; + + // the unsigned integer value. + uint64 unsigned_integer_value = 5; + + // the character string value. + string string_value = 6; + } + } + + // the response body. + oneof result { + // the session variable was successfully set. + Success success = 1; + + // request was failed by error. + diagnostic.Error error = 2; + } +} diff --git a/src/tateyama/session/session.cpp b/src/tateyama/session/session.cpp new file mode 100644 index 0000000..4d4cdad --- /dev/null +++ b/src/tateyama/session/session.cpp @@ -0,0 +1,334 @@ +/* + * Copyright 2022-2024 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include + +#include "tateyama/authentication/authentication.h" +#include +#include +#include + +#include +#include +#include "session.h" + +DECLARE_string(monitor); +DECLARE_bool(force); + +namespace tateyama::session { + +static std::string to_timepoint_string(std::uint64_t msu) { + auto ms = static_cast(msu); + std::timespec ts{ms / 1000, (ms % 1000) * 1000000}; + + auto* when = std::localtime(&ts.tv_sec); + constexpr int array_size = 32; + std::array buf{}; + if (std::strftime(buf.data(), array_size - 1, "%Y-%m-%d %H:%M:%S", when) == 0) { + return {}; + } + std::array output{}; + const int msec = static_cast(ts.tv_nsec) / 1000000; + auto len = snprintf(output.data(), array_size, "%s.%03d %s", buf.data(), msec, when->tm_zone); // NOLINT(cppcoreguidelines-pro-type-vararg,hicpp-vararg) + std::string rv{output.data(), static_cast(len)}; + return rv; +} + +tgctl::return_code session_list() { + std::unique_ptr monitor_output{}; + + if (!FLAGS_monitor.empty()) { + monitor_output = std::make_unique(FLAGS_monitor); + monitor_output->start(); + } + + auto rtnv = tgctl::return_code::ok; + authentication::auth_options(); + try { + auto transport = std::make_unique(tateyama::framework::service_id_session); + ::tateyama::proto::session::request::Request request{}; + (void) request.mutable_session_list(); + auto response_opt = transport->send<::tateyama::proto::session::response::SessionList>(request); + request.clear_session_list(); + + if (response_opt) { + auto response = response_opt.value(); + switch(response.result_case()) { + case ::tateyama::proto::session::response::SessionList::ResultCase::kSuccess: + break; + case ::tateyama::proto::session::response::SessionList::ResultCase::kError: + std::cerr << "SessionList error: " << response.error().message() << std::endl; + rtnv = tgctl::return_code::err; + break; + default: + std::cerr << "SessionList result_case() error: " << std::endl; + rtnv = tgctl::return_code::err; + } + + if (rtnv == tgctl::return_code::ok) { + auto session_list = response.success().entries(); + + std::size_t id_max{2}; + std::size_t label_max{5}; + std::size_t application_max{11}; + std::size_t user_max{4}; + std::size_t start_max{5}; + std::size_t type_max{4}; + std::size_t remote_max{6}; + for( auto& e : session_list ) { + if (id_max < e.session_id().length()) { + id_max = e.session_id().length(); + } + if (label_max < e.label().length()) { + label_max = e.label().length(); + } + if (application_max < e.application().length()) { + application_max = e.application().length(); + } + if (user_max < e.user().length()) { + user_max = e.user().length(); + } + auto ts = to_timepoint_string(e.start_at()); + if (start_max < ts.length()) { + start_max = ts.length(); + } + if (type_max < e.connection_type().length()) { + type_max = e.connection_type().length(); + } + if (remote_max < e.connection_info().length()) { + remote_max = e.connection_info().length(); + } + } + + id_max += 2; + label_max += 2; + application_max += 2; + user_max += 2; + start_max += 2; + type_max += 2; + remote_max += 2; + + std::cout << std::left; + std::cout << std::setw(static_cast(id_max)) << "id"; + std::cout << std::setw(static_cast(label_max)) << "label"; + std::cout << std::setw(static_cast(application_max)) << "application"; + std::cout << std::setw(static_cast(user_max)) << "user"; + std::cout << std::setw(static_cast(start_max)) << "start"; + std::cout << std::setw(static_cast(type_max)) << "type"; + std::cout << std::setw(static_cast(remote_max)) << "remote"; + std::cout << std::endl; + + for( auto& e : session_list ) { + std::cout << std::setw(static_cast(id_max)) << e.session_id(); + std::cout << std::setw(static_cast(label_max)) << e.label(); + std::cout << std::setw(static_cast(application_max)) << e.application(); + std::cout << std::setw(static_cast(user_max)) << e.user(); + std::cout << std::setw(static_cast(start_max)) << to_timepoint_string(e.start_at()); + std::cout << std::setw(static_cast(type_max)) << e.connection_type(); + std::cout << std::setw(static_cast(remote_max)) << e.connection_info(); + std::cout << std::endl; + if (monitor_output) { + monitor_output->session_info(e.session_id(), e.label(), e.application(), e.user(), to_timepoint_string(e.start_at()), e.connection_type(), e.connection_info()); + } + } + + if (monitor_output) { + monitor_output->finish(true); + } + return rtnv; + } + } + } catch (std::runtime_error &ex) { + std::cerr << "could not connect to database with name " << tateyama::bootstrap::wire::transport::database_name() << std::endl; + } + rtnv = tgctl::return_code::err; + + if (monitor_output) { + monitor_output->finish(false); + } + return rtnv; +} + +tgctl::return_code session_show(std::string_view session_ref) { + std::unique_ptr monitor_output{}; + + if (!FLAGS_monitor.empty()) { + monitor_output = std::make_unique(FLAGS_monitor); + monitor_output->start(); + } + + auto rtnv = tgctl::return_code::ok; + authentication::auth_options(); + try { + auto transport = std::make_unique(tateyama::framework::service_id_session); + ::tateyama::proto::session::request::Request request{}; + auto* command = request.mutable_session_get(); + command->set_session_specifier(std::string(session_ref)); + auto response_opt = transport->send<::tateyama::proto::session::response::SessionGet>(request); + request.clear_session_get(); + + if (response_opt) { + auto response = response_opt.value(); + switch(response.result_case()) { + case ::tateyama::proto::session::response::SessionGet::ResultCase::kSuccess: + break; + case ::tateyama::proto::session::response::SessionGet::ResultCase::kError: + std::cerr << "SessionShow error: " << response.error().message() << std::endl; + rtnv = tgctl::return_code::err; + break; + default: + std::cerr << "SessionShow result_case() error: " << std::endl; + rtnv = tgctl::return_code::err; + } + + if (rtnv == tgctl::return_code::ok) { + auto e = response.success().entry(); + + std::cout << std::left; + std::cout << std::setw(static_cast(13)) << "id" << e.session_id() << std::endl; + std::cout << std::setw(static_cast(13)) << "application" << e.application() << std::endl; + std::cout << std::setw(static_cast(13)) << "label" << e.label() << std::endl; + std::cout << std::setw(static_cast(13)) << "user" << e.user() << std::endl; + std::cout << std::setw(static_cast(13)) << "start" << to_timepoint_string(e.start_at()) << std::endl; + std::cout << std::setw(static_cast(13)) << "type" << e.connection_type() << std::endl; + std::cout << std::setw(static_cast(13)) << "remote" << e.connection_info() << std::endl; + if (monitor_output) { + monitor_output->session_info(e.session_id(), e.label(), e.application(), e.user(), to_timepoint_string(e.start_at()), e.connection_type(), e.connection_info()); + monitor_output->finish(true); + } + return rtnv; + } + } + } catch (std::runtime_error &ex) { + std::cerr << "could not connect to database with name " << tateyama::bootstrap::wire::transport::database_name() << std::endl; + } + rtnv = tgctl::return_code::err; + + if (monitor_output) { + monitor_output->finish(false); + } + return rtnv; +} + +tgctl::return_code session_kill(std::string_view session_ref) { + std::unique_ptr monitor_output{}; + + if (!FLAGS_monitor.empty()) { + monitor_output = std::make_unique(FLAGS_monitor); + monitor_output->start(); + } + + auto rtnv = tgctl::return_code::ok; + authentication::auth_options(); + try { + auto transport = std::make_unique(tateyama::framework::service_id_session); + ::tateyama::proto::session::request::Request request{}; + auto* command = request.mutable_session_shutdown(); + command->set_session_specifier(std::string(session_ref)); + command->set_request_type(FLAGS_force ? + ::tateyama::proto::session::request::SessionShutdownType::FORCEFUL : + ::tateyama::proto::session::request::SessionShutdownType::GRACEFUL); + auto response_opt = transport->send<::tateyama::proto::session::response::SessionShutdown>(request); + request.clear_session_shutdown(); + + if (response_opt) { + auto response = response_opt.value(); + switch(response.result_case()) { + case ::tateyama::proto::session::response::SessionShutdown::ResultCase::kSuccess: + break; + case ::tateyama::proto::session::response::SessionShutdown::ResultCase::kError: + std::cerr << "SessionShutdown error: " << response.error().message() << std::endl; + rtnv = tgctl::return_code::err; + break; + default: + std::cerr << "SessionShutdown result_case() error: " << std::endl; + rtnv = tgctl::return_code::err; + } + + if (rtnv == tgctl::return_code::ok) { + if (monitor_output) { + monitor_output->finish(true); + } + return rtnv; + } + } + } catch (std::runtime_error &ex) { + std::cerr << "could not connect to database with name " << tateyama::bootstrap::wire::transport::database_name() << std::endl; + } + rtnv = tgctl::return_code::err; + + if (monitor_output) { + monitor_output->finish(false); + } + return rtnv; +} + +tgctl::return_code session_swtch(std::string_view session_ref, std::string_view set_key, std::string_view set_value) { + std::unique_ptr monitor_output{}; + + if (!FLAGS_monitor.empty()) { + monitor_output = std::make_unique(FLAGS_monitor); + monitor_output->start(); + } + + auto rtnv = tgctl::return_code::ok; + authentication::auth_options(); + try { + auto transport = std::make_unique(tateyama::framework::service_id_session); + ::tateyama::proto::session::request::Request request{}; + auto* command = request.mutable_session_set_variable(); + command->set_session_specifier(std::string(session_ref)); + command->set_name(std::string(set_key)); + command->set_value(std::string(set_value)); + auto response_opt = transport->send<::tateyama::proto::session::response::SessionSetVariable>(request); + request.clear_session_set_variable(); + + if (response_opt) { + auto response = response_opt.value(); + switch(response.result_case()) { + case ::tateyama::proto::session::response::SessionSetVariable::ResultCase::kSuccess: + break; + case ::tateyama::proto::session::response::SessionSetVariable::ResultCase::kError: + std::cerr << "SessionSetVariable error: " << response.error().message() << std::endl; + rtnv = tgctl::return_code::err; + break; + default: + std::cerr << "SessionSetVariable result_case() error: " << std::endl; + rtnv = tgctl::return_code::err; + } + + if (rtnv == tgctl::return_code::ok) { + if (monitor_output) { + monitor_output->finish(true); + } + return rtnv; + } + } + } catch (std::runtime_error &ex) { + std::cerr << "could not connect to database with name " << tateyama::bootstrap::wire::transport::database_name() << std::endl; + } + rtnv = tgctl::return_code::err; + + if (monitor_output) { + monitor_output->finish(false); + } + return rtnv; +} + +} // tateyama::bootstrap::backup diff --git a/src/tateyama/session/session.h b/src/tateyama/session/session.h index f935e3d..df14001 100644 --- a/src/tateyama/session/session.h +++ b/src/tateyama/session/session.h @@ -23,19 +23,9 @@ namespace tateyama::session { - tgctl::return_code list(); - tgctl::return_code show(std::string_view session_ref); - tgctl::return_code kill(const std::vector&& sesstion_refs); - tgctl::return_code swtch(std::string_view session_ref, std::string_view set_key, std::string_view set_value); - - struct session_list_entry { - std::string id; - std::string label; - std::string application; - std::string user; - std::string start; - std::string type; - std::string remote; - }; + tgctl::return_code session_list(); + tgctl::return_code session_show(std::string_view session_ref); + tgctl::return_code session_kill(std::string_view session_ref); + tgctl::return_code session_swtch(std::string_view session_ref, std::string_view set_key, std::string_view set_value); } // tateyama::session diff --git a/src/tateyama/session/session_mock.cpp b/src/tateyama/session/session_mock.cpp deleted file mode 100644 index ff94810..0000000 --- a/src/tateyama/session/session_mock.cpp +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Copyright 2022-2024 Project Tsurugi. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include - -#include - -#include "tateyama/monitor/monitor.h" -#include "session.h" - -DECLARE_string(monitor); // NOLINT - -namespace tateyama::session { - -const std::vector sessoin_list = { // NOLINT as it is used for mock purpose only - { "1", "belayer-dump-1", "belayer", "arakawa", "2022-06-20T12:34:56Z", "ipc", "6502" }, - { "2", "example-1", "tgsql", "arakawa", "2022-06-20T12:34:50Z", "tcp", "192.168.1.23:10000" }, - { "3", "example-2", "tgsql", "kurosawa", "2022-06-20T12:34:53Z", "tcp", "192.168.1.78:10000" }, - { "4", "", "", "admin", "2022-06-20T12:34:57Z", "ipc", "32816" }, - { "cafebabe", "load-1", "tgload", "kambayashi", "2022-06-10-T01:23:45", "tcp", "192.168.1.24:32768" } -}; - -tgctl::return_code list() { - std::unique_ptr monitor_output{}; - if (!FLAGS_monitor.empty()) { - monitor_output = std::make_unique(FLAGS_monitor); - monitor_output->start(); - } - - std::size_t id_max{2}; - std::size_t label_max{5}; - std::size_t application_max{11}; - std::size_t user_max{4}; - std::size_t start_max{5}; - std::size_t type_max{4}; - std::size_t remote_max{6}; - for( auto& e : sessoin_list ) { - if (id_max < e.id.length()) { - id_max = e.id.length(); - } - if (label_max < e.label.length()) { - label_max = e.label.length(); - } - if (application_max < e.application.length()) { - application_max = e.application.length(); - } - if (user_max < e.user.length()) { - user_max = e.user.length(); - } - if (start_max < e.start.length()) { - start_max = e.start.length(); - } - if (type_max < e.type.length()) { - type_max = e.type.length(); - } - if (remote_max < e.remote.length()) { - remote_max = e.remote.length(); - } - } - id_max += 2; - label_max += 2; - application_max += 2; - user_max += 2; - start_max += 2; - type_max += 2; - remote_max += 2; - - std::cout << std::left; - std::cout << std::setw(static_cast(id_max + 1)) << "id"; - std::cout << std::setw(static_cast(label_max)) << "label"; - std::cout << std::setw(static_cast(application_max)) << "application"; - std::cout << std::setw(static_cast(user_max)) << "user"; - std::cout << std::setw(static_cast(start_max)) << "start"; - std::cout << std::setw(static_cast(type_max)) << "type"; - std::cout << std::setw(static_cast(remote_max)) << "remote"; - std::cout << std::endl; - - for( auto& e : sessoin_list ) { - std::cout << ":" << std::setw(static_cast(id_max)) << e.id; - std::cout << std::setw(static_cast(label_max)) << e.label; - std::cout << std::setw(static_cast(application_max)) << e.application; - std::cout << std::setw(static_cast(user_max)) << e.user; - std::cout << std::setw(static_cast(start_max)) << e.start; - std::cout << std::setw(static_cast(type_max)) << e.type; - std::cout << std::setw(static_cast(remote_max)) << e.remote; - std::cout << std::endl; - if (monitor_output) { - monitor_output->session_info(e.id, e.label, e.application, e.user, e.start, e.type, e.remote); - } - } - if (monitor_output) { - monitor_output->finish(true); - } - return tateyama::tgctl::return_code::ok; -} - -tgctl::return_code show(std::string_view session_ref) { - std::unique_ptr monitor_output{}; - if (!FLAGS_monitor.empty()) { - monitor_output = std::make_unique(FLAGS_monitor); - monitor_output->start(); - } - - if (session_ref.empty()) { - return tateyama::tgctl::return_code::err; - } - bool comp_id = (session_ref.at(0) == ':'); - for( auto& e : sessoin_list ) { - if(comp_id) { - if (e.id != session_ref.substr(1)) { - continue; - } - } else { - if (e.label != session_ref) { - continue; - } - } - std::cout << std::left; - std::cout << std::setw(static_cast(13)) << "id" << e.id << std::endl; - std::cout << std::setw(static_cast(13)) << "application" << e.application << std::endl; - std::cout << std::setw(static_cast(13)) << "label" << e.label << std::endl; - std::cout << std::setw(static_cast(13)) << "user" << e.user << std::endl; - std::cout << std::setw(static_cast(13)) << "start" << e.start << std::endl; - std::cout << std::setw(static_cast(13)) << "type" << e.type << std::endl; - std::cout << std::setw(static_cast(13)) << "remote" << e.remote << std::endl; - if (monitor_output) { - monitor_output->session_info(e.id, e.label, e.application, e.user, e.start, e.type, e.remote); - monitor_output->finish(true); - } - return tateyama::tgctl::return_code::ok; - } - if (monitor_output) { - monitor_output->finish(false); - } - return tateyama::tgctl::return_code::err; -} - -tgctl::return_code kill(const std::vector&& sesstion_refs) { - std::unique_ptr monitor_output{}; - if (!FLAGS_monitor.empty()) { - monitor_output = std::make_unique(FLAGS_monitor); - monitor_output->start(); - } - - tateyama::tgctl::return_code rv{tateyama::tgctl::return_code::err}; - - for (auto& session_ref : sesstion_refs) { - if (session_ref.empty()) { - return tateyama::tgctl::return_code::err; - } - bool comp_id = (session_ref.at(0) == ':'); - for( auto& e : sessoin_list ) { - if(comp_id) { - if (e.id != session_ref.substr(1)) { - continue; - } - } else { - if (e.label != session_ref) { - continue; - } - } - std::cout << std::left - << std::setw(static_cast(6)) << "kill" - << std::setw(static_cast(1)) << ":" - << std::setw(static_cast(e.id.length())) << e.id - << std::endl; - rv = tateyama::tgctl::return_code::ok; - } - } - if (monitor_output) { - monitor_output->finish(rv == tateyama::tgctl::return_code::ok); - } - return rv; -} - -tgctl::return_code swtch(std::string_view session_ref, std::string_view set_key, std::string_view set_value) { - std::unique_ptr monitor_output{}; - if (!FLAGS_monitor.empty()) { - monitor_output = std::make_unique(FLAGS_monitor); - monitor_output->start(); - } - - bool comp_id = (session_ref.at(0) == ':'); - for( auto& e : sessoin_list ) { - if(comp_id) { - if (e.id != session_ref.substr(1)) { - continue; - } - } else { - if (e.label != session_ref) { - continue; - } - } - std::cout << std::left - << std::setw(static_cast(8)) << "set" - << std::setw(static_cast(1)) << ":" - << std::setw(static_cast(e.id.length())) << e.id - << "[" << set_key << "] to " << set_value - << std::endl; - if (monitor_output) { - monitor_output->finish(true); - } - return tateyama::tgctl::return_code::ok; - } - if (monitor_output) { - monitor_output->finish(false); - } - return tateyama::tgctl::return_code::err; -} - -} // tateyama::session diff --git a/src/tateyama/tgctl/main.cpp b/src/tateyama/tgctl/main.cpp index 9011d82..a86f9ce 100644 --- a/src/tateyama/tgctl/main.cpp +++ b/src/tateyama/tgctl/main.cpp @@ -163,29 +163,32 @@ int tgctl_main(const std::vector& args) { //NOLINT(readability-func return tateyama::tgctl::return_code::err; } if (args.at(2) == "list") { - return tateyama::session::list(); + return tateyama::session::session_list(); } if (args.at(2) == "show") { if (args.size() < 3) { std::cerr << "need to specify session-ref" << std::endl; return tateyama::tgctl::return_code::err; } - return tateyama::session::show(args.at(3)); + return tateyama::session::session_show(args.at(3)); } if (args.at(2) == "kill") { if (args.size() < 3) { std::cerr << "need to specify session-ref(s)" << std::endl; return tateyama::tgctl::return_code::err; } - return tateyama::session::kill(std::vector(args.begin() + 3, args.begin() + static_cast(args.size()))); + // return tateyama::session::session_kill(std::vector(args.begin() + 3, args.begin() + static_cast(args.size()))); // FIXME confirm specification + return tateyama::session::session_kill(args.at(3)); } if (args.at(2) == "set") { if (args.size() < 5) { std::cerr << "need to specify session-ref, set-key, and set-value" << std::endl; return tateyama::tgctl::return_code::err; } - return tateyama::session::swtch(args.at(3), args.at(4), args.at(5)); + return tateyama::session::session_swtch(args.at(3), args.at(4), args.at(5)); } + std::cerr << "unknown session sub command '" << args.at(2) << "'" << std::endl; + return tateyama::tgctl::return_code::err; } if (args.at(1) == "dbstats") { if (args.at(2) == "list") { diff --git a/src/tateyama/transport/client_wire.h b/src/tateyama/transport/client_wire.h index 706ff9c..7083672 100644 --- a/src/tateyama/transport/client_wire.h +++ b/src/tateyama/transport/client_wire.h @@ -93,14 +93,14 @@ class session_wire_container public: wire_container() = default; wire_container(unidirectional_message_wire* wire, char* bip_buffer) noexcept : wire_(wire), bip_buffer_(bip_buffer) {}; - message_header peep(bool wait = false) { - return wire_->peep(bip_buffer_, wait); + message_header peep() { + return wire_->peep(bip_buffer_); } void write(const std::string& data, message_header::index_type index) { wire_->write(bip_buffer_, data.data(), message_header(index, data.length())); } void disconnect() { - wire_->write(bip_buffer_, nullptr, message_header(message_header::termination_request, 0)); + wire_->terminate(); } private: @@ -153,7 +153,7 @@ class session_wire_container } ~session_wire_container() = default; - + void close() { request_wire_.disconnect(); } diff --git a/src/tateyama/transport/transport.h b/src/tateyama/transport/transport.h index a008fc0..8a67cbf 100644 --- a/src/tateyama/transport/transport.h +++ b/src/tateyama/transport/transport.h @@ -21,6 +21,9 @@ #include #include +#include + +#include #include #include #include @@ -29,10 +32,14 @@ #include #include #include +#include +#include #include "tateyama/server/status_info.h" #include "client_wire.h" +DECLARE_string(conf); // NOLINT + namespace tateyama::bootstrap::wire { constexpr static std::size_t HEADER_MESSAGE_VERSION_MAJOR = 0; @@ -41,17 +48,19 @@ constexpr static std::size_t DATASTORE_MESSAGE_VERSION_MAJOR = 0; constexpr static std::size_t DATASTORE_MESSAGE_VERSION_MINOR = 0; constexpr static std::size_t ENDPOINT_MESSAGE_VERSION_MAJOR = 0; constexpr static std::size_t ENDPOINT_MESSAGE_VERSION_MINOR = 0; +constexpr static std::size_t SESSION_MESSAGE_VERSION_MAJOR = 0; +constexpr static std::size_t SESSION_MESSAGE_VERSION_MINOR = 0; class transport { public: transport() = delete; - transport(std::string_view name, std::string_view digest, tateyama::framework::component::id_type type) : - wire_(tateyama::common::wire::session_wire_container(tateyama::common::wire::connection_container(name).connect())) { + explicit transport(tateyama::framework::component::id_type type) : + wire_(tateyama::common::wire::session_wire_container(tateyama::common::wire::connection_container(database_name()).connect())) { header_.set_service_message_version_major(HEADER_MESSAGE_VERSION_MAJOR); header_.set_service_message_version_minor(HEADER_MESSAGE_VERSION_MINOR); header_.set_service_id(type); - status_info_ = std::make_unique(std::string(digest)); + status_info_ = std::make_unique(digest()); auto handshake_response = handshake(); if (!handshake_response) { throw std::runtime_error("handshake error"); @@ -61,6 +70,22 @@ class transport { } } + ~transport() { + try { + if (!closed_) { + close(); + } + } catch (std::exception &ex) { + std::cerr << ex.what() << std::endl; + } + } + + transport(transport const& other) = delete; + transport& operator=(transport const& other) = delete; + transport(transport&& other) noexcept = delete; + transport& operator=(transport&& other) noexcept = delete; + + // for datastore template std::optional send(::tateyama::proto::datastore::request::Request& request) { auto& response_wire = wire_.get_response_wire(); @@ -107,6 +132,53 @@ class transport { return response; } + // for session + template + std::optional send(::tateyama::proto::session::request::Request& request) { + auto& response_wire = wire_.get_response_wire(); + + std::stringstream sst{}; + if(auto res = tateyama::utils::SerializeDelimitedToOstream(header_, std::addressof(sst)); ! res) { + return std::nullopt; + } + request.set_service_message_version_major(SESSION_MESSAGE_VERSION_MAJOR); + request.set_service_message_version_minor(SESSION_MESSAGE_VERSION_MINOR); + if(auto res = tateyama::utils::SerializeDelimitedToOstream(request, std::addressof(sst)); ! res) { + return std::nullopt; + } + wire_.write(sst.str()); + + while (true) { + try { + response_wire.await(); + break; + } catch (std::runtime_error &e) { + if (status_info_->alive()) { + continue; + } + std::cerr << e.what() << std::endl; + return std::nullopt; + } + } + std::string res_message{}; + res_message.resize(response_wire.get_length()); + response_wire.read(res_message.data()); + ::tateyama::proto::framework::response::Header header{}; + google::protobuf::io::ArrayInputStream ins{res_message.data(), static_cast(res_message.length())}; + if(auto res = tateyama::utils::ParseDelimitedFromZeroCopyStream(std::addressof(header), std::addressof(ins), nullptr); ! res) { + return std::nullopt; + } + std::string_view payload{}; + if (auto res = tateyama::utils::GetDelimitedBodyFromZeroCopyStream(std::addressof(ins), nullptr, payload); ! res) { + return std::nullopt; + } + T response{}; + if(auto res = response.ParseFromArray(payload.data(), payload.length()); ! res) { + return std::nullopt; + } + return response; + } + // implement handshake template std::optional send(::tateyama::proto::endpoint::request::Request& request) { @@ -161,12 +233,35 @@ class transport { void close() { wire_.close(); + closed_ = true; + } + + static std::string database_name() { + auto conf = configuration::bootstrap_configuration::create_bootstrap_configuration(FLAGS_conf).get_configuration(); + auto endpoint_config = conf->get_section("ipc_endpoint"); + if (endpoint_config == nullptr) { + throw std::runtime_error("cannot find ipc_endpoint section in the configuration"); + } + auto database_name_opt = endpoint_config->get("database_name"); + if (!database_name_opt) { + throw std::runtime_error("cannot find database_name at the section in the configuration"); + } + return database_name_opt.value(); } private: tateyama::common::wire::session_wire_container wire_; tateyama::proto::framework::request::Header header_{}; std::unique_ptr status_info_{}; + bool closed_{}; + + std::string digest() { + auto bst_conf = configuration::bootstrap_configuration::create_bootstrap_configuration(FLAGS_conf); + if (bst_conf.valid()) { + return bst_conf.digest(); + } + return {}; + } std::optional handshake() { tateyama::proto::endpoint::request::ClientInformation information{}; diff --git a/src/tateyama/transport/wire.h b/src/tateyama/transport/wire.h index 149721b..296e153 100644 --- a/src/tateyama/transport/wire.h +++ b/src/tateyama/transport/wire.h @@ -42,7 +42,7 @@ class message_header { public: using length_type = std::uint32_t; using index_type = std::uint16_t; - static constexpr index_type termination_request = 0xffff; + static constexpr index_type null_request = 0xffff; static constexpr std::size_t size = sizeof(length_type) + sizeof(index_type); @@ -224,40 +224,6 @@ class simple_wire } } - /** - * @brief write response message in the response wire, which is used by endpoint IF - */ - void write(char* base, const char* from, T header) { - std::size_t length = header.get_length() + T::size; - auto msg_length = min(length, capacity_); - if (msg_length > room()) { wait_to_write(msg_length); } - write_in_buffer(base, buffer_address(base, pushed_.load()), header.get_buffer(), T::size); - if (msg_length > T::size) { - write_in_buffer(base, buffer_address(base, pushed_.load() + T::size), from, msg_length - T::size); - } - pushed_.fetch_add(msg_length); - length -= msg_length; - from += (msg_length - T::size); // NOLINT - std::atomic_thread_fence(std::memory_order_acq_rel); - if (wait_for_read_) { - boost::interprocess::scoped_lock lock(m_mutex_); - c_empty_.notify_one(); - } - while (length > 0) { - msg_length = min(length, capacity_); - if (msg_length > room()) { wait_to_write(msg_length); } - write_in_buffer(base, buffer_address(base, pushed_.load()), from, msg_length); - pushed_.fetch_add(msg_length); - length -= msg_length; - from += msg_length; // NOLINT - std::atomic_thread_fence(std::memory_order_acq_rel); - if (wait_for_read_) { - boost::interprocess::scoped_lock lock(m_mutex_); - c_empty_.notify_one(); - } - } - } - /** * @brief dispose the message in the queue at read_point that has completed read and is no longer needed * used by endpoint IF @@ -298,11 +264,43 @@ class simple_wire [[nodiscard]] const char* read_address(const char* base, std::size_t offset) const { return base + index(poped_.load() + offset); } //NOLINT [[nodiscard]] const char* read_address(const char* base) const { return base + index(poped_.load()); } //NOLINT - void wait_to_write(std::size_t length) { + void write(char* base, const char* from, T header, std::atomic_bool& closed) { + std::size_t length = header.get_length() + T::size; + auto msg_length = min(length, capacity_); + if (msg_length > room() && !closed.load()) { wait_to_write(msg_length, closed); } + if (closed.load()) { return; } + write_in_buffer(base, buffer_address(base, pushed_.load()), header.get_buffer(), T::size); + if (msg_length > T::size) { + write_in_buffer(base, buffer_address(base, pushed_.load() + T::size), from, msg_length - T::size); + } + pushed_.fetch_add(msg_length); + length -= msg_length; + from += (msg_length - T::size); // NOLINT + std::atomic_thread_fence(std::memory_order_acq_rel); + if (wait_for_read_) { + boost::interprocess::scoped_lock lock(m_mutex_); + c_empty_.notify_one(); + } + while (length > 0) { + msg_length = min(length, capacity_); + if (msg_length > room() && !closed.load()) { wait_to_write(msg_length, closed); } + if (closed.load()) { return; } + write_in_buffer(base, buffer_address(base, pushed_.load()), from, msg_length); + pushed_.fetch_add(msg_length); + length -= msg_length; + from += msg_length; // NOLINT + std::atomic_thread_fence(std::memory_order_acq_rel); + if (wait_for_read_) { + boost::interprocess::scoped_lock lock(m_mutex_); + c_empty_.notify_one(); + } + } + } + void wait_to_write(std::size_t length, std::atomic_bool& closed) { boost::interprocess::scoped_lock lock(m_mutex_); wait_for_write_ = true; std::atomic_thread_fence(std::memory_order_acq_rel); - c_full_.wait(lock, [this, length](){ return room() >= length; }); + c_full_.wait(lock, [this, length, &closed](){ return room() >= length || closed.load(); }); wait_for_write_ = false; } void write_in_buffer(char *base, char* top, const char* from, std::size_t length) noexcept { @@ -372,35 +370,77 @@ inline static std::int64_t n_cap(std::int64_t timeout) { // for request class unidirectional_message_wire : public simple_wire { + constexpr static std::size_t watch_interval = 2; public: unidirectional_message_wire(boost::interprocess::managed_shared_memory* managed_shm_ptr, std::size_t capacity) : simple_wire(managed_shm_ptr, capacity) {} /** - * @brief peep the current header. + * @brief wait a request message arives and peep the current header. + * @returnm the essage_header if request message has been received, + * otherwise, say timeout or termination requested, dummy request message whose length is 0 and index is message_header::null_request. */ - message_header peep(const char* base, bool wait_flag = false) { + message_header peep(const char* base) { while (true) { - if(stored() >= message_header::size || termination_requested_.load()) { - break; + if(stored() >= message_header::size) { + copy_header(base); + return header_received_; } - if (wait_flag) { - boost::interprocess::scoped_lock lock(m_mutex_); - wait_for_read_ = true; - std::atomic_thread_fence(std::memory_order_acq_rel); - c_empty_.wait(lock, [this](){ return (stored() >= message_header::size) || termination_requested_.load(); }); + if (termination_requested_.load() || onetime_notification_.load()) { + onetime_notification_.store(false); + return {message_header::null_request, 0}; + } + boost::interprocess::scoped_lock lock(m_mutex_); + wait_for_read_ = true; + std::atomic_thread_fence(std::memory_order_acq_rel); + if (!c_empty_.timed_wait(lock, + boost::get_system_time() + boost::posix_time::microseconds(u_cap(u_round(watch_interval * 1000 * 1000))), + [this](){ return (stored() >= message_header::size) || termination_requested_.load() || onetime_notification_.load(); })) { wait_for_read_ = false; - } else { - if (stored() < message_header::size) { return {}; } + header_received_ = message_header(message_header::null_request, 0); + return header_received_; } + wait_for_read_ = false; } - if (!termination_requested_.load()) { - copy_header(base); - } else { - header_received_ = message_header(message_header::termination_request, 0); + } + /** + * @brief check if an termination request has been made + * @retrun true if terminate request has been made + */ + [[nodiscard]] bool terminate_requested() { + return termination_requested_.load(); + } + /** + * @brief wake up the worker immediately. + */ + void notify() { + onetime_notification_.store(true); + std::atomic_thread_fence(std::memory_order_acq_rel); + if (wait_for_read_) { + boost::interprocess::scoped_lock lock(m_mutex_); + c_empty_.notify_one(); + } + } + /** + * @brief close the request wire, used by the server. + */ + void close() { + closed_.store(true); + std::atomic_thread_fence(std::memory_order_acq_rel); + if (wait_for_write_) { + boost::interprocess::scoped_lock lock(m_mutex_); + c_full_.notify_one(); } - return header_received_; } + /** + * @brief write request message + * @param base the base address of the request wire + * @param from the request message to be written in the request wire + * @param header the header of the request message + */ + void write(char* base, const char* from, message_header header) { + simple_wire::write(base, from, header, closed_); + } /** * @brief wake up the worker thread waiting for request arrival, supposed to be used in server termination. */ @@ -412,16 +452,11 @@ class unidirectional_message_wire : public simple_wire { c_empty_.notify_one(); } } - /** - * @brief check if an termination request has been made - * @retrun true if terminate request has been made - */ - [[nodiscard]] bool terminate_requested() { - return termination_requested_.load(); - } private: std::atomic_bool termination_requested_{}; + std::atomic_bool onetime_notification_{}; + std::atomic_bool closed_{}; }; @@ -480,18 +515,44 @@ class unidirectional_response_wire : public simple_wire { [[nodiscard]] response_header::msg_type get_type() const { return header_received_.get_type(); } - + /** + * @brief close the response wire, used by the client. + */ void close() { closed_.store(true); std::atomic_thread_fence(std::memory_order_acq_rel); - if (wait_for_read_) { + if (wait_for_write_) { boost::interprocess::scoped_lock lock(m_mutex_); c_empty_.notify_one(); } } + /** + * @brief check the session has been shut down + * @return true if the session has been shut down + */ + [[nodiscard]] bool check_shutdown() const noexcept { + return shutdown_.load(); + } + + /** + * @brief write response message + * @param base the base address of the response wire + * @param from the response message to be written in the response wire + * @param header the header of the response message + */ + void write(char* base, const char* from, response_header header) { + simple_wire::write(base, from, header, closed_); + } + /** + * @brief notify client of the client of the shutdown + */ + void notify_shutdown() noexcept { + shutdown_.store(true); + } private: std::atomic_bool closed_{}; + std::atomic_bool shutdown_{}; }; @@ -897,18 +958,22 @@ class status_provider { status_provider(boost::interprocess::managed_shared_memory* managed_shm_ptr, std::string_view file) : mutex_file_(file, managed_shm_ptr->get_segment_manager()) { } - [[nodiscard]] bool is_alive() { - int fd = open(mutex_file_.c_str(), O_WRONLY); // NOLINT + [[nodiscard]] std::string is_alive() { + int fd = open(mutex_file_.c_str(), O_RDONLY); // NOLINT if (fd < 0) { - return false; + std::stringstream ss{}; + ss << "cannot open the lock file (" << mutex_file_.c_str() << ")"; + return ss.str(); } if (flock(fd, LOCK_EX | LOCK_NB) == 0) { // NOLINT flock(fd, LOCK_UN); close(fd); - return false; + std::stringstream ss{}; + ss << "the lock file (" << mutex_file_.c_str() << ") is not locked, possibly due to server crash"; + return ss.str(); } close(fd); - return true; + return {}; } private: @@ -923,6 +988,7 @@ class connection_queue constexpr static const char* name = "connection_queue"; class index_queue { + constexpr static std::size_t watch_interval = 5; using long_allocator = boost::interprocess::allocator; public: @@ -953,10 +1019,12 @@ class connection_queue } } } - void wait(std::atomic_bool& terminate) { + [[nodiscard]] bool wait(std::atomic_bool& terminate) { boost::interprocess::scoped_lock lock(mutex_); std::atomic_thread_fence(std::memory_order_acq_rel); - condition_.wait(lock, [this, &terminate](){ return (pushed_.load() > poped_.load()) || terminate.load(); }); + return condition_.timed_wait(lock, + boost::get_system_time() + boost::posix_time::microseconds(u_cap(u_round(watch_interval * 1000 * 1000))), + [this, &terminate](){ return (pushed_.load() > poped_.load()) || terminate.load(); }); } [[nodiscard]] std::size_t pop() { return queue_.at(index(poped_.fetch_add(1))); @@ -1065,10 +1133,11 @@ class connection_queue bool check(std::size_t rid) { return v_requested_.at(rid).check(); } - std::size_t listen() { - q_requested_.wait(terminate_); - return ++session_id_; + if (q_requested_.wait(terminate_)) { + return ++session_id_; + } + return 0; } std::size_t accept(std::size_t session_id) { std::size_t sid = q_requested_.pop(); diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index e769805..e9941d4 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -18,8 +18,10 @@ target_link_libraries(${test_target} PRIVATE glog::glog PRIVATE gflags::gflags PRIVATE rt + PRIVATE Boost::thread PRIVATE Boost::filesystem PRIVATE ${tateyama_engine} + PRIVATE protobuf::libprotobuf ) function (add_test_executable source_file) @@ -39,6 +41,7 @@ file(GLOB SRCS "tateyama/tgctl/*_test.cpp" "tateyama/configuration/*_test.cpp" "tateyama/version/*_test.cpp" + "tateyama/session/*_test.cpp" ) foreach(file ${SRCS}) diff --git a/test/tateyama/monitor/session_mock_test.cpp b/test/tateyama/monitor/session_mock_test.cpp deleted file mode 100644 index a9c32fc..0000000 --- a/test/tateyama/monitor/session_mock_test.cpp +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Copyright 2022-2022 tsurugi project. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include -#include "test_root.h" - -namespace tateyama::testing { - -class session_mock_test : public ::testing::Test { -public: - virtual void SetUp() { - helper_ = std::make_unique("session_mock_test", 20006); - helper_->set_up(); - } - - virtual void TearDown() { - helper_->tear_down(); - } - -protected: - std::unique_ptr helper_{}; - -private: - std::string name_{}; -}; - -TEST_F(session_mock_test, list) { - std::string command; - - command = "tgctl session list --conf "; - command += helper_->conf_file_path(); - command += " --monitor "; - command += helper_->abs_path("test/list.log"); - std::cout << command << std::endl; - if (system(command.c_str()) != 0) { - std::cerr << "cannot tgctl session list" << std::endl; - FAIL(); - } - EXPECT_TRUE(validate_json(helper_->abs_path("test/list.log"))); -} - -TEST_F(session_mock_test, show_by_id) { - std::string command; - - command = "tgctl session show :cafebabe --conf "; - command += helper_->conf_file_path(); - command += " --monitor "; - command += helper_->abs_path("test/show_by_id.log"); - std::cout << command << std::endl; - if (system(command.c_str()) != 0) { - std::cerr << "cannot tgctl session show" << std::endl; - FAIL(); - } - EXPECT_TRUE(validate_json(helper_->abs_path("test/show_by_id.log"))); -} - -TEST_F(session_mock_test, show_by_label) { - std::string command; - - command = "tgctl session show example-1 --conf "; - command += helper_->conf_file_path(); - command += " --monitor "; - command += helper_->abs_path("test/show_by_label.log"); - std::cout << command << std::endl; - if (system(command.c_str()) != 0) { - std::cerr << "cannot tgctl session show" << std::endl; - FAIL(); - } - EXPECT_TRUE(validate_json(helper_->abs_path("test/show_by_label.log"))); -} - -TEST_F(session_mock_test, kill) { - std::string command; - - command = "tgctl session kill example-1 --conf "; - command += helper_->conf_file_path(); - command += " --monitor "; - command += helper_->abs_path("test/kill.log"); - std::cout << command << std::endl; - if (system(command.c_str()) != 0) { - std::cerr << "cannot tgctl session kill" << std::endl; - FAIL(); - } - EXPECT_TRUE(validate_json(helper_->abs_path("test/kill.log"))); -} - -TEST_F(session_mock_test, swtch) { - std::string command; - - command = "tgctl session set example-1 key label --conf "; - command += helper_->conf_file_path(); - command += " --monitor "; - command += helper_->abs_path("test/set.log"); - std::cout << command << std::endl; - if (system(command.c_str()) != 0) { - std::cerr << "cannot tgctl session set" << std::endl; - FAIL(); - } - EXPECT_TRUE(validate_json(helper_->abs_path("test/set.log"))); -} - -} // namespace tateyama::testing diff --git a/test/tateyama/session/session_test.cpp b/test/tateyama/session/session_test.cpp new file mode 100644 index 0000000..a162503 --- /dev/null +++ b/test/tateyama/session/session_test.cpp @@ -0,0 +1,283 @@ +/* + * Copyright 2022-2024 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "test_root.h" + +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include "tateyama/configuration/bootstrap_configuration.h" +#include "tateyama/test_utils/server_mock.h" + +namespace tateyama::test_utils { + +template<> +inline void server_mock::request_message(tateyama::proto::session::request::SessionGet& rq) { + tateyama::proto::session::request::Request r{}; + auto s = current_request(); + EXPECT_TRUE(r.ParseFromString(s)); + EXPECT_EQ(r.command_case(), tateyama::proto::session::request::Request::CommandCase::kSessionGet); + rq = r.session_get(); +} + +template<> +inline void server_mock::request_message(tateyama::proto::session::request::SessionList& rq) { + tateyama::proto::session::request::Request r{}; + auto s = current_request(); + EXPECT_TRUE(r.ParseFromString(s)); + EXPECT_EQ(r.command_case(), tateyama::proto::session::request::Request::CommandCase::kSessionList); + rq = r.session_list(); +} + +template<> +inline void server_mock::request_message(tateyama::proto::session::request::SessionShutdown& rq) { + tateyama::proto::session::request::Request r{}; + auto s = current_request(); + EXPECT_TRUE(r.ParseFromString(s)); + EXPECT_EQ(r.command_case(), tateyama::proto::session::request::Request::CommandCase::kSessionShutdown); + rq = r.session_shutdown(); +} + +template<> +inline void server_mock::request_message(tateyama::proto::session::request::SessionSetVariable& rq) { + tateyama::proto::session::request::Request r{}; + auto s = current_request(); + EXPECT_TRUE(r.ParseFromString(s)); + EXPECT_EQ(r.command_case(), tateyama::proto::session::request::Request::CommandCase::kSessionSetVariable); + rq = r.session_set_variable(); +} + + +} // tateyama::test_utils + + +namespace tateyama::session { + +class session_test : public ::testing::Test { +public: + virtual void SetUp() { + helper_ = std::make_unique("session_test", 20201); + helper_->set_up(); + auto bst_conf = tateyama::configuration::bootstrap_configuration::create_bootstrap_configuration(helper_->conf_file_path()); + server_mock_ = std::make_unique("session_test", bst_conf.digest(), sync_); + sync_.wait(); + } + + virtual void TearDown() { + helper_->tear_down(); + } + +protected: + std::unique_ptr helper_{}; + std::unique_ptr server_mock_{}; + boost::barrier sync_{2}; + + std::string read_pipe(FILE* fp) { + std::stringstream ss{}; + int c{}; + while ((c = std::fgetc(fp)) != EOF) { + ss << static_cast(c); + } + return ss.str(); + } + + std::string to_timepoint_string(std::uint64_t msu) { + auto ms = static_cast(msu); + std::timespec ts{ms / 1000, (ms % 1000) * 1000000}; + + auto* when = std::localtime(&ts.tv_sec); + constexpr int array_size = 32; + std::array buf{}; + std::strftime(buf.data(), array_size - 1, "%Y-%m-%d %H:%M:%S", when); + std::array output{}; + const int msec = ts.tv_nsec / 1000000; + auto len = snprintf(output.data(), array_size, "%s.%03d %s", buf.data(), msec, when->tm_zone); + std::string rv{output.data(), static_cast(len)}; + return rv; + } +}; + +TEST_F(session_test, session_list) { + std::string command; + FILE *fp; + + auto now_stamp = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + { + tateyama::proto::session::response::SessionList session_list{}; + auto* success = session_list.mutable_success(); + auto *entry = success->add_entries(); + entry->set_session_id(":123456"); + entry->set_label("test_label"); + entry->set_application("session_test"); + entry->set_user("test_user"); + entry->set_start_at(now_stamp); + entry->set_connection_type(std::string("IPC")); + entry->set_connection_info(std::to_string(getpid())); + server_mock_->push_response(session_list.SerializeAsString()); + } + + command = "tgctl session list --conf "; + command += helper_->conf_file_path(); + std::cout << command << std::endl; + if((fp = popen(command.c_str(), "r")) == nullptr){ + std::cerr << "cannot tgctl session list" << std::endl; + } + auto result = read_pipe(fp); +// std::cout << result << std::flush; + EXPECT_NE(std::string::npos, result.find(":123456")); + EXPECT_NE(std::string::npos, result.find("test_label")); + EXPECT_NE(std::string::npos, result.find("session_test")); + EXPECT_NE(std::string::npos, result.find("test_user")); + EXPECT_NE(std::string::npos, result.find(to_timepoint_string(now_stamp))); + EXPECT_NE(std::string::npos, result.find("IPC")); + EXPECT_NE(std::string::npos, result.find(std::to_string(getpid()))); + + EXPECT_EQ(tateyama::framework::service_id_session, server_mock_->component_id()); + tateyama::proto::session::request::SessionList rq{}; + server_mock_->request_message(rq); +} + +TEST_F(session_test, session_show) { + std::string command; + FILE *fp; + + auto now_stamp = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + { + tateyama::proto::session::response::SessionGet session_get{}; + auto* success = session_get.mutable_success(); + auto *entry = success->mutable_entry(); + entry->set_session_id("123456"); + entry->set_label("test_label"); + entry->set_application("session_test"); + entry->set_user("test_user"); + entry->set_start_at(now_stamp); + entry->set_connection_type(std::string("IPC")); + entry->set_connection_info(std::to_string(getpid())); + server_mock_->push_response(session_get.SerializeAsString()); + } + + command = "tgctl session show :123456 --conf "; + command += helper_->conf_file_path(); + std::cout << command << std::endl; + if((fp = popen(command.c_str(), "r")) == nullptr){ + std::cerr << "cannot tgctl session get" << std::endl; + } + auto result = read_pipe(fp); +// std::cout << result << std::flush; + EXPECT_NE(std::string::npos, result.find("123456")); + EXPECT_NE(std::string::npos, result.find("test_label")); + EXPECT_NE(std::string::npos, result.find("session_test")); + EXPECT_NE(std::string::npos, result.find("test_user")); + EXPECT_NE(std::string::npos, result.find(to_timepoint_string(now_stamp))); + EXPECT_NE(std::string::npos, result.find("IPC")); + EXPECT_NE(std::string::npos, result.find(std::to_string(getpid()))); + + EXPECT_EQ(tateyama::framework::service_id_session, server_mock_->component_id()); + tateyama::proto::session::request::SessionGet rq{}; + server_mock_->request_message(rq); + EXPECT_EQ(":123456", rq.session_specifier()); +} + +TEST_F(session_test, session_kill_graceful) { + std::string command; + FILE *fp; + + { + tateyama::proto::session::response::SessionShutdown session_sd{}; + auto* success = session_sd.mutable_success(); + server_mock_->push_response(session_sd.SerializeAsString()); + } + + command = "tgctl session kill :123456 --conf "; + command += helper_->conf_file_path(); + std::cout << command << std::endl; + if((fp = popen(command.c_str(), "r")) == nullptr){ + std::cerr << "cannot tgctl session get" << std::endl; + } + auto result = read_pipe(fp); + std::cout << result << std::flush; + + EXPECT_EQ(tateyama::framework::service_id_session, server_mock_->component_id()); + tateyama::proto::session::request::SessionShutdown rq{}; + server_mock_->request_message(rq); + EXPECT_EQ(":123456", rq.session_specifier()); + EXPECT_EQ(tateyama::proto::session::request::SessionShutdownType::GRACEFUL, rq.request_type()); +} + +TEST_F(session_test, session_kill_forceful) { + std::string command; + FILE *fp; + + { + tateyama::proto::session::response::SessionShutdown session_sd{}; + auto* success = session_sd.mutable_success(); + server_mock_->push_response(session_sd.SerializeAsString()); + } + + command = "tgctl session kill :123456 --conf "; + command += helper_->conf_file_path(); + command += " --force"; + std::cout << command << std::endl; + if((fp = popen(command.c_str(), "r")) == nullptr){ + std::cerr << "cannot tgctl session get" << std::endl; + } + auto result = read_pipe(fp); + std::cout << result << std::flush; + + EXPECT_EQ(tateyama::framework::service_id_session, server_mock_->component_id()); + tateyama::proto::session::request::SessionShutdown rq{}; + server_mock_->request_message(rq); + EXPECT_EQ(":123456", rq.session_specifier()); + EXPECT_EQ(tateyama::proto::session::request::SessionShutdownType::FORCEFUL, rq.request_type()); +} + +TEST_F(session_test, session_set) { + std::string command; + FILE *fp; + + { + tateyama::proto::session::response::SessionSetVariable session_set{}; + auto* success = session_set.mutable_success(); + server_mock_->push_response(session_set.SerializeAsString()); + } + + command = "tgctl session set :123456 test_variable test_value --conf "; + command += helper_->conf_file_path(); + std::cout << command << std::endl; + if((fp = popen(command.c_str(), "r")) == nullptr){ + std::cerr << "cannot tgctl session get" << std::endl; + } + auto result = read_pipe(fp); + std::cout << result << std::flush; + + EXPECT_EQ(tateyama::framework::service_id_session, server_mock_->component_id()); + tateyama::proto::session::request::SessionSetVariable rq{}; + server_mock_->request_message(rq); + EXPECT_EQ(":123456", rq.session_specifier()); + EXPECT_EQ("test_variable", rq.name()); + EXPECT_EQ("test_value", rq.value()); +} + +} // namespace tateyama::session diff --git a/test/tateyama/test_utils/endpoint.h b/test/tateyama/test_utils/endpoint.h new file mode 100644 index 0000000..77f7756 --- /dev/null +++ b/test/tateyama/test_utils/endpoint.h @@ -0,0 +1,225 @@ +/* + * Copyright 2023-2024 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include + +#include +#include +#include +#include "server_wires_mock.h" +#include "endpoint_proto_utils.h" + +namespace tateyama::test_utils { + +class endpoint_response { +public: + endpoint_response(std::string_view response, tateyama::proto::framework::response::Header_PayloadType type) : type_(type), response_(response) { + } + std::string_view get_response() const { + return response_; + } + tateyama::proto::framework::response:: Header_PayloadType get_type() const { + return type_; + } +private: + tateyama::proto::framework::response:: Header_PayloadType type_; + std::string response_; +}; + +class endpoint { + constexpr static tateyama::common::wire::response_header::msg_type RESPONSE_BODY = 1; + constexpr static tateyama::common::wire::response_header::msg_type RESPONSE_BODYHEAD = 2; + constexpr static tateyama::common::wire::response_header::msg_type RESPONSE_HANDSHAKE = 98; + constexpr static tateyama::common::wire::response_header::msg_type RESPONSE_HELLO = 99; + +public: + class worker { + public: + worker(std::size_t session_id, std::unique_ptr wire, std::function clean_up, std::queue& responses) + : session_id_(session_id), wire_(std::move(wire)), clean_up_(std::move(clean_up)), responses_(responses), thread_(std::thread(std::ref(*this))) { + } + ~worker() { + if (thread_.joinable()) { + thread_.join(); + } + } + void operator()() { + while(true) { + auto h = wire_->get_request_wire().peep(); + auto index = h.get_idx(); + if (h.get_length() == 0 && index == tateyama::common::wire::message_header::null_request) { break; } + std::string message; + message.resize(h.get_length()); + wire_->get_request_wire().read(message.data()); + + ::tateyama::proto::framework::request::Header req_header{}; + google::protobuf::io::ArrayInputStream in{message.data(), static_cast(message.length())}; + if(auto res = tateyama::utils::ParseDelimitedFromZeroCopyStream(std::addressof(req_header), std::addressof(in), nullptr); ! res) { + throw std::runtime_error("error parsing request message"); + } + std::stringstream ss{}; + if (component_id_ = req_header.service_id(); component_id_ == tateyama::framework::service_id_endpoint_broker) { + ::tateyama::proto::framework::response::Header header{}; + if(auto res = tateyama::utils::SerializeDelimitedToOstream(header, std::addressof(ss)); ! res) { + throw std::runtime_error("error formatting response message"); + } + tateyama::proto::endpoint::response::Handshake rp{}; + auto rs = rp.mutable_success(); + rs->set_session_id(1); // session id is dummy, as this is a test + auto body = rp.SerializeAsString(); + if(auto res = tateyama::utils::PutDelimitedBodyToOstream(body, std::addressof(ss)); ! res) { + throw std::runtime_error("error formatting response message"); + } + rp.clear_success(); + auto reply_message = ss.str(); + wire_->get_response_wire().write(reply_message.data(), tateyama::common::wire::response_header(index, reply_message.length(), RESPONSE_HANDSHAKE)); + continue; + } + { + std::string_view payload{}; + if (auto res = tateyama::utils::GetDelimitedBodyFromZeroCopyStream(std::addressof(in), nullptr, payload); ! res) { + throw std::runtime_error("error reading payload"); + } + current_request_ = payload; + if (responses_.empty()) { + throw std::runtime_error("response queue is empty"); + } + auto reply = responses_.front(); + responses_.pop(); + std::stringstream ss{}; + ::tateyama::proto::framework::response::Header header{}; + header.set_payload_type(reply.get_type()); + header.set_session_id(session_id_); + if(auto res = tateyama::utils::SerializeDelimitedToOstream(header, std::addressof(ss)); ! res) { + throw std::runtime_error("error formatting response message"); + } + if(auto res = tateyama::utils::PutDelimitedBodyToOstream(reply.get_response(), std::addressof(ss)); ! res) { + throw std::runtime_error("error formatting response message"); + } + auto reply_message = ss.str(); + wire_->get_response_wire().write(reply_message.data(), tateyama::common::wire::response_header(index, reply_message.length(), RESPONSE_BODY)); + } + } + clean_up_(); + } + const tateyama::framework::component::id_type component_id() const { + return component_id_; + } + const std::string& current_request() const { + return current_request_; + } + + private: + std::size_t session_id_; + std::unique_ptr wire_; + std::function clean_up_; + std::queue& responses_; + std::thread thread_; + + std::string current_request_; + tateyama::framework::component::id_type component_id_{}; + }; + + endpoint(const std::string& name, const std::string& digest, boost::barrier& sync) + : name_(name), digest_(digest), container_(std::make_unique(name_, 1)), thread_(std::thread(std::ref(*this))), sync_(sync) { + } + ~endpoint() { + if (thread_.joinable()) { + thread_.join(); + } + } + worker* get_worker() { + if (!worker_) { + std::unique_lock lk(mutex_); + condition_.wait(lk, [&]{ + return worker_.get() != nullptr; + }); + } + return worker_.get(); + } + void operator()() { + auto& connection_queue = container_->get_connection_queue(); + + if (!notified_) { + sync_.wait(); + notified_ = true; + } + while(true) { + auto session_id = connection_queue.listen(); + if (connection_queue.is_terminated()) { + connection_queue.confirm_terminated(); + break; + } + std::string session_name = name_; + session_name += "-"; + session_name += std::to_string(session_id); + auto wire = std::make_unique(session_name, std::string("tsurugidb-") + digest_ + ".stat"); + std::size_t index = connection_queue.accept(session_id); + try { + std::unique_lock lk(mutex_); + worker_ = std::make_unique(session_id, std::move(wire), [&connection_queue, index](){ connection_queue.disconnect(index); }, responses_); + condition_.notify_all(); + } catch (std::exception& ex) { + LOG(ERROR) << ex.what(); + break; + } + } + } + void push_response(std::string_view response, tateyama::proto::framework::response::Header_PayloadType type) { + responses_.emplace(response, type); + } + const tateyama::framework::component::id_type component_id() const { + if (worker_) { + return worker_->component_id(); + } + throw std::runtime_error("no active worker thread"); + } + const std::string& current_request() const { + if (worker_) { + return worker_->current_request(); + } + throw std::runtime_error("no active worker thread"); + } + void terminate() { + container_->get_connection_queue().request_terminate(); + } + +private: + std::string name_; + std::string digest_; + std::unique_ptr container_; + std::thread thread_; + boost::barrier& sync_; + std::unique_ptr worker_{}; + std::mutex mutex_{}; + std::condition_variable condition_{}; + std::queue responses_{}; + bool notified_{false}; +}; + +} // namespace tateyama::test_utils diff --git a/test/tateyama/test_utils/endpoint_proto_utils.h b/test/tateyama/test_utils/endpoint_proto_utils.h new file mode 100644 index 0000000..effbfde --- /dev/null +++ b/test/tateyama/test_utils/endpoint_proto_utils.h @@ -0,0 +1,63 @@ +/* + * Copyright 2019-2023 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include + +#include "tateyama/framework/component_ids.h" +#include +#include +#include + +namespace tateyama::test_utils { + +struct parse_result { + std::string_view payload_{}; + std::size_t service_id_{}; + std::size_t session_id_{}; +}; + +inline bool parse_header(std::string_view input, parse_result& result) { + result = {}; + ::tateyama::proto::framework::request::Header hdr{}; + google::protobuf::io::ArrayInputStream in{input.data(), static_cast(input.size())}; + if(auto res = utils::ParseDelimitedFromZeroCopyStream(std::addressof(hdr), std::addressof(in), nullptr); ! res) { + return false; + } + result.session_id_ = hdr.session_id(); + result.service_id_ = hdr.service_id(); + return utils::GetDelimitedBodyFromZeroCopyStream(std::addressof(in), nullptr, result.payload_); +} + +struct header_content { + std::size_t session_id_{}; +}; + +inline bool append_response_header(std::stringstream& ss, std::string_view body, header_content input) { + ::tateyama::proto::framework::response::Header hdr{}; + hdr.set_session_id(input.session_id_); + if(auto res = utils::SerializeDelimitedToOstream(hdr, std::addressof(ss)); ! res) { + return false; + } + if(auto res = utils::PutDelimitedBodyToOstream(body, std::addressof(ss)); ! res) { + return false; + } + return true; +} + +} // tateyama::endpoint::ipc diff --git a/test/tateyama/test_utils/server_mock.h b/test/tateyama/test_utils/server_mock.h new file mode 100644 index 0000000..083b056 --- /dev/null +++ b/test/tateyama/test_utils/server_mock.h @@ -0,0 +1,69 @@ +/* + * Copyright 2019-2024 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include +#include +#include + +#include "endpoint.h" +#include "status_mock.h" + +namespace tateyama::test_utils { + +class server_mock { +public: + server_mock(const std::string& name, const std::string& digest, boost::barrier& sync) : + name_(name), endpoint_(name_, digest, sync), status_(std::make_unique(name, digest)) { + } + ~server_mock() { + endpoint_.terminate(); + remove_shm(); + } + + void push_response(std::string_view response, tateyama::proto::framework::response::Header_PayloadType type = tateyama::proto::framework::response::Header_PayloadType_SERVICE_RESULT) { + endpoint_.push_response(response, type); + } + + template + void request_message(T& reply) = delete; + template + void request_message(T& reply, std::size_t) = delete; + + const tateyama::framework::component::id_type component_id() const { + return endpoint_.component_id(); + } + const std::string& current_request() const { + return endpoint_.current_request(); + } + +private: + std::string name_; + endpoint endpoint_; + std::unique_ptr status_; + + void remove_shm() { + std::string cmd = "if [ -f /dev/shm/" + name_ + " ]; then rm -f /dev/shm/" + name_ + "*; fi"; + if (system(cmd.c_str())) { + throw std::runtime_error("error in clearing shared memory file"); + } + } +}; + +} // namespace tateyama::testing diff --git a/test/tateyama/test_utils/server_wires_mock.h b/test/tateyama/test_utils/server_wires_mock.h new file mode 100644 index 0000000..9be4b5b --- /dev/null +++ b/test/tateyama/test_utils/server_wires_mock.h @@ -0,0 +1,239 @@ +/* + * Copyright 2018-2023 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "tateyama/transport/wire.h" + +namespace tateyama::test_utils { + +class server_wire_container_mock +{ + static constexpr std::size_t request_buffer_size = (1<<12); // 4K bytes NOLINT + static constexpr std::size_t response_buffer_size = (1<<13); // 8K bytes NOLINT + static constexpr std::size_t writer_count = 32; + static constexpr std::size_t data_channel_overhead = 7700; // by experiment NOLINT + static constexpr std::size_t total_overhead = (1<<14); // 16K bytes by experiment NOLINT + +public: + class request_wire_container_mock { + public: + request_wire_container_mock() = default; + ~request_wire_container_mock() = default; + request_wire_container_mock(request_wire_container_mock const&) = delete; + request_wire_container_mock(request_wire_container_mock&&) = delete; + request_wire_container_mock& operator = (request_wire_container_mock const&) = delete; + request_wire_container_mock& operator = (request_wire_container_mock&&) = delete; + + void initialize(tateyama::common::wire::unidirectional_message_wire* wire, char* bip_buffer) { + wire_ = wire; + bip_buffer_ = bip_buffer; + } + tateyama::common::wire::message_header peep() { + return wire_->peep(bip_buffer_); + } + std::string_view payload() { + return wire_->payload(bip_buffer_); + } + void read(char* to) { + wire_->read(to, bip_buffer_); + } + std::size_t read_point() { return wire_->read_point(); } + void dispose() { wire_->dispose(); } + void notify() { wire_->notify(); } + [[nodiscard]] bool terminate_requested() const { return wire_->terminate_requested(); } + + // for mainly client, except for terminate request from server + void write(const char* from, const std::size_t len, tateyama::common::wire::message_header::index_type index) { + wire_->write(bip_buffer_, from, tateyama::common::wire::message_header(index, len)); + } + + private: + tateyama::common::wire::unidirectional_message_wire* wire_{}; + char* bip_buffer_{}; + }; + + class response_wire_container_mock { + public: + response_wire_container_mock() = default; + ~response_wire_container_mock() = default; + response_wire_container_mock(response_wire_container_mock const&) = delete; + response_wire_container_mock(response_wire_container_mock&&) = delete; + response_wire_container_mock& operator = (response_wire_container_mock const&) = delete; + response_wire_container_mock& operator = (response_wire_container_mock&&) = delete; + + void initialize(tateyama::common::wire::unidirectional_response_wire* wire, char* bip_buffer) { + wire_ = wire; + bip_buffer_ = bip_buffer; + } + void write(const char* from, tateyama::common::wire::response_header header) { + std::lock_guard lock(mtx_); + wire_->write(bip_buffer_, from, header); + } + void notify_shutdown() { + wire_->notify_shutdown(); + } + + // for client + tateyama::common::wire::response_header await() { + return wire_->await(bip_buffer_); + } + [[nodiscard]] tateyama::common::wire::response_header::length_type get_length() const { + return wire_->get_length(); + } + [[nodiscard]] tateyama::common::wire::response_header::index_type get_idx() const { + return wire_->get_idx(); + } + [[nodiscard]] tateyama::common::wire::response_header::msg_type get_type() const { + return wire_->get_type(); + } + void read(char* to) { + wire_->read(to, bip_buffer_); + } + void close() { + wire_->close(); + } + + private: + tateyama::common::wire::unidirectional_response_wire* wire_{}; + char* bip_buffer_{}; + std::mutex mtx_{}; + }; + + server_wire_container_mock(std::string_view name, std::string_view mutex_file) : name_(name) { + boost::interprocess::shared_memory_object::remove(name_.c_str()); + try { + boost::interprocess::permissions unrestricted_permissions; + unrestricted_permissions.set_unrestricted(); + + std::size_t shm_size = request_buffer_size + response_buffer_size + total_overhead; + managed_shared_memory_ = + std::make_unique(boost::interprocess::create_only, name_.c_str(), shm_size, nullptr, unrestricted_permissions); + auto req_wire = managed_shared_memory_->construct(tateyama::common::wire::request_wire_name)(managed_shared_memory_.get(), request_buffer_size); + auto res_wire = managed_shared_memory_->construct(tateyama::common::wire::response_wire_name)(managed_shared_memory_.get(), response_buffer_size); + status_provider_ = managed_shared_memory_->construct(tateyama::common::wire::status_provider_name)(managed_shared_memory_.get(), mutex_file); + + request_wire_.initialize(req_wire, req_wire->get_bip_address(managed_shared_memory_.get())); + response_wire_.initialize(res_wire, res_wire->get_bip_address(managed_shared_memory_.get())); + } catch(const boost::interprocess::interprocess_exception& ex) { + LOG_LP(ERROR) << ex.what() << " on server_wire_container_mock::server_wire_container_mock()"; + throw std::runtime_error(ex.what()); + } catch (std::runtime_error &ex) { + LOG_LP(ERROR) << "running out of boost managed shared memory"; + throw ex; + } + } + + /** + * @brief Copy and move constructers are deleted. + */ + server_wire_container_mock(server_wire_container_mock const&) = delete; + server_wire_container_mock(server_wire_container_mock&&) = delete; + server_wire_container_mock& operator = (server_wire_container_mock const&) = delete; + server_wire_container_mock& operator = (server_wire_container_mock&&) = delete; + + ~server_wire_container_mock() { + boost::interprocess::shared_memory_object::remove(name_.c_str()); + } + + request_wire_container_mock& get_request_wire() { return request_wire_; } + response_wire_container_mock& get_response_wire() { return response_wire_; } + + void notify_shutdown() { + request_wire_.notify(); + response_wire_.notify_shutdown(); + } + + [[nodiscard]] bool terminate_requested() const { + return request_wire_.terminate_requested(); + } + +private: + std::string name_; + std::unique_ptr managed_shared_memory_{}; + request_wire_container_mock request_wire_{}; + response_wire_container_mock response_wire_{}; + tateyama::common::wire::status_provider* status_provider_{}; + std::mutex mtx_shm_{}; + + std::size_t datachannel_buffer_size_; +}; + +class connection_container +{ +public: + explicit connection_container(std::string_view name, std::size_t n) : name_(name) { + boost::interprocess::shared_memory_object::remove(name_.c_str()); + try { + boost::interprocess::permissions unrestricted_permissions; + unrestricted_permissions.set_unrestricted(); + + managed_shared_memory_ = + std::make_unique(boost::interprocess::create_only, name_.c_str(), request_queue_size(n), nullptr, unrestricted_permissions); + managed_shared_memory_->destroy(tateyama::common::wire::connection_queue::name); + connection_queue_ = managed_shared_memory_->construct(tateyama::common::wire::connection_queue::name)(n, managed_shared_memory_->get_segment_manager()); + } + catch(const boost::interprocess::interprocess_exception& ex) { + using namespace std::literals::string_view_literals; + + std::stringstream ss{}; + ss << "cannot create a database connection outlet named "sv << name << ", probably another server is running using the same database name"sv; + throw std::runtime_error(ss.str()); + } + } + + /** + * @brief Copy and move constructers are deleted. + */ + connection_container(connection_container const&) = delete; + connection_container(connection_container&&) = delete; + connection_container& operator = (connection_container const&) = delete; + connection_container& operator = (connection_container&&) = delete; + + ~connection_container() { + boost::interprocess::shared_memory_object::remove(name_.c_str()); + } + tateyama::common::wire::connection_queue& get_connection_queue() { + return *connection_queue_; + } + +private: + std::string name_; + std::unique_ptr managed_shared_memory_{}; + tateyama::common::wire::connection_queue* connection_queue_; + + static constexpr std::size_t initial_size = 720; // obtained by experiment + static constexpr std::size_t per_size = 112; // obtained by experiment + std::size_t request_queue_size(std::size_t n) { + std::size_t size = initial_size + (n * per_size); // exact size + size += initial_size / 2; // a little bit of leeway + return ((size / 4096) + 1) * 4096; // round up to the page size + } +}; + +}; // namespace tateyama::common::wire diff --git a/test/tateyama/test_utils/status_mock.h b/test/tateyama/test_utils/status_mock.h new file mode 100644 index 0000000..87864b5 --- /dev/null +++ b/test/tateyama/test_utils/status_mock.h @@ -0,0 +1,58 @@ +/* + * Copyright 2019-2024 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include + +#include + +namespace tateyama::test_utils { + +class status_mock { + static constexpr std::string_view file_prefix = "tsurugidb-"; + static constexpr std::size_t shm_size = 4096; +public: + status_mock(const std::string& name, const std::string& digest) { + status_file_name_ = file_prefix; + status_file_name_ += digest; + status_file_name_ += ".stat"; + try { + segment_ = std::make_unique(boost::interprocess::create_only, status_file_name_.c_str(), shm_size); + resource_status_memory_ = std::make_unique(*segment_); + resource_status_memory_->set_pid(); + resource_status_memory_->set_database_name(name); + } catch(const boost::interprocess::interprocess_exception& ex) { + std::stringstream ss{}; + ss << "could not create shared memory to inform tsurugidb status (cause; '" + << ex.what() + << "'), review the shared memory settings."; + throw std::runtime_error(ss.str()); + } + } + ~status_mock() { + boost::interprocess::shared_memory_object::remove(status_file_name_.c_str()); + } + +private: + std::string status_file_name_{}; + std::unique_ptr segment_{}; + std::unique_ptr resource_status_memory_{}; +}; + +} // namespace tateyama::testing