Skip to content

Commit 509ec7a

Browse files
authored
feat: add specific_resource_group to rw_streaming_jobs (#20207)
Signed-off-by: Shanicky Chen <[email protected]>
1 parent 1b74a95 commit 509ec7a

File tree

13 files changed

+128
-35
lines changed

13 files changed

+128
-35
lines changed

proto/meta.proto

+7-5
Original file line numberDiff line numberDiff line change
@@ -232,16 +232,18 @@ message ListTableFragmentsResponse {
232232
map<uint32, TableFragmentInfo> table_fragments = 1;
233233
}
234234

235-
message ListTableFragmentStatesRequest {}
235+
message ListStreamingJobStatesRequest {}
236236

237-
message ListTableFragmentStatesResponse {
238-
message TableFragmentState {
237+
message ListStreamingJobStatesResponse {
238+
message StreamingJobState {
239239
uint32 table_id = 1;
240240
TableFragments.State state = 2;
241241
TableParallelism parallelism = 3;
242242
uint32 max_parallelism = 4;
243+
string name = 5;
244+
string resource_group = 6;
243245
}
244-
repeated TableFragmentState states = 1;
246+
repeated StreamingJobState states = 1;
245247
}
246248

247249
message ListFragmentDistributionRequest {}
@@ -333,7 +335,7 @@ service StreamManagerService {
333335
rpc Resume(ResumeRequest) returns (ResumeResponse);
334336
rpc CancelCreatingJobs(CancelCreatingJobsRequest) returns (CancelCreatingJobsResponse);
335337
rpc ListTableFragments(ListTableFragmentsRequest) returns (ListTableFragmentsResponse);
336-
rpc ListTableFragmentStates(ListTableFragmentStatesRequest) returns (ListTableFragmentStatesResponse);
338+
rpc ListStreamingJobStates(ListStreamingJobStatesRequest) returns (ListStreamingJobStatesResponse);
337339
rpc ListFragmentDistribution(ListFragmentDistributionRequest) returns (ListFragmentDistributionResponse);
338340
rpc ListActorStates(ListActorStatesRequest) returns (ListActorStatesResponse);
339341
rpc ListActorSplits(ListActorSplitsRequest) returns (ListActorSplitsResponse);

src/frontend/src/catalog/system_catalog/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use risingwave_common::error::BoxedError;
3232
use risingwave_common::session_config::SessionConfig;
3333
use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
3434
use risingwave_common::types::DataType;
35-
use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState;
35+
use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
3636
use risingwave_pb::meta::table_parallelism::{PbFixedParallelism, PbParallelism};
3737
use risingwave_pb::user::grant_privilege::Object;
3838

@@ -216,7 +216,7 @@ pub fn infer_dummy_view_sql(columns: &[SystemCatalogColumnsDef<'_>]) -> String {
216216
)
217217
}
218218

219-
fn extract_parallelism_from_table_state(state: &TableFragmentState) -> String {
219+
fn extract_parallelism_from_table_state(state: &StreamingJobState) -> String {
220220
match state
221221
.parallelism
222222
.as_ref()

src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,5 @@ mod rw_actor_id_to_ddl;
6262
mod rw_actor_splits;
6363
mod rw_fragment_id_to_ddl;
6464
mod rw_internal_table_info;
65+
mod rw_streaming_jobs;
6566
mod rw_worker_actor_count;

src/frontend/src/catalog/system_catalog/rw_catalog/rw_actor_infos.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ struct RwActorInfo {
3333
async fn read_rw_actors(reader: &SysCatalogReaderImpl) -> Result<Vec<RwActorInfo>> {
3434
let table_ids = reader
3535
.meta_client
36-
.list_table_fragment_states()
36+
.list_streaming_job_states()
3737
.await?
3838
.into_iter()
3939
.map(|fragment| fragment.table_id)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// Copyright 2025 RisingWave Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use risingwave_common::types::Fields;
16+
use risingwave_frontend_macro::system_catalog;
17+
18+
use crate::catalog::system_catalog::{extract_parallelism_from_table_state, SysCatalogReaderImpl};
19+
use crate::error::Result;
20+
21+
#[derive(Fields)]
22+
struct RwStreamingJob {
23+
#[primary_key]
24+
job: i32,
25+
name: String,
26+
status: String,
27+
parallelism: String,
28+
max_parallelism: i32,
29+
resource_group: String,
30+
}
31+
32+
#[system_catalog(table, "rw_catalog.rw_streaming_jobs")]
33+
async fn read_rw_streaming_jobs(reader: &SysCatalogReaderImpl) -> Result<Vec<RwStreamingJob>> {
34+
let states = reader.meta_client.list_streaming_job_states().await?;
35+
36+
Ok(states
37+
.into_iter()
38+
.map(|state| {
39+
let parallelism = extract_parallelism_from_table_state(&state);
40+
RwStreamingJob {
41+
job: state.table_id as i32,
42+
status: state.state().as_str_name().into(),
43+
name: state.name,
44+
parallelism: parallelism.to_uppercase(),
45+
max_parallelism: state.max_parallelism as i32,
46+
resource_group: state.resource_group,
47+
}
48+
})
49+
.collect())
50+
}

src/frontend/src/catalog/system_catalog/rw_catalog/rw_table_fragments.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ struct RwTableFragment {
3131
async fn read_rw_table_fragments_info(
3232
reader: &SysCatalogReaderImpl,
3333
) -> Result<Vec<RwTableFragment>> {
34-
let states = reader.meta_client.list_table_fragment_states().await?;
34+
let states = reader.meta_client.list_streaming_job_states().await?;
3535

3636
Ok(states
3737
.into_iter()

src/frontend/src/meta_client.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use risingwave_pb::meta::list_actor_states_response::ActorState;
3434
use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution;
3535
use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
3636
use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
37-
use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState;
37+
use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
3838
use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
3939
use risingwave_pb::meta::{EventLog, PbThrottleTarget, RecoveryStatus};
4040
use risingwave_rpc_client::error::Result;
@@ -64,7 +64,7 @@ pub trait FrontendMetaClient: Send + Sync {
6464
table_ids: &[u32],
6565
) -> Result<HashMap<u32, TableFragmentInfo>>;
6666

67-
async fn list_table_fragment_states(&self) -> Result<Vec<TableFragmentState>>;
67+
async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>>;
6868

6969
async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>>;
7070

@@ -163,8 +163,8 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
163163
self.0.list_table_fragments(table_ids).await
164164
}
165165

166-
async fn list_table_fragment_states(&self) -> Result<Vec<TableFragmentState>> {
167-
self.0.list_table_fragment_states().await
166+
async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
167+
self.0.list_streaming_job_states().await
168168
}
169169

170170
async fn list_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {

src/frontend/src/test_utils.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ use risingwave_pb::meta::list_actor_states_response::ActorState;
5959
use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution;
6060
use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
6161
use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
62-
use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState;
62+
use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
6363
use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
6464
use risingwave_pb::meta::{
6565
EventLog, PbTableParallelism, PbThrottleTarget, RecoveryStatus, SystemParams,
@@ -990,7 +990,7 @@ impl FrontendMetaClient for MockFrontendMetaClient {
990990
Ok(HashMap::default())
991991
}
992992

993-
async fn list_table_fragment_states(&self) -> RpcResult<Vec<TableFragmentState>> {
993+
async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
994994
Ok(vec![])
995995
}
996996

src/meta/service/src/stream_service.rs

+9-5
Original file line numberDiff line numberDiff line change
@@ -243,10 +243,10 @@ impl StreamManagerService for StreamServiceImpl {
243243
}
244244

245245
#[cfg_attr(coverage, coverage(off))]
246-
async fn list_table_fragment_states(
246+
async fn list_streaming_job_states(
247247
&self,
248-
_request: Request<ListTableFragmentStatesRequest>,
249-
) -> Result<Response<ListTableFragmentStatesResponse>, Status> {
248+
_request: Request<ListStreamingJobStatesRequest>,
249+
) -> Result<Response<ListStreamingJobStatesResponse>, Status> {
250250
let job_infos = self
251251
.metadata_manager
252252
.catalog_controller
@@ -258,8 +258,10 @@ impl StreamManagerService for StreamServiceImpl {
258258
|StreamingJobInfo {
259259
job_id,
260260
job_status,
261+
name,
261262
parallelism,
262263
max_parallelism,
264+
resource_group,
263265
..
264266
}| {
265267
let parallelism = match parallelism {
@@ -268,17 +270,19 @@ impl StreamManagerService for StreamServiceImpl {
268270
StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _),
269271
};
270272

271-
list_table_fragment_states_response::TableFragmentState {
273+
list_streaming_job_states_response::StreamingJobState {
272274
table_id: job_id as _,
275+
name,
273276
state: PbState::from(job_status) as _,
274277
parallelism: Some(parallelism.into()),
275278
max_parallelism: max_parallelism as _,
279+
resource_group,
276280
}
277281
},
278282
)
279283
.collect_vec();
280284

281-
Ok(Response::new(ListTableFragmentStatesResponse { states }))
285+
Ok(Response::new(ListStreamingJobStatesResponse { states }))
282286
}
283287

284288
#[cfg_attr(coverage, coverage(off))]

src/meta/src/controller/catalog/get_op.rs

+2-7
Original file line numberDiff line numberDiff line change
@@ -300,12 +300,9 @@ impl CatalogController {
300300
streaming_job_ids: Vec<ObjectId>,
301301
) -> MetaResult<HashMap<ObjectId, String>> {
302302
let inner = self.inner.read().await;
303-
let txn = inner.db.begin().await?;
304-
305303
let mut resource_groups = HashMap::new();
306-
307304
for job_id in streaming_job_ids {
308-
let resource_group = get_existing_job_resource_group(&txn, job_id).await?;
305+
let resource_group = get_existing_job_resource_group(&inner.db, job_id).await?;
309306
resource_groups.insert(job_id, resource_group);
310307
}
311308

@@ -317,14 +314,12 @@ impl CatalogController {
317314
streaming_job_id: ObjectId,
318315
) -> MetaResult<String> {
319316
let inner = self.inner.read().await;
320-
let txn = inner.db.begin().await?;
321-
322317
let database_id: ObjectId = StreamingJob::find_by_id(streaming_job_id)
323318
.select_only()
324319
.join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
325320
.column(object::Column::DatabaseId)
326321
.into_tuple()
327-
.one(&txn)
322+
.one(&inner.db)
328323
.await?
329324
.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", streaming_job_id))?;
330325

src/meta/src/controller/fragment.rs

+16-4
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ use risingwave_meta_model::fragment::DistributionType;
2626
use risingwave_meta_model::object::ObjectType;
2727
use risingwave_meta_model::prelude::{Actor, Fragment, Sink, StreamingJob};
2828
use risingwave_meta_model::{
29-
actor, actor_dispatcher, fragment, object, sink, source, streaming_job, table, ActorId,
30-
ActorUpstreamActors, ConnectorSplits, DatabaseId, ExprContext, FragmentId, I32Array, JobStatus,
31-
ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId, VnodeBitmap,
32-
WorkerId,
29+
actor, actor_dispatcher, database, fragment, object, sink, source, streaming_job, table,
30+
ActorId, ActorUpstreamActors, ConnectorSplits, DatabaseId, ExprContext, FragmentId, I32Array,
31+
JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
32+
VnodeBitmap, WorkerId,
3333
};
3434
use risingwave_meta_model_migration::{Alias, SelectStatement};
3535
use risingwave_pb::common::PbActorLocation;
@@ -90,6 +90,7 @@ pub struct StreamingJobInfo {
9090
pub job_status: JobStatus,
9191
pub parallelism: StreamingParallelism,
9292
pub max_parallelism: i32,
93+
pub resource_group: String,
9394
}
9495

9596
impl CatalogControllerInner {
@@ -728,6 +729,7 @@ impl CatalogController {
728729
.select_only()
729730
.column(streaming_job::Column::JobId)
730731
.join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
732+
.join(JoinType::InnerJoin, object::Relation::Database2.def())
731733
.column(object::Column::ObjType)
732734
.join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
733735
.join(JoinType::LeftJoin, source::Relation::Object.def().rev())
@@ -750,6 +752,16 @@ impl CatalogController {
750752
streaming_job::Column::Parallelism,
751753
streaming_job::Column::MaxParallelism,
752754
])
755+
.column_as(
756+
Expr::if_null(
757+
Expr::col((
758+
streaming_job::Entity,
759+
streaming_job::Column::SpecificResourceGroup,
760+
)),
761+
Expr::col((database::Entity, database::Column::ResourceGroup)),
762+
),
763+
"resource_group",
764+
)
753765
.into_model()
754766
.all(&inner.db)
755767
.await?;

src/rpc_client/src/meta_client.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
7878
use risingwave_pb::meta::list_actor_states_response::ActorState;
7979
use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution;
8080
use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
81-
use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState;
81+
use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
8282
use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
8383
use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
8484
use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
@@ -997,10 +997,10 @@ impl MetaClient {
997997
Ok(resp.table_fragments)
998998
}
999999

1000-
pub async fn list_table_fragment_states(&self) -> Result<Vec<TableFragmentState>> {
1000+
pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
10011001
let resp = self
10021002
.inner
1003-
.list_table_fragment_states(ListTableFragmentStatesRequest {})
1003+
.list_streaming_job_states(ListStreamingJobStatesRequest {})
10041004
.await?;
10051005
Ok(resp.states)
10061006
}
@@ -2097,7 +2097,7 @@ macro_rules! for_all_meta_rpc {
20972097
,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
20982098
,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
20992099
,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2100-
,{ stream_client, list_table_fragment_states, ListTableFragmentStatesRequest, ListTableFragmentStatesResponse }
2100+
,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
21012101
,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
21022102
,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
21032103
,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }

src/tests/simulation/tests/integration_tests/scale/resource_group.rs

+29
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use risingwave_common::config::default;
2020
use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
2121
use risingwave_simulation::cluster::{Cluster, Configuration};
2222
use risingwave_simulation::ctl_ext::predicate::{identity_contains, no_identity_contains};
23+
use risingwave_simulation::utils::AssertResult;
2324
use tokio::time::sleep;
2425

2526
#[tokio::test]
@@ -107,5 +108,33 @@ async fn test_resource_group() -> Result<()> {
107108
assert_eq!(union_fragment.inner.actors.len(), 2);
108109
assert_eq!(mat_fragment.inner.actors.len(), 4);
109110

111+
session
112+
.run("select resource_group from rw_streaming_jobs where name = 'm'")
113+
.await?
114+
.assert_result_eq("test");
115+
116+
let _ = session
117+
.run("alter materialized view m reset resource_group;")
118+
.await?;
119+
120+
session
121+
.run("select resource_group from rw_streaming_jobs where name = 'm'")
122+
.await?
123+
.assert_result_eq(DEFAULT_RESOURCE_GROUP);
124+
125+
let union_fragment = cluster
126+
.locate_one_fragment([identity_contains("union")])
127+
.await?;
128+
129+
let mat_fragment = cluster
130+
.locate_one_fragment([
131+
identity_contains("materialize"),
132+
no_identity_contains("union"),
133+
])
134+
.await?;
135+
136+
assert_eq!(union_fragment.inner.actors.len(), 2);
137+
assert_eq!(mat_fragment.inner.actors.len(), 2);
138+
110139
Ok(())
111140
}

0 commit comments

Comments
 (0)