Skip to content

Commit

Permalink
Fix remote subscribe (eclipse-uprotocol#8)
Browse files Browse the repository at this point in the history
* Fix UUID and message priority (RPC must at least use CS4)
* Changed other rpc calls also to CS4 and a correct UUID
* Return connection state if a subscriber is already there and handle same client connecting twice
* Fix new remote subscriptions always returning SUBSCRIBE_PENDING, regardless of actual state
  • Loading branch information
jjj-vtm authored Aug 27, 2024
1 parent 886846a commit 905fddb
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 58 deletions.
Empty file modified tools/fmt_clippy_doc.sh
100644 → 100755
Empty file.
49 changes: 15 additions & 34 deletions up-subscription/src/subscription_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use up_rust::{
RESOURCE_ID_SUBSCRIBE, RESOURCE_ID_UNSUBSCRIBE, USUBSCRIPTION_TYPE_ID,
USUBSCRIPTION_VERSION_MAJOR,
},
UCode, UPriority, UStatus, UUri, UUID,
UCode, UPriority, UStatus, UUri,
};

use crate::{helpers, usubscription::UP_REMOTE_TTL};
Expand Down Expand Up @@ -148,22 +148,21 @@ pub(crate) async fn handle_message(
respond_to,
} => {
// Add new subscriber to topic subscription tracker (create new entries as necessary)
topic_subscribers
let is_new = topic_subscribers
.entry(topic.clone())
.or_default()
.insert(subscriber);

// This really should unwrap() ok, as we just inserted an entry above
let subscribers_count =
topic_subscribers.get(&topic).map(|e| e.len()).unwrap_or(0);

let mut state = TopicState::SUBSCRIBED; // everything in topic_subscribers is considered SUBSCRIBED by default

if topic.is_remote_authority(&own_uri) {
state = TopicState::SUBSCRIBE_PENDING; // for remote_topics, we explicitly track state due to the _PENDING scenarios
remote_topics.entry(topic.clone()).or_insert(state);
// for remote_topics, we explicitly track state due to the _PENDING scenarios
state = *remote_topics
.get(&topic)
.unwrap_or(&TopicState::SUBSCRIBE_PENDING);

if subscribers_count == 1 {
remote_topics.entry(topic.clone()).or_insert(state);
if is_new {
// this is the first subscriber to this (remote) topic, so perform remote subscription
let own_uri_clone = own_uri.clone();
let up_client_clone = up_client.clone();
Expand Down Expand Up @@ -454,12 +453,7 @@ async fn remote_subscribe(
let subscription_response: SubscriptionResponse = up_client
.invoke_proto_method(
make_remote_subscribe_uuri(&subscription_request.topic),
CallOptions::for_rpc_request(
UP_REMOTE_TTL,
Some(UUID::new()),
None,
Some(UPriority::UPRIORITY_CS2),
),
CallOptions::for_rpc_request(UP_REMOTE_TTL, None, None, Some(UPriority::UPRIORITY_CS4)),
subscription_request,
)
.await
Expand Down Expand Up @@ -507,12 +501,7 @@ async fn remote_unsubscribe(
let unsubscribe_response: UStatus = up_client
.invoke_proto_method(
make_remote_unsubscribe_uuri(&unsubscribe_request.topic),
CallOptions::for_rpc_request(
UP_REMOTE_TTL,
Some(UUID::new()),
None,
Some(UPriority::UPRIORITY_CS2),
),
CallOptions::for_rpc_request(UP_REMOTE_TTL, None, None, Some(UPriority::UPRIORITY_CS4)),
unsubscribe_request,
)
.await
Expand Down Expand Up @@ -612,13 +601,8 @@ mod tests {
let expected_topic = test_lib::helpers::remote_topic1_uri();
let expected_method = make_remote_subscribe_uuri(&expected_topic);
let expected_subscriber = test_lib::helpers::local_usubscription_service_uri();

let expected_options = CallOptions::for_rpc_request(
UP_REMOTE_TTL,
Some(UUID::new()),
None,
Some(UPriority::UPRIORITY_CS2),
);
let expected_options =
CallOptions::for_rpc_request(UP_REMOTE_TTL, None, None, Some(UPriority::UPRIORITY_CS4));
let expected_request = SubscriptionRequest {
topic: Some(expected_topic.clone()).into(),
subscriber: Some(SubscriberInfo {
Expand Down Expand Up @@ -675,12 +659,8 @@ mod tests {
let expected_method = make_remote_unsubscribe_uuri(&expected_topic);
let expected_subscriber = test_lib::helpers::local_usubscription_service_uri();

let expected_options = CallOptions::for_rpc_request(
UP_REMOTE_TTL,
Some(UUID::new()),
None,
Some(UPriority::UPRIORITY_CS2),
);
let expected_options =
CallOptions::for_rpc_request(UP_REMOTE_TTL, None, None, Some(UPriority::UPRIORITY_CS4));
let expected_request = UnsubscribeRequest {
topic: Some(expected_topic.clone()).into(),
subscriber: Some(SubscriberInfo {
Expand Down Expand Up @@ -732,6 +712,7 @@ mod tests {
resource_id: RESOURCE_ID_SUBSCRIBE as u32,
..Default::default()
};

let remote_method = make_remote_subscribe_uuri(&test_lib::helpers::remote_topic1_uri());

assert_eq!(expected_uri, remote_method);
Expand Down
26 changes: 7 additions & 19 deletions up-subscription/src/tests/subscription_manager_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ mod tests {
};
use up_rust::{
communication::{CallOptions, UPayload},
UCode, UPriority, UStatus, UUri, UUID,
UCode, UPriority, UStatus, UUri,
};

use crate::configuration::DEFAULT_COMMAND_BUFFER_SIZE;
Expand Down Expand Up @@ -281,12 +281,8 @@ mod tests {
.into(),
..Default::default()
};
let remote_call_options = CallOptions::for_rpc_request(
UP_REMOTE_TTL,
Some(UUID::new()),
None,
Some(UPriority::UPRIORITY_CS2),
);
let remote_call_options =
CallOptions::for_rpc_request(UP_REMOTE_TTL, None, None, Some(UPriority::UPRIORITY_CS4));
let command_sender =
CommandSender::new_with_client_options::<SubscriptionRequest, SubscriptionResponse>(
remote_method,
Expand Down Expand Up @@ -349,12 +345,8 @@ mod tests {
.into(),
..Default::default()
};
let remote_call_options = CallOptions::for_rpc_request(
UP_REMOTE_TTL,
Some(UUID::new()),
None,
Some(UPriority::UPRIORITY_CS2),
);
let remote_call_options =
CallOptions::for_rpc_request(UP_REMOTE_TTL, None, None, Some(UPriority::UPRIORITY_CS4));
let command_sender =
CommandSender::new_with_client_options::<SubscriptionRequest, SubscriptionResponse>(
remote_method,
Expand Down Expand Up @@ -509,12 +501,8 @@ mod tests {
code: UCode::OK.into(),
..Default::default()
};
let remote_call_options = CallOptions::for_rpc_request(
UP_REMOTE_TTL,
Some(UUID::new()),
None,
Some(UPriority::UPRIORITY_CS2),
);
let remote_call_options =
CallOptions::for_rpc_request(UP_REMOTE_TTL, None, None, Some(UPriority::UPRIORITY_CS4));
let command_sender = CommandSender::new_with_client_options::<UnsubscribeRequest, UStatus>(
remote_method,
remote_call_options,
Expand Down
10 changes: 5 additions & 5 deletions up-subscription/src/tests/usubscription_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ mod tests {
UnsubscribeRequest, UnsubscribeResponse, RESOURCE_ID_SUBSCRIBE,
RESOURCE_ID_UNSUBSCRIBE, USUBSCRIPTION_TYPE_ID, USUBSCRIPTION_VERSION_MAJOR,
},
UCode, UPriority, UUri, UUID,
UCode, UPriority, UUri,
};

use crate::{
Expand Down Expand Up @@ -79,9 +79,9 @@ mod tests {
};
let expected_call_options = CallOptions::for_rpc_request(
crate::UP_REMOTE_TTL,
Some(UUID::new()),
None,
Some(UPriority::UPRIORITY_CS2),
None,
Some(UPriority::UPRIORITY_CS4),
);
let remote_subscription_request = SubscriptionRequest {
topic: Some(topic.clone()).into(),
Expand Down Expand Up @@ -199,9 +199,9 @@ mod tests {
};
let expected_call_options = CallOptions::for_rpc_request(
crate::UP_REMOTE_TTL,
Some(UUID::new()),
None,
Some(UPriority::UPRIORITY_CS2),
None,
Some(UPriority::UPRIORITY_CS4),
);
let remote_unsubscribe_request = UnsubscribeRequest {
topic: Some(topic.clone()).into(),
Expand Down

0 comments on commit 905fddb

Please sign in to comment.