Skip to content

Commit

Permalink
Merge pull request #49 from bgpkit/refactor
Browse files Browse the repository at this point in the history
organize code into separate modules
  • Loading branch information
digizeph authored Jul 21, 2023
2 parents c1e1c5e + 8f9685a commit 8f91509
Show file tree
Hide file tree
Showing 11 changed files with 308 additions and 280 deletions.
136 changes: 127 additions & 9 deletions src/database.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use rusqlite::Connection;
use anyhow::Result;
use bgpkit_parser::models::{ElemType, MetaCommunity};
use bgpkit_parser::BgpElem;
use itertools::Itertools;
use rusqlite::Connection;

pub struct MonocleDatabase {
pub conn: Connection,
Expand All @@ -8,13 +11,128 @@ pub struct MonocleDatabase {
impl MonocleDatabase {
pub fn new(path: &Option<String>) -> Result<MonocleDatabase> {
let conn = match path {
Some(p) => {
Connection::open(p.as_str())?
}
None => {
Connection::open_in_memory()?
}
Some(p) => Connection::open(p.as_str())?,
None => Connection::open_in_memory()?,
};
Ok(MonocleDatabase{conn})
Ok(MonocleDatabase { conn })
}
}
macro_rules! option_to_string {
($a:expr) => {
if let Some(v) = $a {
format!("'{}'", v)
} else {
"NULL".to_string()
}
};
}

pub struct MsgStore {
db: MonocleDatabase,
}

impl MsgStore {
pub fn new(db_path: &Option<String>, reset: bool) -> MsgStore {
let mut db = MonocleDatabase::new(db_path).unwrap();
Self::initialize_msgs_db(&mut db, reset);
MsgStore { db }
}

fn initialize_msgs_db(db: &mut MonocleDatabase, reset: bool) {
db.conn
.execute(
r#"
create table if not exists elems (
timestamp INTEGER,
elem_type TEXT,
peer_ip TEXT,
peer_asn INTEGER,
prefix TEXT,
next_hop TEXT,
as_path TEXT,
origin_asns TEXT,
origin TEXT,
local_pref INTEGER,
med INTEGER,
communities TEXT,
atomic TEXT,
aggr_asn INTEGER,
aggr_ip TEXT
);
"#,
[],
)
.unwrap();

if reset {
db.conn.execute("delete from elems", []).unwrap();
}
}

#[inline(always)]
fn option_to_string_communities(o: &Option<Vec<MetaCommunity>>) -> String {
if let Some(v) = o {
format!("'{}'", v.iter().join(" "))
} else {
"NULL".to_string()
}
}

pub fn insert_elems(&self, elems: &[BgpElem]) {
for elems in elems.chunks(10000) {
let values = elems
.iter()
.map(|elem| {
let t = match elem.elem_type {
// bgpkit_parser::ElemType::ANNOUNCE => "A",
// bgpkit_parser::ElemType::WITHDRAW => "W",
ElemType::ANNOUNCE => "A",
ElemType::WITHDRAW => "W",
};
let origin_string = elem.origin_asns.as_ref().map(|asns| asns.get(0).unwrap());
format!(
"('{}','{}','{}','{}','{}', {},{},{},{},{},{},{},{},{},{})",
elem.timestamp as u32,
t,
elem.peer_ip,
elem.peer_asn,
elem.prefix,
option_to_string!(&elem.next_hop),
option_to_string!(&elem.as_path),
option_to_string!(origin_string),
option_to_string!(&elem.origin),
option_to_string!(&elem.local_pref),
option_to_string!(&elem.med),
Self::option_to_string_communities(&elem.communities),
option_to_string!(&elem.atomic),
option_to_string!(&elem.aggr_asn),
option_to_string!(&elem.aggr_ip),
)
})
.join(", ")
.to_string();
let query = format!(
"INSERT INTO elems (\
timestamp, elem_type, peer_ip, peer_asn, prefix, next_hop, \
as_path, origin_asns, origin, local_pref, med, communities,\
atomic, aggr_asn, aggr_ip)\
VALUES {values};"
);
self.db.conn.execute(query.as_str(), []).unwrap();
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use bgpkit_parser::BgpkitParser;

#[test]
fn test_insert() {
let store = MsgStore::new(&Some("test.sqlite3".to_string()), false);
let url = "https://spaces.bgpkit.org/parser/update-example.gz";
let elems: Vec<BgpElem> = BgpkitParser::new(url).unwrap().into_elem_iter().collect();
store.insert_elems(&elems);
}
}
Loading

0 comments on commit 8f91509

Please sign in to comment.