Skip to content

Commit

Permalink
fix(cdc): fix wrong default column matching
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang committed Jan 31, 2025
1 parent 08bdf27 commit f6caf7d
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 11 deletions.
23 changes: 23 additions & 0 deletions e2e_test/source_legacy/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,13 @@ CREATE TABLE test_my_default_value (
PRIMARY KEY (id)
) FROM mysql_mytest TABLE 'mytest.test_my_default_value';

statement ok
CREATE TABLE test_my_default_value_disorderd (
city varchar,
id int,
PRIMARY KEY (id)
) FROM mysql_mytest TABLE 'mytest.test_my_default_value';

statement ok
SET RW_IMPLICIT_FLUSH=true;

Expand All @@ -200,6 +207,10 @@ select * from test_my_default_value;
----
2 jack Shanghai

query II
select * from test_my_default_value_disorderd;
----
Shanghai 2

statement ok
create table kt1 (*) from mysql_source table 'kdb.kt1';
Expand Down Expand Up @@ -650,6 +661,13 @@ CREATE TABLE test_pg_default_value (
PRIMARY KEY (id)
) FROM pg_source TABLE 'public.test_default_value';

statement ok
CREATE TABLE test_pg_default_value_disorderd (
city varchar,
id int,
PRIMARY KEY (id)
) FROM pg_source TABLE 'public.test_default_value';

statement ok
SET RW_IMPLICIT_FLUSH=true;

Expand All @@ -661,6 +679,11 @@ select * from test_pg_default_value;
----
1 noris Shanghai

query II
select * from test_pg_default_value_disorderd;
----
Shanghai 1

### BEGIN reset the password to the original one
onlyif can-use-recover
statement ok
Expand Down
26 changes: 15 additions & 11 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use risingwave_common::license::Feature;
use risingwave_common::session_config::sink_decouple::SinkDecouple;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_common::util::value_encoding::DatumToProtoExt;
use risingwave_common::{bail, bail_not_implemented};
Expand Down Expand Up @@ -1182,20 +1181,25 @@ pub(super) async fn handle_create_table_plan(
let table: ExternalTableImpl = ExternalTableImpl::connect(config)
.await
.context("failed to auto derive table schema")?;
let external_columns: Vec<_> = table
let external_columns: HashMap<&str, ColumnCatalog> = table
.column_descs()
.iter()
.cloned()
.map(|column_desc| ColumnCatalog {
column_desc,
is_hidden: false,
.map(|column_desc| {
(
column_desc.name(),
ColumnCatalog {
column_desc: column_desc.clone(),
is_hidden: false,
},
)
})
.collect();
for (col, external_col) in
columns.iter_mut().zip_eq_fast(external_columns.into_iter())
{
col.column_desc.generated_or_default_column =
external_col.column_desc.generated_or_default_column;

for col in columns.iter_mut() {
if let Some(external_col) = external_columns.get(col.name()) {
col.column_desc.generated_or_default_column =
external_col.column_desc.generated_or_default_column.clone();
}
}
(columns, pk_names)
}
Expand Down

0 comments on commit f6caf7d

Please sign in to comment.