Skip to content

Commit

Permalink
refactor(meta): use in memory struct for table fragments (#20190)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Jan 17, 2025
1 parent 37635a6 commit 14a5d3d
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 66 deletions.
2 changes: 1 addition & 1 deletion src/meta/service/src/scale_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl ScaleService for ScaleServiceImpl {
.table_fragments()
.await?
.values()
.cloned()
.map(|tf| tf.to_protobuf())
.collect();

let worker_nodes = self
Expand Down
8 changes: 4 additions & 4 deletions src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,15 +208,15 @@ impl StreamManagerService for StreamServiceImpl {

let mut info = HashMap::new();
for job_id in table_ids {
let pb_table_fragments = self
let table_fragments = self
.metadata_manager
.catalog_controller
.get_job_fragments_by_id(job_id as _)
.await?;
info.insert(
pb_table_fragments.table_id,
table_fragments.stream_job_id().table_id,
TableFragmentInfo {
fragments: pb_table_fragments
fragments: table_fragments
.fragments
.into_iter()
.map(|(id, fragment)| FragmentInfo {
Expand All @@ -232,7 +232,7 @@ impl StreamManagerService for StreamServiceImpl {
.collect_vec(),
})
.collect_vec(),
ctx: pb_table_fragments.ctx,
ctx: Some(table_fragments.ctx.to_protobuf()),
},
);
}
Expand Down
3 changes: 1 addition & 2 deletions src/meta/src/barrier/context/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,10 @@ impl GlobalBarrierWorkerContextImpl {

try_join_all(mviews.into_iter().map(|mview| async move {
let table_id = TableId::new(mview.table_id as _);
let table_fragments = mgr
let stream_job_fragments = mgr
.catalog_controller
.get_job_fragments_by_id(mview.table_id)
.await?;
let stream_job_fragments = StreamJobFragments::from_protobuf(table_fragments);
assert_eq!(stream_job_fragments.stream_job_id(), table_id);
Ok((mview.definition, stream_job_fragments))
}))
Expand Down
45 changes: 23 additions & 22 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ use crate::controller::utils::{
FragmentDesc, PartialActorLocation, PartialFragmentStateTables,
};
use crate::manager::LocalNotification;
use crate::model::TableParallelism;
use crate::stream::SplitAssignment;
use crate::model::{StreamContext, StreamJobFragments, TableParallelism};
use crate::stream::{build_actor_split_impls, SplitAssignment};
use crate::{MetaError, MetaResult};

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -344,39 +344,37 @@ impl CatalogController {
)>,
parallelism: StreamingParallelism,
max_parallelism: usize,
) -> MetaResult<PbTableFragments> {
let mut pb_fragments = HashMap::new();
) -> MetaResult<StreamJobFragments> {
let mut pb_fragments = BTreeMap::new();
let mut pb_actor_splits = HashMap::new();
let mut pb_actor_status = HashMap::new();
let mut pb_actor_status = BTreeMap::new();

for (fragment, actors, actor_dispatcher) in fragments {
let (fragment, fragment_actor_status, fragment_actor_splits) =
Self::compose_fragment(fragment, actors, actor_dispatcher)?;

pb_fragments.insert(fragment.fragment_id, fragment);

pb_actor_splits.extend(fragment_actor_splits.into_iter());
pb_actor_splits.extend(build_actor_split_impls(&fragment_actor_splits));
pb_actor_status.extend(fragment_actor_status.into_iter());
}

let table_fragments = PbTableFragments {
table_id,
let table_fragments = StreamJobFragments {
stream_job_id: table_id.into(),
state: state as _,
fragments: pb_fragments,
actor_status: pb_actor_status,
actor_splits: pb_actor_splits,
ctx: Some(ctx.unwrap_or_default()),
parallelism: Some(
match parallelism {
StreamingParallelism::Custom => TableParallelism::Custom,
StreamingParallelism::Adaptive => TableParallelism::Adaptive,
StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
}
.into(),
),
node_label: "".to_owned(),
backfill_done: true,
max_parallelism: Some(max_parallelism as _),
ctx: ctx
.as_ref()
.map(StreamContext::from_protobuf)
.unwrap_or_default(),
assigned_parallelism: match parallelism {
StreamingParallelism::Custom => TableParallelism::Custom,
StreamingParallelism::Adaptive => TableParallelism::Adaptive,
StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
},
max_parallelism,
};

Ok(table_fragments)
Expand Down Expand Up @@ -680,7 +678,10 @@ impl CatalogController {
Ok(select.into_tuple().all(&inner.db).await?)
}

pub async fn get_job_fragments_by_id(&self, job_id: ObjectId) -> MetaResult<PbTableFragments> {
pub async fn get_job_fragments_by_id(
&self,
job_id: ObjectId,
) -> MetaResult<StreamJobFragments> {
let inner = self.inner.read().await;
let fragment_actors = Fragment::find()
.find_with_related(Actor)
Expand Down Expand Up @@ -832,7 +833,7 @@ impl CatalogController {
}

// TODO: This function is too heavy, we should avoid using it and implement others on demand.
pub async fn table_fragments(&self) -> MetaResult<BTreeMap<ObjectId, PbTableFragments>> {
pub async fn table_fragments(&self) -> MetaResult<BTreeMap<ObjectId, StreamJobFragments>> {
let inner = self.inner.read().await;
let jobs = StreamingJob::find().all(&inner.db).await?;
let mut table_fragments = BTreeMap::new();
Expand Down
19 changes: 8 additions & 11 deletions src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,11 +543,9 @@ impl MetadataManager {
&self,
job_id: &TableId,
) -> MetaResult<StreamJobFragments> {
let pb_table_fragments = self
.catalog_controller
self.catalog_controller
.get_job_fragments_by_id(job_id.table_id as _)
.await?;
Ok(StreamJobFragments::from_protobuf(pb_table_fragments))
.await
}

pub async fn get_running_actors_of_fragment(
Expand Down Expand Up @@ -581,20 +579,19 @@ impl MetadataManager {
) -> MetaResult<Vec<StreamJobFragments>> {
let mut table_fragments = vec![];
for id in ids {
let pb_table_fragments = self
.catalog_controller
.get_job_fragments_by_id(id.table_id as _)
.await?;
table_fragments.push(StreamJobFragments::from_protobuf(pb_table_fragments));
table_fragments.push(
self.catalog_controller
.get_job_fragments_by_id(id.table_id as _)
.await?,
);
}
Ok(table_fragments)
}

pub async fn all_active_actors(&self) -> MetaResult<HashMap<ActorId, StreamActor>> {
let table_fragments = self.catalog_controller.table_fragments().await?;
let mut actor_maps = HashMap::new();
for (_, fragments) in table_fragments {
let tf = StreamJobFragments::from_protobuf(fragments);
for (_, tf) in table_fragments {
for actor in tf.active_actors() {
actor_maps
.try_insert(actor.actor_id, actor)
Expand Down
29 changes: 3 additions & 26 deletions src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use risingwave_pb::stream_plan::{FragmentTypeFlag, PbStreamContext, StreamActor,

use super::{ActorId, FragmentId};
use crate::model::MetadataModelResult;
use crate::stream::{build_actor_connector_splits, build_actor_split_impls, SplitAssignment};
use crate::stream::{build_actor_connector_splits, SplitAssignment};

/// The parallelism for a `TableFragments`.
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
Expand Down Expand Up @@ -93,10 +93,10 @@ impl From<TableParallelism> for PbTableParallelism {
#[derive(Debug, Clone)]
pub struct StreamJobFragments {
/// The table id.
stream_job_id: TableId,
pub stream_job_id: TableId,

/// The state of the table fragments.
state: State,
pub state: State,

/// The table fragments.
pub fragments: BTreeMap<FragmentId, Fragment>,
Expand Down Expand Up @@ -174,29 +174,6 @@ impl StreamJobFragments {
max_parallelism: Some(self.max_parallelism as _),
}
}

pub fn from_protobuf(prost: PbTableFragments) -> Self {
let ctx = StreamContext::from_protobuf(prost.get_ctx().unwrap());

let default_parallelism = PbTableParallelism {
parallelism: Some(Parallelism::Custom(PbCustomParallelism {})),
};

let state = prost.state();

Self {
stream_job_id: TableId::new(prost.table_id),
state,
fragments: prost.fragments.into_iter().collect(),
actor_status: prost.actor_status.into_iter().collect(),
actor_splits: build_actor_split_impls(&prost.actor_splits),
ctx,
assigned_parallelism: prost.parallelism.unwrap_or(default_parallelism).into(),
max_parallelism: prost
.max_parallelism
.map_or(VirtualNode::COUNT_FOR_COMPAT, |v| v as _),
}
}
}

impl StreamJobFragments {
Expand Down

0 comments on commit 14a5d3d

Please sign in to comment.