Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: gitlab extractors and convertors support incremental mode #7997

Merged
merged 28 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
5ba00cf
feat: gitlab mr_extractor support incremental sync
klesh Sep 2, 2024
9932391
feat: incr mode transformation support for deployment/issue/job/mr an…
klesh Sep 3, 2024
dbd6c1c
feat: removed mr_enricher and all gitlab transformers support incr-mode
klesh Sep 4, 2024
797a819
fix: some gitlab subtasks are missing connection in left join clause
klesh Sep 4, 2024
0bb3453
fix: test cases failed due to gitlab transformers support incr-mode
klesh Sep 4, 2024
ec4ca35
fix: linting
klesh Sep 4, 2024
37611a5
fix: do not collect all accounts from jihulab.com
klesh Sep 5, 2024
25db78e
fix: typo
klesh Sep 5, 2024
5045741
fix: gitlab mr comments won't be converted to domain layer till next run
klesh Sep 5, 2024
191f2e6
feat: improve gitlab MR comments/commits collection performance
klesh Sep 5, 2024
3307ad9
fix: gitlab issues/mrs child records are not deleted
klesh Sep 5, 2024
637b5c5
docs: update stateful extractor doc
klesh Sep 5, 2024
5b9da49
fix: gitlab mr detail test
klesh Sep 6, 2024
a403d48
refactor: unify stateful extractor and convertor helper
klesh Sep 6, 2024
c640547
docs: update stateful extractor/convertor doc
klesh Sep 6, 2024
51f9ccf
refactor: remove useless type hint
klesh Sep 6, 2024
81d2a7a
fix: shoud not be deleting records not extracted by current extractor
klesh Sep 6, 2024
967dba8
fix: jira issue extractor should not deleting sprint_issue
klesh Sep 6, 2024
5e0dc15
refactor: remove commit related subtasks
klesh Sep 6, 2024
4c2a710
Merge remote-tracking branch 'origin/main' into feat-gitlab-tx-inc
klesh Sep 9, 2024
2cb0576
fix: remove commit conversion e2e test
klesh Sep 9, 2024
5e66cc6
Merge remote-tracking branch 'origin/main' into feat-gitlab-tx-inc
klesh Sep 9, 2024
4eabe80
feat(gitlab): update ExtractApiChildPipelines (#8016)
d4x1 Sep 9, 2024
8585d14
Merge branch 'main' into feat-gitlab-tx-inc
klesh Sep 10, 2024
3c6d9fd
fix: retransform should run in fullsync mode
klesh Sep 10, 2024
387ed8e
Merge branch 'main' into feat-gitlab-tx-inc
klesh Sep 11, 2024
6c144fa
fix: gitlab issue assignees are not being converted
klesh Sep 12, 2024
2c54c14
Merge remote-tracking branch 'origin/main' into feat-gitlab-tx-inc
klesh Sep 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 36 additions & 12 deletions backend/helpers/pluginhelper/api/api_extractor_stateful.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License.
package api

import (
"encoding/json"
"reflect"

"github.com/apache/incubator-devlake/core/dal"
Expand All @@ -27,9 +28,10 @@ import (
)

// StatefulApiExtractorArgs is a struct that contains the arguments for a stateful api extractor
type StatefulApiExtractorArgs struct {
type StatefulApiExtractorArgs[InputType any] struct {
*SubtaskCommonArgs
Extract func(row *RawData) ([]any, errors.Error)
BeforeExtract func(issue *InputType, stateManager *SubtaskStateManager) errors.Error
Extract func(body *InputType, row *RawData) ([]any, errors.Error)
}

// StatefulApiExtractor is a struct that manages the stateful API extraction process.
Expand All @@ -41,7 +43,7 @@ type StatefulApiExtractorArgs struct {
//
// Example:
//
// extractor, err := api.NewStatefulApiExtractor(&api.StatefulApiExtractorArgs{
// extractor, err := api.NewStatefulApiExtractor(&api.StatefulApiExtractorArgs[apiv2models.Issue]{
// SubtaskCommonArgs: &api.SubtaskCommonArgs{
// SubTaskContext: subtaskCtx,
// Table: RAW_ISSUE_TABLE,
Expand All @@ -54,8 +56,17 @@ type StatefulApiExtractorArgs struct {
// // Ensure that the configuration is serializable and contains only public fields.
// // It is also recommended that the configuration includes only the necessary fields used by the extractor.
// ..},
// Extract: func(row *api.RawData) ([]interface{}, errors.Error) {
// return extractIssues(data, config, row, userFieldMap)
// BeforeExtract: func(body *IssuesResponse, stateManager *api.SubtaskStateManager) errors.Error {
// if stateManager.IsIncremental() {
// // It is important to delete all existing child-records under DiffSync Mode
// err := db.Delete(
// &models.JiraIssueLabel{},
// dal.Where("connection_id = ? AND issue_id = ?", data.Options.ConnectionId, body.Id),
// )
// }
// return nil
// },
// Extract: func(apiIssue *apiv2models.Issue, row *api.RawData) ([]interface{}, errors.Error) {
// },
// })
//
Expand All @@ -64,25 +75,25 @@ type StatefulApiExtractorArgs struct {
// }
//
// return extractor.Execute()
type StatefulApiExtractor struct {
*StatefulApiExtractorArgs
type StatefulApiExtractor[InputType any] struct {
*StatefulApiExtractorArgs[InputType]
*SubtaskStateManager
}

// NewStatefulApiExtractor creates a new StatefulApiExtractor
func NewStatefulApiExtractor(args *StatefulApiExtractorArgs) (*StatefulApiExtractor, errors.Error) {
func NewStatefulApiExtractor[InputType any](args *StatefulApiExtractorArgs[InputType]) (*StatefulApiExtractor[InputType], errors.Error) {
stateManager, err := NewSubtaskStateManager(args.SubtaskCommonArgs)
if err != nil {
return nil, err
}
return &StatefulApiExtractor{
return &StatefulApiExtractor[InputType]{
StatefulApiExtractorArgs: args,
SubtaskStateManager: stateManager,
}, nil
}

// Execute sub-task
func (extractor *StatefulApiExtractor) Execute() errors.Error {
func (extractor *StatefulApiExtractor[InputType]) Execute() errors.Error {
// load data from database
db := extractor.GetDal()
logger := extractor.GetLogger()
Expand Down Expand Up @@ -135,7 +146,20 @@ func (extractor *StatefulApiExtractor) Execute() errors.Error {
return errors.Default.Wrap(err, "error fetching row")
}

results, err := extractor.Extract(row)
body := new(InputType)
err = errors.Convert(json.Unmarshal(row.Data, body))
if err != nil {
return err
}

if extractor.BeforeExtract != nil {
err = extractor.BeforeExtract(body, extractor.SubtaskStateManager)
if err != nil {
return err
}
}

results, err := extractor.Extract(body, row)
if err != nil {
return errors.Default.Wrap(err, "error calling plugin Extract implementation")
}
Expand Down Expand Up @@ -169,4 +193,4 @@ func (extractor *StatefulApiExtractor) Execute() errors.Error {
return extractor.SubtaskStateManager.Close()
}

var _ plugin.SubTask = (*StatefulApiExtractor)(nil)
var _ plugin.SubTask = (*StatefulApiExtractor[any])(nil)
2 changes: 1 addition & 1 deletion backend/helpers/pluginhelper/api/api_rawdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type RawData struct {
Data []byte
Url string
Input json.RawMessage `gorm:"type:json"`
CreatedAt time.Time
CreatedAt time.Time `gorm:"index"`
}

type TaskOptions interface {
Expand Down
26 changes: 21 additions & 5 deletions backend/helpers/pluginhelper/api/data_convertor_stateful.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ import (

type StatefulDataConverterArgs[InputType any] struct {
*SubtaskCommonArgs
Input func(*SubtaskStateManager) (dal.Rows, errors.Error)
Convert func(row *InputType) ([]any, errors.Error)
BatchSize int
Input func(*SubtaskStateManager) (dal.Rows, errors.Error)
BeforeConvert func(issue *InputType, stateManager *SubtaskStateManager) errors.Error
Convert func(row *InputType) ([]any, errors.Error)
BatchSize int
}

// StatefulDataConverter is a struct that manages the stateful data conversion process.
Expand All @@ -47,7 +48,7 @@ type StatefulDataConverterArgs[InputType any] struct {
//
// Example:
//
// converter, err := api.NewStatefulDataConverter[models.JiraIssue](&api.StatefulDataConverterArgs[models.JiraIssue]{
// converter, err := api.NewStatefulDataConverter(&api.StatefulDataConverterArgs[models.JiraIssue]{
// SubtaskCommonArgs: &api.SubtaskCommonArgs{
// SubTaskContext: subtaskCtx,
// Table: RAW_ISSUE_TABLE,
Expand Down Expand Up @@ -78,6 +79,15 @@ type StatefulDataConverterArgs[InputType any] struct {
// }
// return db.Cursor(clauses...)
// },
// BeforeConvert: func(jiraIssue *models.GitlabMergeRequest, stateManager *api.SubtaskStateManager) errors.Error {
// // It is important to delete all existing child-records under DiffSync Mode
// issueId := issueIdGen.Generate(data.Options.ConnectionId, jiraIssue.IssueId)
// if err := db.Delete(&ticket.IssueAssignee{}, dal.Where("issue_id = ?", issueId)); err != nil {
// return err
// }
// ...
// return nil
// },
// Convert: func(jiraIssue *models.JiraIssue) ([]interface{}, errors.Error) {
// },
// })
Expand All @@ -93,7 +103,6 @@ type StatefulDataConverter[InputType any] struct {
}

func NewStatefulDataConverter[
OptType any,
InputType any,
](
args *StatefulDataConverterArgs[InputType],
Expand Down Expand Up @@ -146,6 +155,13 @@ func (converter *StatefulDataConverter[InputType]) Execute() errors.Error {
return errors.Default.Wrap(err, "error fetching rows")
}

if converter.BeforeConvert != nil {
err = converter.BeforeConvert(inputRow, converter.SubtaskStateManager)
if err != nil {
return err
}
}

results, err := converter.Convert(inputRow)
if err != nil {
return errors.Default.Wrap(err, "error calling Converter plugin implementation")
Expand Down
8 changes: 8 additions & 0 deletions backend/helpers/pluginhelper/api/enrich_with_regex.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ func (r *RegexEnricher) ReturnNameIfOmittedOrMatched(name string, targets ...str
return r.ReturnNameIfMatched(name, targets...)
}

func (r *RegexEnricher) PlainMap() map[string]string {
m := make(map[string]string)
for k, v := range r.regexpMap {
m[k] = v.String()
}
return m
}

// TryAdd a named regexp if given pattern is not empty
func (r *RegexEnricher) TryAddList(name string, patterns ...string) errors.Error {
if _, ok := r.regexMapList[name]; ok {
Expand Down
2 changes: 1 addition & 1 deletion backend/helpers/pluginhelper/api/subtask_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type SubtaskCommonArgs struct {
plugin.SubTaskContext
Table string // raw table name
Params any // for filtering rows belonging to the scope (jira board, github repo) of the subtask
SubtaskConfig any // for determining whether the subtask should run in incremental or full sync mode
SubtaskConfig any // for determining whether the subtask should run in Incremental or Full-Sync mode by comparing with the previous config to see if it changed
BatchSize int // batch size for saving data
}

Expand Down
5 changes: 5 additions & 0 deletions backend/impls/dalgorm/encdec_serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ func (es *EncDecSerializer) Value(ctx context.Context, field *schema.Field, dst
}
target = string(b)
}
if field.GORMDataType == "string" {
println("field.GORMDataType == string", field.Size)
gormTag, ok := field.Tag.Lookup("gorm")
println(ok, gormTag)
}
return plugin.Encrypt(es.encryptionSecret, target)
}

Expand Down
33 changes: 0 additions & 33 deletions backend/plugins/gitlab/e2e/mr_commits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,37 +133,4 @@ func TestGitlabMrCommitDataFlow(t *testing.T) {
CSVRelPath: "./snapshot_tables/pull_request_commits.csv",
IgnoreTypes: []interface{}{common.Model{}},
})

// verify conversion
dataflowTester.FlushTabler(&code.Commit{})
dataflowTester.FlushTabler(&code.RepoCommit{})
dataflowTester.Subtask(tasks.ConvertCommitsMeta, taskData)
dataflowTester.VerifyTable(
code.Commit{},
"./snapshot_tables/commits.csv",
e2ehelper.ColumnWithRawData(
"sha",
"additions",
"deletions",
"dev_eq",
"message",
"author_name",
"author_email",
"authored_date",
"author_id",
"committer_name",
"committer_email",
"committed_date",
"committer_id",
),
)

dataflowTester.VerifyTable(
code.RepoCommit{},
"./snapshot_tables/repo_commits.csv",
e2ehelper.ColumnWithRawData(
"repo_id",
"commit_sha",
),
)
}
2 changes: 2 additions & 0 deletions backend/plugins/gitlab/e2e/mr_detail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ func TestGitlabMrDetailDataFlow(t *testing.T) {
},
}
// import raw data table
dataflowTester.FlushTabler(&code.PullRequestAssignee{})
dataflowTester.FlushTabler(&code.PullRequestReviewer{})
dataflowTester.ImportCsvIntoRawTable("./raw_tables/_raw_gitlab_api_merge_requests.csv",
"_raw_gitlab_api_merge_request_details")

Expand Down
79 changes: 0 additions & 79 deletions backend/plugins/gitlab/e2e/mr_enrich_test.go

This file was deleted.

This file was deleted.

Loading
Loading