Skip to content

Commit

Permalink
Fix DynTypesParticipant::on_type_discovery_ callback to register type…
Browse files Browse the repository at this point in the history
…_name-type_id tuple and some other minor changes

Signed-off-by: Lucia Echevarria <[email protected]>
  • Loading branch information
LuciaEchevarria99 committed May 17, 2024
1 parent 31ea252 commit e08275f
Show file tree
Hide file tree
Showing 12 changed files with 92 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
#include <string>
#include <thread>

#include <fastrtps/utils/DBQueue.h>
#include <cpp_utils/ReturnCode.hpp>
#include <cpp_utils/queue/DBQueue.hpp>

#include <ddspipe_core/types/dds/Endpoint.hpp>
#include <ddspipe_core/types/dds/Guid.hpp>
#include <cpp_utils/ReturnCode.hpp>
#include <ddspipe_core/types/topic/dds/DistributedTopic.hpp>

namespace eprosima {
Expand Down Expand Up @@ -263,7 +263,7 @@ class DiscoveryDatabase
mutable std::mutex callbacks_mutex_;

//! Queue storing database operations to be performed in a dedicated thread
fastrtps::DBQueue<std::tuple<DatabaseOperation, types::Endpoint>> entities_to_process_;
utils::event::DBQueue<std::tuple<DatabaseOperation, types::Endpoint>> entities_to_process_;

//! Handle of thread dedicated to performing database operations
std::thread queue_processing_thread_;
Expand Down
2 changes: 1 addition & 1 deletion ddspipe_core/include/ddspipe_core/interface/IReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

#include <functional>

#include <fastrtps/utils/TimedMutex.hpp>
#include <fastdds/utils/TimedMutex.hpp>

#include <cpp_utils/ReturnCode.hpp>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@

#pragma once

#include <tuple>

#include <fastcdr/cdr/fixed_size_string.hpp>

#include <fastdds/dds/xtypes/dynamic_types/DynamicType.hpp>
#include <fastdds/dds/xtypes/type_representation/TypeObject.hpp>

Expand All @@ -38,6 +42,8 @@ struct DynamicTypeData : public core::IRoutingData

fastdds::dds::DynamicType::_ref_type dynamic_type{nullptr};

std::tuple<fastcdr::string_255, fastdds::dds::xtypes::TypeIdentifier> type_ids_tuple{};

// fastdds::dds::xtypes::TypeInformation type_information{};
};

Expand Down
12 changes: 6 additions & 6 deletions ddspipe_core/src/cpp/dynamic/DiscoveryDatabase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ void DiscoveryDatabase::queue_processing_thread_routine_() noexcept
lock,
[&]
{
return !entities_to_process_.BothEmpty() || exit_.load();
return !entities_to_process_.both_empty() || exit_.load();
});

if (exit_.load())
Expand All @@ -326,17 +326,17 @@ void DiscoveryDatabase::push_item_to_queue_(
{
{
std::lock_guard<std::mutex> lock(entities_to_process_cv_mutex_);
entities_to_process_.Push(item);
entities_to_process_.push(item);
}
entities_to_process_cv_.notify_one();
}

void DiscoveryDatabase::process_queue_() noexcept
{
entities_to_process_.Swap();
while (!entities_to_process_.Empty())
entities_to_process_.swap();
while (!entities_to_process_.empty())
{
std::tuple<DatabaseOperation, Endpoint> queue_item = entities_to_process_.Front();
std::tuple<DatabaseOperation, Endpoint> queue_item = entities_to_process_.front();
DatabaseOperation db_operation = std::get<0>(queue_item);
Endpoint entity = std::get<1>(queue_item);
try
Expand All @@ -359,7 +359,7 @@ void DiscoveryDatabase::process_queue_() noexcept
logDevError(DDSPIPE_DISCOVERY_DATABASE,
"Error processing database operations queue:" << e.what() << ".");
}
entities_to_process_.Pop();
entities_to_process_.pop();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,11 @@ class DynTypesParticipant : public rtps::SimpleParticipant, public eprosima::fas
void on_type_discovery_(
fastdds::dds::DomainParticipant* participant,
const fastdds::dds::xtypes::TypeInformation& type_info,
const std::string& type_name);
const fastcdr::string_255& type_name);

void internal_notify_type_object_(
fastdds::dds::DynamicType::_ref_type dynamic_type);
fastdds::dds::DynamicType::_ref_type dynamic_type,
const std::tuple<fastcdr::string_255, fastdds::dds::xtypes::TypeIdentifier>& type_name_and_id);

void initialize_internal_dds_participant_();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

#pragma once

#include <tuple>

#include <fastcdr/cdr/fixed_size_string.hpp>

#include <fastdds/dds/xtypes/dynamic_types/DynamicType.hpp>

#include <ddspipe_core/types/data/RtpsPayloadData.hpp>
Expand All @@ -35,7 +39,8 @@ class ISchemaHandler
public:

virtual void add_schema(
const fastdds::dds::DynamicType::_ref_type& dynamic_type) = 0;
const fastdds::dds::DynamicType::_ref_type& dynamic_type,
const std::tuple<fastcdr::string_255, fastdds::dds::xtypes::TypeIdentifier>& type_ids_tuple) = 0;

virtual void add_data(
const core::types::DdsTopic& topic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@

#include <fastdds/rtps/rtps_fwd.h>
#include <fastrtps/rtps/attributes/HistoryAttributes.h>
#include <fastrtps/attributes/TopicAttributes.h>
#include <fastdds/rtps/attributes/TopicAttributes.h>
#include <fastrtps/qos/ReaderQos.h>
#include <fastrtps/rtps/history/ReaderHistory.h>
#include <fastrtps/rtps/attributes/ReaderAttributes.h>
#include <fastrtps/rtps/reader/RTPSReader.h>
#include <fastrtps/rtps/reader/ReaderListener.h>
#include <fastrtps/utils/TimedMutex.hpp>
#include <fastdds/utils/TimedMutex.hpp>

#include <ddspipe_core/types/dds/Guid.hpp>
#include <ddspipe_core/types/topic/dds/DdsTopic.hpp>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

#include <fastdds/rtps/rtps_fwd.h>
#include <fastrtps/rtps/attributes/HistoryAttributes.h>
#include <fastrtps/attributes/TopicAttributes.h>
#include <fastdds/rtps/attributes/TopicAttributes.h>
#include <fastrtps/qos/WriterQos.h>
#include <fastrtps/rtps/history/WriterHistory.h>
#include <fastrtps/rtps/attributes/WriterAttributes.h>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

#include <fastdds/rtps/rtps_fwd.h>
#include <fastrtps/rtps/attributes/HistoryAttributes.h>
#include <fastrtps/attributes/TopicAttributes.h>
#include <fastdds/rtps/attributes/TopicAttributes.h>
#include <fastrtps/qos/WriterQos.h>
#include <fastrtps/rtps/history/WriterHistory.h>
#include <fastrtps/rtps/attributes/WriterAttributes.h>
Expand Down
21 changes: 12 additions & 9 deletions ddspipe_participants/src/cpp/participant/dds/XmlParticipant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
#include <memory>

#include <fastdds/dds/core/ReturnCode.hpp>
#include <fastdds/dds/domain/DomainParticipant.hpp>
#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastdds/dds/domain/qos/DomainParticipantExtendedQos.hpp>
// #include <fastrtps/xmlparser/XMLProfileManager.h>

#include <cpp_utils/Log.hpp>
Expand All @@ -39,15 +41,16 @@ XmlParticipant::XmlParticipant(
: CommonParticipant(participant_configuration, payload_pool, discovery_database)
, xml_specific_configuration_(*reinterpret_cast<XmlParticipantConfiguration*>(configuration_.get()))
{
// // Replace the configuration's domain with the XML's domainId
// eprosima::fastrtps::ParticipantAttributes attr;

// if (xml_specific_configuration_.participant_profile.is_set() &&
// XMLProfileManager::fillParticipantAttributes(xml_specific_configuration_.participant_profile
// .get_value(), attr) == XMLP_ret::XML_OK)
// {
// configuration_->domain = attr.domainId;
// }
if (xml_specific_configuration_.participant_profile.is_set())
{
fastdds::dds::DomainParticipantExtendedQos extended_qos;
if (fastdds::dds::RETCODE_OK == fastdds::dds::DomainParticipantFactory::get_instance()->get_participant_extended_qos_from_profile(
xml_specific_configuration_.participant_profile.get_value(),
extended_qos))
{
configuration_->domain = extended_qos.domainId();
}
}
}

std::shared_ptr<core::IWriter> XmlParticipant::create_writer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@

#include <memory>

#include <tuple>
#include <unordered_set>

#include <cpp_utils/Log.hpp>

#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastdds/dds/topic/TypeSupport.hpp>
#include <fastdds/dds/xtypes/dynamic_types/DynamicType.hpp>
#include <fastdds/dds/xtypes/dynamic_types/DynamicTypeBuilder.hpp>
#include <fastdds/dds/xtypes/dynamic_types/DynamicTypeBuilderFactory.hpp>
Expand Down Expand Up @@ -102,7 +106,8 @@ void DynTypesParticipant::on_data_reader_discovery(
{
// Get type information
const auto type_info = info.info.type_information().type_information;
on_type_discovery_(participant, type_info, info.info.typeName().to_string());
const auto type_name = info.info.typeName();
on_type_discovery_(participant, type_info, type_name);
}

void DynTypesParticipant::on_data_writer_discovery(
Expand All @@ -112,59 +117,74 @@ void DynTypesParticipant::on_data_writer_discovery(
{
// Get type information
const auto type_info = info.info.type_information().type_information;
on_type_discovery_(participant, type_info, info.info.typeName().to_string());
const auto type_name = info.info.typeName();
on_type_discovery_(participant, type_info, type_name);
}

void DynTypesParticipant::on_type_discovery_(
fastdds::dds::DomainParticipant* participant,
const fastdds::dds::xtypes::TypeInformation& type_info,
const std::string& type_name)
const fastcdr::string_255& type_name)
{
// Get type information
auto type_identifier = type_info.complete().typeid_with_size().type_id();
fastdds::dds::xtypes::TypeObject dyn_type_object;
if (fastdds::dds::RETCODE_OK != fastdds::dds::DomainParticipantFactory::get_instance()->type_object_registry().get_type_object(
type_info.complete().typeid_with_size().type_id(),
type_identifier,
dyn_type_object))
{
// Error
return;
}

fastdds::dds::xtypes::TypeIdentifierSeq type_id_seq;
std::unordered_set<fastdds::dds::xtypes::TypeIdentfierWithSize> type_dependencies;
if (fastdds::dds::RETCODE_OK != fastdds::dds::DomainParticipantFactory::get_instance()->type_object_registry().get_type_dependencies(
type_id_seq,
type_dependencies))
{
return;
}

// // Napa: reregister as local
// if (fastdds::dds::RETCODE_OK != fastdds::dds::DomainParticipantFactory::get_instance()->type_object_registry().register_type_object(
// type_name,
// dyn_type_object.complete()))
// {
// // Error
// return;
// }

// Register discovered type
// TODO
fastdds::dds::DynamicType::_ref_type dyn_type = fastdds::dds::DynamicTypeBuilderFactory::get_instance()->create_type_w_type_object(
dyn_type_object)->build();

// Request type object through TypeLookup if not present in factory, or if type building failed
dyn_type_object)->build();
if (!dyn_type)
{
//TODO: Return logWarning
}
else
{
internal_notify_type_object_(dyn_type);
fastdds::dds::TypeSupport dyn_type_support(new fastdds::dds::DynamicPubSubType(dyn_type));
dyn_type_support.register_type(participant);

// Save type_identifier and its associated tyme_name. NOTE: We assume each type_name corresponds to only one type_identifier
auto type_name_and_id = std::make_tuple(type_name, type_identifier);
internal_notify_type_object_(dyn_type, type_name_and_id);
}

// // Register its dependencies
// for (const auto& type_id : type_dependencies)
// {
// // Get type object
// fastdds::dds::xtypes::TypeObject dyn_type_object;
// if (fastdds::dds::RETCODE_OK != fastdds::dds::DomainParticipantFactory::get_instance()->type_object_registry().get_type_object(
// type_id.type_id(),
// dyn_type_object))
// {
// // Error
// return;
// }

// // Register discovered type
// fastdds::dds::DynamicType::_ref_type dyn_type = fastdds::dds::DynamicTypeBuilderFactory::get_instance()->create_type_w_type_object(
// dyn_type_object)->build();

// // Request type object through TypeLookup if not present in factory, or if type building failed
// if (!dyn_type)
// {
// //TODO: Return logWarning
// }
// else
// {
// internal_notify_type_object_(dyn_type);
// }
// }
}

void DynTypesParticipant::internal_notify_type_object_(
fastdds::dds::DynamicType::_ref_type dynamic_type)
fastdds::dds::DynamicType::_ref_type dynamic_type,
const std::tuple<fastcdr::string_255, fastdds::dds::xtypes::TypeIdentifier>& type_name_and_id)
{
logInfo(DDSPIPE_DYNTYPES_PARTICIPANT,
"Participant " << this->id() << " discovered type object " << dynamic_type->get_name());
Expand All @@ -174,7 +194,7 @@ void DynTypesParticipant::internal_notify_type_object_(
// Create data containing Dynamic Type
auto data = std::make_unique<DynamicTypeData>();
data->dynamic_type = dynamic_type; // TODO: add constructor with param
// data->type_information = type_information;
data->type_ids_tuple = type_name_and_id;

// Insert new data in internal reader queue
type_object_reader_->simulate_data_reception(std::move(data));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ utils::ReturnCode TypeObjectWriter::write_nts_(
// Add schema
try
{
schema_handler_->add_schema(dynamic_type_data.dynamic_type);
schema_handler_->add_schema(dynamic_type_data.dynamic_type, dynamic_type_data.type_ids_tuple);
}
catch (const utils::Exception& e)
{
Expand Down

0 comments on commit e08275f

Please sign in to comment.