Skip to content

Commit

Permalink
feat: support backfill_rate_limit for source backfill
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Nov 19, 2024
1 parent 4469944 commit 9004675
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 19 deletions.
134 changes: 134 additions & 0 deletions e2e_test/source_inline/kafka/alter/rate_limit_source_kafka_shared.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
control substitution on

############## Create kafka seed data

statement ok
create table kafka_seed_data (v1 int);

statement ok
insert into kafka_seed_data select * from generate_series(1, 1000);

############## Sink into kafka

statement ok
create sink kafka_sink
from
kafka_seed_data with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_rate_limit_shared',
type = 'append-only',
force_append_only='true'
);

############## Source from kafka (rate_limit = 0)

# Wait for the topic to create
skipif in-memory
sleep 5s

statement ok
create source kafka_source (v1 int) with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_rate_limit_shared',
source_rate_limit = 0,
) FORMAT PLAIN ENCODE JSON

statement ok
flush;

############## Check data

skipif in-memory
sleep 3s

############## Create MV on source

statement ok
create materialized view rl_mv1 as select count(*) from kafka_source;

############## Although source is rate limited, the MV's SourceBackfill is not.

statement ok
flush;

query I
select * from rl_mv1;
----
1000

############## Insert more data. They will not go into the MV.

statement ok
insert into kafka_seed_data select * from generate_series(1, 1000);

sleep 3s

query I
select * from rl_mv1;
----
1000

statement ok
SET BACKGROUND_DDL=true;

statement ok
SET BACKFILL_RATE_LIMIT=0;

statement ok
create materialized view rl_mv2 as select count(*) from kafka_source;

sleep 1s

query T
SELECT progress from rw_ddl_progress;
----
0 rows consumed

############## Alter Source (rate_limit = 0 --> rate_limit = 1000)

statement ok
alter source kafka_source set source_rate_limit to 1000;

sleep 3s

query I
select * from rl_mv1;
----
2000

query T
SELECT progress from rw_ddl_progress;
----
0 rows consumed



statement error
alter materialized view rl_mv2 set source_rate_limit = 1000;
----
db error: ERROR: Failed to run the query

Caused by:
sql parser error: expected SCHEMA/PARALLELISM/BACKFILL_RATE_LIMIT after SET, found: source_rate_limit
LINE 1: alter materialized view rl_mv2 set source_rate_limit = 1000;
^


statement ok
alter materialized view rl_mv2 set backfill_rate_limit = 2000;

sleep 3s

query ?
select * from rl_mv2;
----
2000


############## Cleanup

statement ok
drop source kafka_source cascade;

statement ok
drop table kafka_seed_data cascade;
10 changes: 5 additions & 5 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ message StreamSource {
map<string, string> with_properties = 6;
catalog.StreamSourceInfo info = 7;
string source_name = 8;
// Streaming rate limit
// Source rate limit
optional uint32 rate_limit = 9;
map<string, secret.SecretRef> secret_refs = 10;
}
Expand All @@ -205,7 +205,7 @@ message StreamFsFetch {
map<string, string> with_properties = 6;
catalog.StreamSourceInfo info = 7;
string source_name = 8;
// Streaming rate limit
// Source rate limit
optional uint32 rate_limit = 9;
map<string, secret.SecretRef> secret_refs = 10;
}
Expand All @@ -231,7 +231,7 @@ message SourceBackfillNode {
catalog.StreamSourceInfo info = 4;
string source_name = 5;
map<string, string> with_properties = 6;
// Streaming rate limit
// Backfill rate limit
optional uint32 rate_limit = 7;

// fields above are the same as StreamSource
Expand Down Expand Up @@ -609,7 +609,7 @@ message StreamScanNode {
// Used iff `ChainType::Backfill`.
plan_common.StorageTableDesc table_desc = 7;

// The rate limit for the stream scan node.
// The backfill rate limit for the stream scan node.
optional uint32 rate_limit = 8;

// Snapshot read every N barriers
Expand Down Expand Up @@ -646,7 +646,7 @@ message StreamCdcScanNode {
// The external table that will be backfilled for CDC.
plan_common.ExternalTableDesc cdc_table_desc = 5;

// The rate limit for the stream cdc scan node.
// The backfill rate limit for the stream cdc scan node.
optional uint32 rate_limit = 6;

// Whether skip the backfill and only consume from upstream.
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/encoder/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ fn datum_to_json_object(

let data_type = field.data_type();

tracing::debug!("datum_to_json_object: {:?}, {:?}", data_type, scalar_ref);
tracing::trace!("datum_to_json_object: {:?}, {:?}", data_type, scalar_ref);

let value = match (data_type, scalar_ref) {
(DataType::Boolean, ScalarRefImpl::Bool(v)) => {
Expand Down
4 changes: 2 additions & 2 deletions src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ impl StreamManagerService for StreamServiceImpl {
}
ThrottleTarget::Mv => {
self.metadata_manager
.update_mv_rate_limit_by_table_id(TableId::from(request.id), request.rate)
.update_backfill_rate_limit_by_table_id(TableId::from(request.id), request.rate)
.await?
}
ThrottleTarget::CdcTable => {
self.metadata_manager
.update_mv_rate_limit_by_table_id(TableId::from(request.id), request.rate)
.update_backfill_rate_limit_by_table_id(TableId::from(request.id), request.rate)
.await?
}
ThrottleTarget::Unspecified => {
Expand Down
13 changes: 5 additions & 8 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1317,7 +1317,6 @@ impl CatalogController {
.map(|(id, mask, stream_node)| (id, mask, stream_node.to_protobuf()))
.collect_vec();

// TODO: limit source backfill?
fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
let mut found = false;
if *fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 {
Expand Down Expand Up @@ -1385,7 +1384,7 @@ impl CatalogController {

// edit the `rate_limit` of the `Chain` node in given `table_id`'s fragments
// return the actor_ids to be applied
pub async fn update_mv_rate_limit_by_job_id(
pub async fn update_backfill_rate_limit_by_job_id(
&self,
job_id: ObjectId,
rate_limit: Option<u32>,
Expand All @@ -1412,7 +1411,7 @@ impl CatalogController {
fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
let mut found = false;
if (*fragment_type_mask & PbFragmentTypeFlag::StreamScan as i32 != 0)
|| (*fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0)
|| (*fragment_type_mask & PbFragmentTypeFlag::SourceScan as i32 != 0)
{
visit_stream_node(stream_node, |node| match node {
PbNodeBody::StreamCdcScan(node) => {
Expand All @@ -1423,11 +1422,9 @@ impl CatalogController {
node.rate_limit = rate_limit;
found = true;
}
PbNodeBody::Source(node) => {
if let Some(inner) = node.source_inner.as_mut() {
inner.rate_limit = rate_limit;
found = true;
}
PbNodeBody::SourceBackfill(node) => {
node.rate_limit = rate_limit;
found = true;
}
_ => {}
});
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -657,14 +657,14 @@ impl MetadataManager {
.collect())
}

pub async fn update_mv_rate_limit_by_table_id(
pub async fn update_backfill_rate_limit_by_table_id(
&self,
table_id: TableId,
rate_limit: Option<u32>,
) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
let fragment_actors = self
.catalog_controller
.update_mv_rate_limit_by_job_id(table_id.table_id as _, rate_limit)
.update_backfill_rate_limit_by_job_id(table_id.table_id as _, rate_limit)
.await?;
Ok(fragment_actors
.into_iter()
Expand Down
27 changes: 27 additions & 0 deletions src/stream/src/executor/source/source_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,33 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
)
.await?;
}
Mutation::Throttle(actor_to_apply) => {
if let Some(new_rate_limit) =
actor_to_apply.get(&self.actor_ctx.id)
&& *new_rate_limit != self.rate_limit_rps
{
tracing::info!(
"updating rate limit from {:?} to {:?}",
self.rate_limit_rps,
*new_rate_limit
);
self.rate_limit_rps = *new_rate_limit;
// rebuild reader
let (reader, _backfill_info) = self
.build_stream_source_reader(
&source_desc,
backfill_stage
.get_latest_unfinished_splits()?,
)
.await?;

backfill_stream = select_with_strategy(
input.by_ref().map(Either::Left),
reader.map(Either::Right),
select_strategy,
);
}
}
_ => {}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ impl<S: StateStore> SourceExecutor<S> {
if let Some(new_rate_limit) = actor_to_apply.get(&self.actor_ctx.id)
&& *new_rate_limit != self.rate_limit_rps
{
tracing::debug!(
tracing::info!(
"updating rate limit from {:?} to {:?}",
self.rate_limit_rps,
*new_rate_limit
Expand Down

0 comments on commit 9004675

Please sign in to comment.