Skip to content

Commit

Permalink
Merge pull request #100 from malor/version-negotiation
Browse files Browse the repository at this point in the history
Relax ZMTP version requirements according to the spec
  • Loading branch information
Alexei-Kornienko authored Nov 28, 2020
2 parents 674ca9e + f4bd82a commit 106e10f
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 13 deletions.
4 changes: 3 additions & 1 deletion src/codec/greeting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ use super::mechanism::ZmqMechanism;
use bytes::{Bytes, BytesMut};
use std::convert::TryFrom;

pub type ZmtpVersion = (u8, u8);

#[derive(Debug, Copy, Clone)]
pub struct ZmqGreeting {
pub version: (u8, u8),
pub version: ZmtpVersion,
pub mechanism: ZmqMechanism,
pub as_server: bool,
}
Expand Down
3 changes: 2 additions & 1 deletion src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ mod zmq_codec;
pub(crate) use command::{ZmqCommand, ZmqCommandName};
pub(crate) use error::{CodecError, CodecResult};
pub(crate) use framed::{FrameableRead, FrameableWrite, FramedIo, ZmqFramedRead, ZmqFramedWrite};
pub(crate) use greeting::ZmqGreeting;
pub(crate) use greeting::{ZmqGreeting, ZmtpVersion};
pub(crate) use mechanism::ZmqMechanism;
pub(crate) use zmq_codec::ZmqCodec;

use crate::message::ZmqMessage;
Expand Down
4 changes: 3 additions & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::codec::{CodecError, Message};
use crate::codec::{CodecError, Message, ZmtpVersion};
use crate::endpoint::Endpoint;
use crate::endpoint::EndpointError;
use crate::task_handle::TaskError;
Expand Down Expand Up @@ -41,6 +41,8 @@ pub enum ZmqError {
Other(&'static str),
#[error("No message received")]
NoMessage,
#[error("Unsupported ZMTP version")]
UnsupportedVersion(ZmtpVersion),
}

impl From<futures::channel::mpsc::TrySendError<Message>> for ZmqError {
Expand Down
102 changes: 92 additions & 10 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,21 +90,44 @@ pub fn sockets_compatible(one: SocketType, another: SocketType) -> bool {
COMPATIBILITY_MATRIX[row_index * 11 + col_index] != 0
}

pub(crate) async fn greet_exchange(raw_socket: &mut FramedIo) -> ZmqResult<()> {
/// Given the result of the greetings exchange, determines the version of the
/// ZMTP protocol that should be used for communication with the peer according
/// to https://rfc.zeromq.org/spec/23/#version-negotiation.
fn negotiate_version(greeting: Option<CodecResult<Message>>) -> ZmqResult<ZmtpVersion> {
let my_version = ZmqGreeting::default().version;

match greeting {
Some(Ok(Message::Greeting(peer))) => {
if peer.version >= my_version {
// A peer MUST accept higher protocol versions as valid. That is,
// a ZMTP peer MUST accept protocol versions greater or equal to 3.0.
// This allows future implementations to safely interoperate with
// current implementations.
//
// A peer SHALL always use its own protocol (including framing)
// when talking to an equal or higher protocol peer.
Ok(my_version)
} else {
// A peer MAY downgrade its protocol to talk to a lower protocol peer.
//
// If a peer cannot downgrade its protocol to match its peer, it MUST
// close the connection.
// TODO: implement interoperability with older protocol versions
Err(ZmqError::UnsupportedVersion(peer.version))
}
}
_ => Err(ZmqError::Other("Failed Greeting exchange")),
}
}

pub(crate) async fn greet_exchange(raw_socket: &mut FramedIo) -> ZmqResult<ZmtpVersion> {
raw_socket
.write_half
.send(Message::Greeting(ZmqGreeting::default()))
.await?;

let greeting: Option<CodecResult<Message>> = raw_socket.read_half.next().await;

match greeting {
Some(Ok(Message::Greeting(greet))) => match greet.version {
(3, 0) => Ok(()),
_ => Err(ZmqError::Other("Unsupported protocol version")),
},
_ => Err(ZmqError::Other("Failed Greeting exchange")),
}
negotiate_version(greeting)
}

pub(crate) async fn ready_exchange(
Expand Down Expand Up @@ -163,7 +186,7 @@ pub(crate) async fn peer_connected(

#[cfg(test)]
pub(crate) mod tests {
use crate::{Endpoint, Host, Socket, ZmqResult};
use super::*;

pub async fn test_bind_to_unspecified_interface_helper(
any: std::net::IpAddr,
Expand Down Expand Up @@ -222,4 +245,63 @@ pub(crate) mod tests {

Ok(())
}

fn new_greeting(version: ZmtpVersion) -> CodecResult<Message> {
Ok(Message::Greeting(ZmqGreeting {
version: version,
mechanism: ZmqMechanism::PLAIN,
as_server: false,
}))
}

#[test]
fn negotiate_version_peer_is_using_the_same_version() {
// if both peers are using the same protocol version, negotiation is trivial
let peer_version = ZmqGreeting::default().version;
let expected = ZmqGreeting::default().version;
let actual = negotiate_version(Some(new_greeting(peer_version))).unwrap();
assert_eq!(actual, expected);
}

#[test]
fn negotiate_version_peer_is_using_a_newer_version() {
// if the other end is using a newer protocol version, they should adjust to us
let peer_version = (3, 1);
let expected = ZmqGreeting::default().version;
let actual = negotiate_version(Some(new_greeting(peer_version))).unwrap();
assert_eq!(actual, expected);
}

#[test]
fn negotiate_version_peer_is_using_an_older_version() {
// if the other end is using an older protocol version, we should adjust to them, but
// interoperability with older peers is not implemented at the moment, so we just give
// up immediately, which is allowed by the spec
let peer_version = (2, 1);
let actual = negotiate_version(Some(new_greeting(peer_version)));
match actual {
Err(ZmqError::UnsupportedVersion(version)) => assert_eq!(version, peer_version),
_ => panic!("Unexpected result"),
}
}

#[test]
fn negotiate_version_invalid_greeting() {
// could not read the greeting message
let actual = negotiate_version(None);
match actual {
Err(ZmqError::Other(_)) => {}
_ => panic!("Unexpected result"),
}

// unexpected message during greetings exchange
let message = Ok(Message::Message(ZmqMessage {
data: Bytes::from_static(b"hello"),
}));
let actual = negotiate_version(Some(message));
match actual {
Err(ZmqError::Other(_)) => {}
_ => panic!("Unexpected result"),
}
}
}

0 comments on commit 106e10f

Please sign in to comment.