diff --git a/src/database.rs b/src/database.rs index 963906c..c51c9dd 100644 --- a/src/database.rs +++ b/src/database.rs @@ -1,5 +1,8 @@ -use rusqlite::Connection; use anyhow::Result; +use bgpkit_parser::models::{ElemType, MetaCommunity}; +use bgpkit_parser::BgpElem; +use itertools::Itertools; +use rusqlite::Connection; pub struct MonocleDatabase { pub conn: Connection, @@ -8,13 +11,128 @@ pub struct MonocleDatabase { impl MonocleDatabase { pub fn new(path: &Option) -> Result { let conn = match path { - Some(p) => { - Connection::open(p.as_str())? - } - None => { - Connection::open_in_memory()? - } + Some(p) => Connection::open(p.as_str())?, + None => Connection::open_in_memory()?, }; - Ok(MonocleDatabase{conn}) + Ok(MonocleDatabase { conn }) + } +} +macro_rules! option_to_string { + ($a:expr) => { + if let Some(v) = $a { + format!("'{}'", v) + } else { + "NULL".to_string() + } + }; +} + +pub struct MsgStore { + db: MonocleDatabase, +} + +impl MsgStore { + pub fn new(db_path: &Option, reset: bool) -> MsgStore { + let mut db = MonocleDatabase::new(db_path).unwrap(); + Self::initialize_msgs_db(&mut db, reset); + MsgStore { db } + } + + fn initialize_msgs_db(db: &mut MonocleDatabase, reset: bool) { + db.conn + .execute( + r#" + create table if not exists elems ( + timestamp INTEGER, + elem_type TEXT, + peer_ip TEXT, + peer_asn INTEGER, + prefix TEXT, + next_hop TEXT, + as_path TEXT, + origin_asns TEXT, + origin TEXT, + local_pref INTEGER, + med INTEGER, + communities TEXT, + atomic TEXT, + aggr_asn INTEGER, + aggr_ip TEXT + ); + "#, + [], + ) + .unwrap(); + + if reset { + db.conn.execute("delete from elems", []).unwrap(); + } + } + + #[inline(always)] + fn option_to_string_communities(o: &Option>) -> String { + if let Some(v) = o { + format!("'{}'", v.iter().join(" ")) + } else { + "NULL".to_string() + } + } + + pub fn insert_elems(&self, elems: &[BgpElem]) { + for elems in elems.chunks(10000) { + let values = elems + .iter() + .map(|elem| { + let t = match elem.elem_type { + // bgpkit_parser::ElemType::ANNOUNCE => "A", + // bgpkit_parser::ElemType::WITHDRAW => "W", + ElemType::ANNOUNCE => "A", + ElemType::WITHDRAW => "W", + }; + let origin_string = elem.origin_asns.as_ref().map(|asns| asns.get(0).unwrap()); + format!( + "('{}','{}','{}','{}','{}', {},{},{},{},{},{},{},{},{},{})", + elem.timestamp as u32, + t, + elem.peer_ip, + elem.peer_asn, + elem.prefix, + option_to_string!(&elem.next_hop), + option_to_string!(&elem.as_path), + option_to_string!(origin_string), + option_to_string!(&elem.origin), + option_to_string!(&elem.local_pref), + option_to_string!(&elem.med), + Self::option_to_string_communities(&elem.communities), + option_to_string!(&elem.atomic), + option_to_string!(&elem.aggr_asn), + option_to_string!(&elem.aggr_ip), + ) + }) + .join(", ") + .to_string(); + let query = format!( + "INSERT INTO elems (\ + timestamp, elem_type, peer_ip, peer_asn, prefix, next_hop, \ + as_path, origin_asns, origin, local_pref, med, communities,\ + atomic, aggr_asn, aggr_ip)\ + VALUES {values};" + ); + self.db.conn.execute(query.as_str(), []).unwrap(); + } } -} \ No newline at end of file +} + +#[cfg(test)] +mod tests { + use super::*; + use bgpkit_parser::BgpkitParser; + + #[test] + fn test_insert() { + let store = MsgStore::new(&Some("test.sqlite3".to_string()), false); + let url = "https://spaces.bgpkit.org/parser/update-example.gz"; + let elems: Vec = BgpkitParser::new(url).unwrap().into_elem_iter().collect(); + store.insert_elems(&elems); + } +} diff --git a/src/as2org.rs b/src/datasets/as2org.rs similarity index 71% rename from src/as2org.rs rename to src/datasets/as2org.rs index 5e385b8..153080b 100644 --- a/src/as2org.rs +++ b/src/datasets/as2org.rs @@ -1,18 +1,18 @@ -/// AS2Org data handling utility. -/// -/// Data source: -/// The CAIDA AS Organizations Dataset, -/// http://www.caida.org/data/as-organizations - -use serde::{Serialize, Deserialize}; +//! AS2Org data handling utility. +//! +//! Data source: +//! The CAIDA AS Organizations Dataset, +//! http://www.caida.org/data/as-organizations + +use crate::database::MonocleDatabase; +use crate::CountryLookup; use anyhow::{anyhow, Result}; use itertools::Itertools; use regex::Regex; use rusqlite::Statement; +use serde::{Deserialize, Serialize}; use tabled::Tabled; use tracing::info; -use crate::{CountryLookup, MonocleDatabase}; - /// Organization JSON format /// @@ -32,7 +32,7 @@ use crate::{CountryLookup, MonocleDatabase}; /// source : the RIR or NIR database which was contained this entry #[derive(Debug, Serialize, Deserialize)] pub struct JsonOrg { - #[serde(alias="organizationId")] + #[serde(alias = "organizationId")] org_id: String, changed: Option, @@ -45,8 +45,8 @@ pub struct JsonOrg { /// The RIR or NIR database that contained this entry source: String, - #[serde(alias="type")] - data_type: String + #[serde(alias = "type")] + data_type: String, } /// AS Json format @@ -62,7 +62,6 @@ pub struct JsonOrg { /// source : the RIR or NIR database which was contained this entry #[derive(Debug, Serialize, Deserialize)] pub struct JsonAs { - asn: String, changed: Option, @@ -70,17 +69,17 @@ pub struct JsonAs { #[serde(default)] name: String, - #[serde(alias="opaqueId")] + #[serde(alias = "opaqueId")] opaque_id: Option, - #[serde(alias="organizationId")] + #[serde(alias = "organizationId")] org_id: String, /// The RIR or NIR database that contained this entry source: String, - #[serde(rename="type")] - data_type: String + #[serde(rename = "type")] + data_type: String, } #[derive(Debug)] @@ -94,20 +93,15 @@ pub struct As2org { country_lookup: CountryLookup, } -#[derive(Debug)] +#[derive(Debug, Default)] pub enum SearchType { AsnOnly, NameOnly, CountryOnly, + #[default] Guess, } -impl Default for SearchType { - fn default() -> Self { - SearchType::Guess - } -} - #[derive(Debug, Tabled)] pub struct SearchResult { pub asn: u32, @@ -115,7 +109,7 @@ pub struct SearchResult { pub org_name: String, pub org_id: String, pub org_country: String, - pub org_size: u32 + pub org_size: u32, } #[derive(Debug, Tabled)] @@ -127,15 +121,18 @@ pub struct SearchResultConcise { } impl As2org { - pub fn new(db_path: &Option) -> Result { let mut db = MonocleDatabase::new(db_path)?; As2org::initialize_db(&mut db); let country_lookup = CountryLookup::new(); - Ok(As2org{ db, country_lookup }) + Ok(As2org { db, country_lookup }) } - fn stmt_to_results(&self, stmt: &mut Statement, full_country_name: bool) -> Result> { + fn stmt_to_results( + &self, + stmt: &mut Statement, + full_country_name: bool, + ) -> Result> { let res_iter = stmt.query_map([], |row| { let code: String = row.get(4)?; let country: String = match full_country_name { @@ -145,10 +142,8 @@ impl As2org { None => code, Some(c) => c.to_string(), } - }, - false => { - code } + false => code, }; Ok(SearchResult { asn: row.get(0)?, @@ -156,107 +151,146 @@ impl As2org { org_name: row.get(2)?, org_id: row.get(3)?, org_country: country, - org_size: row.get(5)? + org_size: row.get(5)?, }) })?; - Ok( - res_iter.filter_map(|x| x.ok()).collect() - ) + Ok(res_iter.filter_map(|x| x.ok()).collect()) } pub fn is_db_empty(&self) -> bool { - let count: u32 = self.db.conn.query_row("select count(*) from as2org_as", [], - |row| row.get(0), - ).unwrap(); + let count: u32 = self + .db + .conn + .query_row("select count(*) from as2org_as", [], |row| row.get(0)) + .unwrap(); count == 0 } fn initialize_db(db: &mut MonocleDatabase) { - db.conn.execute(r#" + db.conn + .execute( + r#" create table if not exists as2org_as ( asn INTEGER PRIMARY KEY, name TEXT, org_id TEXT, source TEXT ); - "#,[]).unwrap(); - db.conn.execute(r#" + "#, + [], + ) + .unwrap(); + db.conn + .execute( + r#" create table if not exists as2org_org ( org_id TEXT PRIMARY KEY, name TEXT, country TEXT, source TEXT ); - "#,[]).unwrap(); + "#, + [], + ) + .unwrap(); // views - db.conn.execute(r#" + db.conn + .execute( + r#" create view if not exists as2org_both as select a.asn, a.name as 'as_name', b.name as 'org_name', b.org_id, b.country from as2org_as as a join as2org_org as b on a.org_id = b.org_id ; - "#,[]).unwrap(); - - db.conn.execute(r#" + "#, + [], + ) + .unwrap(); + + db.conn + .execute( + r#" create view if not exists as2org_count as select org_id, org_name, count(*) as count from as2org_both group by org_name order by count desc; - "#,[]).unwrap(); - - db.conn.execute(r#" + "#, + [], + ) + .unwrap(); + + db.conn + .execute( + r#" create view if not exists as2org_all as select a.*, b.count from as2org_both as a join as2org_count as b on a.org_id = b.org_id; - "#,[]).unwrap(); + "#, + [], + ) + .unwrap(); } fn insert_as(&self, as_entry: &JsonAs) -> Result<()> { - self.db.conn.execute( r#" + self.db.conn.execute( + r#" INSERT INTO as2org_as (asn, name, org_id, source) VALUES (?1, ?2, ?3, ?4) - "#, ( - as_entry.asn.parse::().unwrap(), - as_entry.name.as_str(), - as_entry.org_id.as_str(), - as_entry.source.as_str(), - ) + "#, + ( + as_entry.asn.parse::().unwrap(), + as_entry.name.as_str(), + as_entry.org_id.as_str(), + as_entry.source.as_str(), + ), )?; Ok(()) } fn insert_org(&self, org_entry: &JsonOrg) -> Result<()> { - self.db.conn.execute( r#" + self.db.conn.execute( + r#" INSERT INTO as2org_org (org_id, name, country, source) VALUES (?1, ?2, ?3, ?4) - "#, ( - org_entry.org_id.as_str(), - org_entry.name.as_str(), - org_entry.country.as_str(), - org_entry.source.as_str(), - ) + "#, + ( + org_entry.org_id.as_str(), + org_entry.name.as_str(), + org_entry.country.as_str(), + org_entry.source.as_str(), + ), )?; Ok(()) } pub fn clear_db(&self) { - self.db.conn.execute(r#" + self.db + .conn + .execute( + r#" DELETE FROM as2org_as - "#, [] - ).unwrap(); - self.db.conn.execute(r#" + "#, + [], + ) + .unwrap(); + self.db + .conn + .execute( + r#" DELETE FROM as2org_org - "#, [] - ).unwrap(); + "#, + [], + ) + .unwrap(); } /// parse as2org data and insert into monocle sqlite database - pub fn parse_insert_as2org(&self, url: Option<&str>) -> Result<()>{ + pub fn parse_insert_as2org(&self, url: Option<&str>) -> Result<()> { self.clear_db(); let url = match url { Some(u) => u.to_string(), - None => As2org::get_most_recent_data() + None => As2org::get_most_recent_data(), }; info!("start parsing as2org file at {}", url.as_str()); let entries = As2org::parse_as2org_file(url.as_str())?; @@ -275,9 +309,17 @@ impl As2org { Ok(()) } - pub fn search(&self, query: &str, search_type: &SearchType, full_country_name: bool) -> Result>{ + pub fn search( + &self, + query: &str, + search_type: &SearchType, + full_country_name: bool, + ) -> Result> { + #[allow(clippy::upper_case_acronyms)] enum QueryType { - ASN, NAME, COUNTRY + ASN, + NAME, + COUNTRY, } let res: Vec; let mut query_type = QueryType::ASN; @@ -305,8 +347,10 @@ impl As2org { if countries.is_empty() { return Err(anyhow!("no country found with the query ({})", query)); } else if countries.len() > 1 { - let countries = countries.into_iter().map(|x|x.name).join(" ; "); - return Err(anyhow!("more than one countries found with the query ({query}): {countries}")); + let countries = countries.into_iter().map(|x| x.name).join(" ; "); + return Err(anyhow!( + "more than one countries found with the query ({query}): {countries}" + )); } let mut stmt = self.db.conn.prepare( @@ -315,67 +359,57 @@ impl As2org { )?; res = self.stmt_to_results(&mut stmt, full_country_name)?; } - SearchType::Guess => { - match query.parse::() { - Ok(asn) => { - query_type = QueryType::ASN; - let mut stmt = self.db.conn.prepare( + SearchType::Guess => match query.parse::() { + Ok(asn) => { + query_type = QueryType::ASN; + let mut stmt = self.db.conn.prepare( format!( "SELECT asn, as_name, org_name, org_id, country, count FROM as2org_all where asn='{asn}'").as_str() )?; - res = self.stmt_to_results(&mut stmt, full_country_name)?; - } - Err(_) => { - query_type = QueryType::NAME; - let mut stmt = self.db.conn.prepare( + res = self.stmt_to_results(&mut stmt, full_country_name)?; + } + Err(_) => { + query_type = QueryType::NAME; + let mut stmt = self.db.conn.prepare( format!( "SELECT asn, as_name, org_name, org_id, country, count FROM as2org_all where org_name like '%{query}%' or as_name like '%{query}%' or org_id like '%{query}%' order by count desc").as_str() )?; - res = self.stmt_to_results(&mut stmt, full_country_name)?; - } + res = self.stmt_to_results(&mut stmt, full_country_name)?; } - } + }, } match res.is_empty() { true => { let new_res = match query_type { - QueryType::ASN => { - SearchResult{ - asn: query.parse::().unwrap(), - as_name: "?".to_string(), - org_name: "?".to_string(), - org_id: "?".to_string(), - org_country: "?".to_string(), - org_size: 0, - } - } - QueryType::NAME => { - SearchResult{ - asn: 0, - as_name: "?".to_string(), - org_name: query.to_string(), - org_id: "?".to_string(), - org_country: "?".to_string(), - org_size: 0, - } - } - QueryType::COUNTRY => { - SearchResult{ - asn: 0, - as_name: "?".to_string(), - org_name: "?".to_string(), - org_id: "?".to_string(), - org_country: query.to_string(), - org_size: 0, - } - } + QueryType::ASN => SearchResult { + asn: query.parse::().unwrap(), + as_name: "?".to_string(), + org_name: "?".to_string(), + org_id: "?".to_string(), + org_country: "?".to_string(), + org_size: 0, + }, + QueryType::NAME => SearchResult { + asn: 0, + as_name: "?".to_string(), + org_name: query.to_string(), + org_id: "?".to_string(), + org_country: "?".to_string(), + org_size: 0, + }, + QueryType::COUNTRY => SearchResult { + asn: 0, + as_name: "?".to_string(), + org_name: "?".to_string(), + org_id: "?".to_string(), + org_country: query.to_string(), + org_size: 0, + }, }; Ok(vec![new_res]) } - false => { - Ok(res) - } + false => Ok(res), } } @@ -393,7 +427,7 @@ impl As2org { } Err(e) => { eprintln!("error parsing line:\n{}", line.as_str()); - return Err(anyhow!(e)) + return Err(anyhow!(e)); } } } else { @@ -404,7 +438,7 @@ impl As2org { } Err(e) => { eprintln!("error parsing line:\n{}", line.as_str()); - return Err(anyhow!(e)) + return Err(anyhow!(e)); } } } @@ -414,17 +448,21 @@ impl As2org { pub fn get_most_recent_data() -> String { let data_link: Regex = Regex::new(r#".*(........\.as-org2info\.jsonl\.gz).*"#).unwrap(); - let content = ureq::get("https://publicdata.caida.org/datasets/as-organizations/").call().unwrap().into_string().unwrap(); - let res: Vec = data_link.captures_iter(content.as_str()).map(|cap| { - cap[1].to_owned() - }).collect(); + let content = ureq::get("https://publicdata.caida.org/datasets/as-organizations/") + .call() + .unwrap() + .into_string() + .unwrap(); + let res: Vec = data_link + .captures_iter(content.as_str()) + .map(|cap| cap[1].to_owned()) + .collect(); let file = res.last().unwrap().to_string(); format!("https://publicdata.caida.org/datasets/as-organizations/{file}") } } - #[cfg(test)] mod tests { use super::*; @@ -458,7 +496,9 @@ mod tests { let as2org = As2org::new(&Some("./test.sqlite3".to_string())).unwrap(); as2org.clear_db(); assert!(as2org.is_db_empty()); - as2org.parse_insert_as2org(Some("tests/test-as2org.jsonl.gz")).unwrap(); + as2org + .parse_insert_as2org(Some("tests/test-as2org.jsonl.gz")) + .unwrap(); let res = as2org.search("400644", &SearchType::AsnOnly, false); assert!(res.is_ok()); diff --git a/src/country.rs b/src/datasets/country.rs similarity index 100% rename from src/country.rs rename to src/datasets/country.rs diff --git a/src/datasets/mod.rs b/src/datasets/mod.rs new file mode 100644 index 0000000..53ba835 --- /dev/null +++ b/src/datasets/mod.rs @@ -0,0 +1,7 @@ +mod as2org; +mod country; +mod rpki; + +pub use crate::datasets::as2org::*; +pub use crate::datasets::country::*; +pub use crate::datasets::rpki::*; diff --git a/src/rpki/aspa.rs b/src/datasets/rpki/aspa.rs similarity index 100% rename from src/rpki/aspa.rs rename to src/datasets/rpki/aspa.rs diff --git a/src/rpki/mod.rs b/src/datasets/rpki/mod.rs similarity index 100% rename from src/rpki/mod.rs rename to src/datasets/rpki/mod.rs diff --git a/src/rpki/roa.rs b/src/datasets/rpki/roa.rs similarity index 100% rename from src/rpki/roa.rs rename to src/datasets/rpki/roa.rs diff --git a/src/rpki/validator.rs b/src/datasets/rpki/validator.rs similarity index 100% rename from src/rpki/validator.rs rename to src/datasets/rpki/validator.rs diff --git a/src/lib.rs b/src/lib.rs index 138a3d9..75f3877 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,6 @@ -pub mod as2org; mod config; -pub mod country; mod database; -mod msg_store; -pub mod rpki; +mod datasets; use anyhow::{anyhow, Result}; use bgpkit_parser::BgpkitParser; @@ -15,11 +12,9 @@ use std::net::IpAddr; use tabled::settings::Style; use tabled::{Table, Tabled}; -pub use crate::as2org::*; pub use crate::config::MonocleConfig; -pub use crate::country::CountryLookup; -pub use crate::database::MonocleDatabase; -pub use crate::msg_store::MsgStore; +pub use crate::database::*; +pub use crate::datasets::*; #[allow(clippy::too_many_arguments)] pub fn parser_with_filters( diff --git a/src/monocle.rs b/src/monocle.rs index 98f7a4f..bf75596 100644 --- a/src/monocle.rs +++ b/src/monocle.rs @@ -9,21 +9,13 @@ use bgpkit_parser::BgpElem; use chrono::DateTime; use clap::{Args, Parser, Subcommand}; use ipnetwork::IpNetwork; +use monocle::*; use rayon::prelude::*; use serde_json::json; use tabled::settings::{Merge, Style}; use tabled::Table; use tracing::{info, Level}; -use monocle::rpki::{ - list_by_asn, list_by_prefix, read_aspa, read_roa, summarize_asn, validate, RoaTableItem, - SummaryTableItem, -}; -use monocle::{ - parser_with_filters, string_to_time, time_to_table, As2org, CountryLookup, MonocleConfig, - MsgStore, SearchResult, SearchResultConcise, SearchType, -}; - trait Validate { fn validate(&self) -> Result<()>; } diff --git a/src/msg_store.rs b/src/msg_store.rs deleted file mode 100644 index d8f914e..0000000 --- a/src/msg_store.rs +++ /dev/null @@ -1,124 +0,0 @@ -use crate::MonocleDatabase; -use bgpkit_parser::models::{ElemType, MetaCommunity}; -use bgpkit_parser::BgpElem; -use itertools::Itertools; - -macro_rules! option_to_string { - ($a:expr) => { - if let Some(v) = $a { - format!("'{}'", v) - } else { - "NULL".to_string() - } - }; -} - -pub struct MsgStore { - db: MonocleDatabase, -} - -impl MsgStore { - pub fn new(db_path: &Option, reset: bool) -> MsgStore { - let mut db = MonocleDatabase::new(db_path).unwrap(); - Self::initialize_msgs_db(&mut db, reset); - MsgStore { db } - } - - fn initialize_msgs_db(db: &mut MonocleDatabase, reset: bool) { - db.conn - .execute( - r#" - create table if not exists elems ( - timestamp INTEGER, - elem_type TEXT, - peer_ip TEXT, - peer_asn INTEGER, - prefix TEXT, - next_hop TEXT, - as_path TEXT, - origin_asns TEXT, - origin TEXT, - local_pref INTEGER, - med INTEGER, - communities TEXT, - atomic TEXT, - aggr_asn INTEGER, - aggr_ip TEXT - ); - "#, - [], - ) - .unwrap(); - - if reset { - db.conn.execute("delete from elems", []).unwrap(); - } - } - - #[inline(always)] - fn option_to_string_communities(o: &Option>) -> String { - if let Some(v) = o { - format!("'{}'", v.iter().join(" ")) - } else { - "NULL".to_string() - } - } - - pub fn insert_elems(&self, elems: &[BgpElem]) { - for elems in elems.chunks(10000) { - let values = elems - .iter() - .map(|elem| { - let t = match elem.elem_type { - // bgpkit_parser::ElemType::ANNOUNCE => "A", - // bgpkit_parser::ElemType::WITHDRAW => "W", - ElemType::ANNOUNCE => "A", - ElemType::WITHDRAW => "W", - }; - let origin_string = elem.origin_asns.as_ref().map(|asns| asns.get(0).unwrap()); - format!( - "('{}','{}','{}','{}','{}', {},{},{},{},{},{},{},{},{},{})", - elem.timestamp as u32, - t, - elem.peer_ip, - elem.peer_asn, - elem.prefix, - option_to_string!(&elem.next_hop), - option_to_string!(&elem.as_path), - option_to_string!(origin_string), - option_to_string!(&elem.origin), - option_to_string!(&elem.local_pref), - option_to_string!(&elem.med), - Self::option_to_string_communities(&elem.communities), - option_to_string!(&elem.atomic), - option_to_string!(&elem.aggr_asn), - option_to_string!(&elem.aggr_ip), - ) - }) - .join(", ") - .to_string(); - let query = format!( - "INSERT INTO elems (\ - timestamp, elem_type, peer_ip, peer_asn, prefix, next_hop, \ - as_path, origin_asns, origin, local_pref, med, communities,\ - atomic, aggr_asn, aggr_ip)\ - VALUES {values};" - ); - self.db.conn.execute(query.as_str(), []).unwrap(); - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use bgpkit_parser::BgpkitParser; - - #[test] - fn test_insert() { - let store = MsgStore::new(&Some("test.sqlite3".to_string()), false); - let url = "https://spaces.bgpkit.org/parser/update-example.gz"; - let elems: Vec = BgpkitParser::new(url).unwrap().into_elem_iter().collect(); - store.insert_elems(&elems); - } -}