Skip to content

Commit

Permalink
Merge pull request #91 from atlanhq/Snowflake-miner-s3
Browse files Browse the repository at this point in the history
feat: Add extraction from S3 for Snowflake Miner
  • Loading branch information
0xquark authored Feb 25, 2025
2 parents 4eed4cf + 7f6d3ec commit f3ffd5f
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 4 deletions.
4 changes: 4 additions & 0 deletions atlan/assets/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,10 @@ func handleApiError(response *http.Response, originalError error) error {
if err := json.Unmarshal(body, &errorResponse); err == nil {
fmt.Println(errorResponse)
causes = errorResponse.Causes
// Check for Atlan-specific error code 1006
if errorResponse.ErrorID == "1006" && strings.Contains(errorResponse.Message, "Please provide the required payload") {
return ThrowAtlanError(originalError, PERMISSION_PASSTHROUGH, nil, "API token doesn't have necessary permissions")
}
}
var causesString string
if len(causes) > 0 {
Expand Down
38 changes: 38 additions & 0 deletions atlan/assets/snowflake_miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,44 @@ func (s *SnowflakeMiner) Direct(startEpoch int64, database, schema string) *Snow
return s
}

// S3 sets up the miner to extract from S3 using JSON line-separated files.
//
// Parameters:
// - s3Bucket: S3 bucket where the JSON line-separated files are located
// - s3Prefix: Prefix within the S3 bucket where the JSON files are stored
// - sqlQueryKey: JSON key containing the query definition
// - defaultDatabaseKey: JSON key containing the default database name
// - defaultSchemaKey: JSON key containing the default schema name
// - sessionIDKey: JSON key containing the session ID of the SQL query
// - s3BucketRegion: (Optional) Region of the S3 bucket if applicable
//
// Returns:
// - SnowflakeMiner instance, set up to extract from S3
func (s *SnowflakeMiner) S3(
s3Bucket string,
s3Prefix string,
sqlQueryKey string,
defaultDatabaseKey string,
defaultSchemaKey string,
sessionIDKey string,
s3BucketRegion *string,
) *SnowflakeMiner {
s.Parameters = append(s.Parameters, structs.NameValuePair{Name: "extraction-method", Value: "s3"})
s.Parameters = append(s.Parameters, structs.NameValuePair{Name: "extraction-s3-bucket", Value: s3Bucket})
s.Parameters = append(s.Parameters, structs.NameValuePair{Name: "extraction-s3-prefix", Value: s3Prefix})
s.Parameters = append(s.Parameters, structs.NameValuePair{Name: "sql-json-key", Value: sqlQueryKey})
s.Parameters = append(s.Parameters, structs.NameValuePair{Name: "catalog-json-key", Value: defaultDatabaseKey})
s.Parameters = append(s.Parameters, structs.NameValuePair{Name: "schema-json-key", Value: defaultSchemaKey})
s.Parameters = append(s.Parameters, structs.NameValuePair{Name: "session-json-key", Value: sessionIDKey})

// Add S3 bucket region only if provided
if s3BucketRegion != nil {
s.Parameters = append(s.Parameters, structs.NameValuePair{Name: "extraction-s3-region", Value: *s3BucketRegion})
}

return s
}

// ExcludeUsers excludes certain users from being considered in the usage metrics calculation for assets (e.g., system users).
//
// Param:
Expand Down
31 changes: 27 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,43 @@ package main

import (
"fmt"
"log"

"github.com/atlanhq/atlan-go/atlan/assets"
"github.com/atlanhq/atlan-go/atlan/logger"
"github.com/atlanhq/atlan-go/atlan/model/structs"
)

func main() {
ctx := assets.NewContext()
ctx.EnableLogging("debug")

response, err := ctx.UserClient.RemoveUser("singhkaranjot99", "karanjot.singh", nil)
miner := assets.NewSnowflakeMiner("default/snowflake/1739484068").
S3(
"test-s3-bucket",
"test-s3-prefix",
"TEST_QUERY",
"TEST_SNOWFLAKE",
"TEST_SCHEMA",
"TEST_SESSION_ID",
structs.StringPtr("test-s3-bucket-region"),
).
PopularityWindow(30).
NativeLineage(true).
CustomConfig(map[string]interface{}{
"test": true,
"feature": 1234,
}).
ToWorkflow()

Schedule := structs.WorkflowSchedule{CronSchedule: "45 5 * * *", Timezone: "Europe/Paris"}

// Run the workflow
response, err := ctx.WorkflowClient.Run(miner, &Schedule)
if err != nil {
log.Fatalf("Error deleting user: %v", err)
logger.Log.Errorf("Error running workflow: %v", err)
return
}
fmt.Printf("Workflow Execution Response: %+v\n", *response)
fmt.Println(response.Spec)

/*
workflowJSON := `{
Expand Down

0 comments on commit f3ffd5f

Please sign in to comment.