Skip to content

Commit

Permalink
feat(meta): maintain per database job fragment info in barrier manager (
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Oct 29, 2024
1 parent 4c05b7a commit 98baacd
Show file tree
Hide file tree
Showing 13 changed files with 309 additions and 265 deletions.
2 changes: 1 addition & 1 deletion src/common/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ pub trait SysCatalogReader: Sync + Send + 'static {

pub type SysCatalogReaderRef = Arc<dyn SysCatalogReader>;

#[derive(Clone, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq)]
#[derive(Clone, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq, Copy)]
#[display("{database_id}")]
pub struct DatabaseId {
pub database_id: u32,
Expand Down
69 changes: 38 additions & 31 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use risingwave_pb::stream_plan::{
use risingwave_pb::stream_service::WaitEpochCommitRequest;
use tracing::warn;

use super::info::{CommandFragmentChanges, InflightGraphInfo};
use super::info::{CommandFragmentChanges, InflightStreamingJobInfo};
use crate::barrier::info::BarrierInfo;
use crate::barrier::{GlobalBarrierWorkerContextImpl, InflightSubscriptionInfo};
use crate::controller::fragment::InflightFragmentInfo;
Expand Down Expand Up @@ -78,10 +78,6 @@ pub struct Reschedule {
/// `Source` and `SourceBackfill` are handled together here.
pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,

/// Whether this fragment is injectable. The injectable means whether the fragment contains
/// any executors that are able to receive barrier.
pub injectable: bool,

pub newly_created_actors: Vec<(StreamActor, PbActorStatus)>,
}

Expand Down Expand Up @@ -109,28 +105,31 @@ impl ReplaceTablePlan {
fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
let mut fragment_changes = HashMap::new();
for fragment in self.new_table_fragments.fragments.values() {
let fragment_change = CommandFragmentChanges::NewFragment(InflightFragmentInfo {
actors: fragment
.actors
.iter()
.map(|actor| {
(
actor.actor_id as i32,
self.new_table_fragments
.actor_status
.get(&actor.actor_id)
.expect("should exist")
.worker_id(),
)
})
.collect(),
state_table_ids: fragment
.state_table_ids
.iter()
.map(|table_id| *table_id as ObjectId)
.collect(),
is_injectable: TableFragments::is_injectable(fragment.fragment_type_mask),
});
let fragment_change = CommandFragmentChanges::NewFragment(
self.streaming_job.database_id().into(),
self.streaming_job.id().into(),
InflightFragmentInfo {
actors: fragment
.actors
.iter()
.map(|actor| {
(
actor.actor_id as i32,
self.new_table_fragments
.actor_status
.get(&actor.actor_id)
.expect("should exist")
.worker_id(),
)
})
.collect(),
state_table_ids: fragment
.state_table_ids
.iter()
.map(|table_id| *table_id as ObjectId)
.collect(),
},
);
assert!(fragment_changes
.insert(fragment.fragment_id, fragment_change)
.is_none());
Expand Down Expand Up @@ -187,7 +186,6 @@ impl CreateStreamingJobCommandInfo {
.iter()
.map(|table_id| *table_id as ObjectId)
.collect(),
is_injectable: TableFragments::is_injectable(fragment.fragment_type_mask),
},
)
})
Expand Down Expand Up @@ -252,7 +250,9 @@ pub enum Command {
info: CreateStreamingJobCommandInfo,
job_type: CreateStreamingJobType,
},
MergeSnapshotBackfillStreamingJobs(HashMap<TableId, (SnapshotBackfillInfo, InflightGraphInfo)>),
MergeSnapshotBackfillStreamingJobs(
HashMap<TableId, (SnapshotBackfillInfo, InflightStreamingJobInfo)>,
),
/// `CancelStreamingJob` command generates a `Stop` barrier including the actors of the given
/// table fragment.
///
Expand Down Expand Up @@ -338,8 +338,15 @@ impl Command {
);
let mut changes: HashMap<_, _> = info
.new_fragment_info()
.map(|(fragment_id, info)| {
(fragment_id, CommandFragmentChanges::NewFragment(info))
.map(|(fragment_id, fragment_info)| {
(
fragment_id,
CommandFragmentChanges::NewFragment(
info.streaming_job.database_id().into(),
info.streaming_job.id().into(),
fragment_info,
),
)
})
.collect();

Expand Down
38 changes: 23 additions & 15 deletions src/meta/src/barrier/creating_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use std::ops::Bound::{Excluded, Unbounded};
use risingwave_common::catalog::TableId;
use risingwave_common::metrics::LabelGuardedIntGauge;
use risingwave_meta_model::WorkerId;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
Expand All @@ -33,10 +32,11 @@ use crate::barrier::creating_job::barrier_control::CreatingStreamingJobBarrierCo
use crate::barrier::creating_job::status::{
CreatingJobInjectBarrierInfo, CreatingStreamingJobStatus,
};
use crate::barrier::info::{BarrierInfo, InflightGraphInfo};
use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
use crate::barrier::progress::CreateMviewProgressTracker;
use crate::barrier::rpc::ControlStreamManager;
use crate::barrier::{Command, CreateStreamingJobCommandInfo, SnapshotBackfillInfo};
use crate::controller::fragment::InflightFragmentInfo;
use crate::rpc::metrics::GLOBAL_META_METRICS;
use crate::MetaResult;

Expand All @@ -46,7 +46,7 @@ pub(super) struct CreatingStreamingJobControl {
pub(super) snapshot_backfill_info: SnapshotBackfillInfo,
backfill_epoch: u64,

graph_info: InflightGraphInfo,
graph_info: InflightStreamingJobInfo,

barrier_control: CreatingStreamingJobBarrierControl,
status: CreatingStreamingJobStatus,
Expand All @@ -70,19 +70,24 @@ impl CreatingStreamingJobControl {
let snapshot_backfill_actors = info.table_fragments.snapshot_backfill_actor_ids();
let mut create_mview_tracker = CreateMviewProgressTracker::default();
create_mview_tracker.update_tracking_jobs(Some((&info, None)), [], version_stat);
let fragment_info: HashMap<_, _> = info.new_fragment_info().collect();
let fragment_infos: HashMap<_, _> = info.new_fragment_info().collect();

let table_id = info.table_fragments.table_id();
let table_id_str = format!("{}", table_id.table_id);

let actors_to_create = info.table_fragments.actors_to_create();
let graph_info = InflightStreamingJobInfo {
job_id: table_id,
database_id: info.streaming_job.database_id().into(),
fragment_infos,
};

Self {
info,
snapshot_backfill_info,
barrier_control: CreatingStreamingJobBarrierControl::new(table_id, backfill_epoch),
backfill_epoch,
graph_info: InflightGraphInfo::new(fragment_info),
graph_info,
status: CreatingStreamingJobStatus::ConsumingSnapshot {
prev_epoch_fake_physical_time: 0,
pending_upstream_barriers: vec![],
Expand All @@ -101,11 +106,11 @@ impl CreatingStreamingJobControl {

pub(super) fn is_wait_on_worker(&self, worker_id: WorkerId) -> bool {
self.barrier_control.is_wait_on_worker(worker_id)
|| (self.status.is_finishing() && self.graph_info.contains_worker(worker_id))
}

pub(super) fn on_new_worker_node_map(&self, node_map: &HashMap<WorkerId, WorkerNode>) {
self.graph_info.on_new_worker_node_map(node_map)
|| (self.status.is_finishing()
&& InflightFragmentInfo::contains_worker(
self.graph_info.fragment_infos(),
worker_id,
))
}

pub(super) fn gen_ddl_progress(&self) -> DdlProgress {
Expand Down Expand Up @@ -163,8 +168,8 @@ impl CreatingStreamingJobControl {
table_id: TableId,
control_stream_manager: &mut ControlStreamManager,
barrier_control: &mut CreatingStreamingJobBarrierControl,
pre_applied_graph_info: &InflightGraphInfo,
applied_graph_info: Option<&InflightGraphInfo>,
pre_applied_graph_info: &InflightStreamingJobInfo,
applied_graph_info: Option<&InflightStreamingJobInfo>,
CreatingJobInjectBarrierInfo {
barrier_info,
new_actors,
Expand All @@ -175,8 +180,11 @@ impl CreatingStreamingJobControl {
Some(table_id),
mutation,
&barrier_info,
pre_applied_graph_info,
applied_graph_info,
pre_applied_graph_info.fragment_infos(),
applied_graph_info
.map(|graph_info| graph_info.fragment_infos())
.into_iter()
.flatten(),
new_actors,
vec![],
vec![],
Expand Down Expand Up @@ -267,7 +275,7 @@ impl CreatingStreamingJobControl {
Ok(())
}

pub(super) fn should_merge_to_upstream(&self) -> Option<InflightGraphInfo> {
pub(super) fn should_merge_to_upstream(&self) -> Option<InflightStreamingJobInfo> {
if let CreatingStreamingJobStatus::ConsumingLogStore {
ref log_store_progress_tracker,
} = &self.status
Expand Down
Loading

0 comments on commit 98baacd

Please sign in to comment.