-
Notifications
You must be signed in to change notification settings - Fork 102
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add support for reverse binding between PUB and SUB #149
Conversation
- add forwarder_device.rs example
Basically we need to implement 2 things:
|
- remove SubSocket dependency on GenericSocketBackend - add HashSet to store unique subscriptions
Couple of questions:
|
Another issue I see now is that peer_connected is not async. It means that you want be able to send messages to pub socket and await them. I see 2 possible options:
|
- move subs HashSet to SubSocketBackend and wrap it in Mutex - patch peer_connected and send all subscriptions to the peer on_connect - convert peer_connected to async function and update all structs that depend on it
Mostly LGTM. I would suggest minor refactoring + maybe run a fmt/clippy to make sure that code looks pretty |
- add create_subs_message associated function no `SubSocketBackend` - refactor `peer_connected` to avoid extra copy on iter of `Mutex<HashSet>` - use send_queue from `FramedIo` instead of getting a peer object - run `cargo clippy --fix`
Overall LGTM. Could you please fix formatting issues reported by CI so we can get it merged? |
Sure I will take care of formatting and push: I have two questions in the mean time:
let subs = self.subs.lock();
let subs_msgs: Vec<ZmqMessage> = subs.iter().map(
|x| SubSocketBackend::create_subs_message(
&x, SubBackendMsgType::SUBSCRIBE)).collect();
drop(subs);
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
println!("Start client");
let mut socket = zeromq::PubSocket::new();
socket.connect("tcp://127.0.0.1:30001").await.expect("Failed to connect");
sleep(Duration::from_millis(100)).await;
let mut msg = zeromq::ZmqMessage::from("test");
msg.prepend(&"topic".into());
loop {
println!("Send ...");
println!("{:?}", socket.send(msg.clone()).await.unwrap());
sleep(Duration::from_millis(1000)).await
}
Ok(())
}
|
So in this case I would say that compiler rejects correct code.
|
- `cargo fmt --all -- --check`
@Alexei-Kornienko, let me know if you are fine with the function/variable naming conventions I used. |
|
||
for message in subs_msgs.iter() { | ||
send_queue | ||
.send(Message::Message(message.clone())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a minor comment. There is no need to clone message here. We are sending each message only once. You can use into_iter and send message by value
LGTM. |
In order to provide functionality required by following example #147 a patch to PUB and SUB sockets should be implemented.
Quote from dev on discord:
Todos
Any comments regarding possible solutions are appreciated.
Thanks.