diff --git a/tools/fmt_clippy_doc.sh b/tools/fmt_clippy_doc.sh old mode 100644 new mode 100755 diff --git a/up-subscription/src/subscription_manager.rs b/up-subscription/src/subscription_manager.rs index 6289d69..3ba1669 100644 --- a/up-subscription/src/subscription_manager.rs +++ b/up-subscription/src/subscription_manager.rs @@ -27,7 +27,7 @@ use up_rust::{ RESOURCE_ID_SUBSCRIBE, RESOURCE_ID_UNSUBSCRIBE, USUBSCRIPTION_TYPE_ID, USUBSCRIPTION_VERSION_MAJOR, }, - UCode, UPriority, UStatus, UUri, UUID, + UCode, UPriority, UStatus, UUri, }; use crate::{helpers, usubscription::UP_REMOTE_TTL}; @@ -148,22 +148,21 @@ pub(crate) async fn handle_message( respond_to, } => { // Add new subscriber to topic subscription tracker (create new entries as necessary) - topic_subscribers + let is_new = topic_subscribers .entry(topic.clone()) .or_default() .insert(subscriber); - // This really should unwrap() ok, as we just inserted an entry above - let subscribers_count = - topic_subscribers.get(&topic).map(|e| e.len()).unwrap_or(0); - let mut state = TopicState::SUBSCRIBED; // everything in topic_subscribers is considered SUBSCRIBED by default if topic.is_remote_authority(&own_uri) { - state = TopicState::SUBSCRIBE_PENDING; // for remote_topics, we explicitly track state due to the _PENDING scenarios - remote_topics.entry(topic.clone()).or_insert(state); + // for remote_topics, we explicitly track state due to the _PENDING scenarios + state = *remote_topics + .get(&topic) + .unwrap_or(&TopicState::SUBSCRIBE_PENDING); - if subscribers_count == 1 { + remote_topics.entry(topic.clone()).or_insert(state); + if is_new { // this is the first subscriber to this (remote) topic, so perform remote subscription let own_uri_clone = own_uri.clone(); let up_client_clone = up_client.clone(); @@ -454,12 +453,7 @@ async fn remote_subscribe( let subscription_response: SubscriptionResponse = up_client .invoke_proto_method( make_remote_subscribe_uuri(&subscription_request.topic), - CallOptions::for_rpc_request( - UP_REMOTE_TTL, - Some(UUID::new()), - None, - Some(UPriority::UPRIORITY_CS2), - ), + CallOptions::for_rpc_request(UP_REMOTE_TTL, None, None, Some(UPriority::UPRIORITY_CS4)), subscription_request, ) .await @@ -507,12 +501,7 @@ async fn remote_unsubscribe( let unsubscribe_response: UStatus = up_client .invoke_proto_method( make_remote_unsubscribe_uuri(&unsubscribe_request.topic), - CallOptions::for_rpc_request( - UP_REMOTE_TTL, - Some(UUID::new()), - None, - Some(UPriority::UPRIORITY_CS2), - ), + CallOptions::for_rpc_request(UP_REMOTE_TTL, None, None, Some(UPriority::UPRIORITY_CS4)), unsubscribe_request, ) .await @@ -612,13 +601,8 @@ mod tests { let expected_topic = test_lib::helpers::remote_topic1_uri(); let expected_method = make_remote_subscribe_uuri(&expected_topic); let expected_subscriber = test_lib::helpers::local_usubscription_service_uri(); - - let expected_options = CallOptions::for_rpc_request( - UP_REMOTE_TTL, - Some(UUID::new()), - None, - Some(UPriority::UPRIORITY_CS2), - ); + let expected_options = + CallOptions::for_rpc_request(UP_REMOTE_TTL, None, None, Some(UPriority::UPRIORITY_CS4)); let expected_request = SubscriptionRequest { topic: Some(expected_topic.clone()).into(), subscriber: Some(SubscriberInfo { @@ -675,12 +659,8 @@ mod tests { let expected_method = make_remote_unsubscribe_uuri(&expected_topic); let expected_subscriber = test_lib::helpers::local_usubscription_service_uri(); - let expected_options = CallOptions::for_rpc_request( - UP_REMOTE_TTL, - Some(UUID::new()), - None, - Some(UPriority::UPRIORITY_CS2), - ); + let expected_options = + CallOptions::for_rpc_request(UP_REMOTE_TTL, None, None, Some(UPriority::UPRIORITY_CS4)); let expected_request = UnsubscribeRequest { topic: Some(expected_topic.clone()).into(), subscriber: Some(SubscriberInfo { @@ -732,6 +712,7 @@ mod tests { resource_id: RESOURCE_ID_SUBSCRIBE as u32, ..Default::default() }; + let remote_method = make_remote_subscribe_uuri(&test_lib::helpers::remote_topic1_uri()); assert_eq!(expected_uri, remote_method); diff --git a/up-subscription/src/tests/subscription_manager_tests.rs b/up-subscription/src/tests/subscription_manager_tests.rs index 581403a..103473d 100644 --- a/up-subscription/src/tests/subscription_manager_tests.rs +++ b/up-subscription/src/tests/subscription_manager_tests.rs @@ -27,7 +27,7 @@ mod tests { }; use up_rust::{ communication::{CallOptions, UPayload}, - UCode, UPriority, UStatus, UUri, UUID, + UCode, UPriority, UStatus, UUri, }; use crate::configuration::DEFAULT_COMMAND_BUFFER_SIZE; @@ -281,12 +281,8 @@ mod tests { .into(), ..Default::default() }; - let remote_call_options = CallOptions::for_rpc_request( - UP_REMOTE_TTL, - Some(UUID::new()), - None, - Some(UPriority::UPRIORITY_CS2), - ); + let remote_call_options = + CallOptions::for_rpc_request(UP_REMOTE_TTL, None, None, Some(UPriority::UPRIORITY_CS4)); let command_sender = CommandSender::new_with_client_options::( remote_method, @@ -349,12 +345,8 @@ mod tests { .into(), ..Default::default() }; - let remote_call_options = CallOptions::for_rpc_request( - UP_REMOTE_TTL, - Some(UUID::new()), - None, - Some(UPriority::UPRIORITY_CS2), - ); + let remote_call_options = + CallOptions::for_rpc_request(UP_REMOTE_TTL, None, None, Some(UPriority::UPRIORITY_CS4)); let command_sender = CommandSender::new_with_client_options::( remote_method, @@ -509,12 +501,8 @@ mod tests { code: UCode::OK.into(), ..Default::default() }; - let remote_call_options = CallOptions::for_rpc_request( - UP_REMOTE_TTL, - Some(UUID::new()), - None, - Some(UPriority::UPRIORITY_CS2), - ); + let remote_call_options = + CallOptions::for_rpc_request(UP_REMOTE_TTL, None, None, Some(UPriority::UPRIORITY_CS4)); let command_sender = CommandSender::new_with_client_options::( remote_method, remote_call_options, diff --git a/up-subscription/src/tests/usubscription_tests.rs b/up-subscription/src/tests/usubscription_tests.rs index 812fb5d..289fb33 100644 --- a/up-subscription/src/tests/usubscription_tests.rs +++ b/up-subscription/src/tests/usubscription_tests.rs @@ -25,7 +25,7 @@ mod tests { UnsubscribeRequest, UnsubscribeResponse, RESOURCE_ID_SUBSCRIBE, RESOURCE_ID_UNSUBSCRIBE, USUBSCRIPTION_TYPE_ID, USUBSCRIPTION_VERSION_MAJOR, }, - UCode, UPriority, UUri, UUID, + UCode, UPriority, UUri, }; use crate::{ @@ -79,9 +79,9 @@ mod tests { }; let expected_call_options = CallOptions::for_rpc_request( crate::UP_REMOTE_TTL, - Some(UUID::new()), None, - Some(UPriority::UPRIORITY_CS2), + None, + Some(UPriority::UPRIORITY_CS4), ); let remote_subscription_request = SubscriptionRequest { topic: Some(topic.clone()).into(), @@ -199,9 +199,9 @@ mod tests { }; let expected_call_options = CallOptions::for_rpc_request( crate::UP_REMOTE_TTL, - Some(UUID::new()), None, - Some(UPriority::UPRIORITY_CS2), + None, + Some(UPriority::UPRIORITY_CS4), ); let remote_unsubscribe_request = UnsubscribeRequest { topic: Some(topic.clone()).into(),