Skip to content

Commit

Permalink
feat(naming): query service subscriber list
Browse files Browse the repository at this point in the history
  • Loading branch information
a981008 committed Dec 29, 2024
1 parent 17ba6b2 commit 311867a
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 11 deletions.
8 changes: 7 additions & 1 deletion src/console/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ use crate::common::error_code::NO_PERMISSION;
use crate::common::string_utils::StringUtils;
use crate::naming::ops::ops_api::query_opt_service_list;
use crate::openapi::naming::instance::{del_instance, get_instance, update_instance};
use crate::openapi::naming::service::{query_service, remove_service, update_service};
use crate::openapi::naming::service::{
query_service, query_subscribers_list, remove_service, update_service,
};
use crate::user_namespace_privilege;
use actix_web::{http::header, web, HttpMessage, HttpRequest, HttpResponse, Responder};
use uuid::Uuid;
Expand Down Expand Up @@ -148,6 +150,10 @@ pub fn console_api_config_v1(config: &mut web::ServiceConfig) {
.route(web::delete().to(remove_service))
.route(web::get().to(query_service)),
)
.service(
web::resource("/ns/service/subscribers")
.route(web::get().to(query_subscribers_list)),
)
.service(
web::resource("/ns/instance")
.route(web::get().to(get_instance))
Expand Down
58 changes: 53 additions & 5 deletions src/naming/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ use super::naming_delay_nofity::DelayNotifyActor;
use super::naming_delay_nofity::DelayNotifyCmd;
use super::naming_subscriber::NamingListenerItem;
use super::naming_subscriber::Subscriber;
use super::service::Service;
use super::service::ServiceInfoDto;
use super::service::ServiceMetadata;
use super::service::{Service, SubscriberInfoDto};
use super::service_index::NamespaceIndex;
use super::service_index::ServiceQueryParam;
use super::NamingUtils;
Expand Down Expand Up @@ -647,6 +647,51 @@ impl NamingActor {
(size, service_names)
}

pub fn get_subscribers_list(
&self,
page_size: usize,
page_index: usize,
key: &ServiceKey,
) -> (usize, Vec<Arc<SubscriberInfoDto>>) {
let mut ret = Vec::new();

let res = self.subscriber.fuzzy_match_listener(
&key.group_name,
&key.service_name,
&key.namespace_id,
);

for (service_key, val) in res {
for (ip_port, _) in val {
let parts: Vec<&str> = ip_port.split(':').collect();
if parts.len() == 2 {
if let Ok(port) = parts[1].parse::<u16>() {
let subscriber_info = SubscriberInfoDto {
service_name: service_key.service_name.clone(),
group_name: service_key.group_name.clone(),
namespace_id: service_key.namespace_id.clone(),
ip: Arc::new(parts[0].to_string()),
port,
};

ret.push(Arc::new(subscriber_info));
}
}
}
}

let total = ret.len();
let start = (page_index - 1) * page_size;

let paginated_result = ret
.into_iter()
.skip(start)
.take(page_size)
.collect::<Vec<_>>();

(total, paginated_result)
}

pub fn get_service_info_page(&self, param: ServiceQueryParam) -> (usize, Vec<ServiceInfoDto>) {
let (size, list) = self.namespace_index.query_service_page(&param);

Expand Down Expand Up @@ -855,6 +900,7 @@ pub enum NamingCmd {
QueryListString(ServiceKey, String, bool, Option<SocketAddr>),
QueryServiceInfo(ServiceKey, String, bool),
QueryServicePage(ServiceKey, usize, usize),
QueryServiceSubscribersPage(ServiceKey, usize, usize),
//查询服务实际信息列表
QueryServiceInfoPage(ServiceQueryParam),
//CreateService(ServiceDetailDto),
Expand All @@ -881,6 +927,7 @@ pub enum NamingResult {
InstanceListString(String),
ServiceInfo(ServiceInfo),
ServicePage((usize, Vec<Arc<String>>)),
ServiceSubscribersPage((usize, Vec<Arc<SubscriberInfoDto>>)),
ServiceInfoPage((usize, Vec<ServiceInfoDto>)),
ClientInstanceCount(Vec<(Arc<String>, usize)>),
RewriteToCluster(u64, Instance),
Expand Down Expand Up @@ -975,6 +1022,11 @@ impl Handler<NamingCmd> for NamingActor {
&service_key,
)))
}
NamingCmd::QueryServiceSubscribersPage(service_key, page_size, page_index) => {
Ok(NamingResult::ServiceSubscribersPage(
self.get_subscribers_list(page_size, page_index, &service_key),
))
}
NamingCmd::QueryServiceInfoPage(param) => Ok(NamingResult::ServiceInfoPage(
self.get_service_info_page(param),
)),
Expand Down Expand Up @@ -1127,8 +1179,6 @@ async fn query_healthy_instances() {

#[test]
fn test_add_service() {
use super::*;
use tokio::net::UdpSocket;
let mut naming = NamingActor::new();
let service_key = ServiceKey::new("1", "1", "1");
let service_info = ServiceDetailDto {
Expand All @@ -1147,8 +1197,6 @@ fn test_add_service() {

#[test]
fn test_remove_has_instance_service() {
use super::*;
use tokio::net::UdpSocket;
let mut naming = NamingActor::new();
let mut instance = Instance::new("127.0.0.1".to_owned(), 8080);
instance.namespace_id = Arc::new("public".to_owned());
Expand Down
19 changes: 18 additions & 1 deletion src/naming/naming_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
use actix::prelude::*;

use super::{
model::{Instance, ServiceInfo, ServiceKey},
model::ServiceKey,
naming_delay_nofity::{DelayNotifyActor, DelayNotifyCmd},
};

Expand Down Expand Up @@ -185,4 +185,21 @@ impl Subscriber {
}
sum
}

pub fn fuzzy_match_listener(
&self,
group_name: &str,
service_name: &str,
namespace_id: &str,
) -> HashMap<ServiceKey, HashMap<Arc<String>, Option<HashSet<String>>>> {
self.listener
.iter()
.filter(|(key, _)| {
key.group_name.contains(group_name)
&& key.service_name.contains(service_name)
&& key.namespace_id.contains(namespace_id)
}) // 模糊匹配
.map(|(key, value)| (key.clone(), value.clone()))
.collect()
}
}
13 changes: 11 additions & 2 deletions src/naming/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ use std::{

use crate::common::constant::EMPTY_ARC_STRING;
use crate::naming::cluster::model::ProcessRange;
use crate::now_millis;
use actix_web::rt;
use inner_mem_cache::TimeoutSet;

use crate::now_millis;
use serde::{Deserialize, Serialize};

use super::{
api_model::QueryListResult,
Expand Down Expand Up @@ -389,3 +389,12 @@ pub struct ServiceInfoDto {
pub metadata: Option<Arc<HashMap<String, String>>>,
pub protect_threshold: Option<f32>,
}

#[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub struct SubscriberInfoDto {
pub service_name: Arc<String>,
pub group_name: Arc<String>,
pub namespace_id: Arc<String>,
pub ip: Arc<String>,
pub port: u16,
}
9 changes: 8 additions & 1 deletion src/openapi/naming/model.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#![allow(unused_imports, unused_assignments, unused_variables)]
use crate::common::option_utils::OptionUtils;
use crate::naming::model::{Instance, ServiceKey};
use crate::naming::service::SubscriberInfoDto;
use crate::naming::NamingUtils;
use crate::utils::{get_bool_from_string, select_option_by_clone};
use crate::utils::get_bool_from_string;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::SocketAddr;
Expand Down Expand Up @@ -351,3 +352,9 @@ pub struct ServiceQueryListResponce {
pub count: usize,
pub doms: Vec<Arc<String>>,
}

#[derive(Debug, Serialize, Deserialize, Default)]
pub struct ServiceQuerySubscribersListResponce {
pub count: usize,
pub subscribers: Vec<Arc<SubscriberInfoDto>>,
}
55 changes: 54 additions & 1 deletion src/openapi/naming/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use crate::naming::core::{NamingActor, NamingCmd, NamingResult};
use crate::naming::model::ServiceKey;
use crate::naming::NamingUtils;
use crate::openapi::constant::EMPTY;
use crate::openapi::naming::model::{ServiceQueryListRequest, ServiceQueryListResponce};
use crate::openapi::naming::model::{
ServiceQueryListRequest, ServiceQueryListResponce, ServiceQuerySubscribersListResponce,
};

pub(super) fn service() -> Scope {
web::scope("/service")
Expand All @@ -19,6 +21,7 @@ pub(super) fn service() -> Scope {
.route(web::get().to(query_service)),
)
.service(web::resource("/list").route(web::get().to(query_service_list)))
.service(web::resource("/subscribers").route(web::get().to(query_subscribers_list)))
}

pub async fn query_service(
Expand Down Expand Up @@ -107,3 +110,53 @@ pub async fn query_service_list(
Err(_) => HttpResponse::InternalServerError().body("error"),
}
}

pub async fn query_subscribers_list(
param: web::Query<ServiceQueryListRequest>,
naming_addr: web::Data<Addr<NamingActor>>,
) -> impl Responder {
let page_size = param.page_size.unwrap_or(0x7fffffff);
let page_index = param.page_no.unwrap_or(1);
let namespace_id = NamingUtils::default_namespace(
param
.namespace_id
.as_ref()
.unwrap_or(&"".to_owned())
.to_owned(),
);
let group = NamingUtils::default_group(
param
.group_name
.as_ref()
.unwrap_or(&"".to_owned())
.to_owned(),
);
let service = param
.service_name
.as_ref()
.unwrap_or(&"".to_owned())
.to_owned();

let key = ServiceKey::new(&namespace_id, &group, &service);
match naming_addr
.send(NamingCmd::QueryServiceSubscribersPage(
key, page_size, page_index,
))
.await
{
Ok(res) => {
let result: NamingResult = res.unwrap();
match result {
NamingResult::ServiceSubscribersPage((c, v)) => {
let resp = ServiceQuerySubscribersListResponce {
count: c,
subscribers: v,
};
HttpResponse::Ok().body(serde_json::to_string(&resp).unwrap())
}
_ => HttpResponse::InternalServerError().body("error"),
}
}
Err(_) => HttpResponse::InternalServerError().body("error"),
}
}
3 changes: 3 additions & 0 deletions src/user/permission.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
/// 2)http请求路径,由后端拦截器控制否支持请求;
use std::{collections::HashSet, hash::Hash, sync::Arc};


use crate::common::constant::{EMPTY_STR, HTTP_METHOD_ALL, HTTP_METHOD_GET};

pub enum Resource {
Expand Down Expand Up @@ -267,6 +268,7 @@ lazy_static::lazy_static! {

R::Path("/rnacos/api/console/ns/services",HTTP_METHOD_GET),
R::Path("/rnacos/api/console/ns/service",HTTP_METHOD_GET),
R::Path("/rnacos/api/console/ns/service/subscribers",HTTP_METHOD_GET),
R::Path("/rnacos/api/console/instances",HTTP_METHOD_GET),
R::Path("/rnacos/api/console/ns/instance",HTTP_METHOD_GET),

Expand All @@ -288,6 +290,7 @@ lazy_static::lazy_static! {

R::Path("/rnacos/api/console/ns/services",HTTP_METHOD_GET),
R::Path("/rnacos/api/console/ns/service",HTTP_METHOD_ALL),
R::Path("/rnacos/api/console/ns/service/subscribers",HTTP_METHOD_GET),
R::Path("/rnacos/api/console/instances",HTTP_METHOD_GET),
R::Path("/rnacos/api/console/ns/instance",HTTP_METHOD_ALL),

Expand Down

0 comments on commit 311867a

Please sign in to comment.