Skip to content

Commit

Permalink
Merge pull request bruin-data#287 from Barbar1432/filter-struct
Browse files Browse the repository at this point in the history
added filter-struct
  • Loading branch information
Barbar1432 authored Dec 6, 2024
2 parents e14a2ed + 788e45b commit 02dd853
Show file tree
Hide file tree
Showing 3 changed files with 775 additions and 63 deletions.
145 changes: 83 additions & 62 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,78 +288,27 @@ func Run(isDebug *bool) *cli.Command {
return cli.Exit("", 1)
}

runMain := true
runChecks := true
runPushMetadata := c.Bool("push-metadata") || foundPipeline.MetadataPush.HasAnyEnabled()
if runPushMetadata {
foundPipeline.MetadataPush.Global = true
}

onlyFlags := c.StringSlice("only")
if len(onlyFlags) > 0 {
runMain = slices.Contains(onlyFlags, "main")
runChecks = slices.Contains(onlyFlags, "checks")
runPushMetadata = slices.Contains(onlyFlags, "push-metadata")

for _, flag := range onlyFlags {
if flag != "main" && flag != "checks" && flag != "push-metadata" {
errorPrinter.Printf("Invalid value for '--only' flag: '%s', available values are 'main', 'checks', and 'push-metadata'\n", flag)
return cli.Exit("", 1)
}
}
filter := &Filter{
IncludeTag: c.String("tag"),
OnlyTaskTypes: c.StringSlice("only"),
IncludeDownstream: runDownstreamTasks,
PushMetaData: c.Bool("push-metadata"),
SingleTask: task,
}

s := scheduler.NewScheduler(logger, foundPipeline)

// mark all the instances to be skipped, then conditionally mark the ones to run to be pending
s.MarkAll(scheduler.Pending)
if task != nil {
logger.Debug("marking single task to run: ", task.Name)
s.MarkAll(scheduler.Succeeded)
s.MarkAsset(task, scheduler.Pending, runDownstreamTasks)

if c.String("tag") != "" {
errorPrinter.Printf("You cannot use the '--tag' flag when running a single asset.\n")
return cli.Exit("", 1)
}
}

sendTelemetry(s, c)

tag := c.String("tag")
if tag != "" {
assetsByTag := foundPipeline.GetAssetsByTag(tag)
if len(assetsByTag) == 0 {
errorPrinter.Printf("No assets found with the tag '%s'\n", tag)
return cli.Exit("", 1)
}

logger.Debugf("marking assets with tag '%s' to run", tag)
s.MarkAll(scheduler.Succeeded)
s.MarkByTag(tag, scheduler.Pending, runDownstreamTasks)

infoPrinter.Printf("Running only the assets with tag '%s', found %d assets.\n", tag, len(assetsByTag))
}

if !runMain {
logger.Debug("disabling main instances if any")
s.MarkPendingInstancesByType(scheduler.TaskInstanceTypeMain, scheduler.Succeeded)
}
if !runChecks {
logger.Debug("disabling check instances if any")
s.MarkPendingInstancesByType(scheduler.TaskInstanceTypeColumnCheck, scheduler.Succeeded)
s.MarkPendingInstancesByType(scheduler.TaskInstanceTypeCustomCheck, scheduler.Succeeded)
}
if !runPushMetadata {
logger.Debug("disabling metadata push instances if any")
s.MarkPendingInstancesByType(scheduler.TaskInstanceTypeMetadataPush, scheduler.Succeeded)
// Apply the filter to mark assets based on include/exclude tags
if err := filter.ApplyFiltersAndMarkAssets(foundPipeline, s); err != nil {
errorPrinter.Printf("Failed to filter assets: %v\n", err)
return cli.Exit("", 1)
}

if s.InstanceCountByStatus(scheduler.Pending) == 0 {
warningPrinter.Println("No tasks to run.")
return nil
}

sendTelemetry(s, c)
infoPrinter.Printf("\nStarting the pipeline execution...\n")
infoPrinter.Println()

Expand Down Expand Up @@ -756,3 +705,75 @@ func sendTelemetry(s *scheduler.Scheduler, c *cli.Context) {

telemetry.SendEventWithAssetStats("run_assets", assetStats, c)
}

type Filter struct {
IncludeTag string // Tag to include assets (from `--tag`)
OnlyTaskTypes []string // Task types to include (from `--only`)
IncludeDownstream bool // Whether to include downstream tasks (from `--downstream`)
PushMetaData bool
SingleTask *pipeline.Asset
}

func (f *Filter) ApplyFiltersAndMarkAssets(pipeline *pipeline.Pipeline, s *scheduler.Scheduler) error {
// Initially mark all tasks as pending
s.MarkAll(scheduler.Pending)

// Handle single-task execution
if f.SingleTask != nil {
// Skip all other tasks
s.MarkAll(scheduler.Succeeded)

// Mark the single task and optionally its downstream tasks
s.MarkAsset(f.SingleTask, scheduler.Pending, f.IncludeDownstream)
// Validate that --tag is not allowed with single task execution
if f.IncludeTag != "" {
return errors.New("you cannot use the '--tag' flag when running a single asset")
}
}
// Default task execution flags
runMain := true
runChecks := true
runPushMetadata := f.PushMetaData || pipeline.MetadataPush.HasAnyEnabled()

if runPushMetadata {
pipeline.MetadataPush.Global = true
}

// Apply task-type filtering if specified
if len(f.OnlyTaskTypes) > 0 {
// Validate the provided task types
for _, taskType := range f.OnlyTaskTypes {
if taskType != "main" && taskType != "checks" && taskType != "push-metadata" {
return fmt.Errorf("invalid value for '--only' flag: '%s', available values are 'main', 'checks', and 'push-metadata'", taskType)
}
}

// Update task execution flags based on filtering
runMain = slices.Contains(f.OnlyTaskTypes, "main")
runChecks = slices.Contains(f.OnlyTaskTypes, "checks")
runPushMetadata = slices.Contains(f.OnlyTaskTypes, "push-metadata")
}

// Handle include tag: Mark included assets as pending
if f.IncludeTag != "" {
includedAssets := pipeline.GetAssetsByTag(f.IncludeTag)
if len(includedAssets) == 0 {
return fmt.Errorf("no assets found with include tag '%s'", f.IncludeTag)
}
s.MarkAll(scheduler.Succeeded) // Skip everything first
s.MarkByTag(f.IncludeTag, scheduler.Pending, f.IncludeDownstream)
}
// Mark tasks in the scheduler
if !runMain {
s.MarkPendingInstancesByType(scheduler.TaskInstanceTypeMain, scheduler.Succeeded)
}
if !runChecks {
s.MarkPendingInstancesByType(scheduler.TaskInstanceTypeColumnCheck, scheduler.Succeeded)
s.MarkPendingInstancesByType(scheduler.TaskInstanceTypeCustomCheck, scheduler.Succeeded)
}
if !runPushMetadata {
s.MarkPendingInstancesByType(scheduler.TaskInstanceTypeMetadataPush, scheduler.Succeeded)
}

return nil
}
Loading

0 comments on commit 02dd853

Please sign in to comment.