Skip to content

Commit ffb8e18

Browse files
committed
feat: starrocsk plugin add table config
1 parent 5bbd92b commit ffb8e18

File tree

2 files changed

+24
-5
lines changed

2 files changed

+24
-5
lines changed

Diff for: backend/plugins/starrocks/tasks/task_data.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ limitations under the License.
1717

1818
package tasks
1919

20+
type TableConfig struct {
21+
IncludedColumns []string `mapstructure:"included_columns"`
22+
ExcludedColumns []string `mapstructure:"excluded_columns"`
23+
}
24+
2025
type StarRocksConfig struct {
2126
SourceType string `mapstructure:"source_type"`
2227
SourceDsn string `mapstructure:"source_dsn"`
@@ -29,8 +34,9 @@ type StarRocksConfig struct {
2934
BeHost string `mapstructure:"be_host"`
3035
BePort int `mapstructure:"be_port"`
3136
Tables []string
32-
BatchSize int `mapstructure:"batch_size"`
33-
OrderBy map[string]string `mapstructure:"order_by"`
34-
DomainLayer string `mapstructure:"domain_layer"`
37+
TableConfigs map[string]TableConfig `mapstructure:"table_configs"`
38+
BatchSize int `mapstructure:"batch_size"`
39+
OrderBy map[string]string `mapstructure:"order_by"`
40+
DomainLayer string `mapstructure:"domain_layer"`
3541
Extra map[string]string
3642
}

Diff for: backend/plugins/starrocks/tasks/tasks.go

+15-2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import (
2929
"strings"
3030
"time"
3131

32+
"golang.org/x/exp/slices"
33+
3234
"github.com/apache/incubator-devlake/core/dal"
3335
"github.com/apache/incubator-devlake/core/errors"
3436
"github.com/apache/incubator-devlake/core/plugin"
@@ -138,7 +140,6 @@ func createTmpTableInStarrocks(dc *DataConfigParams) (map[string]string, string,
138140
table := dc.SrcTableName
139141
starrocksTable := dc.DestTableName
140142
starrocksTmpTable := fmt.Sprintf("%s_tmp", starrocksTable)
141-
142143
columnMetas, err := db.GetColumns(&Table{name: table}, nil)
143144
updateColumn := config.UpdateColumn
144145
columnMap := make(map[string]string)
@@ -163,8 +164,21 @@ func createTmpTableInStarrocks(dc *DataConfigParams) (map[string]string, string,
163164
} else {
164165
return nil, "", false, errors.NotFound.New(fmt.Sprintf("unsupported dialect %s", db.Dialect()))
165166
}
167+
tableConfig, ok := config.TableConfigs[table]
166168
for _, cm := range columnMetas {
167169
name := cm.Name()
170+
if ok {
171+
if len(tableConfig.ExcludedColumns) > 0 {
172+
if slices.Contains(tableConfig.ExcludedColumns, name) {
173+
continue
174+
}
175+
}
176+
if len(tableConfig.IncludedColumns) > 0 {
177+
if !slices.Contains(tableConfig.IncludedColumns, name) {
178+
continue
179+
}
180+
}
181+
}
168182
if name == updateColumn {
169183
// check update column to detect skip or not
170184
var updatedFrom time.Time
@@ -276,7 +290,6 @@ func copyDataToDst(dc *DataConfigParams, columnMap map[string]string, orderBy st
276290
} else {
277291
return err
278292
}
279-
280293
}
281294
defer rows.Close()
282295

0 commit comments

Comments
 (0)