Skip to content

Commit

Permalink
Store a message's key in SQL with v3.0
Browse files Browse the repository at this point in the history
Signed-off-by: tempate <[email protected]>
  • Loading branch information
Tempate committed Jul 1, 2024
1 parent 8d371bc commit 8f4e4de
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include <memory>
#include <string>

#include <nlohmann/json.hpp>

#include <fastdds/rtps/common/SequenceNumber.h>
#include <fastdds/dds/xtypes/dynamic_types/DynamicType.hpp>

Expand Down Expand Up @@ -66,7 +68,7 @@ struct SqlMessage : public BaseMessage
* @param dynamic_type DynamicType of the message.
*/
void set_key(
fastdds::dds::DynamicType::_ref_type dynamic_type);
const fastdds::dds::DynamicType::_ref_type& dynamic_type);

// Writer GUID
ddspipe::core::types::Guid writer_guid;
Expand All @@ -79,6 +81,18 @@ struct SqlMessage : public BaseMessage

// String containing the JSON-serialized instance key
std::string key;

protected:

/**
* @brief Remove non-key values from the JSON.
*
* @param dynamic_type DynamicType of the message.
* @param key_json JSON object containing the key values.
*/
void remove_nonkey_values(
const fastdds::dds::DynamicType::_ref_type& dynamic_type,
nlohmann::json& key_json);
};

} /* namespace participants */
Expand Down
235 changes: 75 additions & 160 deletions ddsrecorder_participants/src/cpp/recorder/message/SqlMessage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,18 @@


#include <map>
#include <string>

#include <nlohmann/json.hpp>

#include <fastdds/dds/xtypes/dynamic_types/DynamicDataFactory.hpp>
#include <fastdds/dds/xtypes/dynamic_types/DynamicPubSubType.hpp>
#include <fastdds/dds/xtypes/dynamic_types/DynamicTypeMember.hpp>
#include <fastdds/dds/xtypes/dynamic_types/MemberDescriptor.hpp>
#include <fastdds/dds/xtypes/utils.hpp>

#include <cpp_utils/Log.hpp>

#include <ddsrecorder_participants/recorder/message/SqlMessage.hpp>


Expand All @@ -43,167 +52,73 @@ SqlMessage::SqlMessage(
}

void SqlMessage::set_key(
fastdds::dds::DynamicType::_ref_type dynamic_type)
const fastdds::dds::DynamicType::_ref_type& dynamic_type)
{
// Deserialize the payload
fastdds::dds::DynamicPubSubType pub_sub_type(dynamic_type);
auto dynamic_data = fastdds::dds::DynamicDataFactory::get_instance()->create_data(dynamic_type);

pub_sub_type.deserialize(&payload, &dynamic_data);

// Clear non-key values to free-up unnecessary space
dynamic_data->clear_nonkey_values();

// Serialize the key members into a JSON
const auto ret = fastdds::dds::json_serialize(
dynamic_data, key, fastdds::dds::DynamicDataJsonFormat::EPROSIMA);

if (ret != fastdds::dds::RETCODE_OK)
{
logWarning(SQL_MESSAGE, "Failed to serialize key members into JSON");
}

nlohmann::json key_json = nlohmann::json::parse(key);

// Remove non-key values
remove_nonkey_values(dynamic_type, key_json);

// Serialize the JSON back into a string
key = key_json.dump();
}

void SqlMessage::remove_nonkey_values(
const fastdds::dds::DynamicType::_ref_type& dynamic_type,
nlohmann::json& key_json)
{
// // Deserialize the payload
// fastdds::dds::xtypes::DynamicPubSubType pub_sub_type(dynamic_type);
// fastdds::dds::xtypes::DynamicData_ptr dynamic_data;
// dynamic_data = fastdds::dds::xtypes::DynamicDataFactory::get_instance()->create_data(dynamic_type);

// pub_sub_type.deserialize(&payload, dynamic_data.get());

// // Clear non-key values to free-up unnecessary space
// // dynamic_data->clear_nonkey_values();

// // Serialize the key members into a JSON
// nlohmann::json key_json;

// std::map<fastdds::dds::xtypes::MemberId, fastdds::dds::xtypes::DynamicTypeMember*> members_map;
// dynamic_type->get_all_members(members_map);

// for (const auto& member : members_map)
// {
// if (!member.second->key_annotation())
// {
// // The member is not a key
// continue;
// }

// const auto descriptor = member.second->get_descriptor();

// if (descriptor == nullptr)
// {
// // The member has no descriptor
// continue;
// }

// switch (descriptor->get_kind())
// {
// case fastdds::dds::xtypes::TK_BOOLEAN:
// {
// key_json[descriptor->get_name()] = dynamic_data->get_bool_value(descriptor->get_id());
// break;
// }
// case fastdds::dds::xtypes::TK_BYTE:
// {
// key_json[descriptor->get_name()] = dynamic_data->get_byte_value(descriptor->get_id());
// break;
// }
// case fastdds::dds::xtypes::TK_INT16:
// {
// key_json[descriptor->get_name()] = dynamic_data->get_int16_value(descriptor->get_id());
// break;
// }
// case fastdds::dds::xtypes::TK_INT32:
// {
// key_json[descriptor->get_name()] = dynamic_data->get_int32_value(descriptor->get_id());
// break;
// }
// case fastdds::dds::xtypes::TK_INT64:
// {
// key_json[descriptor->get_name()] = dynamic_data->get_int64_value(descriptor->get_id());
// break;
// }
// case fastdds::dds::xtypes::TK_UINT16:
// {
// key_json[descriptor->get_name()] = dynamic_data->get_uint16_value(descriptor->get_id());
// break;
// }
// case fastdds::dds::xtypes::TK_UINT32:
// {
// key_json[descriptor->get_name()] = dynamic_data->get_uint32_value(descriptor->get_id());
// break;
// }
// case fastdds::dds::xtypes::TK_UINT64:
// {
// key_json[descriptor->get_name()] = dynamic_data->get_uint64_value(descriptor->get_id());
// break;
// }
// case fastdds::dds::xtypes::TK_FLOAT32:
// {
// key_json[descriptor->get_name()] = dynamic_data->get_float32_value(descriptor->get_id());
// break;
// }
// case fastdds::dds::xtypes::TK_FLOAT64:
// {
// key_json[descriptor->get_name()] = dynamic_data->get_float64_value(descriptor->get_id());
// break;
// }
// case fastdds::dds::xtypes::TK_FLOAT128:
// {
// key_json[descriptor->get_name()] = dynamic_data->get_float128_value(descriptor->get_id());
// break;
// }
// case fastdds::dds::xtypes::TK_CHAR8:
// {
// key_json[descriptor->get_name()] = dynamic_data->get_char8_value(descriptor->get_id());
// break;
// }
// case fastdds::dds::xtypes::TK_CHAR16:
// {
// key_json[descriptor->get_name()] = dynamic_data->get_char16_value(descriptor->get_id());
// break;
// }
// case fastdds::dds::xtypes::TK_STRING8:
// {
// key_json[descriptor->get_name()] = dynamic_data->get_string_value(descriptor->get_id());
// break;
// }
// case fastdds::dds::xtypes::TK_STRING16:
// {
// key_json[descriptor->get_name()] = dynamic_data->get_wstring_value(descriptor->get_id());
// break;
// }
// case fastdds::dds::xtypes::TK_ALIAS:
// {
// // TODO
// break;
// }
// case fastdds::dds::xtypes::TK_ENUM:
// {
// key_json[descriptor->get_name()] = dynamic_data->get_enum_value(descriptor->get_id());
// break;
// }
// case fastdds::dds::xtypes::TK_BITMASK:
// {
// // TODO
// break;
// }
// case fastdds::dds::xtypes::TK_ANNOTATION:
// {
// // TODO
// break;
// }
// case fastdds::dds::xtypes::TK_STRUCTURE:
// {
// // TODO
// break;
// }
// case fastdds::dds::xtypes::TK_UNION:
// {
// // TODO
// break;
// }
// case fastdds::dds::xtypes::TK_BITSET:
// {
// // TODO
// break;
// }
// case fastdds::dds::xtypes::TK_SEQUENCE:
// {
// // TODO
// break;
// }
// case fastdds::dds::xtypes::TK_ARRAY:
// {
// // TODO
// break;
// }
// }
// }

// // Dump the JSON into a string
// key = key_json.dump();
fastdds::dds::DynamicTypeMembersById members_by_id;

if (dynamic_type->get_all_members(members_by_id) != fastdds::dds::RETCODE_OK)
{
logWarning(DDSRECORDER_DYNTYPES_KEY, "Failed to get all members");
return;
}

for (const auto& member_by_id : members_by_id)
{
const auto member = member_by_id.second;

fastdds::dds::MemberDescriptor::_ref_type member_descriptor{
fastdds::dds::traits<fastdds::dds::MemberDescriptor>::make_shared()};

if (member->get_descriptor(member_descriptor) != fastdds::dds::RETCODE_OK)
{
logWarning(DDSRECORDER_DYNTYPES_KEY, "Failed to get member descriptor");
continue;
}

const auto member_name = static_cast<std::string>(member_descriptor->name());

if (member_descriptor->is_key())
{
// Recursively remove non-key values from nested types
remove_nonkey_values(member_descriptor->type(), key_json[member_name]);
}
else
{
// Remove non-key value
key_json.erase(member_name);
}
}
}

} /* namespace participants */
Expand Down

0 comments on commit 8f4e4de

Please sign in to comment.