diff --git a/meter-core/src/collect.rs b/meter-core/src/collect.rs index e7ff21a..be2de33 100644 --- a/meter-core/src/collect.rs +++ b/meter-core/src/collect.rs @@ -12,14 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::data::ReadRecord; -use crate::data::WriteRecord; +use crate::data::MeterRecord; /// Trait representing the methods required to collect read/write record. +/// Save read and write separately for later refactoring. pub trait Collect: Send + Sync { /// Notifies the method that an event about data insertion occurs. - fn on_write(&self, record: WriteRecord); + fn on_write(&self, record: MeterRecord); /// Notifies the method that an event about data query occurs. - fn on_read(&self, record: ReadRecord); + fn on_read(&self, record: MeterRecord); } diff --git a/meter-core/src/data.rs b/meter-core/src/data.rs index ea0bc2e..321892d 100644 --- a/meter-core/src/data.rs +++ b/meter-core/src/data.rs @@ -12,31 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -/// The WriteRecord records some data about data insertion. #[derive(Debug)] -pub struct WriteRecord { - pub catalog: String, - pub schema: String, - pub table: Option, - pub region_num: Option, - - /// Volume of data written in byte. - pub byte_count: u32, - - /// The calculated value of this read record. - /// - /// If present, other fields will be ignored. - pub calculated_value: Option, -} - -/// The ReadRecord records some data about data query. -#[derive(Debug)] -pub struct ReadRecord { - pub catalog: String, - pub schema: String, - pub table: Option, - pub region_num: Option, - +pub struct ReadItem { /// The CPU consumed by query SQL processes. /// /// Unit is nanosecond. @@ -46,14 +23,20 @@ pub struct ReadRecord { /// /// Unit is byte. pub table_scan: u64, +} - /// The size of the network traffic used by the query. - /// - /// Unit is byte. - pub network_egress: u64, +impl ReadItem { + pub fn new(cpu_time: u64, table_scan: u64) -> Self { + Self { + cpu_time, + table_scan, + } + } +} - /// The calculated value of this read record. - /// - /// If present, other fields will be ignored. - pub calculated_value: Option, +#[derive(Debug)] +pub struct MeterRecord { + pub catalog: String, + pub schema: String, + pub value: u64, } diff --git a/meter-core/src/lib.rs b/meter-core/src/lib.rs index ab58851..5cad338 100644 --- a/meter-core/src/lib.rs +++ b/meter-core/src/lib.rs @@ -16,4 +16,7 @@ pub mod collect; pub mod data; pub mod global; pub mod registry; -pub mod write_calc; + +pub trait ItemCalculator: Send + Sync { + fn calc(&self, value: &T) -> u64; +} diff --git a/meter-core/src/registry.rs b/meter-core/src/registry.rs index 175c470..ba9e67b 100644 --- a/meter-core/src/registry.rs +++ b/meter-core/src/registry.rs @@ -18,9 +18,8 @@ use std::sync::Arc; use parking_lot::RwLock; use crate::collect::Collect; -use crate::data::ReadRecord; -use crate::data::WriteRecord; -use crate::write_calc::WriteCalculator; +use crate::data::MeterRecord; +use crate::ItemCalculator; type CalculatorMap = anymap::Map; @@ -53,22 +52,22 @@ impl Registry { /// Register the calculation formula of 'insert request' -> 'byte count' pub fn register_calculator( &self, - calculator: Arc>, + calculator: Arc>, ) { let mut guard = self.inner.calculator.write(); guard.insert(calculator); } /// Obtain the calculation formula corresponding to the insert request. - pub fn get_calculator(&self) -> Option>> { + pub fn get_calculator(&self) -> Option>> { let guard = self.inner.calculator.read(); - (*guard).get::>>().cloned() + (*guard).get::>>().cloned() } } impl Registry { /// A base API for recording information about data insertion. - pub fn record_write(&self, record: WriteRecord) { + pub fn record_write(&self, record: MeterRecord) { let collector = self.inner.collector.read(); let collector = match collector.as_ref() { @@ -80,7 +79,7 @@ impl Registry { } /// A base API for recording information about data query. - pub fn record_read(&self, record: ReadRecord) { + pub fn record_read(&self, record: MeterRecord) { let collector = self.inner.collector.read(); let collector = match collector.as_ref() { diff --git a/meter-core/src/write_calc.rs b/meter-core/src/write_calc.rs deleted file mode 100644 index d41a016..0000000 --- a/meter-core/src/write_calc.rs +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright 2024 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/// Trait representing how to calculate the byte count of a custom type `T`. -/// Implement `WriteCalculator` and register it to [`crate::registry::Registry`] -/// and then you can use `write_meter!()` with input of type `T` directly. -/// -/// see `meter-macros` crate for example. -pub trait WriteCalculator: Send + Sync { - fn calc_byte(&self, value: &T) -> u32; -} diff --git a/meter-example/examples/simple.rs b/meter-example/examples/simple.rs index 8a7fb09..7bc5cb6 100644 --- a/meter-example/examples/simple.rs +++ b/meter-example/examples/simple.rs @@ -14,11 +14,13 @@ use std::sync::Arc; use std::time::Duration; +use tracing::info; -use meter_core::data::ReadRecord; -use meter_core::data::WriteRecord; +use meter_core::data::MeterRecord; +use meter_core::data::ReadItem; use meter_core::global::global_registry; -use meter_core::write_calc::WriteCalculator; + +use meter_core::ItemCalculator; use meter_example::collector::SimpleCollector; use meter_example::reporter::SimpleReporter; use meter_example::CalcImpl; @@ -41,19 +43,22 @@ async fn run() { } async fn setup_global_registry() { - let collector = Arc::new(SimpleCollector::new(wcu_calc, rcu_calc)); + let collector = Arc::new(SimpleCollector::new(w_calc, r_calc)); let reporter = Arc::new(SimpleReporter::new(collector.clone())); let r = global_registry(); r.set_collector(collector); let calc_impl = Arc::new(CalcImpl); - let string_insert_calc = calc_impl.clone() as Arc>; + let string_insert_calc = calc_impl.clone() as Arc>; r.register_calculator(string_insert_calc); - let mock_insert_calc = calc_impl as Arc>; + let mock_insert_calc = calc_impl.clone() as Arc>; r.register_calculator(mock_insert_calc); + let read_item_calc = calc_impl as Arc>; + r.register_calculator(read_item_calc); + tokio::spawn(async move { reporter.start().await; }); @@ -62,40 +67,29 @@ async fn setup_global_registry() { async fn do_some_record() { for _i in 0..20 { let insert_req = "String insert req".to_string(); - write_meter!("greptime", "db1", insert_req); - - read_meter!("greptime", "db1", cpu_time: 100000); - read_meter!("greptime", "db1", table_scan: 100000); - read_meter!("greptime", "db1", network_egress: 100000); - - read_meter!("greptime", "db2", 100000, 100000, 100000); - - read_meter!("greptime", "db3", calculated_value: 100000); + let w = write_meter!("greptime", "db1", insert_req); + info!("w: {}", w); + + let r = read_meter!( + "greptime", + "db1", + ReadItem { + cpu_time: 100000, + table_scan: 100000, + } + ); + info!("r: {}", r); tokio::time::sleep(Duration::from_secs(1)).await; } } -fn wcu_calc(w_info: &WriteRecord) -> u32 { - let WriteRecord { byte_count, .. } = w_info; - - byte_count / 1024 +fn w_calc(w_info: &MeterRecord) -> u64 { + let MeterRecord { value, .. } = w_info; + *value } -fn rcu_calc(r_info: &ReadRecord) -> u32 { - let ReadRecord { - cpu_time, - table_scan, - network_egress, - calculated_value, - .. - } = r_info; - - if let Some(calculated_value) = calculated_value { - (*calculated_value).try_into().unwrap() - } else { - (*cpu_time / 3 + table_scan / 4096 + network_egress / 4096) - .try_into() - .unwrap() - } +fn r_calc(r_info: &MeterRecord) -> u64 { + let MeterRecord { value, .. } = r_info; + *value } diff --git a/meter-example/src/collector.rs b/meter-example/src/collector.rs index 11bdec4..d613c88 100644 --- a/meter-example/src/collector.rs +++ b/meter-example/src/collector.rs @@ -16,14 +16,13 @@ use std::collections::HashMap; use dashmap::DashMap; use meter_core::collect::Collect; -use meter_core::data::ReadRecord; -use meter_core::data::WriteRecord; +use meter_core::data::MeterRecord; pub struct SimpleCollector { - read_data: DashMap>, - write_data: DashMap>, - wcu_calc: W, - rcu_calc: R, + read_data: DashMap>, + write_data: DashMap>, + w_calc: W, + r_calc: R, } /// The SchemaId identifies a database. @@ -34,50 +33,50 @@ pub struct SchemaId { } impl SimpleCollector { - pub fn new(wcu_calc: W, rcu_calc: R) -> Self { + pub fn new(w_calc: W, r_calc: R) -> Self { Self { read_data: DashMap::default(), write_data: DashMap::default(), - wcu_calc, - rcu_calc, + w_calc, + r_calc, } } } impl SimpleCollector where - R: Fn(&ReadRecord) -> u32 + Send + Sync, - W: Fn(&WriteRecord) -> u32 + Send + Sync, + R: Fn(&MeterRecord) -> u64 + Send + Sync, + W: Fn(&MeterRecord) -> u64 + Send + Sync, { pub fn clear(&self) { self.read_data.clear(); self.write_data.clear(); } - pub fn schema_wcus(&self) -> HashMap { + pub fn schema_ws(&self) -> HashMap { self.write_data .iter() .map(|write_infos| { - let wcus: u32 = write_infos + let ws: u64 = write_infos .value() .iter() - .map(|wcu_info| (self.wcu_calc)(wcu_info)) + .map(|w_info| (self.w_calc)(w_info)) .sum(); - (write_infos.key().clone(), wcus) + (write_infos.key().clone(), ws) }) .collect() } - pub fn schema_rcus(&self) -> HashMap { + pub fn schema_rs(&self) -> HashMap { self.read_data .iter() .map(|read_infos| { - let rcus: u32 = read_infos + let rs: u64 = read_infos .value() .iter() - .map(|read_info| (self.rcu_calc)(read_info)) + .map(|read_info| (self.r_calc)(read_info)) .sum(); - (read_infos.key().clone(), rcus) + (read_infos.key().clone(), rs) }) .collect() } @@ -88,7 +87,7 @@ where R: Send + Sync, W: Send + Sync, { - fn on_read(&self, record: ReadRecord) { + fn on_read(&self, record: MeterRecord) { let schema_id = SchemaId { catalog: record.catalog.clone(), schema: record.schema.clone(), @@ -99,7 +98,7 @@ where entry.push(record) } - fn on_write(&self, record: WriteRecord) { + fn on_write(&self, record: MeterRecord) { let schema_id = SchemaId { catalog: record.catalog.clone(), schema: record.schema.clone(), diff --git a/meter-example/src/lib.rs b/meter-example/src/lib.rs index 62f98c9..781e84f 100644 --- a/meter-example/src/lib.rs +++ b/meter-example/src/lib.rs @@ -12,7 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use meter_core::write_calc::WriteCalculator; +use meter_core::data::ReadItem; +use meter_core::ItemCalculator; pub mod collector; pub mod reporter; @@ -21,14 +22,20 @@ pub struct MockInsertRequest; pub struct CalcImpl; -impl WriteCalculator for CalcImpl { - fn calc_byte(&self, _value: &MockInsertRequest) -> u32 { +impl ItemCalculator for CalcImpl { + fn calc(&self, _value: &MockInsertRequest) -> u64 { + 1024 + } +} + +impl ItemCalculator for CalcImpl { + fn calc(&self, _value: &String) -> u64 { 1024 * 10 } } -impl WriteCalculator for CalcImpl { - fn calc_byte(&self, _value: &String) -> u32 { +impl ItemCalculator for CalcImpl { + fn calc(&self, _value: &ReadItem) -> u64 { 1024 * 100 } } diff --git a/meter-example/src/reporter.rs b/meter-example/src/reporter.rs index 7002915..3c4463b 100644 --- a/meter-example/src/reporter.rs +++ b/meter-example/src/reporter.rs @@ -16,13 +16,12 @@ use std::marker::PhantomData; use std::sync::Arc; use std::time::Duration; -use meter_core::data::ReadRecord; -use meter_core::data::WriteRecord; +use meter_core::data::MeterRecord; use tracing::info; use crate::collector::SimpleCollector; -/// A simple reporter that outputs wrcu information to stdout. +/// A simple reporter that outputs w/r information to stdout. pub struct SimpleReporter { collector: Arc>, p1: PhantomData, @@ -41,31 +40,31 @@ impl SimpleReporter { impl SimpleReporter where - W: Fn(&WriteRecord) -> u32 + Send + Sync, - R: Fn(&ReadRecord) -> u32 + Send + Sync, + W: Fn(&MeterRecord) -> u64 + Send + Sync, + R: Fn(&MeterRecord) -> u64 + Send + Sync, { pub async fn start(&self) { loop { tokio::time::sleep(Duration::from_secs(5)).await; info!("==============================================================="); - let wcus = self.collector.schema_wcus(); - let rcus = self.collector.schema_rcus(); + let ws = self.collector.schema_ws(); + let rs = self.collector.schema_rs(); self.collector.clear(); - info!("The number of WCUs consumed in the last 5 seconds:"); - for (id, wcu_number) in wcus { + info!("The number of Ws consumed in the last 5 seconds:"); + for (id, w_number) in ws { info!( - "catalog {}, schema {}, wcus: {}", - id.catalog, id.schema, wcu_number + "catalog {}, schema {}, ws: {}", + id.catalog, id.schema, w_number ); } - info!("The number of RCUs consumed in the last 5 seconds:"); - for (id, rcu_number) in rcus { + info!("The number of Rs consumed in the last 5 seconds:"); + for (id, r_number) in rs { info!( - "catalog {}, schema {}, rcus: {}", - id.catalog, id.schema, rcu_number + "catalog {}, schema {}, rs: {}", + id.catalog, id.schema, r_number ); } } diff --git a/meter-macros/src/read_meter.rs b/meter-macros/src/read_meter.rs index 3d50bfd..29867a3 100644 --- a/meter-macros/src/read_meter.rs +++ b/meter-macros/src/read_meter.rs @@ -15,21 +15,10 @@ #[cfg(feature = "noop")] #[macro_export] macro_rules! read_meter { - ($catalog: expr, $schema: expr, calculated_value: $calculated_value: expr) => { - let _ = ($catalog, $schema, $calculated_value); - }; - ($catalog: expr, $schema: expr, cpu_time: $cpu_time: expr) => { - let _ = ($catalog, $schema, $cpu_time); - }; - ($catalog: expr, $schema: expr, table_scan: $table_scan: expr) => { - let _ = ($catalog, $schema, $table_scan); - }; - ($catalog: expr, $schema: expr, network_egress: $network_egress: expr) => { - let _ = ($catalog, $schema, $network_egress); - }; - ($catalog: expr, $schema: expr, $cpu_time: expr, $table_scan: expr, $network_egress: expr) => { - let _ = ($catalog, $schema, $cpu_time, $table_scan, $network_egress); - }; + ($catalog: expr, $schema: expr, $item: expr) => {{ + let _ = ($catalog, $schema, $item); + 0 as u64 + }}; } /// Record some about data query. @@ -37,90 +26,54 @@ macro_rules! read_meter { /// # Examples /// /// ```rust +/// use std::sync::Arc; +/// +/// use meter_core::ItemCalculator; +/// use meter_core::global::global_registry; /// use meter_macros::read_meter; +/// use meter_core::data::ReadItem; /// /// let cpu_time_ns = 1000000000; /// let table_scan_bytes = 10224378; -/// let network_egress_bytes = 1023123; /// -/// read_meter!("greptime", "public", cpu_time: cpu_time_ns); -/// read_meter!("greptime", "public", table_scan: table_scan_bytes); -/// read_meter!("greptime", "public", network_egress: network_egress_bytes); +/// // A struct about insert request +/// struct MockInsert; +/// +/// // A byte count calculator of insert request +/// struct MockInsertCalculator; +/// +/// impl ItemCalculator for MockInsertCalculator { +/// fn calc(&self, _: &ReadItem) -> u64 { +/// 10 * 1024 +/// } +/// } +/// +/// let calculator = MockInsertCalculator; +/// +/// // Register a calculator to [registry]. +/// let registry = global_registry(); +/// registry.register_calculator(Arc::new(MockInsertCalculator)); /// -/// read_meter!( -/// "greptime", -/// "public", -/// cpu_time_ns, -/// table_scan_bytes, -/// network_egress_bytes -/// ); +/// read_meter!("greptime", "public", ReadItem { +/// cpu_time: cpu_time_ns, +/// table_scan: table_scan_bytes, +/// }); /// ``` #[cfg(not(feature = "noop"))] #[macro_export] macro_rules! read_meter { - ($catalog: expr, $schema: expr, calculated_value: $calculated_value: expr) => { - let record = meter_core::data::ReadRecord { - catalog: $catalog.into(), - schema: $schema.into(), - table: None, - region_num: None, - cpu_time: 0, - table_scan: 0, - network_egress: 0, - calculated_value: Some($calculated_value), - }; - meter_core::global::global_registry().record_read(record); - }; - ($catalog: expr, $schema: expr, cpu_time: $cpu_time: expr) => { - let record = meter_core::data::ReadRecord { - catalog: $catalog.into(), - schema: $schema.into(), - table: None, - region_num: None, - cpu_time: $cpu_time, - table_scan: 0, - network_egress: 0, - calculated_value: None, - }; - meter_core::global::global_registry().record_read(record); - }; - ($catalog: expr, $schema: expr, table_scan: $table_scan: expr) => { - let record = meter_core::data::ReadRecord { - catalog: $catalog.into(), - schema: $schema.into(), - table: None, - region_num: None, - cpu_time: 0, - table_scan: $table_scan, - network_egress: 0, - calculated_value: None, - }; - meter_core::global::global_registry().record_read(record); - }; - ($catalog: expr, $schema: expr, network_egress: $network_egress: expr) => { - let record = meter_core::data::ReadRecord { - catalog: $catalog.into(), - schema: $schema.into(), - table: None, - region_num: None, - cpu_time: 0, - table_scan: 0, - network_egress: $network_egress, - calculated_value: None, - }; - meter_core::global::global_registry().record_read(record); - }; - ($catalog: expr, $schema: expr, $cpu_time: expr, $table_scan: expr, $network_egress: expr) => { - let record = meter_core::data::ReadRecord { - catalog: $catalog.into(), - schema: $schema.into(), - table: None, - region_num: None, - cpu_time: $cpu_time, - table_scan: $table_scan, - network_egress: $network_egress, - calculated_value: None, - }; - meter_core::global::global_registry().record_read(record); - }; + ($catalog: expr, $schema: expr, $item: expr) => {{ + let r = meter_core::global::global_registry(); + let mut value = 0; + if let Some(calc) = r.get_calculator() { + value = calc.calc(&$item); + let record = meter_core::data::MeterRecord { + catalog: $catalog.into(), + schema: $schema.into(), + value: value, + }; + meter_core::global::global_registry().record_read(record); + } + value + }}; } diff --git a/meter-macros/src/write_meter.rs b/meter-macros/src/write_meter.rs index 4692463..2a74b01 100644 --- a/meter-macros/src/write_meter.rs +++ b/meter-macros/src/write_meter.rs @@ -15,15 +15,10 @@ #[cfg(feature = "noop")] #[macro_export] macro_rules! write_meter { - ($catalog: expr, $schema: expr, $write_calc: expr) => { + ($catalog: expr, $schema: expr, $write_calc: expr) => {{ let _ = ($catalog, $schema, &$write_calc); - }; - ($catalog: expr, $schema: expr, $table: expr, $write_calc: expr) => { - let _ = ($catalog, $schema, $table, &$write_calc); - }; - ($catalog: expr, $schema: expr, $table: expr, $region: expr, $write_calc: expr) => { - let _ = ($catalog, $schema, $table, $region, &$write_calc); - }; + 0 as u64 + }}; } /// Record some about data insertion. @@ -33,7 +28,7 @@ macro_rules! write_meter { /// ```rust /// use std::sync::Arc; /// -/// use meter_core::write_calc::WriteCalculator; +/// use meter_core::ItemCalculator; /// use meter_core::global::global_registry; /// use meter_macros::write_meter; /// @@ -43,8 +38,8 @@ macro_rules! write_meter { /// // A byte count calculator of insert request /// struct MockInsertCalculator; /// -/// impl WriteCalculator for MockInsertCalculator { -/// fn calc_byte(&self, _: &MockInsert) -> u32 { +/// impl ItemCalculator for MockInsertCalculator { +/// fn calc(&self, _: &MockInsert) -> u64 { /// 10 * 1024 /// } /// } @@ -60,58 +55,20 @@ macro_rules! write_meter { #[cfg(not(feature = "noop"))] #[macro_export] macro_rules! write_meter { - ($catalog: expr, $schema: expr, $write_calc: expr) => { + ($catalog: expr, $schema: expr, $req_item: expr) => {{ let r = meter_core::global::global_registry(); - - if let Some(calc) = r.get_calculator() { - let byte_count = calc.calc_byte(&$write_calc); - - let record = meter_core::data::WriteRecord { - catalog: $catalog.into(), - schema: $schema.into(), - table: None, - region_num: None, - byte_count, - calculated_value: None, - }; - - r.record_write(record); - }; - }; - - ($catalog: expr, $schema: expr, $table: expr, $write_calc: expr) => { - let r = meter_core::global::global_registry(); - - if let Some(calc) = r.get_calculator() { - let byte_count = calc.calc_byte(&$write_calc); - - let record = meter_core::data::WriteRecord { - catalog: $catalog.into(), - schema: $schema.into(), - table: Some($table.into()), - region_num: None, - byte_count, - }; - - r.record_write(record); - }; - }; - - ($catalog: expr, $schema: expr, $table: expr, $region: expr, $write_calc: expr) => { - let r = meter_core::global::global_registry(); - + let mut value = 0; if let Some(calc) = r.get_calculator() { - let byte_count = calc.calc_byte(&$write_calc); + value = calc.calc(&$req_item); - let record = meter_core::data::WriteRecord { + let record = meter_core::data::MeterRecord { catalog: $catalog.into(), schema: $schema.into(), - table: Some($table.into()), - region_num: Some($region), - byte_count, + value: value, }; r.record_write(record); }; - }; + value + }}; }