Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
tabversion committed Jan 28, 2025
1 parent 6ee39d7 commit c205c60
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 13 deletions.
2 changes: 2 additions & 0 deletions e2e_test/source_inline/kafka/alter_table_drop_connector.slt
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ cat <<EOF | rpk topic produce 'test_alter_table_drop_connector' -f "%k^%v\n"
{"ID": 8}^{"ID": 8, "firstName": "Ava", "lastName": "Garcia", "age": 21, "height": 5.2, "weight": 115}
EOF

sleep 1s

# the streaming job does not intake new data
query I
SELECT count(*) FROM plain_students;
Expand Down
14 changes: 1 addition & 13 deletions src/frontend/src/handler/alter_table_drop_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@
use std::collections::HashSet;
use std::sync::{Arc, LazyLock};

use anyhow::Context;
use risingwave_common::error::{NotImplemented, TrackingIssue};
use risingwave_connector::parser::additional_columns::gen_default_addition_col_name;
use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
use risingwave_connector::WithPropertiesExt;
use risingwave_sqlparser::ast::{ColumnDef, Ident};
use risingwave_sqlparser::parser::Parser;

use crate::catalog::root_catalog::SchemaPath;
use crate::catalog::source_catalog::SourceCatalog;
Expand Down Expand Up @@ -66,13 +64,6 @@ fn fetch_schema_info(
};
let (source_def, _) =
reader.get_any_source_by_id(db_name.as_str(), schema_path, &source_id.table_id())?;
if source_def.info.use_schema_registry {
return Err(ErrorCode::ProtocolError(format!(
"Table {} is associated with a source that uses schema registry, cannot drop connector for now.",
real_table_name
))
.into());
}
Ok((table_def.clone(), source_def.clone()))
}

Expand Down Expand Up @@ -176,10 +167,7 @@ pub async fn handle_alter_table_drop_connector(
) -> Result<RwPgResponse> {
let session = handler_args.session;
let (table_def, source_def) = fetch_schema_info(&session, table_name.clone())?;
let [original_definition]: [_; 1] = Parser::parse_sql(&table_def.definition)
.context("unable to parse original table definition")?
.try_into()
.unwrap();
let original_definition = table_def.create_sql_ast_purified()?;

let new_statement = rewrite_table_definition(&table_def, &source_def, original_definition)?;
let (_, table, graph, col_index_mapping, _) = get_replace_table_plan(
Expand Down

0 comments on commit c205c60

Please sign in to comment.