Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 162 additions & 24 deletions src/DiscoveryServerManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,43 @@ DiscoveryServerManager::DiscoveryServerManager(
tinyxml2::XMLElement* profiles = child->FirstChildElement(DSxmlparser::PROFILES);
if (profiles != nullptr)
{
<<<<<<< HEAD
loadProfiles(profiles);
=======
tinyxml2::XMLPrinter printer;
profiles->Accept(&printer);
std::string xmlString = R"(")" + std::string(printer.CStr()) + R"(")";
if (RETCODE_OK == DomainParticipantFactory::get_instance()->load_XML_profiles_string(xmlString.c_str(),
std::string(printer.CStr()).length()))
{
LOG_INFO("Profiles parsed successfully.");
}
else
{
LOG_ERROR("Error parsing profiles!");
}

// Store Topic Profiles
tinyxml2::XMLElement* topic_profile = profiles->FirstChildElement(s_sTopic.c_str());
while (nullptr != topic_profile)
{
const char* topic_profile_name = topic_profile->Attribute(DSxmlparser::PROFILE_NAME);
if (topic_profile_name != nullptr)
{
auto topic_desc_it = topic_description_profiles_map.find(topic_profile_name);
if (topic_desc_it == topic_description_profiles_map.end())
{
auto& topic_desc = topic_description_profiles_map[topic_profile_name];
if (!fill_topic_description_profile(topic_profile, topic_desc))
{
LOG_ERROR("Error parsing topic profiles");
break;
}
}
}
topic_profile = topic_profile->NextSiblingElement(s_sTopic.c_str());
}
>>>>>>> cb2addc (Fix warning on ubuntu noble (#133))
}

// Server processing requires a two pass analysis
Expand Down Expand Up @@ -698,18 +734,73 @@ Topic* DiscoveryServerManager::getParticipantTopicByName(
return returnTopic;
}

<<<<<<< HEAD
void DiscoveryServerManager::loadProfiles(
tinyxml2::XMLElement* profiles)
{
xmlparser::XMLP_ret ret = xmlparser::XMLProfileManager::loadXMLProfiles(*profiles);
=======
bool DiscoveryServerManager::fill_topic_description_profile(
tinyxml2::XMLElement* elem,
TopicDescriptionItem& topic_description)
{
/*
<xs:complexType name="topicDescriptionType">
<xs:all minOccurs="0">
<xs:element name="name" type="stringType" minOccurs="0"/>
<xs:element name="dataType" type="stringType" minOccurs="0"/>
</xs:all>
</xs:complexType>
*/
>>>>>>> cb2addc (Fix warning on ubuntu noble (#133))

if (ret == xmlparser::XMLP_ret::XML_OK)
{
<<<<<<< HEAD
LOG_INFO("Profiles parsed successfully.");
}
else
{
LOG_ERROR("Error parsing profiles!");
=======
name = p_aux0->Name();

if (strcmp(name, DSxmlparser::NAME) == 0)
{
// name - stringType
const char* text = p_aux0->GetText();
if (nullptr != text)
{
topic_description.name = text;
}
else
{
LOG_ERROR("<" << p_aux0->Value() << "> GetText XML_ERROR");
ret = false;
break;
}
}
else if (strcmp(name, DSxmlparser::DATA_TYPE) == 0)
{
// dataType - stringType
const char* text = p_aux0->GetText();
if (nullptr != text)
{
topic_description.type_name = text;
}
else
{
LOG_ERROR("<" << p_aux0->Value() << "> GetText XML_ERROR");
ret = false;
break;
}
}
else
{
LOG_ERROR("Invalid element found into 'topicDescriptionType'. Name: " << name);
ret = false;
}
>>>>>>> cb2addc (Fix warning on ubuntu noble (#133))
}
}

Expand Down Expand Up @@ -875,9 +966,7 @@ void DiscoveryServerManager::loadServer(
GuidPrefix_t& prefix = dpQOS.wire_protocol().prefix;
const char* cprefix = server->Attribute(DSxmlparser::PREFIX);

if (cprefix != nullptr &&
!(std::istringstream(cprefix) >> prefix) &&
(prefix == c_GuidPrefix_Unknown))
if (cprefix != nullptr && !(std::istringstream(cprefix) >> prefix) && (prefix == c_GuidPrefix_Unknown))
{
LOG_ERROR("Servers cannot have a framework provided prefix"); // at least for now
return;
Expand All @@ -886,8 +975,7 @@ void DiscoveryServerManager::loadServer(
GUID_t guid(prefix, c_EntityId_RTPSParticipant);

// Check if the guidPrefix is already in use (there is a mistake on config file)
if (enable_prefix_validation &&
servers.find(guid) != servers.end())
if (enable_prefix_validation && servers.find(guid) != servers.end())
{
LOG_ERROR("DiscoveryServerManager detected two servers sharing the same prefix " << prefix);
return;
Expand All @@ -896,8 +984,7 @@ void DiscoveryServerManager::loadServer(
// replace the DomainParticipantQOS builtin lists with the ones from server_locators (if present)
// note that a previous call to DiscoveryServerManager::MapServerInfo
serverLocator_map::mapped_type& lists = server_locators[guid];
if (!lists.first.empty() ||
!lists.second.empty())
if (!lists.first.empty() || !lists.second.empty())
{
// server elements take precedence over profile ones
// I copy them because other servers may need this values
Expand Down Expand Up @@ -954,7 +1041,12 @@ void DiscoveryServerManager::loadServer(
// We define the PDP as external (when moved to fast library it would be SERVER)
DiscoverySettings& b = dpQOS.wire_protocol().builtin.discovery_config;
(void)b;
<<<<<<< HEAD
assert(b.discoveryProtocol == SERVER || b.discoveryProtocol == BACKUP);
=======
assert(b.discoveryProtocol == eprosima::fastdds::rtps::DiscoveryProtocol::SERVER ||
b.discoveryProtocol == eprosima::fastdds::rtps::DiscoveryProtocol::BACKUP);
>>>>>>> cb2addc (Fix warning on ubuntu noble (#133))

// Create the participant or the associated events
DelayedParticipantCreation event(creation_time, std::move(dpQOS), &DiscoveryServerManager::addServer);
Expand Down Expand Up @@ -1043,8 +1135,7 @@ void DiscoveryServerManager::loadClient(
if (atts.rtps.builtin.discovery_config.discoveryProtocol != DiscoveryProtocol_t::CLIENT)
#endif // if FASTRTPS_VERSION_MAJOR >= 2
{
LOG_ERROR(
"DiscoveryServerManager::loadClient try to create a client with an incompatible profile: " <<
LOG_ERROR("DiscoveryServerManager::loadClient try to create a client with an incompatible profile: " <<
profile_name);
return;
}
Expand Down Expand Up @@ -1156,7 +1247,7 @@ void DiscoveryServerManager::loadClient(
if (!p4)
{
// try to find a descriptor matching the listener port setup
if (p4 = std::dynamic_pointer_cast<TCPv4TransportDescriptor>(sp))
if ((p4 = std::dynamic_pointer_cast<TCPv4TransportDescriptor>(sp)))
{
continue;
}
Expand Down Expand Up @@ -1316,8 +1407,8 @@ void DiscoveryServerManager::loadSimple(
if (dpQOS.wire_protocol().builtin.discovery_config.discoveryProtocol != DiscoveryProtocol_t::SIMPLE)
{
LOG_ERROR(
"DiscoveryServerManager::loadSimple try to create a simple participant with an incompatible profile: " <<
profile_name);
"DiscoveryServerManager::loadSimple try to create a simple participant with an incompatible profile: "
<< profile_name);
return;
}

Expand Down Expand Up @@ -1482,9 +1573,14 @@ void DiscoveryServerManager::loadSubscriber(
endpoint_profile = std::string(profile_name);
}

<<<<<<< HEAD
DelayedEndpointCreation<DataReader> event(creation_time, subatts.topic.getTopicName().to_string(),
subatts.topic.getTopicDataType().to_string(), topic_name, endpoint_profile, part_guid, pDE,
participant_creation_event);
=======
DelayedEndpointCreation<DataReader> event(creation_time, topic_description.name, topic_description.type_name,
topic_name, endpoint_profile, part_guid, pDE, participant_creation_event);
>>>>>>> cb2addc (Fix warning on ubuntu noble (#133))

if (creation_time == getTime())
{
Expand Down Expand Up @@ -1604,9 +1700,14 @@ void DiscoveryServerManager::loadPublisher(
endpoint_profile = std::string(profile_name);
}

<<<<<<< HEAD
DelayedEndpointCreation<DataWriter> event(creation_time, pubatts.topic.getTopicName().to_string(),
pubatts.topic.getTopicDataType().to_string(), topic_name, endpoint_profile, part_guid, pDE,
participant_creation_event);
=======
DelayedEndpointCreation<DataWriter> event(creation_time, topic_description.name, topic_description.type_name,
topic_name, endpoint_profile, part_guid, pDE, participant_creation_event);
>>>>>>> cb2addc (Fix warning on ubuntu noble (#133))

if (creation_time == getTime())
{
Expand Down Expand Up @@ -1713,7 +1814,20 @@ void DiscoveryServerManager::MapServerInfo(
// server GuidPrefix is either pass as an attribute (preferred to allow profile reuse)
// or inside the profile.
GuidPrefix_t prefix;
<<<<<<< HEAD
std::shared_ptr<ParticipantAttributes> atts;
=======

std::shared_ptr<DomainParticipantQos> pqos;
// I must load the prefix from the profile
// retrieve profile QOS
pqos = std::make_shared<DomainParticipantQos>();
if (RETCODE_OK != DomainParticipantFactory::get_instance()->get_participant_qos_from_profile(profile_name, *pqos))
{
LOG_ERROR("DiscoveryServerManager::loadServer couldn't load profile " << profile_name);
return;
}
>>>>>>> cb2addc (Fix warning on ubuntu noble (#133))

const char* cprefix = server->Attribute(DSxmlparser::PREFIX);

Expand Down Expand Up @@ -1803,8 +1917,13 @@ void DiscoveryServerManager::on_participant_discovery(
return;
}

<<<<<<< HEAD
LOG_INFO("Participant " << participant->get_qos().name().to_string() << " reports a participant "
<< info.info.m_participantName << " is " << info.status << ". Prefix " << partid);
=======
LOG_INFO("Participant " << participant->get_qos().name().to_string() << " reports a participant " <<
info.participant_name << " is " << status << ". Prefix " << partid);
>>>>>>> cb2addc (Fix warning on ubuntu noble (#133))

std::chrono::steady_clock::time_point callback_time = std::chrono::steady_clock::now();
{
Expand All @@ -1827,8 +1946,12 @@ void DiscoveryServerManager::on_participant_discovery(
{
case ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT:
{
<<<<<<< HEAD
state.AddParticipant(srcGuid, partid, info.info.m_participantName.to_string(), callback_time,
server);
=======
state.AddParticipant(srcGuid, srcName, partid, info.participant_name.to_string(), callback_time, server);
>>>>>>> cb2addc (Fix warning on ubuntu noble (#133))
break;
}
case ParticipantDiscoveryInfo::REMOVED_PARTICIPANT:
Expand Down Expand Up @@ -1874,8 +1997,7 @@ void DiscoveryServerManager::on_subscriber_discovery(
if (!no_callbacks)
{
// is one of ours?
if ((it = servers.find(partid)) != servers.end() ||
(it = clients.find(partid)) != clients.end() ||
if ((it = servers.find(partid)) != servers.end() || (it = clients.find(partid)) != clients.end() ||
(it = simples.find(partid)) != simples.end())
{
part_name = it->second->get_qos().name().to_string();
Expand Down Expand Up @@ -1915,9 +2037,15 @@ void DiscoveryServerManager::on_subscriber_discovery(
break;
}

<<<<<<< HEAD
LOG_INFO("Participant " << participant->get_qos().name().to_string() << " reports a subscriber of participant "
<< part_name << " is " << info.status << " with typename: " << info.info.typeName()
<< " topic: " << info.info.topicName() << " GUID: " << subsid);
=======
LOG_INFO("Participant " << participant->get_qos().name().to_string() << " reports a subscriber of participant " <<
part_name << " is " << reason << " with typename: " << info.type_name << " topic: " << info.topic_name <<
" GUID: " << subsid);
>>>>>>> cb2addc (Fix warning on ubuntu noble (#133))
}

void DiscoveryServerManager::on_publisher_discovery(
Expand Down Expand Up @@ -1952,8 +2080,7 @@ void DiscoveryServerManager::on_publisher_discovery(
// is one of ours?
participant_map::iterator it;

if ((it = servers.find(partid)) != servers.end() ||
(it = clients.find(partid)) != clients.end() ||
if ((it = servers.find(partid)) != servers.end() || (it = clients.find(partid)) != clients.end() ||
(it = simples.find(partid)) != simples.end())
{
part_name = it->second->get_qos().name().to_string();
Expand Down Expand Up @@ -1984,12 +2111,17 @@ void DiscoveryServerManager::on_publisher_discovery(
{
case DS::DISCOVERED_WRITER:

<<<<<<< HEAD
state.AddDataWriter(srcGuid,
partid,
pubsid,
info.info.typeName().to_string(),
info.info.topicName().to_string(),
callback_time);
=======
state.AddDataWriter(srcGuid, srcName, partid, pubsid, info.type_name.to_string(),
info.topic_name.to_string(), callback_time);
>>>>>>> cb2addc (Fix warning on ubuntu noble (#133))
break;
case DS::REMOVED_WRITER:

Expand All @@ -1999,9 +2131,15 @@ void DiscoveryServerManager::on_publisher_discovery(
break;
}

<<<<<<< HEAD
LOG_INFO("Participant " << participant->get_qos().name().to_string() << " reports a publisher of participant "
<< part_name << " is " << info.status << " with typename: " << info.info.typeName()
<< " topic: " << info.info.topicName() << " GUID: " << pubsid);
=======
LOG_INFO("Participant " << participant->get_qos().name().to_string() << " reports a publisher of participant " <<
part_name << " is " << reason << " with typename: " << info.type_name << " topic: " << info.topic_name <<
" GUID: " << pubsid);
>>>>>>> cb2addc (Fix warning on ubuntu noble (#133))
}

void DiscoveryServerManager::on_liveliness_changed(
Expand Down Expand Up @@ -2108,15 +2246,15 @@ Snapshot& DiscoveryServerManager::takeSnapshot(
temp.insert(clients.begin(), clients.end());
temp.insert(simples.begin(), simples.end());

std::function<bool(const participant_map::value_type&, const Snapshot::value_type&)> pred(
[](const participant_map::value_type& p1, const Snapshot::value_type& p2)
{
return p1.first == p2.endpoint_guid;
}
);
std::function<bool(const participant_map::value_type&,
const Snapshot::value_type&)> pred([](const participant_map::value_type& p1, const Snapshot::value_type& p2)
{
return p1.first == p2.endpoint_guid;
}
);

std::pair<participant_map::const_iterator, Snapshot::const_iterator> res =
std::mismatch(temp.cbegin(), temp.cend(), shot.cbegin(), shot.cend(), pred);
std::pair<participant_map::const_iterator, Snapshot::const_iterator> res = std::mismatch(temp.cbegin(), temp.cend(),
shot.cbegin(), shot.cend(), pred);

while (res.first != temp.end())
{
Expand Down