Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improved RIS Live module #185

Merged
merged 1 commit into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/target
target

# These are backup files generated by rustfmt
**/*.rs.bk
Expand Down
7 changes: 3 additions & 4 deletions examples/real-time-ris-live-websocket.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bgpkit_parser::parse_ris_live_message;
use serde_json::json;
use bgpkit_parser::rislive::messages::{RisLiveClientMessage, RisSubscribe};
use tungstenite::{connect, Message};

const RIS_LIVE_URL: &str = "ws://ris-live.ripe.net/v1/ws/?client=rust-bgpkit-parser";
Expand All @@ -13,9 +13,8 @@ fn main() {
connect(RIS_LIVE_URL).expect("Can't connect to RIS Live websocket server");

// subscribe to messages from one collector
// let msg = json!({"type": "ris_subscribe", "data": {"host": "rrc21"}}).to_string();
let msg = json!({"type": "ris_subscribe", "data": null}).to_string();
socket.send(Message::Text(msg)).unwrap();
let msg = RisSubscribe::new().host("rrc21");
socket.send(Message::Text(msg.to_json_string())).unwrap();

loop {
let msg = socket.read().expect("Error reading message").to_string();
Expand Down
28 changes: 28 additions & 0 deletions src/parser/rislive/messages/client/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
//! This module contains the RIS-live client message definitions.
//!
//! Official manual available at <https://ris-live.ripe.net/manual/#client-messages>

pub mod ping;
pub mod request_rrc_list;
pub mod ris_subscribe;
pub mod ris_unsubscribe;

pub trait RisLiveClientMessage: Serialize {
fn msg_type(&self) -> &'static str;

fn to_json_string(&self) -> String {
serde_json::to_string(&json!({
"type": self.msg_type(),
"data": self
}))
.unwrap()
}
}

use serde::Serialize;
use serde_json::json;

pub use ping::Ping;
pub use request_rrc_list::RequestRrcList;
pub use ris_subscribe::RisSubscribe;
pub use ris_unsubscribe::RisUnsubscribe;
34 changes: 34 additions & 0 deletions src/parser/rislive/messages/client/ping.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use crate::rislive::messages::RisLiveClientMessage;
use serde::Serialize;

#[derive(Debug)]
pub struct Ping {}

impl Serialize for Ping {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_none()
}
}

impl RisLiveClientMessage for Ping {
fn msg_type(&self) -> &'static str {
"ping"
}
}

#[cfg(test)]
mod tests {
use super::*;
use serde_json::{json, Value};

#[test]
fn test_to_json_string() {
let ping_msg = Ping {};
let json_str = ping_msg.to_json_string();
let value: Value = serde_json::from_str(&json_str).unwrap();
assert_eq!(value, json!({"type": "ping", "data": null}));
}
}
40 changes: 40 additions & 0 deletions src/parser/rislive/messages/client/request_rrc_list.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use crate::rislive::messages::RisLiveClientMessage;
use serde::Serialize;

#[derive(Default, Debug)]
pub struct RequestRrcList {
data: Vec<String>,
}

impl Serialize for RequestRrcList {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
self.data.serialize(serializer)
}
}

impl RequestRrcList {
pub fn new(rrc_list: Vec<String>) -> Self {
Self { data: rrc_list }
}
}

impl RisLiveClientMessage for RequestRrcList {
fn msg_type(&self) -> &'static str {
"request_rrc_list"
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_request_rrc_list() {
let rrc_list = RequestRrcList::new(vec!["rrc00".to_string(), "rrc01".to_string()]);

println!("{}", rrc_list.to_json_string());
}
}
206 changes: 206 additions & 0 deletions src/parser/rislive/messages/client/ris_subscribe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
use crate::rislive::messages::RisLiveClientMessage;
use ipnet::IpNet;
use serde::Serialize;
use std::net::IpAddr;

#[derive(Debug, Serialize)]
#[allow(clippy::upper_case_acronyms)]
pub enum RisSubscribeType {
UPDATE,
OPEN,
NOTIFICATION,
KEEPALIVE,
RIS_PEER_STATE,
}

#[derive(Debug, Serialize)]
pub struct RisSubscribeSocketOptions {
/// Include a Base64-encoded version of the original binary BGP message as `raw` for all subscriptions
///
/// *Default: false*
#[serde(rename = "includeRaw")]
pub include_raw: Option<bool>,

/// Send a `ris_subscribe_ok` message for all succesful subscriptions
///
/// *Default: false*
pub acknowledge: Option<bool>,
}

#[derive(Default, Debug, Serialize)]
pub struct RisSubscribe {
/// Only include messages collected by a particular RRC
#[serde(skip_serializing_if = "Option::is_none")]
pub host: Option<String>,

/// Only include messages of a given BGP or RIS type
#[serde(rename = "type")]
#[serde(skip_serializing_if = "Option::is_none")]
pub data_type: Option<RisSubscribeType>,

/// Only include messages containing a given key
///
/// Examples:
/// * "announcements"
/// * "withdrawals"
#[serde(skip_serializing_if = "Option::is_none")]
pub require: Option<String>,

/// Only include messages sent by the given BGP peer
#[serde(skip_serializing_if = "Option::is_none")]
pub peer: Option<IpAddr>,

/// ASN or pattern to match against the AS PATH attribute
///
/// Any of:
/// * ASN (integer)
/// * AS path pattern (string)
///
/// Comma-separated pattern describing all or part of the AS path.
/// Can optionally begin with ^ to match the first item of the path (the last traversed ASN),
/// and/or end with $ to match the last item of the path
/// (the originating ASN).
///
/// The entire pattern can be prefixed with ! to invert the match.
/// AS_SETs should be written as JSON arrays of integers in ascending order and with no spaces.
/// Note: this is not a regular expression.
///
/// Examples:
/// * "789$"
/// * "^123,456,789,[789,10111]$"
/// * "!6666$"
/// * "!^3333"
#[serde(skip_serializing_if = "Option::is_none")]
pub path: Option<String>,

/// Filter UPDATE messages by prefixes in announcements or withdrawals
///
/// Any of:
/// * IPv4 or IPv6 CIDR prefix (string)
/// * Array of CIDR prefixes (array)
///
/// For the purposes of subsequent `ris_unsubscribe` messages,
/// each prefix results in a separate subscription that can be stopped independently
///
/// Array items:
/// * IPv4 or IPv6 CIDR prefix (string)
#[serde(skip_serializing_if = "Option::is_none")]
pub prefix: Option<IpNet>,

/// Match prefixes that are more specific (part of) `prefix`
///
/// *Default: true*
#[serde(rename = "moreSpecific")]
#[serde(skip_serializing_if = "Option::is_none")]
pub more_specific: Option<bool>,

/// Match prefixes that are less specific (contain) `prefix`
///
/// *Default: false*
#[serde(rename = "lessSpecific")]
#[serde(skip_serializing_if = "Option::is_none")]
pub less_specific: Option<bool>,

/// Options that apply to all subscriptions over the current WebSocket.
/// If a new subscription contains `socketOptions` it will override those from previous subscriptions
#[serde(rename = "socketOptions")]
#[serde(skip_serializing_if = "Option::is_none")]
pub socket_options: Option<RisSubscribeSocketOptions>,
}

impl RisSubscribe {
pub fn new() -> Self {
Default::default()
}

pub fn host(mut self, host: &str) -> Self {
self.host = Some(host.to_string());
self
}

pub fn data_type(mut self, data_type: RisSubscribeType) -> Self {
self.data_type = Some(data_type);
self
}

pub fn require(mut self, require: &str) -> Self {
self.require = Some(require.to_string());
self
}

pub fn peer(mut self, peer: IpAddr) -> Self {
self.peer = Some(peer);
self
}

pub fn path(mut self, path: &str) -> Self {
self.path = Some(path.to_string());
self
}

pub fn prefix(mut self, prefix: IpNet) -> Self {
self.prefix = Some(prefix);
self
}

pub fn more_specific(mut self, more_specific: bool) -> Self {
self.more_specific = Some(more_specific);
self
}

pub fn less_specific(mut self, less_specific: bool) -> Self {
self.less_specific = Some(less_specific);
self
}

pub fn include_raw(mut self, include_raw: bool) -> Self {
match self.socket_options.as_mut() {
None => {
self.socket_options = Some(RisSubscribeSocketOptions {
include_raw: Some(include_raw),
acknowledge: None,
});
}
Some(o) => {
o.include_raw = Some(include_raw);
}
}
self
}

pub fn acknowledge(mut self, acknowledge: bool) -> Self {
match self.socket_options.as_mut() {
None => {
self.socket_options = Some(RisSubscribeSocketOptions {
include_raw: None,
acknowledge: Some(acknowledge),
});
}
Some(o) => {
o.acknowledge = Some(acknowledge);
}
}
self
}
}

impl RisLiveClientMessage for RisSubscribe {
fn msg_type(&self) -> &'static str {
"ris_subscribe"
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_new() {
let ris_subscribe = RisSubscribe::new()
.host("rrc00")
.data_type(RisSubscribeType::UPDATE);
assert_eq!(ris_subscribe.host, Some("rrc00".to_string()));

println!("{}", ris_subscribe.to_json_string());
}
}
39 changes: 39 additions & 0 deletions src/parser/rislive/messages/client/ris_unsubscribe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use crate::rislive::messages::{RisLiveClientMessage, RisSubscribe};
use serde::Serialize;

#[derive(Default, Debug, Serialize)]
pub struct RisUnsubscribe {
#[serde(flatten)]
data: RisSubscribe,
}

impl RisUnsubscribe {
pub fn new(subscribe: RisSubscribe) -> Self {
Self { data: subscribe }
}
}

impl RisLiveClientMessage for RisUnsubscribe {
fn msg_type(&self) -> &'static str {
"ris_unsubscribe"
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::rislive::messages::client::ris_subscribe::RisSubscribeType;

#[test]
fn test_subscribe() {
let subscribe = RisSubscribe::new()
.host("rrc00")
.data_type(RisSubscribeType::UPDATE)
.acknowledge(true);
let unsubscribe = RisUnsubscribe::new(subscribe);

assert_eq!(unsubscribe.msg_type(), "ris_unsubscribe");

println!("{}", unsubscribe.to_json_string());
}
}
Loading