Skip to content

Commit

Permalink
use update_sql when specified
Browse files Browse the repository at this point in the history
  • Loading branch information
jacekszubert committed Oct 12, 2022
1 parent d426380 commit 684c6ec
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 2 deletions.
17 changes: 16 additions & 1 deletion mysql/resource_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ func resourcePipeline() *schema.Resource {
ForceNew: true,
},

"table_name": {
Type: schema.TypeString,
Optional: true,
ForceNew: false,
Default: "",
},

"kafka_endpoint": {
Type: schema.TypeString,
Optional: true,
Expand Down Expand Up @@ -197,6 +204,7 @@ func pipelineConfigSQL(verb string, d *schema.ResourceData) string {
databaseName := d.Get("database_name").(string)
defaultKafkaEndpoint := d.Get("kafka_endpoint").(string)
defaultKafkaTopic := d.Get("kafka_topic").(string)
defaultTableName := d.Get("table_name").(string)
defaultConfig := d.Get("config").(string)
defaultTableMapping := d.Get("table_mapping").(string)
defaultSchema := d.Get("schema").(string)
Expand All @@ -208,6 +216,7 @@ func pipelineConfigSQL(verb string, d *schema.ResourceData) string {
var schemaClause string
var onDuplicateKeyUpdateClause string
var setClause string
var tableName string

if defaultKafkaEndpoint != "" {
pipelineClause = fmt.Sprintf("KAFKA '%s/%s' %s", defaultKafkaEndpoint, defaultKafkaTopic, defaultConfig)
Expand All @@ -225,13 +234,19 @@ func pipelineConfigSQL(verb string, d *schema.ResourceData) string {
setClause = fmt.Sprintf("SET %s", defaultSet)
}

if defaultTableName != "" {
tableName = defaultTableName
} else {
tableName = name
}

return fmt.Sprintf(
"BEGIN; USE %s; %s PIPELINE %s AS LOAD DATA %s INTO TABLE %s %s %s %s %s; COMMIT;",
databaseName,
verb,
name,
pipelineClause,
name,
tableName,
tableMappingClause,
schemaClause,
setClause,
Expand Down
2 changes: 1 addition & 1 deletion mysql/resource_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func UpdateSql(d *schema.ResourceData, meta interface{}) error {
return err
}
name := d.Get("name").(string)
updateSql := d.Get("create_sql").(string)
updateSql := d.Get("update_sql").(string)

log.Println("Executing SQL", updateSql)

Expand Down

0 comments on commit 684c6ec

Please sign in to comment.