Skip to content

Commit

Permalink
Merge branch 'main' into hy/drop_dead_code_use_cargo_workspace_unused…
Browse files Browse the repository at this point in the history
…_pub
  • Loading branch information
yihong0618 authored Jan 17, 2025
2 parents 821dba7 + f8d26b4 commit 38a5de9
Show file tree
Hide file tree
Showing 16 changed files with 274 additions and 185 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ etcd-client = "0.13"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "4a173785b3376267c4d62b6e0b0a54ca040822aa" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ec801a91aa22f9666063d02805f1f60f7c93458a" }
hex = "0.4"
http = "0.2"
humantime = "2.1"
Expand Down
33 changes: 16 additions & 17 deletions src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ pub type FlowWorkerManagerRef = Arc<FlowWorkerManager>;
pub struct FlowWorkerManager {
/// The handler to the worker that will run the dataflow
/// which is `!Send` so a handle is used
pub worker_handles: Vec<Mutex<WorkerHandle>>,
pub worker_handles: Vec<WorkerHandle>,
/// The selector to select a worker to run the dataflow
worker_selector: Mutex<usize>,
/// The query engine that will be used to parse the query and convert it to a dataflow plan
Expand Down Expand Up @@ -236,7 +236,7 @@ impl FlowWorkerManager {

/// add a worker handler to manager, meaning this corresponding worker is under it's manage
pub fn add_worker_handle(&mut self, handle: WorkerHandle) {
self.worker_handles.push(Mutex::new(handle));
self.worker_handles.push(handle);
}
}

Expand Down Expand Up @@ -577,13 +577,16 @@ impl FlowWorkerManager {
pub async fn run(&self, mut shutdown: Option<broadcast::Receiver<()>>) {
debug!("Starting to run");
let default_interval = Duration::from_secs(1);
let mut tick_interval = tokio::time::interval(default_interval);
// burst mode, so that if we miss a tick, we will run immediately to fully utilize the cpu
tick_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Burst);
let mut avg_spd = 0; // rows/sec
let mut since_last_run = tokio::time::Instant::now();
let run_per_trace = 10;
let mut run_cnt = 0;
loop {
// TODO(discord9): only run when new inputs arrive or scheduled to
let row_cnt = self.run_available(true).await.unwrap_or_else(|err| {
let row_cnt = self.run_available(false).await.unwrap_or_else(|err| {
common_telemetry::error!(err;"Run available errors");
0
});
Expand Down Expand Up @@ -613,9 +616,9 @@ impl FlowWorkerManager {

// for now we want to batch rows until there is around `BATCH_SIZE` rows in send buf
// before trigger a run of flow's worker
// (plus one for prevent div by zero)
let wait_for = since_last_run.elapsed();

// last runs insert speed
let cur_spd = row_cnt * 1000 / wait_for.as_millis().max(1) as usize;
// rapid increase, slow decay
avg_spd = if cur_spd > avg_spd {
Expand All @@ -638,7 +641,10 @@ impl FlowWorkerManager {

METRIC_FLOW_RUN_INTERVAL_MS.set(new_wait.as_millis() as i64);
since_last_run = tokio::time::Instant::now();
tokio::time::sleep(new_wait).await;
tokio::select! {
_ = tick_interval.tick() => (),
_ = tokio::time::sleep(new_wait) => ()
}
}
// flow is now shutdown, drop frontend_invoker early so a ref cycle(in standalone mode) can be prevent:
// FlowWorkerManager.frontend_invoker -> FrontendInvoker.inserter
Expand All @@ -649,23 +655,17 @@ impl FlowWorkerManager {
/// Run all available subgraph in the flow node
/// This will try to run all dataflow in this node
///
/// set `blocking` to true to wait until lock is acquired
/// and false to return immediately if lock is not acquired
/// return numbers of rows send to worker
/// set `blocking` to true to wait until worker finish running
/// false to just trigger run and return immediately
/// return numbers of rows send to worker(Inaccuary)
/// TODO(discord9): add flag for subgraph that have input since last run
pub async fn run_available(&self, blocking: bool) -> Result<usize, Error> {
let mut row_cnt = 0;

let now = self.tick_manager.tick();
for worker in self.worker_handles.iter() {
// TODO(discord9): consider how to handle error in individual worker
if blocking {
worker.lock().await.run_available(now, blocking).await?;
} else if let Ok(worker) = worker.try_lock() {
worker.run_available(now, blocking).await?;
} else {
return Ok(row_cnt);
}
worker.run_available(now, blocking).await?;
}
// check row send and rows remain in send buf
let flush_res = if blocking {
Expand Down Expand Up @@ -736,7 +736,6 @@ impl FlowWorkerManager {
/// remove a flow by it's id
pub async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
for handle in self.worker_handles.iter() {
let handle = handle.lock().await;
if handle.contains_flow(flow_id).await? {
handle.remove_flow(flow_id).await?;
break;
Expand Down Expand Up @@ -873,7 +872,7 @@ impl FlowWorkerManager {
.await
.insert(flow_id, err_collector.clone());
// TODO(discord9): load balance?
let handle = &self.get_worker_handle_for_create_flow().await;
let handle = self.get_worker_handle_for_create_flow().await;
let create_request = worker::Request::Create {
flow_id,
plan: flow_plan,
Expand Down
18 changes: 15 additions & 3 deletions src/flow/src/adapter/node_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,16 @@ impl SourceSender {
// TODO(discord9): send rows instead so it's just moving a point
if let Some(batch) = send_buf.recv().await {
let len = batch.row_count();
self.send_buf_row_cnt.fetch_sub(len, Ordering::SeqCst);
if let Err(prev_row_cnt) =
self.send_buf_row_cnt
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| x.checked_sub(len))
{
common_telemetry::error!(
"send buf row count underflow, prev = {}, len = {}",
prev_row_cnt,
len
);
}
row_cnt += len;
self.sender
.send(batch)
Expand Down Expand Up @@ -162,18 +171,21 @@ impl SourceSender {
batch_datatypes: &[ConcreteDataType],
) -> Result<usize, Error> {
METRIC_FLOW_INPUT_BUF_SIZE.add(rows.len() as _);
// important for backpressure. if send buf is full, block until it's not
while self.send_buf_row_cnt.load(Ordering::SeqCst) >= BATCH_SIZE * 4 {
tokio::task::yield_now().await;
}

// row count metrics is approx so relaxed order is ok
self.send_buf_row_cnt
.fetch_add(rows.len(), Ordering::SeqCst);
let batch = Batch::try_from_rows_with_types(
rows.into_iter().map(|(row, _, _)| row).collect(),
batch_datatypes,
)
.context(EvalSnafu)?;
common_telemetry::trace!("Send one batch to worker with {} rows", batch.row_count());

self.send_buf_row_cnt
.fetch_add(batch.row_count(), Ordering::SeqCst);
self.send_buf_tx.send(batch).await.map_err(|e| {
crate::error::InternalSnafu {
reason: format!("Failed to send row, error = {:?}", e),
Expand Down
1 change: 0 additions & 1 deletion src/flow/src/adapter/stat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ impl FlowWorkerManager {
pub async fn gen_state_report(&self) -> FlowStat {
let mut full_report = BTreeMap::new();
for worker in self.worker_handles.iter() {
let worker = worker.lock().await;
match worker.get_state_size().await {
Ok(state_size) => {
full_report.extend(state_size.into_iter().map(|(k, v)| (k as u32, v)))
Expand Down
21 changes: 10 additions & 11 deletions src/flow/src/adapter/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,18 @@ use crate::FlowWorkerManager;

impl FlowWorkerManager {
/// Get a worker handle for creating flow, using round robin to select a worker
pub(crate) async fn get_worker_handle_for_create_flow(
&self,
) -> tokio::sync::MutexGuard<WorkerHandle> {
let mut selector = self.worker_selector.lock().await;

*selector += 1;
if *selector >= self.worker_handles.len() {
*selector = 0
pub(crate) async fn get_worker_handle_for_create_flow(&self) -> &WorkerHandle {
let use_idx = {
let mut selector = self.worker_selector.lock().await;
if *selector >= self.worker_handles.len() {
*selector = 0
};
let use_idx = *selector;
*selector += 1;
use_idx
};

// Safety: selector is always in bound
let handle = &self.worker_handles[*selector];
handle.lock().await
&self.worker_handles[use_idx]
}

/// Create table from given schema(will adjust to add auto column if needed), return true if table is created
Expand Down
4 changes: 4 additions & 0 deletions src/flow/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ impl flow_server::Flow for FlowService {
self.manager
.handle(request)
.await
.map_err(|err| {
common_telemetry::error!(err; "Failed to handle flow request");
err
})
.map(Response::new)
.map_err(to_status_with_last_err)
}
Expand Down
1 change: 1 addition & 0 deletions src/index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ fastbloom = "0.8"
fst.workspace = true
futures.workspace = true
greptime-proto.workspace = true
itertools.workspace = true
mockall.workspace = true
pin-project.workspace = true
prost.workspace = true
Expand Down
34 changes: 0 additions & 34 deletions src/index/src/bloom_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use serde::{Deserialize, Serialize};

pub mod applier;
pub mod creator;
pub mod error;
Expand All @@ -24,35 +22,3 @@ pub type BytesRef<'a> = &'a [u8];

/// The seed used for the Bloom filter.
pub const SEED: u128 = 42;

/// The Meta information of the bloom filter stored in the file.
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub struct BloomFilterMeta {
/// The number of rows per segment.
pub rows_per_segment: usize,

/// The number of segments.
pub seg_count: usize,

/// The number of total rows.
pub row_count: usize,

/// The size of the bloom filter excluding the meta information.
pub bloom_filter_segments_size: usize,

/// Offset and size of bloom filters in the file.
pub bloom_filter_segments: Vec<BloomFilterSegmentLocation>,
}

/// The location of the bloom filter segment in the file.
#[derive(Debug, Serialize, Deserialize, Clone, Copy, Hash, PartialEq, Eq)]
pub struct BloomFilterSegmentLocation {
/// The offset of the bloom filter segment in the file.
pub offset: u64,

/// The size of the bloom filter segment in the file.
pub size: u64,

/// The number of elements in the bloom filter segment.
pub elem_count: usize,
}
Loading

0 comments on commit 38a5de9

Please sign in to comment.