Skip to content

Commit

Permalink
third argument (num workers) can now be -1 to not constrain worker pool
Browse files Browse the repository at this point in the history
  • Loading branch information
lsh-0 committed Nov 8, 2023
1 parent 9610515 commit e5f3d78
Showing 1 changed file with 11 additions and 7 deletions.
18 changes: 11 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ import (
"sync"
"time"

"github.com/sourcegraph/conc/pool"

"github.com/santhosh-tekuri/jsonschema/v5"
"github.com/sourcegraph/conc/pool"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
)
Expand Down Expand Up @@ -52,7 +51,6 @@ type Article struct {
Type string // POA or VOR
FileName string
Data interface{} // unmarshalled json data

}

func configure_validator(schema_root string) map[string]Schema {
Expand Down Expand Up @@ -139,7 +137,6 @@ func validate(schema Schema, article interface{}) (time.Duration, error) {
end := time.Now()
elapsed := end.Sub(start)
return elapsed, err

}

func path_exists(path string) bool {
Expand All @@ -154,7 +151,6 @@ func path_is_dir(path string) bool {
}

func validate_article(schema_map map[string]Schema, article Article, capture_error bool) Result {

// read article data and determine schema to use
schema, present := schema_map[article.Type]
if !present {
Expand Down Expand Up @@ -215,6 +211,7 @@ func die(b bool, msg string) {
}
}

/*
// process `file_list` by `num_workers` number of goroutines
func process_files_in_parallel(num_workers int, file_list []string, schema_map map[string]Schema, capture_errors bool) (time.Time, time.Time, []Result) {
worker_pool := pool.NewWithResults[Result]().WithMaxGoroutines(num_workers)
Expand All @@ -232,7 +229,7 @@ func process_files_in_parallel(num_workers int, file_list []string, schema_map m
end_time := time.Now()
return start_time, end_time, result_list
}

*/
// keep a buffer of N files in memory at once to feed a pool of validators.
// ensures disk I/O is not a factor in keeping the CPU busy.
func process_files_with_feeder(buffer_size int, num_workers int, file_list []string, schema_map map[string]Schema, capture_errors bool) (time.Time, time.Time, []Result) {
Expand All @@ -256,7 +253,10 @@ func process_files_with_feeder(buffer_size int, num_workers int, file_list []str

// process articles from `article_chan` until it's closed.

worker_pool := pool.NewWithResults[Result]().WithMaxGoroutines(num_workers)
worker_pool := pool.NewWithResults[Result]()
if num_workers >= 1 {
worker_pool = worker_pool.WithMaxGoroutines(num_workers)
}
start_time := time.Now()
for article := range article_chan {
article := article
Expand Down Expand Up @@ -299,6 +299,10 @@ func do() {
if len(args) > 3 {
num_workers, err = strconv.Atoi(args[3])
die(err != nil, "third argument (number of workers) is not an integer.")
if num_workers <= 0 && num_workers != -1 {
fmt.Printf("third argument (number of workers) must be either -1 (unbounded) or a value > 0")
os.Exit(1)
}
}

// the number of articles to keep in memory at once.
Expand Down

0 comments on commit e5f3d78

Please sign in to comment.