Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions plugins/extractors/maxcompute/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,23 @@ source:
- schema_b
tables:
- schema_c.table_a
min_table_lifecycle: 8
concurrency: 10
```

## Inputs

| Key | Value | Example | Description | |
| :-- | :---- | :------ | :---------- | :-- |
| `project_name` | `string` | `goto_test` | MaxCompute Project Name | *required* |
| `endpoint_project` | `string` | `http://goto_test-maxcompute.com` | Endpoint Project URL | *required* |
| `access_key.id` | `string` | `access_key_id` | Access Key ID | *required* |
| `access_key.secret` | `string` | `access_key_secret` | Access Key Secret | *required* |
| `schema_name` | `string` | `DEFAULT` | Default schema name | *optional* |
| `exclude.schemas` | `[]string` | `["schema_a", "schema_b"]` | List of schemas to exclude | *optional* |
| `exclude.tables` | `[]string` | `["schema_c.table_a"]` | List of tables to exclude | *optional* |
| `concurrency` | `int` | `10` | Number of concurrent requests to MaxCompute | *optional* |
| Key | Value | Example | Description | |
|:------------------------------|:-----------|:----------------------------------|:----------------------------------------------------------------------------------------| :-- |
| `project_name` | `string` | `goto_test` | MaxCompute Project Name | *required* |
| `endpoint_project` | `string` | `http://goto_test-maxcompute.com` | Endpoint Project URL | *required* |
| `access_key.id` | `string` | `access_key_id` | Access Key ID | *required* |
| `access_key.secret` | `string` | `access_key_secret` | Access Key Secret | *required* |
| `schema_name` | `string` | `DEFAULT` | Default schema name | *optional* |
| `exclude.schemas` | `[]string` | `["schema_a", "schema_b"]` | List of schemas to exclude | *optional* |
| `exclude.tables` | `[]string` | `["schema_c.table_a"]` | List of tables to exclude | *optional* |
| `exclude.min_table_lifecycle` | `int` | `8` | Exclude tables with a lifecycle less than this value (in days). Value must more than 1. | *optional* |
| `concurrency` | `int` | `10` | Number of concurrent requests to MaxCompute | *optional* |

### *Notes*

Expand Down
5 changes: 3 additions & 2 deletions plugins/extractors/maxcompute/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ type Config struct {
} `mapstructure:"access_key"`
SchemaName string `mapstructure:"schema_name,omitempty"`
Exclude struct {
Schemas []string `mapstructure:"schemas"`
Tables []string `mapstructure:"tables"`
Schemas []string `mapstructure:"schemas"`
Tables []string `mapstructure:"tables"`
MinTableLifecycle int `mapstructure:"min_table_lifecycle"`
} `mapstructure:"exclude,omitempty"`
MaxPreviewRows int `mapstructure:"max_preview_rows,omitempty"`
MixValues bool `mapstructure:"mix_values,omitempty"`
Expand Down
33 changes: 29 additions & 4 deletions plugins/extractors/maxcompute/maxcompute.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (
attributesDataResourceURL = "resource_url"
attributesDataPartitionFields = "partition_fields"
attributesDataLabel = "label"
attributesDataResourceType = "resource_type"
attributesDataLifecycle = "lifecycle"
)

type Extractor struct {
Expand Down Expand Up @@ -145,6 +145,7 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error {
continue
}
if contains(e.config.Exclude.Schemas, schema.Name()) {
e.logger.Info("skipping schema as it is in the exclude list", "schema", schema.Name())
continue
}

Expand All @@ -164,7 +165,9 @@ func (e *Extractor) fetchTablesFromSchema(ctx context.Context, schema *odps.Sche
}

for _, table := range tables {
if contains(e.config.Exclude.Tables, fmt.Sprintf("%s.%s", table.SchemaName(), table.Name())) {
tableName := fmt.Sprintf("%s.%s", schema.Name(), table.Name())
if contains(e.config.Exclude.Tables, tableName) {
e.logger.Info("skipping table as it is in the exclude list", "table", tableName)
continue
}

Expand All @@ -183,6 +186,18 @@ func (e *Extractor) processTable(ctx context.Context, schema *odps.Schema, table
return err
}

// If lifecycle is less than the minimum lifecycle (days), skip the table
if e.config.Exclude.MinTableLifecycle > 1 {
lifecyclePermanent := tableSchema.Lifecycle == -1
lifecycleNotConfigured := tableSchema.Lifecycle == 0
if !lifecyclePermanent && !lifecycleNotConfigured && tableSchema.Lifecycle < e.config.Exclude.MinTableLifecycle {
tableName := fmt.Sprintf("%s.%s", schema.Name(), table.Name())
e.logger.Info("skipping table due to lifecycle less than minimum configured lifecycle",
"table", tableName, "lifecycle", tableSchema.Lifecycle)
return nil
}
}

asset, err := e.buildAsset(ctx, schema, table, tableType, tableSchema)
if err != nil {
e.logger.Error("failed to build asset", "table", table.Name(), "error", err)
Expand Down Expand Up @@ -222,7 +237,7 @@ func (e *Extractor) buildAsset(ctx context.Context, schema *odps.Schema,
Service: maxcomputeService,
}

tableAttributesData := e.buildTableAttributesData(schemaName, tableType, table, tableSchema)
tableAttributesData := e.buildTableAttributesData(schemaName, tableType, tableSchema)

if tableType == config.TableTypeView {
query := tableSchema.ViewText
Expand Down Expand Up @@ -266,7 +281,13 @@ func (e *Extractor) buildAsset(ctx context.Context, schema *odps.Schema,
columns = append(columns, columnData)
}

tableProfile := &v1beta2.TableProfile{}
if tableSchema.RecordNum >= 0 {
tableProfile.TotalRows = int64(tableSchema.RecordNum)
}

tableData := &v1beta2.Table{
Profile: tableProfile,
Attributes: utils.TryParseMapToProto(tableAttributesData),
Columns: columns,
CreateTime: timestamppb.New(time.Time(tableSchema.CreateTime)),
Expand Down Expand Up @@ -326,7 +347,7 @@ func buildColumns(dataType datatype.DataType) []*v1beta2.Column {
return columns
}

func (e *Extractor) buildTableAttributesData(schemaName, tableType string, table *odps.Table, tableInfo *tableschema.TableSchema) map[string]interface{} {
func (e *Extractor) buildTableAttributesData(schemaName, tableType string, tableInfo *tableschema.TableSchema) map[string]interface{} {
attributesData := map[string]interface{}{}

attributesData[attributesDataProjectName] = e.config.ProjectName
Expand All @@ -340,6 +361,10 @@ func (e *Extractor) buildTableAttributesData(schemaName, tableType string, table
attributesData[attributesDataSQL] = tableInfo.ViewText
}

if tableInfo.Lifecycle != 0 {
attributesData[attributesDataLifecycle] = tableInfo.Lifecycle
}

var partitionNames []interface{}
if tableInfo.PartitionColumns != nil && len(tableInfo.PartitionColumns) > 0 {
partitionNames = make([]interface{}, len(tableInfo.PartitionColumns))
Expand Down
37 changes: 32 additions & 5 deletions plugins/extractors/maxcompute/maxcompute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ func TestExtract(t *testing.T) {
odps.NewTable(nil, projectID, "my_schema", "new_table"),
}

table2 := []*odps.Table{
odps.NewTable(nil, projectID, "my_schema", "dummy_table"),
odps.NewTable(nil, projectID, "my_schema", "new_table"),
odps.NewTable(nil, projectID, "my_schema", "table_lifecycle_3"),
odps.NewTable(nil, projectID, "my_schema", "table_lifecycle_8"),
}

c1 := tableschema.Column{
Name: "id",
Type: datatype.BigIntType,
Expand Down Expand Up @@ -139,6 +146,7 @@ func TestExtract(t *testing.T) {
newTableSchemaBuilder.Name("new_table").
Columns(c3, c4)
newTableSchema := newTableSchemaBuilder.Build()
newTableSchema.TableName = "new_table"
newTableSchema.ViewText = "SELECT user_id, email FROM test-project-id.my_schema.new_table"
newCreateTime, err := time.Parse(time.RFC3339, "2024-11-18T08:00:00Z")
if err != nil {
Expand All @@ -147,10 +155,26 @@ func TestExtract(t *testing.T) {
newTableSchema.CreateTime = common.GMTTime(newCreateTime)
newTableSchema.LastModifiedTime = common.GMTTime(newCreateTime)

// Schema for table_lifecycle_3
tableLifecycle3Schema := newTableSchema // copy
tableLifecycle3Schema.TableName = "table_lifecycle_3"
tableLifecycle3Schema.ViewText = "SELECT user_id, email FROM test-project-id.my_schema.table_lifecycle_3"
tableLifecycle3Schema.Lifecycle = 3
tableLifecycle3Schema.RecordNum = 100

// Schema for table_lifecycle_8
tableLifecycle8Schema := newTableSchema // copy
tableLifecycle8Schema.TableName = "table_lifecycle_8"
tableLifecycle8Schema.ViewText = "SELECT user_id, email FROM test-project-id.my_schema.table_lifecycle_8"
tableLifecycle8Schema.Lifecycle = 8
tableLifecycle8Schema.RecordNum = 200

// Schema mapping
schemaMapping := map[string]*tableschema.TableSchema{
"dummy_table": &dummyTableSchema,
"new_table": &newTableSchema,
"dummy_table": &dummyTableSchema,
"new_table": &newTableSchema,
"table_lifecycle_3": &tableLifecycle3Schema,
"table_lifecycle_8": &tableLifecycle8Schema,
}

runTest := func(t *testing.T, cfg plugins.Config, mockSetup func(mockClient *mocks.MaxComputeClient), randomizer func(seed int64) func(int64) int64) ([]*v1beta2.Asset, error) {
Expand Down Expand Up @@ -289,13 +313,16 @@ func TestExtract(t *testing.T) {
},
"endpoint_project": "https://example.com/some-api",
"exclude": map[string]interface{}{
"tables": []string{"my_schema.dummy_table"},
"tables": []string{"my_schema.dummy_table"},
"min_table_lifecycle": 8,
},
},
}, func(mockClient *mocks.MaxComputeClient) {
mockClient.EXPECT().ListSchema(mock.Anything).Return(schema1, nil)
mockClient.EXPECT().ListTable(mock.Anything, "my_schema").Return(table1[1:], nil)
mockClient.EXPECT().GetTableSchema(mock.Anything, table1[1]).Return("MANAGED_TABLE", schemaMapping[table1[1].Name()], nil)
mockClient.EXPECT().ListTable(mock.Anything, "my_schema").Return(table2, nil)
mockClient.EXPECT().GetTableSchema(mock.Anything, table2[1]).Return("MANAGED_TABLE", schemaMapping[table2[1].Name()], nil)
mockClient.EXPECT().GetTableSchema(mock.Anything, table2[2]).Return("MANAGED_TABLE", schemaMapping[table2[2].Name()], nil)
mockClient.EXPECT().GetTableSchema(mock.Anything, table2[3]).Return("MANAGED_TABLE", schemaMapping[table2[3].Name()], nil)
mockClient.EXPECT().GetMaskingPolicies(mock.Anything).Return(map[string][]string{}, nil)
}, nil)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,55 +1,123 @@
[
{
"urn": "urn:maxcompute:test-project-id:table:test-project-id.my_schema.new_table",
"name": "new_table",
"service": "maxcompute",
"type": "table",
"url": "",
"description": "",
"data": {
"@type": "type.googleapis.com/gotocompany.assets.v1beta2.Table",
"profile": null,
"columns": [
{
"name": "user_id",
"description": "Unique identifier for users",
"data_type": "BIGINT",
"is_nullable": false,
"length": "0",
"profile": null,
"columns": [],
"attributes": {}
},
{
"name": "email",
"description": "User email address",
"data_type": "STRING",
"is_nullable": false,
"length": "0",
"profile": null,
"columns": [],
"attributes": {}
}
],
"preview_fields": [],
"preview_rows": null,
"attributes": {
"project_name": "test-project-id",
"resource_url": "/projects/test-project-id/schemas/my_schema/tables/new_table",
"schema": "my_schema",
"sql": "SELECT user_id, email FROM test-project-id.my_schema.new_table",
"type": "MANAGED_TABLE"
},
"create_time": "2024-11-18T08:00:00Z",
"update_time": "2024-11-18T08:00:00Z"
{
"urn": "urn:maxcompute:test-project-id:table:test-project-id.my_schema.new_table",
"name": "new_table",
"service": "maxcompute",
"type": "table",
"url": "",
"description": "",
"data": {
"@type": "type.googleapis.com/gotocompany.assets.v1beta2.Table",
"profile": {
"common_joins": [],
"filters": [],
"partition_key": "",
"partition_value": "",
"total_rows": "0",
"usage_count": "0"
},
"columns": [
{
"name": "user_id",
"description": "Unique identifier for users",
"data_type": "BIGINT",
"is_nullable": false,
"length": "0",
"profile": null,
"columns": [],
"attributes": {}
},
"owners": [],
"lineage": null,
"is_deleted": false,
"labels": {},
"refreshed_at": null,
"event": null,
"create_time": "2024-11-18T08:00:00Z",
"update_time": "2024-11-18T08:00:00Z"
}
{
"name": "email",
"description": "User email address",
"data_type": "STRING",
"is_nullable": false,
"length": "0",
"profile": null,
"columns": [],
"attributes": {}
}
],
"preview_fields": [],
"preview_rows": null,
"attributes": {
"project_name": "test-project-id",
"resource_url": "/projects/test-project-id/schemas/my_schema/tables/new_table",
"schema": "my_schema",
"sql": "SELECT user_id, email FROM test-project-id.my_schema.new_table",
"type": "MANAGED_TABLE"
},
"create_time": "2024-11-18T08:00:00Z",
"update_time": "2024-11-18T08:00:00Z"
},
"owners": [],
"lineage": null,
"is_deleted": false,
"labels": {},
"refreshed_at": null,
"event": null,
"create_time": "2024-11-18T08:00:00Z",
"update_time": "2024-11-18T08:00:00Z"
},
{
"urn": "urn:maxcompute:test-project-id:table:test-project-id.my_schema.table_lifecycle_8",
"name": "table_lifecycle_8",
"service": "maxcompute",
"type": "table",
"url": "",
"description": "",
"data": {
"@type": "type.googleapis.com/gotocompany.assets.v1beta2.Table",
"profile": {
"common_joins": [],
"filters": [],
"partition_key": "",
"partition_value": "",
"total_rows": "200",
"usage_count": "0"
},
"columns": [
{
"name": "user_id",
"description": "Unique identifier for users",
"data_type": "BIGINT",
"is_nullable": false,
"length": "0",
"profile": null,
"columns": [],
"attributes": {}
},
{
"name": "email",
"description": "User email address",
"data_type": "STRING",
"is_nullable": false,
"length": "0",
"profile": null,
"columns": [],
"attributes": {}
}
],
"preview_fields": [],
"preview_rows": null,
"attributes": {
"lifecycle": 8,
"project_name": "test-project-id",
"resource_url": "/projects/test-project-id/schemas/my_schema/tables/table_lifecycle_8",
"schema": "my_schema",
"sql": "SELECT user_id, email FROM test-project-id.my_schema.table_lifecycle_8",
"type": "MANAGED_TABLE"
},
"create_time": "2024-11-18T08:00:00Z",
"update_time": "2024-11-18T08:00:00Z"
},
"owners": [],
"lineage": null,
"is_deleted": false,
"labels": {},
"refreshed_at": null,
"event": null,
"create_time": "2024-11-18T08:00:00Z",
"update_time": "2024-11-18T08:00:00Z"
}
]
Loading
Loading