diff --git a/.gitignore b/.gitignore index 08e1f90..328d279 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,4 @@ -/target +target # These are backup files generated by rustfmt **/*.rs.bk diff --git a/examples/real-time-ris-live-websocket.rs b/examples/real-time-ris-live-websocket.rs index 6bff997..36b59c4 100644 --- a/examples/real-time-ris-live-websocket.rs +++ b/examples/real-time-ris-live-websocket.rs @@ -1,5 +1,5 @@ use bgpkit_parser::parse_ris_live_message; -use serde_json::json; +use bgpkit_parser::rislive::messages::{RisLiveClientMessage, RisSubscribe}; use tungstenite::{connect, Message}; const RIS_LIVE_URL: &str = "ws://ris-live.ripe.net/v1/ws/?client=rust-bgpkit-parser"; @@ -13,9 +13,8 @@ fn main() { connect(RIS_LIVE_URL).expect("Can't connect to RIS Live websocket server"); // subscribe to messages from one collector - // let msg = json!({"type": "ris_subscribe", "data": {"host": "rrc21"}}).to_string(); - let msg = json!({"type": "ris_subscribe", "data": null}).to_string(); - socket.send(Message::Text(msg)).unwrap(); + let msg = RisSubscribe::new().host("rrc21"); + socket.send(Message::Text(msg.to_json_string())).unwrap(); loop { let msg = socket.read().expect("Error reading message").to_string(); diff --git a/src/parser/rislive/messages/client/mod.rs b/src/parser/rislive/messages/client/mod.rs new file mode 100644 index 0000000..89c90ad --- /dev/null +++ b/src/parser/rislive/messages/client/mod.rs @@ -0,0 +1,28 @@ +//! This module contains the RIS-live client message definitions. +//! +//! Official manual available at + +pub mod ping; +pub mod request_rrc_list; +pub mod ris_subscribe; +pub mod ris_unsubscribe; + +pub trait RisLiveClientMessage: Serialize { + fn msg_type(&self) -> &'static str; + + fn to_json_string(&self) -> String { + serde_json::to_string(&json!({ + "type": self.msg_type(), + "data": self + })) + .unwrap() + } +} + +use serde::Serialize; +use serde_json::json; + +pub use ping::Ping; +pub use request_rrc_list::RequestRrcList; +pub use ris_subscribe::RisSubscribe; +pub use ris_unsubscribe::RisUnsubscribe; diff --git a/src/parser/rislive/messages/client/ping.rs b/src/parser/rislive/messages/client/ping.rs new file mode 100644 index 0000000..2d0e3b2 --- /dev/null +++ b/src/parser/rislive/messages/client/ping.rs @@ -0,0 +1,34 @@ +use crate::rislive::messages::RisLiveClientMessage; +use serde::Serialize; + +#[derive(Debug)] +pub struct Ping {} + +impl Serialize for Ping { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_none() + } +} + +impl RisLiveClientMessage for Ping { + fn msg_type(&self) -> &'static str { + "ping" + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::{json, Value}; + + #[test] + fn test_to_json_string() { + let ping_msg = Ping {}; + let json_str = ping_msg.to_json_string(); + let value: Value = serde_json::from_str(&json_str).unwrap(); + assert_eq!(value, json!({"type": "ping", "data": null})); + } +} diff --git a/src/parser/rislive/messages/client/request_rrc_list.rs b/src/parser/rislive/messages/client/request_rrc_list.rs new file mode 100644 index 0000000..0500aa7 --- /dev/null +++ b/src/parser/rislive/messages/client/request_rrc_list.rs @@ -0,0 +1,40 @@ +use crate::rislive::messages::RisLiveClientMessage; +use serde::Serialize; + +#[derive(Default, Debug)] +pub struct RequestRrcList { + data: Vec, +} + +impl Serialize for RequestRrcList { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + self.data.serialize(serializer) + } +} + +impl RequestRrcList { + pub fn new(rrc_list: Vec) -> Self { + Self { data: rrc_list } + } +} + +impl RisLiveClientMessage for RequestRrcList { + fn msg_type(&self) -> &'static str { + "request_rrc_list" + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_request_rrc_list() { + let rrc_list = RequestRrcList::new(vec!["rrc00".to_string(), "rrc01".to_string()]); + + println!("{}", rrc_list.to_json_string()); + } +} diff --git a/src/parser/rislive/messages/client/ris_subscribe.rs b/src/parser/rislive/messages/client/ris_subscribe.rs new file mode 100644 index 0000000..1e59f4d --- /dev/null +++ b/src/parser/rislive/messages/client/ris_subscribe.rs @@ -0,0 +1,206 @@ +use crate::rislive::messages::RisLiveClientMessage; +use ipnet::IpNet; +use serde::Serialize; +use std::net::IpAddr; + +#[derive(Debug, Serialize)] +#[allow(clippy::upper_case_acronyms)] +pub enum RisSubscribeType { + UPDATE, + OPEN, + NOTIFICATION, + KEEPALIVE, + RIS_PEER_STATE, +} + +#[derive(Debug, Serialize)] +pub struct RisSubscribeSocketOptions { + /// Include a Base64-encoded version of the original binary BGP message as `raw` for all subscriptions + /// + /// *Default: false* + #[serde(rename = "includeRaw")] + pub include_raw: Option, + + /// Send a `ris_subscribe_ok` message for all succesful subscriptions + /// + /// *Default: false* + pub acknowledge: Option, +} + +#[derive(Default, Debug, Serialize)] +pub struct RisSubscribe { + /// Only include messages collected by a particular RRC + #[serde(skip_serializing_if = "Option::is_none")] + pub host: Option, + + /// Only include messages of a given BGP or RIS type + #[serde(rename = "type")] + #[serde(skip_serializing_if = "Option::is_none")] + pub data_type: Option, + + /// Only include messages containing a given key + /// + /// Examples: + /// * "announcements" + /// * "withdrawals" + #[serde(skip_serializing_if = "Option::is_none")] + pub require: Option, + + /// Only include messages sent by the given BGP peer + #[serde(skip_serializing_if = "Option::is_none")] + pub peer: Option, + + /// ASN or pattern to match against the AS PATH attribute + /// + /// Any of: + /// * ASN (integer) + /// * AS path pattern (string) + /// + /// Comma-separated pattern describing all or part of the AS path. + /// Can optionally begin with ^ to match the first item of the path (the last traversed ASN), + /// and/or end with $ to match the last item of the path + /// (the originating ASN). + /// + /// The entire pattern can be prefixed with ! to invert the match. + /// AS_SETs should be written as JSON arrays of integers in ascending order and with no spaces. + /// Note: this is not a regular expression. + /// + /// Examples: + /// * "789$" + /// * "^123,456,789,[789,10111]$" + /// * "!6666$" + /// * "!^3333" + #[serde(skip_serializing_if = "Option::is_none")] + pub path: Option, + + /// Filter UPDATE messages by prefixes in announcements or withdrawals + /// + /// Any of: + /// * IPv4 or IPv6 CIDR prefix (string) + /// * Array of CIDR prefixes (array) + /// + /// For the purposes of subsequent `ris_unsubscribe` messages, + /// each prefix results in a separate subscription that can be stopped independently + /// + /// Array items: + /// * IPv4 or IPv6 CIDR prefix (string) + #[serde(skip_serializing_if = "Option::is_none")] + pub prefix: Option, + + /// Match prefixes that are more specific (part of) `prefix` + /// + /// *Default: true* + #[serde(rename = "moreSpecific")] + #[serde(skip_serializing_if = "Option::is_none")] + pub more_specific: Option, + + /// Match prefixes that are less specific (contain) `prefix` + /// + /// *Default: false* + #[serde(rename = "lessSpecific")] + #[serde(skip_serializing_if = "Option::is_none")] + pub less_specific: Option, + + /// Options that apply to all subscriptions over the current WebSocket. + /// If a new subscription contains `socketOptions` it will override those from previous subscriptions + #[serde(rename = "socketOptions")] + #[serde(skip_serializing_if = "Option::is_none")] + pub socket_options: Option, +} + +impl RisSubscribe { + pub fn new() -> Self { + Default::default() + } + + pub fn host(mut self, host: &str) -> Self { + self.host = Some(host.to_string()); + self + } + + pub fn data_type(mut self, data_type: RisSubscribeType) -> Self { + self.data_type = Some(data_type); + self + } + + pub fn require(mut self, require: &str) -> Self { + self.require = Some(require.to_string()); + self + } + + pub fn peer(mut self, peer: IpAddr) -> Self { + self.peer = Some(peer); + self + } + + pub fn path(mut self, path: &str) -> Self { + self.path = Some(path.to_string()); + self + } + + pub fn prefix(mut self, prefix: IpNet) -> Self { + self.prefix = Some(prefix); + self + } + + pub fn more_specific(mut self, more_specific: bool) -> Self { + self.more_specific = Some(more_specific); + self + } + + pub fn less_specific(mut self, less_specific: bool) -> Self { + self.less_specific = Some(less_specific); + self + } + + pub fn include_raw(mut self, include_raw: bool) -> Self { + match self.socket_options.as_mut() { + None => { + self.socket_options = Some(RisSubscribeSocketOptions { + include_raw: Some(include_raw), + acknowledge: None, + }); + } + Some(o) => { + o.include_raw = Some(include_raw); + } + } + self + } + + pub fn acknowledge(mut self, acknowledge: bool) -> Self { + match self.socket_options.as_mut() { + None => { + self.socket_options = Some(RisSubscribeSocketOptions { + include_raw: None, + acknowledge: Some(acknowledge), + }); + } + Some(o) => { + o.acknowledge = Some(acknowledge); + } + } + self + } +} + +impl RisLiveClientMessage for RisSubscribe { + fn msg_type(&self) -> &'static str { + "ris_subscribe" + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_new() { + let ris_subscribe = RisSubscribe::new() + .host("rrc00") + .data_type(RisSubscribeType::UPDATE); + assert_eq!(ris_subscribe.host, Some("rrc00".to_string())); + + println!("{}", ris_subscribe.to_json_string()); + } +} diff --git a/src/parser/rislive/messages/client/ris_unsubscribe.rs b/src/parser/rislive/messages/client/ris_unsubscribe.rs new file mode 100644 index 0000000..9c025c0 --- /dev/null +++ b/src/parser/rislive/messages/client/ris_unsubscribe.rs @@ -0,0 +1,39 @@ +use crate::rislive::messages::{RisLiveClientMessage, RisSubscribe}; +use serde::Serialize; + +#[derive(Default, Debug, Serialize)] +pub struct RisUnsubscribe { + #[serde(flatten)] + data: RisSubscribe, +} + +impl RisUnsubscribe { + pub fn new(subscribe: RisSubscribe) -> Self { + Self { data: subscribe } + } +} + +impl RisLiveClientMessage for RisUnsubscribe { + fn msg_type(&self) -> &'static str { + "ris_unsubscribe" + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::rislive::messages::client::ris_subscribe::RisSubscribeType; + + #[test] + fn test_subscribe() { + let subscribe = RisSubscribe::new() + .host("rrc00") + .data_type(RisSubscribeType::UPDATE) + .acknowledge(true); + let unsubscribe = RisUnsubscribe::new(subscribe); + + assert_eq!(unsubscribe.msg_type(), "ris_unsubscribe"); + + println!("{}", unsubscribe.to_json_string()); + } +} diff --git a/src/parser/rislive/messages/mod.rs b/src/parser/rislive/messages/mod.rs index 798d2bd..3d5ec60 100644 --- a/src/parser/rislive/messages/mod.rs +++ b/src/parser/rislive/messages/mod.rs @@ -1,20 +1,12 @@ #![allow(non_camel_case_types)] #![allow(non_snake_case)] -pub mod pong; -pub mod raw_bytes; -pub mod ris_error; -pub mod ris_message; -pub mod ris_rrc_list; -pub mod ris_subscribe_ok; +pub mod client; +pub mod server; use serde::{Deserialize, Serialize}; -pub use pong::Pong; -pub use ris_error::RisError; -pub use ris_message::RisMessage; -pub use ris_message::RisMessageEnum; -pub use ris_rrc_list::RisRrcList; -pub use ris_subscribe_ok::RisSubscribeOk; +pub use client::*; +pub use server::*; #[derive(Debug, Serialize, Deserialize)] #[serde(tag = "type", content = "data")] diff --git a/src/parser/rislive/messages/server/mod.rs b/src/parser/rislive/messages/server/mod.rs new file mode 100644 index 0000000..4e4def3 --- /dev/null +++ b/src/parser/rislive/messages/server/mod.rs @@ -0,0 +1,17 @@ +//! This module contains the RIS-live server message definitions. +//! +//! Official manual available at +pub mod pong; +pub mod raw_bytes; +pub mod ris_error; +pub mod ris_message; +pub mod ris_rrc_list; +pub mod ris_subscribe_ok; + +pub use pong::Pong; +pub use raw_bytes::parse_raw_bytes; +pub use ris_error::RisError; +pub use ris_message::RisMessage; +pub use ris_message::RisMessageEnum; +pub use ris_rrc_list::RisRrcList; +pub use ris_subscribe_ok::RisSubscribeOk; diff --git a/src/parser/rislive/messages/pong.rs b/src/parser/rislive/messages/server/pong.rs similarity index 100% rename from src/parser/rislive/messages/pong.rs rename to src/parser/rislive/messages/server/pong.rs diff --git a/src/parser/rislive/messages/raw_bytes.rs b/src/parser/rislive/messages/server/raw_bytes.rs similarity index 100% rename from src/parser/rislive/messages/raw_bytes.rs rename to src/parser/rislive/messages/server/raw_bytes.rs diff --git a/src/parser/rislive/messages/ris_error.rs b/src/parser/rislive/messages/server/ris_error.rs similarity index 100% rename from src/parser/rislive/messages/ris_error.rs rename to src/parser/rislive/messages/server/ris_error.rs diff --git a/src/parser/rislive/messages/ris_message.rs b/src/parser/rislive/messages/server/ris_message.rs similarity index 100% rename from src/parser/rislive/messages/ris_message.rs rename to src/parser/rislive/messages/server/ris_message.rs diff --git a/src/parser/rislive/messages/ris_rrc_list.rs b/src/parser/rislive/messages/server/ris_rrc_list.rs similarity index 100% rename from src/parser/rislive/messages/ris_rrc_list.rs rename to src/parser/rislive/messages/server/ris_rrc_list.rs diff --git a/src/parser/rislive/messages/ris_subscribe_ok.rs b/src/parser/rislive/messages/server/ris_subscribe_ok.rs similarity index 100% rename from src/parser/rislive/messages/ris_subscribe_ok.rs rename to src/parser/rislive/messages/server/ris_subscribe_ok.rs