Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix several issues about the public schema #20201

Merged
merged 3 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
272 changes: 155 additions & 117 deletions src/frontend/src/handler/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use pgwire::pg_protocol::truncated_fmt;
use pgwire::pg_response::{PgResponse, StatementType};
use pgwire::pg_server::Session;
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, DEFAULT_SCHEMA_NAME};
use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
use risingwave_common::types::{DataType, Fields, Timestamptz};
use risingwave_common::util::addr::HostAddr;
Expand All @@ -33,8 +33,11 @@ use risingwave_sqlparser::ast::{

use super::{fields_to_descriptors, RwPgResponse, RwPgResponseBuilderExt};
use crate::binder::{Binder, Relation};
use crate::catalog::catalog_service::CatalogReadGuard;
use crate::catalog::root_catalog::SchemaPath;
use crate::catalog::schema_catalog::SchemaCatalog;
use crate::catalog::{CatalogError, IndexCatalog};
use crate::error::Result;
use crate::error::{Result, RwError};
use crate::handler::create_connection::print_connection_params;
use crate::handler::HandlerArgs;
use crate::session::cursor_manager::SubscriptionCursor;
Expand Down Expand Up @@ -102,12 +105,6 @@ pub fn get_indexes_from_table(
Ok(indexes)
}

fn schema_or_default(schema: &Option<Ident>) -> String {
schema
.as_ref()
.map_or_else(|| DEFAULT_SCHEMA_NAME.to_owned(), |s| s.real_value())
}

fn schema_or_search_path(
session: &Arc<SessionImpl>,
schema: &Option<Ident>,
Expand All @@ -130,6 +127,28 @@ fn schema_or_search_path(
}
}

fn iter_schema_items<F, T>(
session: &Arc<SessionImpl>,
schema: &Option<Ident>,
reader: &CatalogReadGuard,
mut f: F,
) -> Vec<T>
where
F: FnMut(&SchemaCatalog) -> Vec<T>,
{
let search_path = session.config().search_path();

schema_or_search_path(session, schema, &search_path)
.into_iter()
.filter_map(|schema| {
reader
.get_schema_by_name(&session.database(), schema.as_ref())
.ok()
})
.flat_map(|s| f(s).into_iter())
.collect()
}

#[derive(Fields)]
#[fields(style = "Title Case")]
struct ShowObjectRow {
Expand Down Expand Up @@ -307,78 +326,72 @@ pub async fn handle_show_object(
let catalog_reader = session.env().catalog_reader();

let names = match command {
// If not include schema name, use default schema name
ShowObject::Table { schema } => {
let search_path = session.config().search_path();
let mut table_names_in_schema = vec![];
for schema in schema_or_search_path(&session, &schema, &search_path) {
// If the schema is not found, skip it
if let Ok(schema_catalog) = catalog_reader
.read_guard()
.get_schema_by_name(&session.database(), schema.as_ref())
{
table_names_in_schema
.extend(schema_catalog.iter_user_table().map(|t| t.name.clone()));
}
}

table_names_in_schema
let reader = catalog_reader.read_guard();
iter_schema_items(&session, &schema, &reader, |schema| {
schema.iter_user_table().map(|t| t.name.clone()).collect()
})
}
ShowObject::InternalTable { schema } => {
let reader = catalog_reader.read_guard();
iter_schema_items(&session, &schema, &reader, |schema| {
schema
.iter_internal_table()
.map(|t| t.name.clone())
.collect()
})
}
ShowObject::InternalTable { schema } => catalog_reader
.read_guard()
.get_schema_by_name(&session.database(), &schema_or_default(&schema))?
.iter_internal_table()
.map(|t| t.name.clone())
.collect(),
ShowObject::Database => catalog_reader.read_guard().get_all_database_names(),
ShowObject::Schema => catalog_reader
.read_guard()
.get_all_schema_names(&session.database())?,
ShowObject::View { schema } => catalog_reader
.read_guard()
.get_schema_by_name(&session.database(), &schema_or_default(&schema))?
.iter_view()
.map(|t| t.name.clone())
.collect(),
ShowObject::MaterializedView { schema } => catalog_reader
.read_guard()
.get_schema_by_name(&session.database(), &schema_or_default(&schema))?
.iter_created_mvs()
.map(|t| t.name.clone())
.collect(),
ShowObject::Source { schema } => catalog_reader
.read_guard()
.get_schema_by_name(&session.database(), &schema_or_default(&schema))?
.iter_source()
.map(|t| t.name.clone())
.chain(session.temporary_source_manager().keys())
.collect(),
ShowObject::Sink { schema } => catalog_reader
.read_guard()
.get_schema_by_name(&session.database(), &schema_or_default(&schema))?
.iter_sink()
.map(|t| t.name.clone())
.collect(),
ShowObject::View { schema } => {
let reader = catalog_reader.read_guard();
iter_schema_items(&session, &schema, &reader, |schema| {
schema.iter_view().map(|t| t.name.clone()).collect()
})
}
ShowObject::MaterializedView { schema } => {
let reader = catalog_reader.read_guard();
iter_schema_items(&session, &schema, &reader, |schema| {
schema.iter_created_mvs().map(|t| t.name.clone()).collect()
})
}
ShowObject::Source { schema } => {
let reader = catalog_reader.read_guard();
let mut sources = iter_schema_items(&session, &schema, &reader, |schema| {
schema.iter_source().map(|t| t.name.clone()).collect()
});
sources.extend(session.temporary_source_manager().keys());
sources
}
ShowObject::Sink { schema } => {
let reader = catalog_reader.read_guard();
iter_schema_items(&session, &schema, &reader, |schema| {
schema.iter_sink().map(|t| t.name.clone()).collect()
})
}
ShowObject::Subscription { schema } => {
let rows = catalog_reader
.read_guard()
.get_schema_by_name(&session.database(), &schema_or_default(&schema))?
.iter_subscription()
.map(|t| ShowSubscriptionRow {
name: t.name.clone(),
retention_seconds: t.retention_seconds as i64,
})
.collect_vec();
let reader = catalog_reader.read_guard();
let rows = iter_schema_items(&session, &schema, &reader, |schema| {
schema
.iter_subscription()
.map(|t| ShowSubscriptionRow {
name: t.name.clone(),
retention_seconds: t.retention_seconds as i64,
})
.collect()
});
return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
.rows(rows)
.into());
}
ShowObject::Secret { schema } => catalog_reader
.read_guard()
.get_schema_by_name(&session.database(), &schema_or_default(&schema))?
.iter_secret()
.map(|t| t.name.clone())
.collect(),
ShowObject::Secret { schema } => {
let reader = catalog_reader.read_guard();
iter_schema_items(&session, &schema, &reader, |schema| {
schema.iter_secret().map(|t| t.name.clone()).collect()
})
}
ShowObject::Columns { table } => {
let Ok(columns) = get_columns_from_table(&session, table.clone())
.or(get_columns_from_sink(&session, table.clone()))
Expand All @@ -404,10 +417,8 @@ pub async fn handle_show_object(
}
ShowObject::Connection { schema } => {
let reader = catalog_reader.read_guard();
let schema =
reader.get_schema_by_name(&session.database(), &schema_or_default(&schema))?;
let rows = schema
.iter_connections()
let rows = iter_schema_items(&session, &schema, &reader, |schema| {
schema.iter_connections()
.map(|c| {
let name = c.name.clone();
let r#type = match &c.info {
Expand All @@ -420,13 +431,13 @@ pub async fn handle_show_object(
};
let source_names = schema
.get_source_ids_by_connection(c.id)
.unwrap_or(Vec::new())
.unwrap_or_default()
.into_iter()
.filter_map(|sid| schema.get_source_by_id(&sid).map(|catalog| catalog.name.as_str()))
.collect_vec();
let sink_names = schema
.get_sink_ids_by_connection(c.id)
.unwrap_or(Vec::new())
.unwrap_or_default()
.into_iter()
.filter_map(|sid| schema.get_sink_by_id(&sid).map(|catalog| catalog.name.as_str()))
.collect_vec();
Expand All @@ -452,23 +463,26 @@ pub async fn handle_show_object(
r#type,
properties,
}
});
}).collect_vec()
});
return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
.rows(rows)
.into());
}
ShowObject::Function { schema } => {
let reader = catalog_reader.read_guard();
let rows = reader
.get_schema_by_name(&session.database(), &schema_or_default(&schema))?
.iter_function()
.map(|t| ShowFunctionRow {
name: t.name.clone(),
arguments: t.arg_types.iter().map(|t| t.to_string()).join(", "),
return_type: t.return_type.to_string(),
language: t.language.clone(),
link: t.link.clone(),
});
let rows = iter_schema_items(&session, &schema, &reader, |schema| {
schema
.iter_function()
.map(|t| ShowFunctionRow {
name: t.name.clone(),
arguments: t.arg_types.iter().map(|t| t.to_string()).join(", "),
return_type: t.return_type.to_string(),
language: t.language.clone(),
link: t.link.clone(),
})
.collect()
});
return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
.rows(rows)
.into());
Expand Down Expand Up @@ -628,57 +642,81 @@ pub fn handle_show_create_object(
let database = session.database();
let (schema_name, object_name) =
Binder::resolve_schema_qualified_name(&database, name.clone())?;
let schema_name = schema_name.unwrap_or(DEFAULT_SCHEMA_NAME.to_owned());
let schema = catalog_reader.get_schema_by_name(&database, &schema_name)?;
let sql = match show_create_type {
let search_path = session.config().search_path();
let user_name = &session.user_name();
let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);

let (sql, schema_name) = match show_create_type {
ShowCreateType::MaterializedView => {
let mv = schema
.get_created_table_by_name(&object_name)
.filter(|t| t.is_mview())
let (mv, schema) = schema_path
.try_find(|schema_name| {
Ok::<_, RwError>(
catalog_reader
.get_schema_by_name(&database, schema_name)?
.get_created_table_by_name(&object_name)
.filter(|t| t.is_mview()),
)
})?
.ok_or_else(|| CatalogError::NotFound("materialized view", name.to_string()))?;
mv.create_sql()
(mv.create_sql(), schema)
}
ShowCreateType::View => {
let view = schema
.get_view_by_name(&object_name)
.ok_or_else(|| CatalogError::NotFound("view", name.to_string()))?;
view.create_sql(schema.name())
let (view, schema) =
catalog_reader.get_view_by_name(&database, schema_path, &object_name)?;
(view.create_sql(schema.to_owned()), schema)
}
ShowCreateType::Table => {
let table = schema
.get_created_table_by_name(&object_name)
.filter(|t| t.is_user_table())
let (table, schema) = schema_path
.try_find(|schema_name| {
Ok::<_, RwError>(
catalog_reader
.get_schema_by_name(&database, schema_name)?
.get_created_table_by_name(&object_name)
.filter(|t| t.is_user_table()),
)
})?
.ok_or_else(|| CatalogError::NotFound("table", name.to_string()))?;
table.create_sql_purified()

(table.create_sql_purified(), schema)
}
ShowCreateType::Sink => {
let sink = schema
.get_sink_by_name(&object_name)
.ok_or_else(|| CatalogError::NotFound("sink", name.to_string()))?;
sink.create_sql()
let (sink, schema) =
catalog_reader.get_sink_by_name(&database, schema_path, &object_name)?;
(sink.create_sql(), schema)
}
ShowCreateType::Source => {
let source = schema
.get_source_by_name(&object_name)
.filter(|s| s.associated_table_id.is_none())
let (source, schema) = schema_path
.try_find(|schema_name| {
Ok::<_, RwError>(
catalog_reader
.get_schema_by_name(&database, schema_name)?
.get_source_by_name(&object_name)
.filter(|s| s.associated_table_id.is_none()),
)
})?
.ok_or_else(|| CatalogError::NotFound("source", name.to_string()))?;
source.create_sql_purified()
(source.create_sql_purified(), schema)
}
ShowCreateType::Index => {
let index = schema
.get_created_table_by_name(&object_name)
.filter(|t| t.is_index())
let (index, schema) = schema_path
.try_find(|schema_name| {
Ok::<_, RwError>(
catalog_reader
.get_schema_by_name(&database, schema_name)?
.get_created_table_by_name(&object_name)
.filter(|t| t.is_index()),
)
})?
.ok_or_else(|| CatalogError::NotFound("index", name.to_string()))?;
index.create_sql()
(index.create_sql(), schema)
}
ShowCreateType::Function => {
bail_not_implemented!("show create on: {}", show_create_type);
}
ShowCreateType::Subscription => {
let subscription = schema
.get_subscription_by_name(&object_name)
.ok_or_else(|| CatalogError::NotFound("subscription", name.to_string()))?;
subscription.create_sql()
let (subscription, schema) =
catalog_reader.get_subscription_by_name(&database, schema_path, &object_name)?;
(subscription.create_sql(), schema)
}
};
let name = format!("{}.{}", schema_name, object_name);
Expand Down
Loading
Loading