Skip to content

Commit

Permalink
implement a get info rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
aditiharini committed Dec 9, 2024
1 parent af9e7ae commit 8dfd2a4
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 2 deletions.
39 changes: 39 additions & 0 deletions src/network/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@ use crate::mempool::routing;
use crate::proto;
use crate::proto::hub_service_server::HubService;
use crate::proto::Block;
use crate::proto::DbStats;
use crate::proto::GetInfoRequest;
use crate::proto::GetInfoResponse;
use crate::proto::HubEvent;
use crate::proto::{BlocksRequest, ShardChunksRequest, ShardChunksResponse, SubscribeRequest};
use crate::storage::constants::OnChainEventPostfix;
use crate::storage::constants::RootPrefix;
use crate::storage::db::PageOptions;
use crate::storage::db::RocksDbTransactionBatch;
use crate::storage::store::engine::{MempoolMessage, Senders, ShardEngine};
use crate::storage::store::stores::{StoreLimits, Stores};
use crate::storage::store::BlockStore;
Expand Down Expand Up @@ -260,6 +266,39 @@ impl HubService for MyHubService {
}
}

async fn get_info(
&self,
_request: Request<GetInfoRequest>,
) -> Result<Response<GetInfoResponse>, Status> {
let mut num_fid_registrations = 0;
let mut approx_size = 0;
let mut num_messages = 0;
for (_, shard_store) in self.shard_stores.iter() {
approx_size += shard_store.db.approximate_size();
num_messages += shard_store.trie.get_count(
&shard_store.db,
&mut RocksDbTransactionBatch::new(),
&[],
);
num_fid_registrations += shard_store
.db
.count_keys_at_prefix(vec![
RootPrefix::OnChainEvent as u8,
OnChainEventPostfix::IdRegisterByFid as u8,
])
.map_err(|err| Status::from_error(Box::new(err)))?
as u64;
}

Ok(Response::new(GetInfoResponse {
db_stats: Some(DbStats {
num_fid_registrations,
num_messages,
approx_size,
}),
}))
}

type SubscribeStream = ReceiverStream<Result<HubEvent, Status>>;

async fn subscribe(
Expand Down
14 changes: 14 additions & 0 deletions src/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,25 @@ message SubmitMessageResponse {
Message message = 1;
}

message DbStats {
uint64 num_messages = 1;
uint64 num_fid_registrations = 2;
uint64 approx_size = 4;
}

message GetInfoRequest {
}

// Response Types for the Sync RPC Methods
message GetInfoResponse {
DbStats db_stats = 5;
}

service HubService {
rpc SubmitMessage(Message) returns (Message);
rpc SubmitMessageWithOptions(SubmitMessageRequest) returns (SubmitMessageResponse);
rpc GetBlocks(BlocksRequest) returns (stream Block);
rpc GetShardChunks(ShardChunksRequest) returns (ShardChunksResponse);
rpc Subscribe(SubscribeRequest) returns (stream HubEvent);
rpc GetInfo(GetInfoRequest) returns (GetInfoResponse);
};
71 changes: 70 additions & 1 deletion src/storage/db/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,11 +390,35 @@ impl RocksDB {
.map(|metadata| metadata.len()) // Extract the file size.
.sum() // Sum the sizes.
}

/**
* Count the number of keys with a given prefix.
*/
pub fn count_keys_at_prefix(&self, prefix: Vec<u8>) -> Result<u32, HubError> {
let iter_opts = RocksDB::get_iterator_options(
Some(prefix.clone()),
Some(increment_vec_u8(&prefix.to_vec())),
&PageOptions::default(),
);

let db = self.db();
let mut iter = db.as_ref().unwrap().raw_iterator_opt(iter_opts.opts);

let mut count = 0;
iter.seek_to_first();
while iter.valid() {
count += 1;

iter.next();
}

Ok(count)
}
}

#[cfg(test)]
mod tests {
use crate::storage::db::RocksDbTransactionBatch;
use crate::storage::db::{RocksDB, RocksDbTransactionBatch};

#[test]
fn test_merge_rocksdb_transaction() {
Expand Down Expand Up @@ -503,6 +527,51 @@ mod tests {
assert_eq!(txn1.batch.get(&b"key5".to_vec()).unwrap().is_none(), true);
}

#[test]
fn test_count_keys_at_prefix() {
let tmp_path = tempfile::tempdir()
.unwrap()
.path()
.as_os_str()
.to_string_lossy()
.to_string();
let db = RocksDB::new(&tmp_path);
db.open().unwrap();

// Add some keys
db.put(b"key100", b"value1").unwrap();
db.put(b"key101", b"value3").unwrap();
db.put(b"key104", b"value4").unwrap();
db.put(b"key200", b"value2").unwrap();

// Count all keys
let count = db.count_keys_at_prefix(b"key".to_vec());
assert_eq!(count.unwrap(), 4);

// Count keys at prefix
let count = db.count_keys_at_prefix(b"key1".to_vec());
assert_eq!(count.unwrap(), 3);

// Count keys at prefix with a specific prefix that doesn't exist
let count = db.count_keys_at_prefix(b"key11".to_vec());
assert_eq!(count.unwrap(), 0);

// Count keys at prefix with a specific sub prefix
let count = db.count_keys_at_prefix(b"key10".to_vec());
assert_eq!(count.unwrap(), 3);

// Count keys at prefix with a specific prefix
let count = db.count_keys_at_prefix(b"key200".to_vec());
assert_eq!(count.unwrap(), 1);

// Count keys at prefix with a specific prefix that doesn't exist
let count = db.count_keys_at_prefix(b"key201".to_vec());
assert_eq!(count.unwrap(), 0);

// Cleanup
db.destroy().unwrap();
}

#[test]
fn test_keys_exist_in_db() {
let tmp_path = tempfile::tempdir()
Expand Down
2 changes: 1 addition & 1 deletion src/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
mod constants;
pub mod constants;
pub mod db;
pub mod store;
pub mod trie;
Expand Down

0 comments on commit 8dfd2a4

Please sign in to comment.