Skip to content

Commit

Permalink
Allow mappings to specify from/to/key in any order (apache#22)
Browse files Browse the repository at this point in the history
Improves UX while preparing for addition of another optional field for column exclusion
  • Loading branch information
serprex authored Nov 2, 2023
1 parent 33f05e4 commit 1a619fd
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 32 deletions.
12 changes: 4 additions & 8 deletions src/ast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1153,16 +1153,12 @@ pub struct MappingOptions {

impl fmt::Display for MappingOptions {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
// (from, to, key)
// (from, to, key, exclude)
write!(f, "{{from : {}, to : {}", self.source, self.destination)?;
if let Some(partition_key) = &self.partition_key {
write!(
f,
"{{from : {}, to : {}, key : {}}}",
self.source, self.destination, partition_key
)
} else {
write!(f, "{{from : {}, to : {}}}", self.source, self.destination)
write!(f, ", key : {}", partition_key)?;
}
write!(f, "}}")
}
}

Expand Down
64 changes: 42 additions & 22 deletions src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7269,31 +7269,51 @@ impl<'a> Parser<'a> {
});
}

self.expect_keyword(Keyword::FROM)?;
self.expect_token(&Token::Colon)?;
let source_table_identifier = self.parse_object_name()?;
self.expect_token(&Token::Comma)?;

self.expect_keyword(Keyword::TO)?;
self.expect_token(&Token::Colon)?;
let destination_table_identifier = self.parse_object_name()?;

let has_part_key = self.consume_token(&Token::Comma);
let partition_key = if has_part_key {
self.expect_keyword(Keyword::KEY)?;
let mut source_table_identifier = None;
let mut destination_table_identifier = None;
let mut partition_key = None;
loop {
let kw = self.expect_one_of_keywords(&[Keyword::FROM, Keyword::TO, Keyword::KEY])?;
self.expect_token(&Token::Colon)?;
Some(self.parse_identifier()?)
} else {
None
};

match kw {
Keyword::FROM => {
if source_table_identifier.is_some() {
return Err(ParserError::ParserError("Duplicate FROM".to_string()));
}
source_table_identifier = Some(self.parse_object_name()?);
}
Keyword::TO => {
if destination_table_identifier.is_some() {
return Err(ParserError::ParserError("Duplicate TO".to_string()));
}
destination_table_identifier = Some(self.parse_object_name()?);
}
Keyword::KEY => {
if partition_key.is_some() {
return Err(ParserError::ParserError("Duplicate KEY".to_string()));
}
partition_key = Some(self.parse_identifier()?);
}
// unreachable because expect_one_of_keywords used above
_ => unreachable!(),
}
if !self.consume_token(&Token::Comma) {
break;
}
}
self.expect_token(&Token::RBrace)?;

Ok(MappingOptions {
source: source_table_identifier,
destination: destination_table_identifier,
partition_key,
})
if let (Some(source_table_identifier), Some(destination_table_identifier)) =
(source_table_identifier, destination_table_identifier)
{
Ok(MappingOptions {
source: source_table_identifier,
destination: destination_table_identifier,
partition_key,
})
} else {
Err(ParserError::ParserError("Expected FROM/TO".to_string()))
}
}
}

Expand Down
118 changes: 116 additions & 2 deletions tests/sqlparser_postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3199,7 +3199,7 @@ fn parse_drop_mirror() {
#[test]
fn parse_mirror_for_select() {
match pg().verified_stmt("CREATE MIRROR IF NOT EXISTS test_mirror FROM p1 TO p2 FOR $$SELECT 1$$ WITH (key1 = 'value1', key2 = 'value2')") {
Statement::CreateMirror { if_not_exists,create_mirror: MirrorSelect(select) } => {
Statement::CreateMirror { if_not_exists,create_mirror: MirrorSelect(select) } => {
assert!(if_not_exists);
assert_eq!(select.mirror_name, ObjectName(vec![Ident::new("test_mirror")]));
assert_eq!(select.source_peer, ObjectName(vec![Ident::new("p1")]));
Expand All @@ -3210,11 +3210,125 @@ fn parse_mirror_for_select() {
assert_eq!(select.with_options[0].value, Value::SingleQuotedString("value1".into()));
assert_eq!(select.with_options[1].name, Ident::new("key2"));
assert_eq!(select.with_options[1].value, Value::SingleQuotedString("value2".into()));
},
},
_ => unreachable!(),
}
}

#[test]
fn parse_mirror_table_mapping_v1() {
match pg()
.parse_sql_statements(
"CREATE MIRROR test FROM p1 TO p2 WITH TABLE MAPPING (\
schema1.source:schema2.target,\
schema3.source2:schema4.target2\
) WITH (key1 = 'value1')",
)
.unwrap()
.first()
.unwrap()
{
Statement::CreateMirror {
if_not_exists,
create_mirror: CDC(cdc),
} => {
assert!(!if_not_exists);
assert_eq!(cdc.mirror_name, ObjectName(vec![Ident::new("test")]));
assert_eq!(cdc.source_peer, ObjectName(vec![Ident::new("p1")]));
assert_eq!(cdc.target_peer, ObjectName(vec![Ident::new("p2")]));
assert_eq!(cdc.with_options.len(), 1);
assert_eq!(cdc.with_options[0].name, Ident::new("key1"));
assert_eq!(
cdc.with_options[0].value,
Value::SingleQuotedString("value1".into())
);
assert_eq!(cdc.mapping_type, MappingType::Table);
assert_eq!(cdc.mapping_options.len(), 2);
assert_eq!(
cdc.mapping_options[0].source,
ObjectName(vec![Ident::new("schema1"), Ident::new("source")])
);
assert_eq!(
cdc.mapping_options[0].destination,
ObjectName(vec![Ident::new("schema2"), Ident::new("target")])
);
assert_eq!(cdc.mapping_options[0].partition_key, None);
assert_eq!(
cdc.mapping_options[1].source,
ObjectName(vec![Ident::new("schema3"), Ident::new("source2")])
);
assert_eq!(
cdc.mapping_options[1].destination,
ObjectName(vec![Ident::new("schema4"), Ident::new("target2")])
);
}
_ => unreachable!(),
}
}

#[test]
fn parse_mirror_table_mapping_v2() {
match pg()
.parse_sql_statements(
"CREATE MIRROR test FROM p1 TO p2 WITH TABLE MAPPING (\
{from:schema1.source,to:schema2.target},\
{to:schema4.target2,from:schema3.source2,key:k}\
) WITH (key1 = 'value1')",
)
.unwrap()
.first()
.unwrap()
{
Statement::CreateMirror {
if_not_exists,
create_mirror: CDC(cdc),
} => {
assert!(!if_not_exists);
assert_eq!(cdc.mirror_name, ObjectName(vec![Ident::new("test")]));
assert_eq!(cdc.source_peer, ObjectName(vec![Ident::new("p1")]));
assert_eq!(cdc.target_peer, ObjectName(vec![Ident::new("p2")]));
assert_eq!(cdc.with_options.len(), 1);
assert_eq!(cdc.with_options[0].name, Ident::new("key1"));
assert_eq!(
cdc.with_options[0].value,
Value::SingleQuotedString("value1".into())
);
assert_eq!(cdc.mapping_type, MappingType::Table);
assert_eq!(cdc.mapping_options.len(), 2);
assert_eq!(
cdc.mapping_options[0].source,
ObjectName(vec![Ident::new("schema1"), Ident::new("source")])
);
assert_eq!(
cdc.mapping_options[0].destination,
ObjectName(vec![Ident::new("schema2"), Ident::new("target")])
);
assert_eq!(cdc.mapping_options[0].partition_key, None);
assert_eq!(
cdc.mapping_options[1].source,
ObjectName(vec![Ident::new("schema3"), Ident::new("source2")])
);
assert_eq!(
cdc.mapping_options[1].destination,
ObjectName(vec![Ident::new("schema4"), Ident::new("target2")])
);
assert_eq!(cdc.mapping_options[1].partition_key, Some(Ident::new("k")));
}
_ => unreachable!(),
}
}

#[test]
fn parse_mirror_table_mapping_v2_missing() {
assert!(pg()
.parse_sql_statements(
"CREATE MIRROR test FROM p1 TO p2 WITH TABLE MAPPING (\
{from:asdf}\
) WITH (key1 = 'value1')"
)
.is_err());
}

#[test]
fn parse_abort() {
match pg().verified_stmt("ROLLBACK") {
Expand Down

0 comments on commit 1a619fd

Please sign in to comment.