From 8dfd2a437789c49e0ab817dae006881b23b8f63f Mon Sep 17 00:00:00 2001 From: Aditi Srinivasan Date: Mon, 9 Dec 2024 15:07:25 -0500 Subject: [PATCH] implement a get info rpc --- src/network/server.rs | 39 +++++++++++++++++++++ src/proto/rpc.proto | 14 ++++++++ src/storage/db/rocksdb.rs | 71 ++++++++++++++++++++++++++++++++++++++- src/storage/mod.rs | 2 +- 4 files changed, 124 insertions(+), 2 deletions(-) diff --git a/src/network/server.rs b/src/network/server.rs index 034de45e..829bc76f 100644 --- a/src/network/server.rs +++ b/src/network/server.rs @@ -3,9 +3,15 @@ use crate::mempool::routing; use crate::proto; use crate::proto::hub_service_server::HubService; use crate::proto::Block; +use crate::proto::DbStats; +use crate::proto::GetInfoRequest; +use crate::proto::GetInfoResponse; use crate::proto::HubEvent; use crate::proto::{BlocksRequest, ShardChunksRequest, ShardChunksResponse, SubscribeRequest}; +use crate::storage::constants::OnChainEventPostfix; +use crate::storage::constants::RootPrefix; use crate::storage::db::PageOptions; +use crate::storage::db::RocksDbTransactionBatch; use crate::storage::store::engine::{MempoolMessage, Senders, ShardEngine}; use crate::storage::store::stores::{StoreLimits, Stores}; use crate::storage::store::BlockStore; @@ -260,6 +266,39 @@ impl HubService for MyHubService { } } + async fn get_info( + &self, + _request: Request, + ) -> Result, Status> { + let mut num_fid_registrations = 0; + let mut approx_size = 0; + let mut num_messages = 0; + for (_, shard_store) in self.shard_stores.iter() { + approx_size += shard_store.db.approximate_size(); + num_messages += shard_store.trie.get_count( + &shard_store.db, + &mut RocksDbTransactionBatch::new(), + &[], + ); + num_fid_registrations += shard_store + .db + .count_keys_at_prefix(vec![ + RootPrefix::OnChainEvent as u8, + OnChainEventPostfix::IdRegisterByFid as u8, + ]) + .map_err(|err| Status::from_error(Box::new(err)))? + as u64; + } + + Ok(Response::new(GetInfoResponse { + db_stats: Some(DbStats { + num_fid_registrations, + num_messages, + approx_size, + }), + })) + } + type SubscribeStream = ReceiverStream>; async fn subscribe( diff --git a/src/proto/rpc.proto b/src/proto/rpc.proto index 17d09b26..977e1f7b 100644 --- a/src/proto/rpc.proto +++ b/src/proto/rpc.proto @@ -37,6 +37,19 @@ message SubmitMessageResponse { Message message = 1; } +message DbStats { + uint64 num_messages = 1; + uint64 num_fid_registrations = 2; + uint64 approx_size = 4; +} + +message GetInfoRequest { +} + +// Response Types for the Sync RPC Methods +message GetInfoResponse { + DbStats db_stats = 5; +} service HubService { rpc SubmitMessage(Message) returns (Message); @@ -44,4 +57,5 @@ service HubService { rpc GetBlocks(BlocksRequest) returns (stream Block); rpc GetShardChunks(ShardChunksRequest) returns (ShardChunksResponse); rpc Subscribe(SubscribeRequest) returns (stream HubEvent); + rpc GetInfo(GetInfoRequest) returns (GetInfoResponse); }; diff --git a/src/storage/db/rocksdb.rs b/src/storage/db/rocksdb.rs index 3d4cd15a..69997d81 100644 --- a/src/storage/db/rocksdb.rs +++ b/src/storage/db/rocksdb.rs @@ -390,11 +390,35 @@ impl RocksDB { .map(|metadata| metadata.len()) // Extract the file size. .sum() // Sum the sizes. } + + /** + * Count the number of keys with a given prefix. + */ + pub fn count_keys_at_prefix(&self, prefix: Vec) -> Result { + let iter_opts = RocksDB::get_iterator_options( + Some(prefix.clone()), + Some(increment_vec_u8(&prefix.to_vec())), + &PageOptions::default(), + ); + + let db = self.db(); + let mut iter = db.as_ref().unwrap().raw_iterator_opt(iter_opts.opts); + + let mut count = 0; + iter.seek_to_first(); + while iter.valid() { + count += 1; + + iter.next(); + } + + Ok(count) + } } #[cfg(test)] mod tests { - use crate::storage::db::RocksDbTransactionBatch; + use crate::storage::db::{RocksDB, RocksDbTransactionBatch}; #[test] fn test_merge_rocksdb_transaction() { @@ -503,6 +527,51 @@ mod tests { assert_eq!(txn1.batch.get(&b"key5".to_vec()).unwrap().is_none(), true); } + #[test] + fn test_count_keys_at_prefix() { + let tmp_path = tempfile::tempdir() + .unwrap() + .path() + .as_os_str() + .to_string_lossy() + .to_string(); + let db = RocksDB::new(&tmp_path); + db.open().unwrap(); + + // Add some keys + db.put(b"key100", b"value1").unwrap(); + db.put(b"key101", b"value3").unwrap(); + db.put(b"key104", b"value4").unwrap(); + db.put(b"key200", b"value2").unwrap(); + + // Count all keys + let count = db.count_keys_at_prefix(b"key".to_vec()); + assert_eq!(count.unwrap(), 4); + + // Count keys at prefix + let count = db.count_keys_at_prefix(b"key1".to_vec()); + assert_eq!(count.unwrap(), 3); + + // Count keys at prefix with a specific prefix that doesn't exist + let count = db.count_keys_at_prefix(b"key11".to_vec()); + assert_eq!(count.unwrap(), 0); + + // Count keys at prefix with a specific sub prefix + let count = db.count_keys_at_prefix(b"key10".to_vec()); + assert_eq!(count.unwrap(), 3); + + // Count keys at prefix with a specific prefix + let count = db.count_keys_at_prefix(b"key200".to_vec()); + assert_eq!(count.unwrap(), 1); + + // Count keys at prefix with a specific prefix that doesn't exist + let count = db.count_keys_at_prefix(b"key201".to_vec()); + assert_eq!(count.unwrap(), 0); + + // Cleanup + db.destroy().unwrap(); + } + #[test] fn test_keys_exist_in_db() { let tmp_path = tempfile::tempdir() diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 1941a6b6..661a1da0 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1,4 +1,4 @@ -mod constants; +pub mod constants; pub mod db; pub mod store; pub mod trie;