Skip to content

Commit

Permalink
fix: table with fs connector cannot work with rate limit (#19338)
Browse files Browse the repository at this point in the history
Co-authored-by: tabversion <[email protected]>
  • Loading branch information
tabVersion and tabversion authored Nov 12, 2024
1 parent 12fbf12 commit 4dea583
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 1 deletion.
11 changes: 11 additions & 0 deletions e2e_test/source_inline/fs/posix_fs.slt
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,19 @@ CREATE TABLE diamonds (
connector = 'posix_fs',
match_pattern = 'data*.csv',
posix_fs.root = 'e2e_test/source_inline/fs/data',
source_rate_limit = 0
) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ',');

sleep 1s

# no output due to rate limit
query TTTT rowsort
select * from diamonds;
----

statement ok
ALTER TABLE diamonds SET source_rate_limit TO DEFAULT;

sleep 10s

query TTTT rowsort
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl StreamNode for StreamFsFetch {
.map(|c| c.to_protobuf())
.collect_vec(),
with_properties,
rate_limit: self.base.ctx().overwrite_options().source_rate_limit,
rate_limit: source_catalog.rate_limit,
secret_refs,
}
});
Expand Down
16 changes: 16 additions & 0 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use risingwave_common::hash::VnodeCountCompat;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::util::stream_graph_visitor::visit_stream_node;
use risingwave_common::{bail, current_cluster_version};
use risingwave_connector::WithPropertiesExt;
use risingwave_meta_model::actor::ActorStatus;
use risingwave_meta_model::actor_dispatcher::DispatcherType;
use risingwave_meta_model::object::ObjectType;
Expand Down Expand Up @@ -1276,6 +1277,7 @@ impl CatalogController {
MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
})?;

let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
let streaming_job_ids: Vec<ObjectId> =
if let Some(table_id) = source.optional_associated_table_id {
vec![table_id]
Expand Down Expand Up @@ -1330,6 +1332,20 @@ impl CatalogController {
}
});
}
if is_fs_source && *fragment_type_mask == PbFragmentTypeFlag::FragmentUnspecified as i32
{
// when create table with fs connector, the fragment type is unspecified
visit_stream_node(stream_node, |node| {
if let PbNodeBody::StreamFsFetch(node) = node {
if let Some(node_inner) = &mut node.node_inner
&& node_inner.source_id == source_id as u32
{
node_inner.rate_limit = rate_limit;
found = true;
}
}
});
}
found
});

Expand Down
9 changes: 9 additions & 0 deletions src/meta/src/stream/stream_graph/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,15 @@ impl BuildingFragment {
dml_node.table_id = job_id;
dml_node.table_version_id = job.table_version_id().unwrap();
}
NodeBody::StreamFsFetch(fs_fetch_node) => {
if let StreamingJob::Table(table_source, _, _) = job {
if let Some(node_inner) = fs_fetch_node.node_inner.as_mut()
&& let Some(source) = table_source
{
node_inner.source_id = source.id;
}
}
}
NodeBody::Source(source_node) => {
match job {
// Note: For table without connector, it has a dummy Source node.
Expand Down
5 changes: 5 additions & 0 deletions src/stream/src/executor/source/fetch_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,11 @@ impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
actor_to_apply.get(&self.actor_ctx.id)
&& *new_rate_limit != self.rate_limit_rps
{
tracing::debug!(
"updating rate limit from {:?} to {:?}",
self.rate_limit_rps,
*new_rate_limit
);
self.rate_limit_rps = *new_rate_limit;
need_rebuild_reader = true;
}
Expand Down
5 changes: 5 additions & 0 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,11 @@ impl<S: StateStore> SourceExecutor<S> {
if let Some(new_rate_limit) = actor_to_apply.get(&self.actor_ctx.id)
&& *new_rate_limit != self.rate_limit_rps
{
tracing::debug!(
"updating rate limit from {:?} to {:?}",
self.rate_limit_rps,
*new_rate_limit
);
self.rate_limit_rps = *new_rate_limit;
// recreate from latest_split_info
self.rebuild_stream_reader(&source_desc, &mut stream)
Expand Down

0 comments on commit 4dea583

Please sign in to comment.