Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(naming): query service subscriber list #189

Merged
merged 4 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/console/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ use crate::common::error_code::NO_PERMISSION;
use crate::common::string_utils::StringUtils;
use crate::naming::ops::ops_api::query_opt_service_list;
use crate::openapi::naming::instance::{del_instance, get_instance, update_instance};
use crate::openapi::naming::service::{query_service, remove_service, update_service};
use crate::openapi::naming::service::{
query_service, query_subscribers_list, remove_service, update_service,
};
use crate::user_namespace_privilege;
use actix_web::{http::header, web, HttpMessage, HttpRequest, HttpResponse, Responder};
use uuid::Uuid;
Expand Down Expand Up @@ -148,6 +150,10 @@ pub fn console_api_config_v1(config: &mut web::ServiceConfig) {
.route(web::delete().to(remove_service))
.route(web::get().to(query_service)),
)
.service(
web::resource("/ns/service/subscribers")
.route(web::get().to(query_subscribers_list)),
)
.service(
web::resource("/ns/instance")
.route(web::get().to(get_instance))
Expand Down
64 changes: 59 additions & 5 deletions src/naming/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ use super::naming_delay_nofity::DelayNotifyActor;
use super::naming_delay_nofity::DelayNotifyCmd;
use super::naming_subscriber::NamingListenerItem;
use super::naming_subscriber::Subscriber;
use super::service::Service;
use super::service::ServiceInfoDto;
use super::service::ServiceMetadata;
use super::service::{Service, SubscriberInfoDto};
use super::service_index::NamespaceIndex;
use super::service_index::ServiceQueryParam;
use super::NamingUtils;
Expand Down Expand Up @@ -647,6 +647,57 @@ impl NamingActor {
(size, service_names)
}

pub fn get_subscribers_list(
&self,
page_size: usize,
page_index: usize,
key: &ServiceKey,
) -> (usize, Vec<Arc<SubscriberInfoDto>>) {
let mut ret = Vec::new();

let res = self.subscriber.fuzzy_match_listener(
&key.group_name,
&key.service_name,
&key.namespace_id,
);

for (service_key, val) in res {
for (ip_port, _) in val {
let parts: Vec<&str> = ip_port.split(':').collect();
if parts.len() == 2 {
if let Ok(port) = parts[1].parse::<u16>() {
let subscriber_info = SubscriberInfoDto {
service_name: service_key.service_name.clone(),
group_name: service_key.group_name.clone(),
namespace_id: service_key.namespace_id.clone(),
ip: Arc::new(parts[0].to_string()),
port,
};

ret.push(Arc::new(subscriber_info));
}
}
}
}

let total = ret.len();
let start = (page_index - 1) * page_size;
ret.sort_by(|a, b| {
a.service_name
.cmp(&b.service_name)
.then(a.group_name.cmp(&b.group_name))
.then(a.ip.cmp(&b.ip))
.then(a.port.cmp(&b.port))
});
let paginated_result = ret
.into_iter()
.skip(start)
.take(page_size)
.collect::<Vec<_>>();

(total, paginated_result)
}

pub fn get_service_info_page(&self, param: ServiceQueryParam) -> (usize, Vec<ServiceInfoDto>) {
let (size, list) = self.namespace_index.query_service_page(&param);

Expand Down Expand Up @@ -855,6 +906,7 @@ pub enum NamingCmd {
QueryListString(ServiceKey, String, bool, Option<SocketAddr>),
QueryServiceInfo(ServiceKey, String, bool),
QueryServicePage(ServiceKey, usize, usize),
QueryServiceSubscribersPage(ServiceKey, usize, usize),
//查询服务实际信息列表
QueryServiceInfoPage(ServiceQueryParam),
//CreateService(ServiceDetailDto),
Expand All @@ -881,6 +933,7 @@ pub enum NamingResult {
InstanceListString(String),
ServiceInfo(ServiceInfo),
ServicePage((usize, Vec<Arc<String>>)),
ServiceSubscribersPage((usize, Vec<Arc<SubscriberInfoDto>>)),
ServiceInfoPage((usize, Vec<ServiceInfoDto>)),
ClientInstanceCount(Vec<(Arc<String>, usize)>),
RewriteToCluster(u64, Instance),
Expand Down Expand Up @@ -975,6 +1028,11 @@ impl Handler<NamingCmd> for NamingActor {
&service_key,
)))
}
NamingCmd::QueryServiceSubscribersPage(service_key, page_size, page_index) => {
Ok(NamingResult::ServiceSubscribersPage(
self.get_subscribers_list(page_size, page_index, &service_key),
))
}
NamingCmd::QueryServiceInfoPage(param) => Ok(NamingResult::ServiceInfoPage(
self.get_service_info_page(param),
)),
Expand Down Expand Up @@ -1127,8 +1185,6 @@ async fn query_healthy_instances() {

#[test]
fn test_add_service() {
use super::*;
use tokio::net::UdpSocket;
let mut naming = NamingActor::new();
let service_key = ServiceKey::new("1", "1", "1");
let service_info = ServiceDetailDto {
Expand All @@ -1147,8 +1203,6 @@ fn test_add_service() {

#[test]
fn test_remove_has_instance_service() {
use super::*;
use tokio::net::UdpSocket;
let mut naming = NamingActor::new();
let mut instance = Instance::new("127.0.0.1".to_owned(), 8080);
instance.namespace_id = Arc::new("public".to_owned());
Expand Down
19 changes: 18 additions & 1 deletion src/naming/naming_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
use actix::prelude::*;

use super::{
model::{Instance, ServiceInfo, ServiceKey},
model::ServiceKey,
naming_delay_nofity::{DelayNotifyActor, DelayNotifyCmd},
};

Expand Down Expand Up @@ -185,4 +185,21 @@ impl Subscriber {
}
sum
}

pub fn fuzzy_match_listener(
&self,
group_name: &str,
service_name: &str,
namespace_id: &str,
) -> HashMap<ServiceKey, HashMap<Arc<String>, Option<HashSet<String>>>> {
self.listener
.iter()
.filter(|(key, _)| {
key.group_name.contains(group_name)
&& key.service_name.contains(service_name)
&& key.namespace_id.contains(namespace_id)
}) // 模糊匹配
.map(|(key, value)| (key.clone(), value.clone()))
.collect()
}
}
14 changes: 12 additions & 2 deletions src/naming/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ use std::{

use crate::common::constant::EMPTY_ARC_STRING;
use crate::naming::cluster::model::ProcessRange;
use crate::now_millis;
use actix_web::rt;
use inner_mem_cache::TimeoutSet;

use crate::now_millis;
use serde::{Deserialize, Serialize};

use super::{
api_model::QueryListResult,
Expand Down Expand Up @@ -389,3 +389,13 @@ pub struct ServiceInfoDto {
pub metadata: Option<Arc<HashMap<String, String>>>,
pub protect_threshold: Option<f32>,
}

#[derive(Debug, Default, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct SubscriberInfoDto {
pub service_name: Arc<String>,
pub group_name: Arc<String>,
pub namespace_id: Arc<String>,
pub ip: Arc<String>,
pub port: u16,
}
9 changes: 8 additions & 1 deletion src/openapi/naming/model.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#![allow(unused_imports, unused_assignments, unused_variables)]
use crate::common::option_utils::OptionUtils;
use crate::naming::model::{Instance, ServiceKey};
use crate::naming::service::SubscriberInfoDto;
use crate::naming::NamingUtils;
use crate::utils::{get_bool_from_string, select_option_by_clone};
use crate::utils::get_bool_from_string;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::SocketAddr;
Expand Down Expand Up @@ -351,3 +352,9 @@ pub struct ServiceQueryListResponce {
pub count: usize,
pub doms: Vec<Arc<String>>,
}

#[derive(Debug, Serialize, Deserialize, Default)]
pub struct ServiceQuerySubscribersListResponce {
pub count: usize,
pub subscribers: Vec<Arc<SubscriberInfoDto>>,
}
63 changes: 59 additions & 4 deletions src/openapi/naming/service.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use actix::Addr;
use actix_web::{web, HttpResponse, Responder, Scope};

use crate::merge_web_param;
use crate::naming::api_model::ServiceInfoParam;
use crate::naming::core::{NamingActor, NamingCmd, NamingResult};
use crate::naming::model::ServiceKey;
use crate::naming::NamingUtils;
use crate::openapi::constant::EMPTY;
use crate::openapi::naming::model::{ServiceQueryListRequest, ServiceQueryListResponce};
use crate::openapi::naming::model::{
ServiceQueryListRequest, ServiceQueryListResponce, ServiceQuerySubscribersListResponce,
};
use actix::Addr;
use actix_web::http::header;
use actix_web::{web, HttpResponse, Responder, Scope};

pub(super) fn service() -> Scope {
web::scope("/service")
Expand All @@ -19,6 +21,7 @@ pub(super) fn service() -> Scope {
.route(web::get().to(query_service)),
)
.service(web::resource("/list").route(web::get().to(query_service_list)))
.service(web::resource("/subscribers").route(web::get().to(query_subscribers_list)))
}

pub async fn query_service(
Expand Down Expand Up @@ -107,3 +110,55 @@ pub async fn query_service_list(
Err(_) => HttpResponse::InternalServerError().body("error"),
}
}

pub async fn query_subscribers_list(
param: web::Query<ServiceQueryListRequest>,
naming_addr: web::Data<Addr<NamingActor>>,
) -> impl Responder {
let page_size = param.page_size.unwrap_or(0x7fffffff);
let page_index = param.page_no.unwrap_or(1);
let namespace_id = NamingUtils::default_namespace(
param
.namespace_id
.as_ref()
.unwrap_or(&"".to_owned())
.to_owned(),
);
let group = NamingUtils::default_group(
param
.group_name
.as_ref()
.unwrap_or(&"".to_owned())
.to_owned(),
);
let service = param
.service_name
.as_ref()
.unwrap_or(&"".to_owned())
.to_owned();

let key = ServiceKey::new(&namespace_id, &group, &service);
match naming_addr
.send(NamingCmd::QueryServiceSubscribersPage(
key, page_size, page_index,
))
.await
{
Ok(res) => {
let result: NamingResult = res.unwrap();
match result {
NamingResult::ServiceSubscribersPage((c, v)) => {
let resp = ServiceQuerySubscribersListResponce {
count: c,
subscribers: v,
};
HttpResponse::Ok()
.insert_header(header::ContentType(mime::APPLICATION_JSON))
.body(serde_json::to_string(&resp).unwrap())
}
_ => HttpResponse::InternalServerError().body("error"),
}
}
Err(_) => HttpResponse::InternalServerError().body("error"),
}
}
5 changes: 5 additions & 0 deletions src/user/permission.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
/// 2)http请求路径,由后端拦截器控制否支持请求;
use std::{collections::HashSet, hash::Hash, sync::Arc};


use crate::common::constant::{EMPTY_STR, HTTP_METHOD_ALL, HTTP_METHOD_GET};

pub enum Resource {
Expand Down Expand Up @@ -259,6 +260,7 @@ lazy_static::lazy_static! {
//WebResource
R::WebResource("/manage/service"),
R::WebResource("/manage/service/instance"),
R::WebResource("/manage/subscriber"),
R::WebResource("/rnacos/manage/service"),
R::WebResource("/rnacos/manage/service/instance"),
//path
Expand All @@ -267,6 +269,7 @@ lazy_static::lazy_static! {

R::Path("/rnacos/api/console/ns/services",HTTP_METHOD_GET),
R::Path("/rnacos/api/console/ns/service",HTTP_METHOD_GET),
R::Path("/rnacos/api/console/ns/service/subscribers",HTTP_METHOD_GET),
R::Path("/rnacos/api/console/instances",HTTP_METHOD_GET),
R::Path("/rnacos/api/console/ns/instance",HTTP_METHOD_GET),

Expand All @@ -279,6 +282,7 @@ lazy_static::lazy_static! {
//WebResource
R::WebResource("/manage/service"),
R::WebResource("/manage/service/instance"),
R::WebResource("/manage/subscriber"),
R::WebResource("/rnacos/manage/service"),
R::WebResource("/rnacos/manage/service/instance"),
R::WebResource("SERVICE_UPDATE"),
Expand All @@ -288,6 +292,7 @@ lazy_static::lazy_static! {

R::Path("/rnacos/api/console/ns/services",HTTP_METHOD_GET),
R::Path("/rnacos/api/console/ns/service",HTTP_METHOD_ALL),
R::Path("/rnacos/api/console/ns/service/subscribers",HTTP_METHOD_GET),
R::Path("/rnacos/api/console/instances",HTTP_METHOD_GET),
R::Path("/rnacos/api/console/ns/instance",HTTP_METHOD_ALL),

Expand Down
Loading