Skip to content

Commit 8a7a8de

Browse files
authored
feat: support schedule/reschedule resource group (#19955)
Signed-off-by: Shanicky Chen <[email protected]>
1 parent 14a5d3d commit 8a7a8de

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1219
-320
lines changed

proto/catalog.proto

+1
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,7 @@ message Database {
524524
uint32 id = 1;
525525
string name = 2;
526526
uint32 owner = 3;
527+
string resource_group = 4;
527528
}
528529

529530
message Comment {

proto/common.proto

+6-3
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,14 @@ message WorkerNode {
5656
bool is_unschedulable = 3;
5757
// This is used for frontend node to register its rpc address
5858
string internal_rpc_host_addr = 4;
59-
// Meta may assign labels to worker nodes to partition workload by label.
60-
// This is used for serverless backfilling of materialized views.
61-
optional string node_label = 5;
59+
60+
reserved 5;
61+
reserved "node_label";
6262

6363
uint32 parallelism = 6;
64+
65+
// resource group for scheduling
66+
optional string resource_group = 7;
6467
}
6568
message Resource {
6669
string rw_version = 1;

proto/ddl_service.proto

+12
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,9 @@ message CreateMaterializedViewRequest {
144144

145145
// The list of object IDs that this materialized view depends on.
146146
repeated uint32 dependencies = 4;
147+
148+
// The specific resource group to use for the materialized view. If not set, the database resource group is used.
149+
optional string specific_resource_group = 5;
147150
}
148151

149152
message CreateMaterializedViewResponse {
@@ -271,6 +274,14 @@ message AlterParallelismRequest {
271274

272275
message AlterParallelismResponse {}
273276

277+
message AlterResourceGroupRequest {
278+
uint32 table_id = 1;
279+
optional string resource_group = 2;
280+
bool deferred = 3;
281+
}
282+
283+
message AlterResourceGroupResponse {}
284+
274285
message AlterOwnerResponse {
275286
common.Status status = 1;
276287
WaitVersion version = 2;
@@ -550,6 +561,7 @@ service DdlService {
550561
rpc AlterOwner(AlterOwnerRequest) returns (AlterOwnerResponse);
551562
rpc AlterSetSchema(AlterSetSchemaRequest) returns (AlterSetSchemaResponse);
552563
rpc AlterParallelism(AlterParallelismRequest) returns (AlterParallelismResponse);
564+
rpc AlterResourceGroup(AlterResourceGroupRequest) returns (AlterResourceGroupResponse);
553565
rpc DropTable(DropTableRequest) returns (DropTableResponse);
554566
rpc RisectlListStateTables(RisectlListStateTablesRequest) returns (RisectlListStateTablesResponse);
555567
rpc CreateView(CreateViewRequest) returns (CreateViewResponse);

src/common/src/util/worker_util.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,4 @@
1414

1515
pub type WorkerNodeId = u32;
1616

17-
pub const DEFAULT_COMPUTE_NODE_LABEL: &str = "default";
17+
pub const DEFAULT_RESOURCE_GROUP: &str = "default";

src/compute/src/lib.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use risingwave_common::util::meta_addr::MetaAddressStrategy;
3737
use risingwave_common::util::resource_util::cpu::total_cpu_available;
3838
use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
3939
use risingwave_common::util::tokio_util::sync::CancellationToken;
40-
use risingwave_common::util::worker_util::DEFAULT_COMPUTE_NODE_LABEL;
40+
use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
4141
use serde::{Deserialize, Serialize};
4242

4343
/// If `total_memory_bytes` is not specified, the default memory limit will be set to
@@ -113,9 +113,9 @@ pub struct ComputeNodeOpts {
113113
#[override_opts(if_absent, path = streaming.actor_runtime_worker_threads_num)]
114114
pub parallelism: usize,
115115

116-
/// The parallelism that the compute node will register to the scheduler of the meta service.
117-
#[clap(long, env = "RW_NODE_LABEL", default_value_t = default_node_label())]
118-
pub node_label: String,
116+
/// Resource group for scheduling, default value is "default"
117+
#[clap(long, env = "RW_RESOURCE_GROUP", default_value_t = default_resource_group())]
118+
pub resource_group: String,
119119

120120
/// Decides whether the compute node can be used for streaming and serving.
121121
#[clap(long, env = "RW_COMPUTE_NODE_ROLE", value_enum, default_value_t = default_role())]
@@ -262,8 +262,8 @@ pub fn default_parallelism() -> usize {
262262
total_cpu_available().ceil() as usize
263263
}
264264

265-
pub fn default_node_label() -> String {
266-
DEFAULT_COMPUTE_NODE_LABEL.to_owned()
265+
pub fn default_resource_group() -> String {
266+
DEFAULT_RESOURCE_GROUP.to_owned()
267267
}
268268

269269
pub fn default_role() -> Role {

src/compute/src/server.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ pub async fn compute_node_serve(
129129
is_serving: opts.role.for_serving(),
130130
is_unschedulable: false,
131131
internal_rpc_host_addr: "".to_owned(),
132-
node_label: Some(opts.node_label.clone()),
132+
resource_group: Some(opts.resource_group.clone()),
133133
},
134134
&config.meta,
135135
)

src/frontend/src/catalog/catalog_service.rs

+37-3
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,12 @@ impl CatalogReader {
6565
/// [observer](`crate::observer::FrontendObserverNode`).
6666
#[async_trait::async_trait]
6767
pub trait CatalogWriter: Send + Sync {
68-
async fn create_database(&self, db_name: &str, owner: UserId) -> Result<()>;
68+
async fn create_database(
69+
&self,
70+
db_name: &str,
71+
owner: UserId,
72+
resource_group: &str,
73+
) -> Result<()>;
6974

7075
async fn create_schema(
7176
&self,
@@ -81,6 +86,7 @@ pub trait CatalogWriter: Send + Sync {
8186
table: PbTable,
8287
graph: StreamFragmentGraph,
8388
dependencies: HashSet<ObjectId>,
89+
specific_resource_group: Option<String>,
8490
) -> Result<()>;
8591

8692
async fn create_table(
@@ -214,6 +220,13 @@ pub trait CatalogWriter: Send + Sync {
214220
deferred: bool,
215221
) -> Result<()>;
216222

223+
async fn alter_resource_group(
224+
&self,
225+
table_id: u32,
226+
resource_group: Option<String>,
227+
deferred: bool,
228+
) -> Result<()>;
229+
217230
async fn alter_set_schema(
218231
&self,
219232
object: alter_set_schema_request::Object,
@@ -232,13 +245,19 @@ pub struct CatalogWriterImpl {
232245

233246
#[async_trait::async_trait]
234247
impl CatalogWriter for CatalogWriterImpl {
235-
async fn create_database(&self, db_name: &str, owner: UserId) -> Result<()> {
248+
async fn create_database(
249+
&self,
250+
db_name: &str,
251+
owner: UserId,
252+
resource_group: &str,
253+
) -> Result<()> {
236254
let version = self
237255
.meta_client
238256
.create_database(PbDatabase {
239257
name: db_name.to_owned(),
240258
id: 0,
241259
owner,
260+
resource_group: resource_group.to_owned(),
242261
})
243262
.await?;
244263
self.wait_version(version).await
@@ -268,11 +287,12 @@ impl CatalogWriter for CatalogWriterImpl {
268287
table: PbTable,
269288
graph: StreamFragmentGraph,
270289
dependencies: HashSet<ObjectId>,
290+
specific_resource_group: Option<String>,
271291
) -> Result<()> {
272292
let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
273293
let version = self
274294
.meta_client
275-
.create_materialized_view(table, graph, dependencies)
295+
.create_materialized_view(table, graph, dependencies, specific_resource_group)
276296
.await?;
277297
if matches!(create_type, PbCreateType::Foreground) {
278298
self.wait_version(version).await?
@@ -579,6 +599,20 @@ impl CatalogWriter for CatalogWriterImpl {
579599
.await?;
580600
self.wait_version(version).await
581601
}
602+
603+
async fn alter_resource_group(
604+
&self,
605+
table_id: u32,
606+
resource_group: Option<String>,
607+
deferred: bool,
608+
) -> Result<()> {
609+
self.meta_client
610+
.alter_resource_group(table_id, resource_group, deferred)
611+
.await
612+
.map_err(|e| anyhow!(e))?;
613+
614+
Ok(())
615+
}
582616
}
583617

584618
impl CatalogWriterImpl {

src/frontend/src/catalog/system_catalog/rw_catalog/rw_worker_nodes.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ struct RwWorkerNode {
3838
system_total_memory_bytes: Option<i64>,
3939
system_total_cpu_cores: Option<i64>,
4040
started_at: Option<Timestamptz>,
41-
label: Option<String>,
41+
resource_group: Option<String>,
4242
}
4343

4444
#[system_catalog(table, "rw_catalog.rw_worker_nodes")]
@@ -82,8 +82,8 @@ async fn read_rw_worker_nodes_info(reader: &SysCatalogReaderImpl) -> Result<Vec<
8282
started_at: worker
8383
.started_at
8484
.map(|ts| Timestamptz::from_secs(ts as i64).unwrap()),
85-
label: if is_compute {
86-
property.and_then(|p| p.node_label.clone())
85+
resource_group: if is_compute {
86+
property.and_then(|p| p.resource_group.clone())
8787
} else {
8888
None
8989
},
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
// Copyright 2025 RisingWave Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use pgwire::pg_response::StatementType;
16+
use risingwave_common::bail;
17+
use risingwave_sqlparser::ast::{ObjectName, SetVariableValue, SetVariableValueSingle, Value};
18+
19+
use super::{HandlerArgs, RwPgResponse};
20+
use crate::catalog::root_catalog::SchemaPath;
21+
use crate::catalog::table_catalog::TableType;
22+
use crate::error::{ErrorCode, Result};
23+
use crate::Binder;
24+
25+
pub async fn handle_alter_resource_group(
26+
handler_args: HandlerArgs,
27+
obj_name: ObjectName,
28+
resource_group: Option<SetVariableValue>,
29+
stmt_type: StatementType,
30+
deferred: bool,
31+
) -> Result<RwPgResponse> {
32+
let session = handler_args.session;
33+
let db_name = session.database();
34+
let (schema_name, real_table_name) =
35+
Binder::resolve_schema_qualified_name(&db_name, obj_name.clone())?;
36+
let search_path = session.config().search_path();
37+
let user_name = &session.auth_context().user_name;
38+
let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
39+
40+
let table_id = {
41+
let reader = session.env().catalog_reader().read_guard();
42+
43+
match stmt_type {
44+
StatementType::ALTER_MATERIALIZED_VIEW => {
45+
let (table, schema_name) =
46+
reader.get_created_table_by_name(&db_name, schema_path, &real_table_name)?;
47+
48+
match (table.table_type(), stmt_type) {
49+
(TableType::MaterializedView, StatementType::ALTER_MATERIALIZED_VIEW) => {}
50+
_ => {
51+
return Err(ErrorCode::InvalidInputSyntax(format!(
52+
"cannot alter resource group of {} {} by {}",
53+
table.table_type().to_prost().as_str_name(),
54+
table.name(),
55+
stmt_type,
56+
))
57+
.into());
58+
}
59+
}
60+
61+
session.check_privilege_for_drop_alter(schema_name, &**table)?;
62+
table.id.table_id()
63+
}
64+
_ => bail!(
65+
"invalid statement type for alter resource group: {:?}",
66+
stmt_type
67+
),
68+
}
69+
};
70+
71+
let resource_group = match resource_group {
72+
None => None,
73+
Some(SetVariableValue::Single(SetVariableValueSingle::Ident(ident))) => {
74+
Some(ident.real_value())
75+
}
76+
Some(SetVariableValue::Single(SetVariableValueSingle::Literal(
77+
Value::SingleQuotedString(v),
78+
))) => Some(v),
79+
_ => {
80+
return Err(ErrorCode::InvalidInputSyntax(
81+
"target parallelism must be a valid number or adaptive".to_owned(),
82+
)
83+
.into());
84+
}
85+
};
86+
87+
let mut builder = RwPgResponse::builder(stmt_type);
88+
89+
let catalog_writer = session.catalog_writer()?;
90+
catalog_writer
91+
.alter_resource_group(table_id, resource_group, deferred)
92+
.await?;
93+
94+
if deferred {
95+
builder = builder.notice("DEFERRED is used, please ensure that automatic parallelism control is enabled on the meta, otherwise, the alter will not take effect.".to_owned());
96+
}
97+
98+
Ok(builder.into())
99+
}

src/frontend/src/handler/create_database.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use pgwire::pg_response::{PgResponse, StatementType};
16+
use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
1617
use risingwave_sqlparser::ast::ObjectName;
1718

1819
use super::RwPgResponse;
@@ -73,7 +74,8 @@ pub async fn handle_create_database(
7374

7475
let catalog_writer = session.catalog_writer()?;
7576
catalog_writer
76-
.create_database(&database_name, database_owner)
77+
// TODO: add support for create database with resource_group
78+
.create_database(&database_name, database_owner, DEFAULT_RESOURCE_GROUP)
7779
.await?;
7880

7981
Ok(PgResponse::empty_result(StatementType::CREATE_DATABASE))

src/frontend/src/handler/create_mv.rs

+19-4
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ use crate::session::SessionImpl;
3838
use crate::stream_fragmenter::build_graph;
3939
use crate::utils::ordinal;
4040

41+
pub const RESOURCE_GROUP_KEY: &str = "resource_group";
42+
4143
pub(super) fn parse_column_names(columns: &[Ident]) -> Option<Vec<String>> {
4244
if columns.is_empty() {
4345
None
@@ -203,9 +205,22 @@ pub async fn handle_create_mv_bound(
203205
return Ok(resp);
204206
}
205207

206-
let (table, graph, dependencies) = {
208+
let (table, graph, dependencies, resource_group) = {
207209
let context = OptimizerContext::from_handler_args(handler_args);
208-
if !context.with_options().is_empty() {
210+
let mut with_options = context.with_options().clone();
211+
212+
let resource_group = with_options.remove(&RESOURCE_GROUP_KEY.to_owned());
213+
214+
if resource_group.is_some()
215+
&& !context
216+
.session_ctx()
217+
.config()
218+
.streaming_use_arrangement_backfill()
219+
{
220+
return Err(RwError::from(ProtocolError("The session config arrangement backfill must be enabled to use the resource_group option".to_owned())));
221+
}
222+
223+
if !with_options.is_empty() {
209224
// get other useful fields by `remove`, the logic here is to reject unknown options.
210225
return Err(RwError::from(ProtocolError(format!(
211226
"unexpected options in WITH clause: {:?}",
@@ -238,7 +253,7 @@ It only indicates the physical clustering of the data, which may improve the per
238253

239254
let graph = build_graph(plan)?;
240255

241-
(table, graph, dependencies)
256+
(table, graph, dependencies, resource_group)
242257
};
243258

244259
// Ensure writes to `StreamJobTracker` are atomic.
@@ -256,7 +271,7 @@ It only indicates the physical clustering of the data, which may improve the per
256271
let session = session.clone();
257272
let catalog_writer = session.catalog_writer()?;
258273
catalog_writer
259-
.create_materialized_view(table, graph, dependencies)
274+
.create_materialized_view(table, graph, dependencies, resource_group)
260275
.await?;
261276

262277
Ok(PgResponse::empty_result(

0 commit comments

Comments
 (0)