Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use the async OPC UA client #41

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,21 @@ categories = ["command-line-utilities"]


[dependencies]
opcua = { version = "0.12.0", features = ["client", "console-logging"] }
# Explicitly pinning openssl >= 0.10.66 is required, as long as opcua is not 0.13.0, yet.
openssl = { version = "0.10.66" }
opcua = { git = "https://github.com/locka99/opcua.git", rev = "cf2d1bdd52b47273dcbaad439a5a1ae4a528c572", features = ["client", "console-logging"] }
std_msgs = { version = "4.2.4" }
sensor_msgs = { version = "4.2.4" }
builtin_interfaces = { version = "1.2.1" }
voraus_interfaces = { version = "0.1.0" }
rclrs = { version = "0.4.1" }
rosidl_runtime_rs = { version = "0.4.1" }
tokio = "1.38.0"

[dev-dependencies]
rclrs = { version = "0.4.1" }
std_msgs = { version = "4.2.4" }
sensor_msgs = { version = "4.2.4" }
builtin_interfaces = { version = "1.2.1" }
opcua = { version = "0.12.0", features = ["server", "console-logging"] }
opcua = { git = "https://github.com/locka99/opcua.git", rev = "cf2d1bdd52b47273dcbaad439a5a1ae4a528c572", features = ["server", "console-logging"] }
socket2 = "0.5.7"
tokio = "1.38.0"

Expand Down
6 changes: 1 addition & 5 deletions examples/opc_ua/rapid-clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,7 @@ fn rapid_clock() {
let mut server =
Server::new(ServerConfig::load(&PathBuf::from("tests/resources/clock.conf")).unwrap());

let namespace = {
let address_space = server.address_space();
let mut address_space = address_space.write();
address_space.register_namespace("urn:rapid-clock").unwrap()
};
let namespace = 1;

add_timed_variable(&mut server, namespace);

Expand Down
21 changes: 14 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ mod ros_service_server;

use ros_publisher::{create_joint_state_msg, RosPublisher};

fn main() -> Result<(), RclrsError> {
#[tokio::main]
async fn main() -> Result<(), RclrsError> {
let context = Context::new(env::args()).unwrap();
let node = create_node(&context, "voraus_bridge_node")?;
let node_copy = Arc::clone(&node);
Expand All @@ -23,7 +24,7 @@ fn main() -> Result<(), RclrsError> {
opcua::console_logging::init();

let mut simple_subscriber = SimpleSubscriber::new("opc.tcp://127.0.0.1:4855");
let Ok(_connection_result) = simple_subscriber.connect() else {
let Ok(_connection_result) = simple_subscriber.connect().await else {
panic!("Connection could not be established, but is required.");
};

Expand All @@ -49,10 +50,16 @@ fn main() -> Result<(), RclrsError> {
.expect("Error while publishing.")
}
};
simple_subscriber
let handle = simple_subscriber.run().await;
let _ = simple_subscriber
.create_subscription(1, "100111", callback, 10)
.expect("ERROR: Got an error while subscribing to variables");
// Loops forever. The publish thread will call the callback with changes on the variables
simple_subscriber.run();
rclrs::spin(node_copy)
.await;

tokio::task::spawn(async move {
println!("Spinning ROS");
rclrs::spin(node_copy)
});
println!("Awaiting Handle");
handle.await.unwrap();
Ok(())
}
131 changes: 68 additions & 63 deletions src/simple_opc_ua_subscriber.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,33 @@
use std::sync::Arc;

use opcua::client::prelude::{
Client, ClientBuilder, DataChangeCallback, IdentityToken, MonitoredItem, MonitoredItemService,
Session, SubscriptionService,
use opcua::client::{
Client, ClientBuilder, DataChangeCallback, IdentityToken, Session,
SessionEventLoop,
};
use opcua::crypto::SecurityPolicy;
use opcua::sync::RwLock;
use opcua::types::{
MessageSecurityMode, MonitoredItemCreateRequest, NodeId, StatusCode, TimestampsToReturn,
UserTokenPolicy, Variant,
MessageSecurityMode, MonitoredItemCreateRequest, NodeId,
StatusCode, TimestampsToReturn, UserTokenPolicy, Variant,
};

use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinHandle;

pub struct SimpleSubscriber {
endpoint: String,
session: Option<Arc<RwLock<Session>>>,
}

fn extract_value(item: &MonitoredItem) -> Variant {
let data_value = item.last_value();
data_value
.value
.clone()
.expect("No value found - check value bit in EncodingMask.")
session: Option<Arc<Session>>,
event_loop: Option<Arc<SessionEventLoop>>,
}

impl SimpleSubscriber {
pub fn new<S: Into<String>>(endpoint: S) -> Self {
Self {
endpoint: endpoint.into(),
session: None,
event_loop: None,
}
}

pub fn connect(&mut self) -> Result<(), &str> {
pub async fn connect(&mut self) -> Result<(), &str> {
let mut client: Client = ClientBuilder::new()
.application_name("Simple Subscriber")
.application_uri("urn:SimpleSubscriber")
Expand All @@ -42,24 +37,42 @@ impl SimpleSubscriber {
.session_retry_limit(5)
.client()
.unwrap();
match client.connect_to_endpoint(
(
self.endpoint.as_ref(),
SecurityPolicy::None.to_str(),
MessageSecurityMode::None,
UserTokenPolicy::anonymous(),
),
IdentityToken::Anonymous,
) {
Ok(session) => {
self.session = Some(session);
Ok(())

let (session, event_loop) = client
.new_session_from_endpoint(
(
self.endpoint.as_ref(),
SecurityPolicy::None.to_str(),
MessageSecurityMode::None,
UserTokenPolicy::anonymous(),
),
IdentityToken::Anonymous,
)
.await
.unwrap();
self.session = Some(Arc::clone(&session));
self.event_loop = Some(Arc::new(event_loop));
Ok(())
}

pub async fn run(&mut self) -> JoinHandle<StatusCode> {
if let Some(event_loop_arc) = self.event_loop.take() {
match Arc::try_unwrap(event_loop_arc) {
Ok(event_loop) => {
let handle = event_loop.spawn();
self.session.as_ref().unwrap().wait_for_connection().await;
handle
}
Err(_) => {
panic!("Event loop is still shared, cannot take ownership.");
}
}
Err(_) => Err("Could not connect to server."),
} else {
panic!("Event loop is not initialized.");
}
}

pub fn create_subscription<F>(
pub async fn create_subscription<F>(
&self,
namespace: u16,
node_id: &'static str,
Expand All @@ -76,52 +89,44 @@ impl SimpleSubscriber {
"Creating a subscription for ns={};{} to indirectly call the callback every {}ms.",
namespace, node_id, period_ms
);
let cloned_session_lock = self.session.clone().unwrap();
let session = cloned_session_lock.read();
// Creates a subscription with a data change callback
let publishing_interval_ms: f64 = 10.0;
let publishing_interval_ms = Duration::from_millis(10);
let lifetime_count: u32 = 10;
let max_keep_alive_count: u32 = 30;
let max_notifications_per_publish: u32 = 0;
let priority: u8 = 0;
let publishing_enabled: bool = true;

let subscription_id = session.create_subscription(
publishing_interval_ms,
lifetime_count,
max_keep_alive_count,
max_notifications_per_publish,
priority,
publishing_enabled,
DataChangeCallback::new(move |changed_monitored_items| {
println!("Data change from server:");
changed_monitored_items
.iter()
.for_each(|item| callback(extract_value(item)));
}),
)?;
let session = self.session.clone().unwrap();

let subscription_id = session
.create_subscription(
publishing_interval_ms,
lifetime_count,
max_keep_alive_count,
max_notifications_per_publish,
priority,
publishing_enabled,
DataChangeCallback::new(move |data_value, _changed_monitored_items| {
println!("Data change from server|");
callback(data_value.value.unwrap());
}),
)
.await
.unwrap();
println!("Created a subscription with id = {}", subscription_id);

// Create some monitored items
let items_to_create: Vec<MonitoredItemCreateRequest> = [node_id]
.iter()
.map(|v| NodeId::new(namespace, *v).into())
.collect();
let _ = session.create_monitored_items(
subscription_id,
TimestampsToReturn::Both,
&items_to_create,
)?;

Ok(())
}
let _ = session
.create_monitored_items(subscription_id, TimestampsToReturn::Both, items_to_create)
.await
.unwrap();

pub fn run(self) {
match self.session {
Some(session) => Session::run(session),
None => {
eprintln!("Could not run inexistent session.");
}
}
Ok(())
}
}
Loading