Skip to content

Commit

Permalink
feat: gitlab extractors and convertors support incremental mode (#7997)
Browse files Browse the repository at this point in the history
* feat: gitlab mr_extractor support incremental sync

* feat: incr mode transformation support for deployment/issue/job/mr and others

* feat: removed mr_enricher and all gitlab transformers support incr-mode

* fix: some gitlab subtasks are missing connection in left join clause

* fix: test cases failed due to gitlab transformers support incr-mode

* fix: linting

* fix: do not collect all accounts from jihulab.com

* fix: typo

* fix: gitlab mr comments won't be converted to domain layer till next run

* feat: improve gitlab MR comments/commits collection performance

* fix: gitlab issues/mrs child records are not deleted

* docs: update stateful extractor doc

* fix: gitlab mr detail test

* refactor: unify stateful extractor and convertor helper

* docs: update stateful extractor/convertor doc

* refactor: remove useless type hint

* fix: shoud not be deleting records not extracted by current extractor

* fix: jira issue extractor should not deleting sprint_issue

* refactor: remove commit related subtasks

* fix: remove commit conversion e2e test

* feat(gitlab): update ExtractApiChildPipelines (#8016)

* fix: retransform should run in fullsync mode

* fix: gitlab issue assignees are not being converted

---------

Co-authored-by: Lynwee <[email protected]>
  • Loading branch information
klesh and d4x1 committed Nov 5, 2024
1 parent 83ec4a5 commit f9dfac1
Show file tree
Hide file tree
Showing 54 changed files with 843 additions and 1,401 deletions.
74 changes: 65 additions & 9 deletions backend/helpers/pluginhelper/api/api_extractor_stateful.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License.
package api

import (
"encoding/json"
"reflect"

"github.com/apache/incubator-devlake/core/dal"
Expand All @@ -27,30 +28,72 @@ import (
)

// StatefulApiExtractorArgs is a struct that contains the arguments for a stateful api extractor
type StatefulApiExtractorArgs struct {
type StatefulApiExtractorArgs[InputType any] struct {
*SubtaskCommonArgs
Extract func(row *RawData) ([]any, errors.Error)
BeforeExtract func(issue *InputType, stateManager *SubtaskStateManager) errors.Error
Extract func(body *InputType, row *RawData) ([]any, errors.Error)
}

type StatefulApiExtractor struct {
*StatefulApiExtractorArgs
// StatefulApiExtractor is a struct that manages the stateful API extraction process.
// It facilitates extracting data from a single _raw_data table and saving it into multiple Tool Layer tables.
// By default, the extractor operates in Incremental Mode, processing only new records added to the raw table since the previous run.
// This approach reduces the amount of data to process, significantly decreasing the execution time.
// The extractor automatically detects if the configuration has changed since the last run. If a change is detected,
// it will automatically switch to Full-Sync mode.
//
// Example:
//
// extractor, err := api.NewStatefulApiExtractor(&api.StatefulApiExtractorArgs[apiv2models.Issue]{
// SubtaskCommonArgs: &api.SubtaskCommonArgs{
// SubTaskContext: subtaskCtx,
// Table: RAW_ISSUE_TABLE,
// Params: JiraApiParams{
// ConnectionId: data.Options.ConnectionId,
// BoardId: data.Options.BoardId,
// },
// SubtaskConfig: config, // The helper stores this configuration in the state and compares it with the previous one
// // to determine the operating mode (Incremental/FullSync).
// // Ensure that the configuration is serializable and contains only public fields.
// // It is also recommended that the configuration includes only the necessary fields used by the extractor.
// ..},
// BeforeExtract: func(body *IssuesResponse, stateManager *api.SubtaskStateManager) errors.Error {
// if stateManager.IsIncremental() {
// // It is important to delete all existing child-records under DiffSync Mode
// err := db.Delete(
// &models.JiraIssueLabel{},
// dal.Where("connection_id = ? AND issue_id = ?", data.Options.ConnectionId, body.Id),
// )
// }
// return nil
// },
// Extract: func(apiIssue *apiv2models.Issue, row *api.RawData) ([]interface{}, errors.Error) {
// },
// })
//
// if err != nil {
// return err
// }
//
// return extractor.Execute()
type StatefulApiExtractor[InputType any] struct {
*StatefulApiExtractorArgs[InputType]
*SubtaskStateManager
}

// NewStatefulApiExtractor creates a new StatefulApiExtractor
func NewStatefulApiExtractor(args *StatefulApiExtractorArgs) (*StatefulApiExtractor, errors.Error) {
func NewStatefulApiExtractor[InputType any](args *StatefulApiExtractorArgs[InputType]) (*StatefulApiExtractor[InputType], errors.Error) {
stateManager, err := NewSubtaskStateManager(args.SubtaskCommonArgs)
if err != nil {
return nil, err
}
return &StatefulApiExtractor{
return &StatefulApiExtractor[InputType]{
StatefulApiExtractorArgs: args,
SubtaskStateManager: stateManager,
}, nil
}

// Execute sub-task
func (extractor *StatefulApiExtractor) Execute() errors.Error {
func (extractor *StatefulApiExtractor[InputType]) Execute() errors.Error {
// load data from database
db := extractor.GetDal()
logger := extractor.GetLogger()
Expand Down Expand Up @@ -103,7 +146,20 @@ func (extractor *StatefulApiExtractor) Execute() errors.Error {
return errors.Default.Wrap(err, "error fetching row")
}

results, err := extractor.Extract(row)
body := new(InputType)
err = errors.Convert(json.Unmarshal(row.Data, body))
if err != nil {
return err
}

if extractor.BeforeExtract != nil {
err = extractor.BeforeExtract(body, extractor.SubtaskStateManager)
if err != nil {
return err
}
}

results, err := extractor.Extract(body, row)
if err != nil {
return errors.Default.Wrap(err, "error calling plugin Extract implementation")
}
Expand Down Expand Up @@ -137,4 +193,4 @@ func (extractor *StatefulApiExtractor) Execute() errors.Error {
return extractor.SubtaskStateManager.Close()
}

var _ plugin.SubTask = (*StatefulApiExtractor)(nil)
var _ plugin.SubTask = (*StatefulApiExtractor[any])(nil)
2 changes: 1 addition & 1 deletion backend/helpers/pluginhelper/api/api_rawdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type RawData struct {
Data []byte
Url string
Input json.RawMessage `gorm:"type:json"`
CreatedAt time.Time
CreatedAt time.Time `gorm:"index"`
}

type TaskOptions interface {
Expand Down
79 changes: 75 additions & 4 deletions backend/helpers/pluginhelper/api/data_convertor_stateful.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,82 @@ import (

type StatefulDataConverterArgs[InputType any] struct {
*SubtaskCommonArgs
Input func(*SubtaskStateManager) (dal.Rows, errors.Error)
Convert func(row *InputType) ([]any, errors.Error)
BatchSize int
Input func(*SubtaskStateManager) (dal.Rows, errors.Error)
BeforeConvert func(issue *InputType, stateManager *SubtaskStateManager) errors.Error
Convert func(row *InputType) ([]any, errors.Error)
BatchSize int
}

// StatefulDataConverter is a struct that manages the stateful data conversion process.
// It facilitates converting data from a database cursor and saving it into arbitrary tables.
// The converter determines the operating mode (Incremental/FullSync) based on the stored state and configuration.
// It then calls the provided `Input` function to obtain the `dal.Rows` (the database cursor) and processes each
// record individually through the `Convert` function, saving the results to the database.
//
// For Incremental mode to work properly, it is crucial to check `stateManager.IsIncremental()` and utilize
// `stateManager.GetSince()` to build your query in the `Input` function, ensuring that only the necessary
// records are fetched.
//
// The converter automatically detects if the configuration has changed since the last run. If a change is detected,
// it will automatically switch to Full-Sync mode.
//
// Example:
//
// converter, err := api.NewStatefulDataConverter(&api.StatefulDataConverterArgs[models.JiraIssue]{
// SubtaskCommonArgs: &api.SubtaskCommonArgs{
// SubTaskContext: subtaskCtx,
// Table: RAW_ISSUE_TABLE,
// Params: JiraApiParams{
// ConnectionId: data.Options.ConnectionId,
// BoardId: data.Options.BoardId,
// },
// SubtaskConfig: mappings,
// },
// Input: func(stateManager *api.SubtaskStateManager) (dal.Rows, errors.Error) {
// clauses := []dal.Clause{
// dal.Select("_tool_jira_issues.*"),
// dal.From("_tool_jira_issues"),
// dal.Join(`left join _tool_jira_board_issues
// on _tool_jira_board_issues.issue_id = _tool_jira_issues.issue_id
// and _tool_jira_board_issues.connection_id = _tool_jira_issues.connection_id`),
// dal.Where(
// "_tool_jira_board_issues.connection_id = ? AND _tool_jira_board_issues.board_id = ?",
// data.Options.ConnectionId,
// data.Options.BoardId,
// ),
// }
// if stateManager.IsIncremental() { // IMPORTANT: to filter records for Incremental Mode
// since := stateManager.GetSince()
// if since != nil {
// clauses = append(clauses, dal.Where("_tool_jira_issues.updated_at >= ? ", since))
// }
// }
// return db.Cursor(clauses...)
// },
// BeforeConvert: func(jiraIssue *models.GitlabMergeRequest, stateManager *api.SubtaskStateManager) errors.Error {
// // It is important to delete all existing child-records under DiffSync Mode
// issueId := issueIdGen.Generate(data.Options.ConnectionId, jiraIssue.IssueId)
// if err := db.Delete(&ticket.IssueAssignee{}, dal.Where("issue_id = ?", issueId)); err != nil {
// return err
// }
// ...
// return nil
// },
// Convert: func(jiraIssue *models.JiraIssue) ([]interface{}, errors.Error) {
// },
// })

// if err != nil {
// return err
// }

// return converter.Execute()
type StatefulDataConverter[InputType any] struct {
*StatefulDataConverterArgs[InputType]
*SubtaskStateManager
}

func NewStatefulDataConverter[
OptType any,
InputType any,
](
args *StatefulDataConverterArgs[InputType],
Expand Down Expand Up @@ -91,6 +155,13 @@ func (converter *StatefulDataConverter[InputType]) Execute() errors.Error {
return errors.Default.Wrap(err, "error fetching rows")
}

if converter.BeforeConvert != nil {
err = converter.BeforeConvert(inputRow, converter.SubtaskStateManager)
if err != nil {
return err
}
}

results, err := converter.Convert(inputRow)
if err != nil {
return errors.Default.Wrap(err, "error calling Converter plugin implementation")
Expand Down
61 changes: 59 additions & 2 deletions backend/helpers/pluginhelper/api/enrich_with_regex.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ import (
// TODO: remove Enricher from naming since it is more like a util function
type RegexEnricher struct {
// This field will store compiled regular expression for every pattern
regexpMap map[string]*regexp.Regexp
regexpMap map[string]*regexp.Regexp
regexMapList map[string][]*regexp.Regexp
}

// NewRegexEnricher initialize a regexEnricher
func NewRegexEnricher() *RegexEnricher {
return &RegexEnricher{regexpMap: make(map[string]*regexp.Regexp)}
return &RegexEnricher{regexpMap: make(map[string]*regexp.Regexp), regexMapList: make(map[string][]*regexp.Regexp)}
}

// AddRegexp will add compiled regular expression for pattern to regexpMap
Expand Down Expand Up @@ -105,3 +106,59 @@ func (r *RegexEnricher) ReturnNameIfOmittedOrMatched(name string, targets ...str
}
return r.ReturnNameIfMatched(name, targets...)
}

func (r *RegexEnricher) PlainMap() map[string]string {
m := make(map[string]string)
for k, v := range r.regexpMap {
m[k] = v.String()
}
return m
}

// TryAdd a named regexp if given pattern is not empty
func (r *RegexEnricher) TryAddList(name string, patterns ...string) errors.Error {
if _, ok := r.regexMapList[name]; ok {
return errors.Default.New(fmt.Sprintf("Regex pattern with name: %s already exists", name))
}
var regexList []*regexp.Regexp
for _, pattern := range patterns {
if pattern == "" {
continue
}
regex, err := errors.Convert01(regexp.Compile(pattern))
if err != nil {
return errors.BadInput.Wrap(err, fmt.Sprintf("Fail to compile pattern for regex pattern: %s", pattern))
}
regexList = append(regexList, regex)
}

// Only save non-empty regexList
if len(regexList) > 0 {
r.regexMapList[name] = regexList
}
return nil
}

// ReturnNameIfMatched will return name if any of the targets matches the regex associated with the given name
func (r *RegexEnricher) ReturnNameIfMatchedList(name string, targets ...string) string {
if regexList, ok := r.regexMapList[name]; !ok {
return ""
} else {
for _, regex := range regexList {
for _, target := range targets {
if regex.MatchString(target) {
return name
}
}
}
return "" // If any regex fails to match, return ""
}
}

// ReturnNameIfOmittedOrMatched returns the given name if regex of the given name is omitted or fallback to ReturnNameIfMatched
func (r *RegexEnricher) ReturnNameIfOmittedOrMatchedList(name string, targets ...string) string {
if _, ok := r.regexMapList[name]; !ok {
return name
}
return r.ReturnNameIfMatched(name, targets...)
}
2 changes: 1 addition & 1 deletion backend/helpers/pluginhelper/api/subtask_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type SubtaskCommonArgs struct {
plugin.SubTaskContext
Table string // raw table name
Params any // for filtering rows belonging to the scope (jira board, github repo) of the subtask
SubtaskConfig any // for determining whether the subtask should run in incremental or full sync mode
SubtaskConfig any // for determining whether the subtask should run in Incremental or Full-Sync mode by comparing with the previous config to see if it changed
BatchSize int // batch size for saving data
}

Expand Down
5 changes: 5 additions & 0 deletions backend/impls/dalgorm/encdec_serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ func (es *EncDecSerializer) Value(ctx context.Context, field *schema.Field, dst
}
target = string(b)
}
if field.GORMDataType == "string" {
println("field.GORMDataType == string", field.Size)
gormTag, ok := field.Tag.Lookup("gorm")
println(ok, gormTag)
}
return plugin.Encrypt(es.encryptionSecret, target)
}

Expand Down
33 changes: 0 additions & 33 deletions backend/plugins/gitlab/e2e/mr_commits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,37 +133,4 @@ func TestGitlabMrCommitDataFlow(t *testing.T) {
CSVRelPath: "./snapshot_tables/pull_request_commits.csv",
IgnoreTypes: []interface{}{common.Model{}},
})

// verify conversion
dataflowTester.FlushTabler(&code.Commit{})
dataflowTester.FlushTabler(&code.RepoCommit{})
dataflowTester.Subtask(tasks.ConvertCommitsMeta, taskData)
dataflowTester.VerifyTable(
code.Commit{},
"./snapshot_tables/commits.csv",
e2ehelper.ColumnWithRawData(
"sha",
"additions",
"deletions",
"dev_eq",
"message",
"author_name",
"author_email",
"authored_date",
"author_id",
"committer_name",
"committer_email",
"committed_date",
"committer_id",
),
)

dataflowTester.VerifyTable(
code.RepoCommit{},
"./snapshot_tables/repo_commits.csv",
e2ehelper.ColumnWithRawData(
"repo_id",
"commit_sha",
),
)
}
2 changes: 2 additions & 0 deletions backend/plugins/gitlab/e2e/mr_detail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ func TestGitlabMrDetailDataFlow(t *testing.T) {
},
}
// import raw data table
dataflowTester.FlushTabler(&code.PullRequestAssignee{})
dataflowTester.FlushTabler(&code.PullRequestReviewer{})
dataflowTester.ImportCsvIntoRawTable("./raw_tables/_raw_gitlab_api_merge_requests.csv",
"_raw_gitlab_api_merge_request_details")

Expand Down
Loading

0 comments on commit f9dfac1

Please sign in to comment.