Skip to content

Commit

Permalink
refactor: remove unused code & update calc logic (#23)
Browse files Browse the repository at this point in the history
* chore: remove unused code

* chore: add return value for write meter

* chore: refactor calc
  • Loading branch information
shuiyisong authored Jan 26, 2024
1 parent 9984cee commit 80b7271
Show file tree
Hide file tree
Showing 11 changed files with 162 additions and 290 deletions.
8 changes: 4 additions & 4 deletions meter-core/src/collect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::data::ReadRecord;
use crate::data::WriteRecord;
use crate::data::MeterRecord;

/// Trait representing the methods required to collect read/write record.
/// Save read and write separately for later refactoring.
pub trait Collect: Send + Sync {
/// Notifies the method that an event about data insertion occurs.
fn on_write(&self, record: WriteRecord);
fn on_write(&self, record: MeterRecord);

/// Notifies the method that an event about data query occurs.
fn on_read(&self, record: ReadRecord);
fn on_read(&self, record: MeterRecord);
}
47 changes: 15 additions & 32 deletions meter-core/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

/// The WriteRecord records some data about data insertion.
#[derive(Debug)]
pub struct WriteRecord {
pub catalog: String,
pub schema: String,
pub table: Option<String>,
pub region_num: Option<u32>,

/// Volume of data written in byte.
pub byte_count: u32,

/// The calculated value of this read record.
///
/// If present, other fields will be ignored.
pub calculated_value: Option<u64>,
}

/// The ReadRecord records some data about data query.
#[derive(Debug)]
pub struct ReadRecord {
pub catalog: String,
pub schema: String,
pub table: Option<String>,
pub region_num: Option<u32>,

pub struct ReadItem {
/// The CPU consumed by query SQL processes.
///
/// Unit is nanosecond.
Expand All @@ -46,14 +23,20 @@ pub struct ReadRecord {
///
/// Unit is byte.
pub table_scan: u64,
}

/// The size of the network traffic used by the query.
///
/// Unit is byte.
pub network_egress: u64,
impl ReadItem {
pub fn new(cpu_time: u64, table_scan: u64) -> Self {
Self {
cpu_time,
table_scan,
}
}
}

/// The calculated value of this read record.
///
/// If present, other fields will be ignored.
pub calculated_value: Option<u64>,
#[derive(Debug)]
pub struct MeterRecord {
pub catalog: String,
pub schema: String,
pub value: u64,
}
5 changes: 4 additions & 1 deletion meter-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,7 @@ pub mod collect;
pub mod data;
pub mod global;
pub mod registry;
pub mod write_calc;

pub trait ItemCalculator<T>: Send + Sync {
fn calc(&self, value: &T) -> u64;
}
15 changes: 7 additions & 8 deletions meter-core/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ use std::sync::Arc;
use parking_lot::RwLock;

use crate::collect::Collect;
use crate::data::ReadRecord;
use crate::data::WriteRecord;
use crate::write_calc::WriteCalculator;
use crate::data::MeterRecord;
use crate::ItemCalculator;

type CalculatorMap = anymap::Map<dyn Any + Send + Sync>;

Expand Down Expand Up @@ -53,22 +52,22 @@ impl Registry {
/// Register the calculation formula of 'insert request' -> 'byte count'
pub fn register_calculator<T: Send + Sync + 'static>(
&self,
calculator: Arc<dyn WriteCalculator<T>>,
calculator: Arc<dyn ItemCalculator<T>>,
) {
let mut guard = self.inner.calculator.write();
guard.insert(calculator);
}

/// Obtain the calculation formula corresponding to the insert request.
pub fn get_calculator<T: Send + Sync + 'static>(&self) -> Option<Arc<dyn WriteCalculator<T>>> {
pub fn get_calculator<T: Send + Sync + 'static>(&self) -> Option<Arc<dyn ItemCalculator<T>>> {
let guard = self.inner.calculator.read();
(*guard).get::<Arc<dyn WriteCalculator<T>>>().cloned()
(*guard).get::<Arc<dyn ItemCalculator<T>>>().cloned()
}
}

impl Registry {
/// A base API for recording information about data insertion.
pub fn record_write(&self, record: WriteRecord) {
pub fn record_write(&self, record: MeterRecord) {
let collector = self.inner.collector.read();

let collector = match collector.as_ref() {
Expand All @@ -80,7 +79,7 @@ impl Registry {
}

/// A base API for recording information about data query.
pub fn record_read(&self, record: ReadRecord) {
pub fn record_read(&self, record: MeterRecord) {
let collector = self.inner.collector.read();

let collector = match collector.as_ref() {
Expand Down
22 changes: 0 additions & 22 deletions meter-core/src/write_calc.rs

This file was deleted.

64 changes: 29 additions & 35 deletions meter-example/examples/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

use std::sync::Arc;
use std::time::Duration;
use tracing::info;

use meter_core::data::ReadRecord;
use meter_core::data::WriteRecord;
use meter_core::data::MeterRecord;
use meter_core::data::ReadItem;
use meter_core::global::global_registry;
use meter_core::write_calc::WriteCalculator;

use meter_core::ItemCalculator;
use meter_example::collector::SimpleCollector;
use meter_example::reporter::SimpleReporter;
use meter_example::CalcImpl;
Expand All @@ -41,19 +43,22 @@ async fn run() {
}

async fn setup_global_registry() {
let collector = Arc::new(SimpleCollector::new(wcu_calc, rcu_calc));
let collector = Arc::new(SimpleCollector::new(w_calc, r_calc));
let reporter = Arc::new(SimpleReporter::new(collector.clone()));

let r = global_registry();
r.set_collector(collector);

let calc_impl = Arc::new(CalcImpl);
let string_insert_calc = calc_impl.clone() as Arc<dyn WriteCalculator<String>>;
let string_insert_calc = calc_impl.clone() as Arc<dyn ItemCalculator<String>>;
r.register_calculator(string_insert_calc);

let mock_insert_calc = calc_impl as Arc<dyn WriteCalculator<MockInsertRequest>>;
let mock_insert_calc = calc_impl.clone() as Arc<dyn ItemCalculator<MockInsertRequest>>;
r.register_calculator(mock_insert_calc);

let read_item_calc = calc_impl as Arc<dyn ItemCalculator<ReadItem>>;
r.register_calculator(read_item_calc);

tokio::spawn(async move {
reporter.start().await;
});
Expand All @@ -62,40 +67,29 @@ async fn setup_global_registry() {
async fn do_some_record() {
for _i in 0..20 {
let insert_req = "String insert req".to_string();
write_meter!("greptime", "db1", insert_req);

read_meter!("greptime", "db1", cpu_time: 100000);
read_meter!("greptime", "db1", table_scan: 100000);
read_meter!("greptime", "db1", network_egress: 100000);

read_meter!("greptime", "db2", 100000, 100000, 100000);

read_meter!("greptime", "db3", calculated_value: 100000);
let w = write_meter!("greptime", "db1", insert_req);
info!("w: {}", w);

let r = read_meter!(
"greptime",
"db1",
ReadItem {
cpu_time: 100000,
table_scan: 100000,
}
);
info!("r: {}", r);

tokio::time::sleep(Duration::from_secs(1)).await;
}
}

fn wcu_calc(w_info: &WriteRecord) -> u32 {
let WriteRecord { byte_count, .. } = w_info;

byte_count / 1024
fn w_calc(w_info: &MeterRecord) -> u64 {
let MeterRecord { value, .. } = w_info;
*value
}

fn rcu_calc(r_info: &ReadRecord) -> u32 {
let ReadRecord {
cpu_time,
table_scan,
network_egress,
calculated_value,
..
} = r_info;

if let Some(calculated_value) = calculated_value {
(*calculated_value).try_into().unwrap()
} else {
(*cpu_time / 3 + table_scan / 4096 + network_egress / 4096)
.try_into()
.unwrap()
}
fn r_calc(r_info: &MeterRecord) -> u64 {
let MeterRecord { value, .. } = r_info;
*value
}
41 changes: 20 additions & 21 deletions meter-example/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@ use std::collections::HashMap;

use dashmap::DashMap;
use meter_core::collect::Collect;
use meter_core::data::ReadRecord;
use meter_core::data::WriteRecord;
use meter_core::data::MeterRecord;

pub struct SimpleCollector<W, R> {
read_data: DashMap<SchemaId, Vec<ReadRecord>>,
write_data: DashMap<SchemaId, Vec<WriteRecord>>,
wcu_calc: W,
rcu_calc: R,
read_data: DashMap<SchemaId, Vec<MeterRecord>>,
write_data: DashMap<SchemaId, Vec<MeterRecord>>,
w_calc: W,
r_calc: R,
}

/// The SchemaId identifies a database.
Expand All @@ -34,50 +33,50 @@ pub struct SchemaId {
}

impl<W, R> SimpleCollector<W, R> {
pub fn new(wcu_calc: W, rcu_calc: R) -> Self {
pub fn new(w_calc: W, r_calc: R) -> Self {
Self {
read_data: DashMap::default(),
write_data: DashMap::default(),
wcu_calc,
rcu_calc,
w_calc,
r_calc,
}
}
}

impl<W, R> SimpleCollector<W, R>
where
R: Fn(&ReadRecord) -> u32 + Send + Sync,
W: Fn(&WriteRecord) -> u32 + Send + Sync,
R: Fn(&MeterRecord) -> u64 + Send + Sync,
W: Fn(&MeterRecord) -> u64 + Send + Sync,
{
pub fn clear(&self) {
self.read_data.clear();
self.write_data.clear();
}

pub fn schema_wcus(&self) -> HashMap<SchemaId, u32> {
pub fn schema_ws(&self) -> HashMap<SchemaId, u64> {
self.write_data
.iter()
.map(|write_infos| {
let wcus: u32 = write_infos
let ws: u64 = write_infos
.value()
.iter()
.map(|wcu_info| (self.wcu_calc)(wcu_info))
.map(|w_info| (self.w_calc)(w_info))
.sum();
(write_infos.key().clone(), wcus)
(write_infos.key().clone(), ws)
})
.collect()
}

pub fn schema_rcus(&self) -> HashMap<SchemaId, u32> {
pub fn schema_rs(&self) -> HashMap<SchemaId, u64> {
self.read_data
.iter()
.map(|read_infos| {
let rcus: u32 = read_infos
let rs: u64 = read_infos
.value()
.iter()
.map(|read_info| (self.rcu_calc)(read_info))
.map(|read_info| (self.r_calc)(read_info))
.sum();
(read_infos.key().clone(), rcus)
(read_infos.key().clone(), rs)
})
.collect()
}
Expand All @@ -88,7 +87,7 @@ where
R: Send + Sync,
W: Send + Sync,
{
fn on_read(&self, record: ReadRecord) {
fn on_read(&self, record: MeterRecord) {
let schema_id = SchemaId {
catalog: record.catalog.clone(),
schema: record.schema.clone(),
Expand All @@ -99,7 +98,7 @@ where
entry.push(record)
}

fn on_write(&self, record: WriteRecord) {
fn on_write(&self, record: MeterRecord) {
let schema_id = SchemaId {
catalog: record.catalog.clone(),
schema: record.schema.clone(),
Expand Down
17 changes: 12 additions & 5 deletions meter-example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use meter_core::write_calc::WriteCalculator;
use meter_core::data::ReadItem;
use meter_core::ItemCalculator;

pub mod collector;
pub mod reporter;
Expand All @@ -21,14 +22,20 @@ pub struct MockInsertRequest;

pub struct CalcImpl;

impl WriteCalculator<MockInsertRequest> for CalcImpl {
fn calc_byte(&self, _value: &MockInsertRequest) -> u32 {
impl ItemCalculator<MockInsertRequest> for CalcImpl {
fn calc(&self, _value: &MockInsertRequest) -> u64 {
1024
}
}

impl ItemCalculator<String> for CalcImpl {
fn calc(&self, _value: &String) -> u64 {
1024 * 10
}
}

impl WriteCalculator<String> for CalcImpl {
fn calc_byte(&self, _value: &String) -> u32 {
impl ItemCalculator<ReadItem> for CalcImpl {
fn calc(&self, _value: &ReadItem) -> u64 {
1024 * 100
}
}
Loading

0 comments on commit 80b7271

Please sign in to comment.