Skip to content

Commit

Permalink
chore: add more logs
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Nov 7, 2024
1 parent 499f1a0 commit 0de5018
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 21 deletions.
53 changes: 38 additions & 15 deletions src/metric-engine/src/engine/region_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,19 @@

//! Implementation of retrieving logical region's region metadata.
use common_telemetry::info;
use store_api::metadata::ColumnMetadata;
use store_api::storage::RegionId;

use crate::engine::MetricEngineInner;
use crate::error::Result;

impl MetricEngineInner {
/// Load column metadata of a logical region.
///
/// The return value is ordered on column name.
pub async fn load_logical_columns(
async fn get_logical_columns(
&self,
physical_region_id: RegionId,
logical_region_id: RegionId,
) -> Result<Vec<ColumnMetadata>> {
// First try to load from state cache
if let Some(columns) = self
.state
.read()
.unwrap()
.logical_columns()
.get(&logical_region_id)
{
return Ok(columns.clone());
}

// Else load from metadata region and update the cache.
let _read_guard = self
.metadata_region
Expand All @@ -65,6 +52,42 @@ impl MetricEngineInner {
Ok(logical_column_metadata)
}

/// Load column metadata of a logical region.
///
/// The return value is ordered on column name.
pub async fn load_logical_columns(
&self,
physical_region_id: RegionId,
logical_region_id: RegionId,
) -> Result<Vec<ColumnMetadata>> {
// First try to load from state cache
if logical_region_id == RegionId::new(1258, 0) {
let columns_without_cache = self
.get_logical_columns(physical_region_id, logical_region_id)
.await?;
info!(
"The column metadata without cache: {:?}",
columns_without_cache
);
}

if let Some(columns) = self
.state
.read()
.unwrap()
.logical_columns()
.get(&logical_region_id)
{
if logical_region_id == RegionId::new(1258, 0) {
info!("The column metadata in cache: {:?}", columns);
}
return Ok(columns.clone());
}

self.get_logical_columns(physical_region_id, logical_region_id)
.await
}

/// Load logical column names of a logical region.
///
/// The return value is ordered on column name alphabetically.
Expand Down
2 changes: 2 additions & 0 deletions src/mito2/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ lazy_static! {
/// Histogram of flushed bytes.
pub static ref FLUSH_BYTES_TOTAL: IntCounter =
register_int_counter!("greptime_mito_flush_bytes_total", "mito flush bytes total").unwrap();
pub static ref READ_BYTES_TOTAL: IntCounterVec =
register_int_counter_vec!("greptime_mito_read_bytes_total", "mito read bytes total", &[TYPE_LABEL]).unwrap();
// ------ End of flush related metrics


Expand Down
19 changes: 16 additions & 3 deletions src/mito2/src/sst/parquet/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use snafu::ResultExt;

use crate::error;
use crate::error::Result;
use crate::metrics::READ_BYTES_TOTAL;

// Refer to https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/file/footer.rs#L74-L90
/// Convert [format::FileMetaData] to [ParquetMetaData]
Expand Down Expand Up @@ -100,14 +101,26 @@ pub async fn fetch_byte_ranges(
object_store: ObjectStore,
ranges: &[Range<u64>],
) -> object_store::Result<Vec<Bytes>> {
Ok(object_store
let schema = object_store.info().scheme().into_static();
let mut total = 0;

let bytes = object_store
.reader_with(file_path)
.concurrent(FETCH_PARALLELISM)
.gap(MERGE_GAP)
.await?
.fetch(ranges.to_vec())
.await?
.into_iter()
.map(|buf| buf.to_bytes())
.collect::<Vec<_>>())
.map(|buf| {
total += buf.len();
buf.to_bytes()
})
.collect::<Vec<_>>();

READ_BYTES_TOTAL
.with_label_values(&[schema])
.inc_by(total as u64);

Ok(bytes)
}
11 changes: 8 additions & 3 deletions src/store-api/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use api::v1::SemanticType;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_telemetry::info;
use common_telemetry::{error, info};
use datatypes::arrow::datatypes::FieldRef;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use serde::de::Error;
Expand Down Expand Up @@ -336,12 +336,13 @@ impl RegionMetadata {
// is unique in `Schema` so we only check column id here.
if id_names.contains_key(&col.column_id) {
info!(
"The column {} and {} have the same column id {}, region_id: {}, schema: {:?}",
"The column {} and {} have the same column id {}, region_id: {}, schema: {:?}, column metadatas: {:?}",
id_names[&col.column_id],
col.column_schema.name,
col.column_id,
self.region_id,
self.schema,
self.column_metadatas,
);
}
ensure!(
Expand Down Expand Up @@ -543,7 +544,11 @@ impl RegionMetadataBuilder {

/// Consumes the builder and build a [RegionMetadata].
pub fn build(self) -> Result<RegionMetadata> {
let skipped = SkippedFields::new(&self.column_metadatas)?;
let skipped = SkippedFields::new(&self.column_metadatas).inspect_err(|err| {
error!(err; "Skipped fields, column_metadatas: {:?}",
self.column_metadatas
);
})?;

let meta = RegionMetadata {
schema: skipped.schema,
Expand Down

0 comments on commit 0de5018

Please sign in to comment.